1. 程式人生 > 實用技巧 >netty之微信-資料傳輸載體 ByteBuf (八)

netty之微信-資料傳輸載體 ByteBuf (八)

資料傳輸載體 ByteBuf 介紹

在前面一小節,我們已經瞭解到 Netty 裡面資料讀寫是以 ByteBuf 為單位進行互動的,這一小節,我們就來詳細剖析一下 ByteBuf

ByteBuf結構

首先,我們先來了解一下 ByteBuf 的結構

以上就是一個 ByteBuf 的結構圖,從上面這幅圖可以看到

ByteBuf 是一個位元組容器,容器裡面的的資料分為三個部分,第一個部分是已經丟棄的位元組,這部分資料是無效的;第二部分是可讀位元組,這部分資料是 ByteBuf 的主體資料, 從 ByteBuf 裡面讀取的資料都來自這一部分;最後一部分的資料是可寫位元組,所有寫到 ByteBuf 的資料都會寫到這一段。最後一部分虛線表示的是該 ByteBuf 最多還能擴容多少容量

以上三段內容是被兩個指標給劃分出來的,從左到右,依次是讀指標(readerIndex)、寫指標(writerIndex),然後還有一個變數 capacity,表示 ByteBuf 底層記憶體的總容量
從 ByteBuf 中每讀取一個位元組,readerIndex 自增1,ByteBuf 裡面總共有 writerIndex-readerIndex 個位元組可讀, 由此可以推論出當 readerIndex 與 writerIndex 相等的時候,ByteBuf 不可讀
寫資料是從 writerIndex 指向的部分開始寫,每寫一個位元組,writerIndex 自增1,直到增到 capacity,這個時候,表示 ByteBuf 已經不可寫了
ByteBuf 裡面其實還有一個引數 maxCapacity,當向 ByteBuf 寫資料的時候,如果容量不足,那麼這個時候可以進行擴容,直到 capacity 擴容到 maxCapacity,超過 maxCapacity 就會報錯
Netty 使用 ByteBuf 這個資料結構可以有效地區分可讀資料和可寫資料,讀寫之間相互沒有衝突,當然,ByteBuf 只是對二進位制資料的抽象,具體底層的實現我們在下面的小節會講到,在這一小節,我們 只需要知道 Netty 關於資料讀寫只認 ByteBuf,下面,我們就來學習一下 ByteBuf 常用的 API

容量 API
capacity()

表示 ByteBuf 底層佔用了多少位元組的記憶體(包括丟棄的位元組、可讀位元組、可寫位元組),不同的底層實現機制有不同的計算方式,後面我們講 ByteBuf 的分類的時候會講到

maxCapacity()

表示 ByteBuf 底層最大能夠佔用多少位元組的記憶體,當向 ByteBuf 中寫資料的時候,如果發現容量不足,則進行擴容,直到擴容到 maxCapacity,超過這個數,就拋異常

readableBytes() 與 isReadable()

readableBytes() 表示 ByteBuf 當前可讀的位元組數,它的值等於 writerIndex-readerIndex,如果兩者相等,則不可讀,isReadable() 方法返回 false

writableBytes()、 isWritable() 與 maxWritableBytes()

writableBytes() 表示 ByteBuf 當前可寫的位元組數,它的值等於 capacity-writerIndex,如果兩者相等,則表示不可寫,isWritable() 返回 false,但是這個時候,並不代表不能往 ByteBuf 中寫資料了, 如果發現往 ByteBuf 中寫資料寫不進去的話,Netty 會自動擴容 ByteBuf,直到擴容到底層的記憶體大小為 maxCapacity,而 maxWritableBytes() 就表示可寫的最大位元組數,它的值等於 maxCapacity-writerIndex

讀寫指標相關的 API
readerIndex() 與 readerIndex(int)

前者表示返回當前的讀指標 readerIndex, 後者表示設定讀指標

writeIndex() 與 writeIndex(int)

前者表示返回當前的寫指標 writerIndex, 後者表示設定寫指標

markReaderIndex() 與 resetReaderIndex()

前者表示把當前的讀指標儲存起來,後者表示把當前的讀指標恢復到之前儲存的值,下面兩段程式碼是等價的

// 程式碼片段1
int readerIndex = buffer.readerIndex();
// .. 其他操作
buffer.readerIndex(readerIndex);
 
 
// 程式碼片段二
buffer.markReaderIndex();
// .. 其他操作
buffer.resetReaderIndex();

希望大家多多使用程式碼片段二這種方式,不需要自己定義變數,無論 buffer 當作引數傳遞到哪裡,呼叫 resetReaderIndex() 都可以恢復到之前的狀態,在解析自定義協議的資料包的時候非常常見,推薦大家使用這一對 API

markWriterIndex() 與 resetWriterIndex()

這一對 API 的作用與上述一對 API 類似,這裡不再 贅述

讀寫 API
本質上,關於 ByteBuf 的讀寫都可以看作從指標開始的地方開始讀寫資料

