spark原始碼分析之NioBufferedFileInputStream
NioBufferedFileInputStream是spark實現的一種新的位元組流,它既支援內部緩衝區,又支援nio讀取檔案,使用direct buffer避免java堆與native記憶體之間的資料拷貝。在Java jdk中沒有可供直接使用的具備以上2個功能的位元組流。sun.nio.ch.ChannelInputStream雖然支援使用nio讀取一個檔案,但是不支援緩衝。
NioBufferedFileInputStream的實現方式與BufferedInputStream有點類似,都具有:
fill方法(spark為refill方法)
read方法
skip方法
available方法
NioBufferedFileInputStream不支援BufferedInputStream的markPos標記。使用markPos標記回覆到已讀資料的某個位點。
NioBufferedFileInputStream的原始碼如下:
package org.apache.spark.io; import org.apache.spark.storage.StorageUtils; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; /** * {@link InputStream} implementation which uses direct buffer * to read a file to avoid extra copy of data between Java and * native memory which happens when using {@link java.io.BufferedInputStream}. * Unfortunately, this is not something already available in JDK, * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio, * but does not support buffering. */ public final class NioBufferedFileInputStream extends InputStream { private static final int DEFAULT_BUFFER_SIZE_BYTES = 8192; private final ByteBuffer byteBuffer; private final FileChannel fileChannel; public NioBufferedFileInputStream(File file, int bufferSizeInBytes) throws IOException { byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes); fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); byteBuffer.flip(); } public NioBufferedFileInputStream(File file) throws IOException { this(file, DEFAULT_BUFFER_SIZE_BYTES); } /** * Checks weather data is left to be read from the input stream. * @return true if data is left, false otherwise * @throws IOException */ private boolean refill() throws IOException { if (!byteBuffer.hasRemaining()) { byteBuffer.clear(); int nRead = 0; while (nRead == 0) { nRead = fileChannel.read(byteBuffer); } if (nRead < 0) { return false; } byteBuffer.flip(); } return true; } @Override public synchronized int read() throws IOException { if (!refill()) { return -1; } return byteBuffer.get() & 0xFF; } @Override public synchronized int read(byte[] b, int offset, int len) throws IOException { if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) { throw new IndexOutOfBoundsException(); } if (!refill()) { return -1; } len = Math.min(len, byteBuffer.remaining()); byteBuffer.get(b, offset, len); return len; } @Override public synchronized int available() throws IOException { return byteBuffer.remaining(); } @Override public synchronized long skip(long n) throws IOException { if (n <= 0L) { return 0L; } if (byteBuffer.remaining() >= n) { // The buffered content is enough to skip byteBuffer.position(byteBuffer.position() + (int) n); return n; } long skippedFromBuffer = byteBuffer.remaining(); long toSkipFromFileChannel = n - skippedFromBuffer; // Discard everything we have read in the buffer. byteBuffer.position(0); byteBuffer.flip(); return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel); } private long skipFromFileChannel(long n) throws IOException { long currentFilePosition = fileChannel.position(); long size = fileChannel.size(); if (n > size - currentFilePosition) { fileChannel.position(size); return size - currentFilePosition; } else { fileChannel.position(currentFilePosition + n); return n; } } @Override public synchronized void close() throws IOException { fileChannel.close(); StorageUtils.dispose(byteBuffer); } @Override protected void finalize() throws IOException { close(); } }
refill()方法
處理方式如下:
如果bytebuffer已滿,沒有剩餘位元組數,清空bytebuffer,然後從fileChannel中讀取資料填充到bytebuffer中。
如果到達流的末端,返回false,否則返回true。是否像BufferedInputStream填滿緩衝區那樣填滿bytebuffer,未明。
read()方法
執行一次refill()方法,再呼叫bytebuffer的get方法。返回bytebuffer的當前position的對應位元組,並將position前移一位。
處理方式同樣可以簡稱為:沒剩先填充,有剩就讀取。與BufferedInputStream不同的只是,判斷是否有剩這個邏輯放到了refill方法中。
read(byte[] b, int offset, int len)方法
執行一次refill()方法,再呼叫bytebuffer的get(byte[] b, int offset, int len)方法。不一定能讀取目標位元組數,當bytebuffer中的剩餘位元組數不夠時,讀取剩餘位元組數。方法返回實際讀取位元組數。
available()方法
直接呼叫bytebuffer的remaining()方法,返回bytebuffer的剩餘位元組數。
skip(int n)方法
如果bytebuffer的剩餘位元組數大於目標跳過位元組數,直接將bytebuffer的position前移n位。
如果bytebuffer的剩餘位元組數不夠大,不足以跳過目標位元組數,分兩步來完成跳過目標位元組數:
1、處理bytebuffer。把bytebuffer的剩餘位元組數都跳完,重置position,丟棄在bytebuffer中已讀的所有內容。此時仍留有一部分未跳目標位元組數,假設為m。
2、處理fileChannel。將fileChannel的position前移m位。同樣,如果fileChannel的size不夠大,直接將position前移到末尾的size位置。
最後,返回實際跳過的位元組數。