package rwbigfile; import; import; import; import; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.nio.channels.ReadableByteChannel; import; import; import util.StopWatch; /** * NIO写大文件比较 * @author Will * */ public class WriteBigFileComparison { // data chunk be written per time private static final int DATA_CHUNK = 128 * 1024 * 1024; // total data size is 2G private static final long LEN = 2L * 1024 * 1024 * 1024L; public static void writeWithFileChannel() throws IOException { File file = new File("e:/test/fc.dat"); if (file.exists()) { file.delete(); } RandomAccessFile raf = new RandomAccessFile(file, "rw"); FileChannel fileChannel = raf.getChannel(); byte[] data = null; long len = LEN; ByteBuffer buf = ByteBuffer.allocate(DATA_CHUNK); int dataChunk = DATA_CHUNK / (1024 * 1024); while (len >= DATA_CHUNK) { System.out.println("write a data chunk: " + dataChunk + "MB"); buf.clear(); // clear for re-write data = new byte[DATA_CHUNK]; for (int i = 0; i < DATA_CHUNK; i++) { buf.put(data[i]); } data = null; buf.flip(); // switches a Buffer from writing mode to reading mode fileChannel.write(buf); fileChannel.force(true); len -= DATA_CHUNK; } if (len > 0) { System.out.println("write rest data chunk: " + len + "B"); buf = ByteBuffer.allocateDirect((int) len); data = new byte[(int) len]; for (int i = 0; i < len; i++) { buf.put(data[i]); } buf.flip(); // switches a Buffer from writing mode to reading mode, position to 0, limit not changed fileChannel.write(buf); fileChannel.force(true); data = null; } fileChannel.close(); raf.close(); } /** * write big file with MappedByteBuffer * @throws IOException */ public static void writeWithMappedByteBuffer() throws IOException { File file = new File("e:/test/mb.dat"); if (file.exists()) { file.delete(); } RandomAccessFile raf = new RandomAccessFile(file, "rw"); FileChannel fileChannel = raf.getChannel(); int pos = 0; MappedByteBuffer mbb = null; byte[] data = null; long len = LEN; int dataChunk = DATA_CHUNK / (1024 * 1024); while (len >= DATA_CHUNK) { System.out.println("write a data chunk: " + dataChunk + "MB"); mbb =, pos, DATA_CHUNK); data = new byte[DATA_CHUNK]; mbb.put(data); data = null; len -= DATA_CHUNK; pos += DATA_CHUNK; } if (len > 0) { System.out.println("write rest data chunk: " + len + "B"); mbb =, pos, len); data = new byte[(int) len]; mbb.put(data); } data = null; unmap(mbb); // release MappedByteBuffer fileChannel.close(); } public static void writeWithTransferTo() throws IOException { File file = new File("e:/test/transfer.dat"); if (file.exists()) { file.delete(); } RandomAccessFile raf = new RandomAccessFile(file, "rw"); FileChannel toFileChannel = raf.getChannel(); long len = LEN; byte[] data = null; ByteArrayInputStream bais = null; ReadableByteChannel fromByteChannel = null; long position = 0; int dataChunk = DATA_CHUNK / (1024 * 1024); while (len >= DATA_CHUNK) { System.out.println("write a data chunk: " + dataChunk + "MB"); data = new byte[DATA_CHUNK]; bais = new ByteArrayInputStream(data); fromByteChannel = Channels.newChannel(bais); long count = DATA_CHUNK; toFileChannel.transferFrom(fromByteChannel, position, count); data = null; position += DATA_CHUNK; len -= DATA_CHUNK; } if (len > 0) { System.out.println("write rest data chunk: " + len + "B"); data = new byte[(int) len]; bais = new ByteArrayInputStream(data); fromByteChannel = Channels.newChannel(bais); long count = len; toFileChannel.transferFrom(fromByteChannel, position, count); } data = null; toFileChannel.close(); fromByteChannel.close(); } /** * 在MappedByteBuffer释放后再对它进行读操作的话就会引发jvm crash,在并发情况下很容易发生 * 正在释放时另一个线程正开始读取,于是crash就发生了。所以为了系统稳定性释放前一般需要检 * 查是否还有线程在读或写 * @param mappedByteBuffer */ public static void unmap(final MappedByteBuffer mappedByteBuffer) { try { if (mappedByteBuffer == null) { return; } mappedByteBuffer.force(); AccessController.doPrivileged(new PrivilegedAction
MappedByteBuffer需要占用“双倍”的内存(对象JVM堆内存和Direct Byte Buffer内存),可以通过-XX:MaxDirectMemorySize参数设置后者最大大小
import; import; import; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; public class MappedFile { // 文件名 private String fileName; // 文件所在目录路径 private String fileDirPath; // 文件对象 private File file; private MappedByteBuffer mappedByteBuffer; private FileChannel fileChannel; private boolean boundSuccess = false; // 文件最大只能为50MB private final static long MAX_FILE_SIZE = 1024 * 1024 * 50; // 最大的脏数据量512KB,系统必须触发一次强制刷 private long MAX_FLUSH_DATA_SIZE = 1024 * 512; // 最大的刷间隔,系统必须触发一次强制刷 private long MAX_FLUSH_TIME_GAP = 1000; // 文件写入位置 private long writePosition = 0; // 最后一次刷数据的时候 private long lastFlushTime; // 上一次刷的文件位置 private long lastFlushFilePosition = 0; public MappedFile(String fileName, String fileDirPath) { super(); this.fileName = fileName; this.fileDirPath = fileDirPath; this.file = new File(fileDirPath + "/" + fileName); if (!file.exists()) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } } /** * * 内存映照文件绑定 * @return */ public synchronized boolean boundChannelToByteBuffer() { try { RandomAccessFile raf = new RandomAccessFile(file, "rw"); this.fileChannel = raf.getChannel(); } catch (Exception e) { e.printStackTrace(); this.boundSuccess = false; return false; } try { this.mappedByteBuffer = this.fileChannel .map(FileChannel.MapMode.READ_WRITE, 0, MAX_FILE_SIZE); } catch (IOException e) { e.printStackTrace(); this.boundSuccess = false; return false; } this.boundSuccess = true; return true; } /** * 写数据:先将之前的文件删除然后重新 * @param data * @return */ public synchronized boolean writeData(byte[] data) { return false; } /** * 在文件末尾追加数据 * @param data * @return * @throws Exception */ public synchronized boolean appendData(byte[] data) throws Exception { if (!boundSuccess) { boundChannelToByteBuffer(); } writePosition = writePosition + data.length; if (writePosition >= MAX_FILE_SIZE) { // 如果写入data会超出文件大小限制,不写入 flush(); writePosition = writePosition - data.length; System.out.println("File=" + file.toURI().toString() + " is written full."); System.out.println("already write data length:" + writePosition + ", max file size=" + MAX_FILE_SIZE); return false; } this.mappedByteBuffer.put(data); // 检查是否需要把内存缓冲刷到磁盘 if ( (writePosition - lastFlushFilePosition > this.MAX_FLUSH_DATA_SIZE) || (System.currentTimeMillis() - lastFlushTime > this.MAX_FLUSH_TIME_GAP && writePosition > lastFlushFilePosition) ) { flush(); // 刷到磁盘 } return true; } public synchronized void flush() { this.mappedByteBuffer.force(); this.lastFlushTime = System.currentTimeMillis(); this.lastFlushFilePosition = writePosition; } public long getLastFlushTime() { return lastFlushTime; } public String getFileName() { return fileName; } public String getFileDirPath() { return fileDirPath; } public boolean isBundSuccess() { return boundSuccess; } public File getFile() { return file; } public static long getMaxFileSize() { return MAX_FILE_SIZE; } public long getWritePosition() { return writePosition; } public long getLastFlushFilePosition() { return lastFlushFilePosition; } public long getMAX_FLUSH_DATA_SIZE() { return MAX_FLUSH_DATA_SIZE; } public long getMAX_FLUSH_TIME_GAP() { return MAX_FLUSH_TIME_GAP; } }
Copyright © 2009-2022 成都快上网科技有限公司 版权所有 蜀ICP备19037934号