writeBytes(byte[] src) 與 buffer.readBytes(byte[] dst)

writeBytes() 表示把位元組陣列 src 裡面的資料全部寫到 ByteBuf,而 readBytes() 指的是把 ByteBuf 裡面的資料全部讀取到 dst,這裡 dst 位元組陣列的大小通常等於 readableBytes(),而 src 位元組陣列大小的長度通常小於等於 writableBytes()

writeByte(byte b) 與 buffer.readByte()

writeByte() 表示往 ByteBuf 中寫一個位元組,而 buffer.readByte() 表示從 ByteBuf 中讀取一個位元組,類似的 API 還有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 與 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 這裡就不一一贅述了,相信讀者應該很容易理解這些 API

與讀寫 API 類似的 API 還有 getBytes、getByte() 與 setBytes()、setByte() 系列,唯一的區別就是 get/set 不會改變讀寫指標,而 read/write 會改變讀寫指標,這點在解析資料的時候千萬要注意

release() 與 retain()

由於 Netty 使用了堆外記憶體,而堆外記憶體是不被 jvm 直接管理的,也就是說申請到的記憶體無法被垃圾回收器直接回收,所以需要我們手動回收。有點類似於c語言裡面,申請到的記憶體必須手工釋放,否則會造成記憶體洩漏。

Netty 的 ByteBuf 是通過引用計數的方式管理的,如果一個 ByteBuf 沒有地方被引用到,需要回收底層記憶體。預設情況下,當建立完一個 ByteBuf,它的引用為1,然後每次呼叫 retain() 方法, 它的引用就加一, release() 方法原理是將引用計數減一,減完之後如果發現引用計數為0,則直接回收 ByteBuf 底層的記憶體。

slice()、duplicate()、copy()

這三個方法通常情況會放到一起比較,這三者的返回值都是一個新的 ByteBuf 物件

slice() 方法從原始 ByteBuf 中擷取一段,這段資料是從 readerIndex 到 writeIndex,同時,返回的新的 ByteBuf 的最大容量 maxCapacity 為原始 ByteBuf 的 readableBytes()
duplicate() 方法把整個 ByteBuf 都截取出來,包括所有的資料,指標資訊
slice() 方法與 duplicate() 方法的相同點是:底層記憶體以及引用計數與原始的 ByteBuf 共享,也就是說經過 slice() 或者 duplicate() 返回的 ByteBuf 呼叫 write 系列方法都會影響到 原始的 ByteBuf,但是它們都維持著與原始 ByteBuf 相同的記憶體引用計數和不同的讀寫指標
slice() 方法與 duplicate() 不同點就是:slice() 只擷取從 readerIndex 到 writerIndex 之間的資料,它返回的 ByteBuf 的最大容量被限制到 原始 ByteBuf 的 readableBytes(), 而 duplicate() 是把整個 ByteBuf 都與原始的 ByteBuf 共享
slice() 方法與 duplicate() 方法不會拷貝資料,它們只是通過改變讀寫指標來改變讀寫的行為,而最後一個方法 copy() 會直接從原始的 ByteBuf 中拷貝所有的資訊,包括讀寫指標以及底層對應的資料,因此,往 copy() 返回的 ByteBuf 中寫資料不會影響到原始的 ByteBuf
slice() 和 duplicate() 不會改變 ByteBuf 的引用計數,所以原始的 ByteBuf 呼叫 release() 之後發現引用計數為零,就開始釋放記憶體,呼叫這兩個方法返回的 ByteBuf 也會被釋放,這個時候如果再對它們進行讀寫,就會報錯。因此,我們可以通過呼叫一次 retain() 方法 來增加引用,表示它們對應的底層的記憶體多了一次引用,引用計數為2,在釋放記憶體的時候,需要呼叫兩次 release() 方法,將引用計數降到零,才會釋放記憶體
這三個方法均維護著自己的讀寫指標,與原始的 ByteBuf 的讀寫指標無關,相互之間不受影響
retainedSlice() 與 retainedDuplicate()

相信讀者應該已經猜到這兩個 API 的作用了,它們的作用是在擷取記憶體片段的同時,增加記憶體的引用計數,分別與下面兩段程式碼等價

// retainedSlice 等價於
slice().retain();
 
// retainedDuplicate() 等價於
duplicate().retain()

使用到 slice 和 duplicate 方法的時候,千萬要理清記憶體共享,引用計數共享,讀寫指標不共享幾個概念,下面舉兩個常見的易犯錯的例子

  1. 多次釋放
Buffer buffer = xxx;
doWith(buffer);
// 一次釋放
buffer.release();
 
 
public void doWith(Bytebuf buffer) {
// ...    
    
// 沒有增加引用計數
Buffer slice = buffer.slice();
 
foo(slice);
 
}
 
 
public void foo(ByteBuf buffer) {
    // read from buffer
    
    // 重複釋放
    buffer.release();
}

這裡的 doWith 有的時候是使用者自定義的方法,有的時候是 Netty 的回撥方法,比如channelRead()等等

  1. 不釋放造成記憶體洩漏
Buffer buffer = xxx;
doWith(buffer);
// 引用計數為2,呼叫 release 方法之後,引用計數為1,無法釋放記憶體 
buffer.release();
 
 
public void doWith(Bytebuf buffer) {
// ...    
    
// 增加引用計數
Buffer slice = buffer.retainedSlice();
 
foo(slice);
 
// 沒有呼叫 release
 
}
 
 
public void foo(ByteBuf buffer) {
    // read from buffer
}

想要避免以上兩種情況發生,大家只需要記得一點,在一個函式體裡面,只要增加了引用計數(包括 ByteBuf 的建立和手動呼叫 retain() 方法),就必須呼叫 release() 方法

實戰
瞭解了以上 API 之後,最後我們使用上述 API 來 寫一個簡單的 demo

ByteBufTest.java

public class ByteBufTest {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
 
        print("allocate ByteBuf(9, 100)", buffer);
 
        // write 方法改變寫指標,寫完之後寫指標未到 capacity 的時候,buffer 仍然可寫
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        print("writeBytes(1,2,3,4)", buffer);
 
        // write 方法改變寫指標,寫完之後寫指標未到 capacity 的時候,buffer 仍然可寫, 寫完 int 型別之後,寫指標增加4
        buffer.writeInt(12);
        print("writeInt(12)", buffer);
 
        // write 方法改變寫指標, 寫完之後寫指標等於 capacity 的時候,buffer 不可寫
        buffer.writeBytes(new byte[]{5});
        print("writeBytes(5)", buffer);
 
        // write 方法改變寫指標,寫的時候發現 buffer 不可寫則開始擴容,擴容之後 capacity 隨即改變
        buffer.writeBytes(new byte[]{6});
        print("writeBytes(6)", buffer);
 
        // get 方法不改變讀寫指標
        System.out.println("getByte(3) return: " + buffer.getByte(3));
        System.out.println("getShort(3) return: " + buffer.getShort(3));
        System.out.println("getInt(3) return: " + buffer.getInt(3));
        print("getByte()", buffer);
 
 
        // set 方法不改變讀寫指標
        buffer.setByte(buffer.readableBytes() + 1, 0);
        print("setByte()", buffer);
 
        // read 方法改變讀指標
        byte[] dst = new byte[buffer.readableBytes()];
        buffer.readBytes(dst);
        print("readBytes(" + dst.length + ")", buffer);
 
    }
 
    private static void print(String action, ByteBuf buffer) {
        System.out.println("after ===========" + action + "============");
        System.out.println("capacity(): " + buffer.capacity());
        System.out.println("maxCapacity(): " + buffer.maxCapacity());
        System.out.println("readerIndex(): " + buffer.readerIndex());
        System.out.println("readableBytes(): " + buffer.readableBytes());
        System.out.println("isReadable(): " + buffer.isReadable());
        System.out.println("writerIndex(): " + buffer.writerIndex());
        System.out.println("writableBytes(): " + buffer.writableBytes());
        System.out.println("isWritable(): " + buffer.isWritable());
        System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
        System.out.println();
    }
}

最後,控制檯輸出

after ===========allocate ByteBuf(9, 100)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 0
isReadable(): false
writerIndex(): 0
writableBytes(): 9
isWritable(): true
maxWritableBytes(): 100
after ===========writeBytes(1,2,3,4)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 4
isReadable(): true
writerIndex(): 4
writableBytes(): 5
isWritable(): true
maxWritableBytes(): 96
after ===========writeInt(12)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 8
isReadable(): true
writerIndex(): 8
writableBytes(): 1
isWritable(): true
maxWritableBytes(): 92
after ===========writeBytes(5)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 9
isReadable(): true
writerIndex(): 9
writableBytes(): 0
isWritable(): false
maxWritableBytes(): 91
after ===========writeBytes(6)============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90
getByte(3) return: 4
getShort(3) return: 1024
getInt(3) return: 67108864
after ===========getByte()============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90
after ===========setByte()============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90
after ===========readBytes(10)============
capacity(): 64
maxCapacity(): 100
readerIndex(): 10
readableBytes(): 0
isReadable(): false
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90

相信大家在瞭解了 ByteBuf 的結構之後,不難理解控制檯的輸出

總結
本小節,我們分析了 Netty 對二進位制資料的抽象 ByteBuf 的結構,本質上它的原理就是,它引用了一段記憶體,這段記憶體可以是堆內也可以是堆外的,然後用引用計數來控制這段記憶體是否需要被釋放,使用讀寫指標來控制對 ByteBuf 的讀寫,可以理解為是外觀模式的一種使用
基於讀寫指標和容量、最大可擴容容量,衍生出一系列的讀寫方法,要注意 read/write 與 get/set 的區別
多個 ByteBuf 可以引用同一段記憶體,通過引用計數來控制記憶體的釋放,遵循誰 retain() 誰 release() 的原則
最後,我們通過一個具體的例子說明 ByteBuf 的實際使用