Netty原始碼分析第5章(ByteBuf)---->第4節: PooledByteBufAllocator簡述
Netty原始碼分析第五章: ByteBuf
第四節: PooledByteBufAllocator簡述
上一小節簡單介紹了ByteBufAllocator以及其子類UnPooledByteBufAllocator的緩衝區分類的邏輯, 這一小節開始帶大家剖析更為複雜的PooledByteBufAllocator, 我們知道PooledByteBufAllocator是通過自己取一塊連續的記憶體進行ByteBuf的封裝, 所以這裡更為複雜, 在這一小節簡單講解有關PooledByteBufAllocator分配邏輯
友情提示: 從這一節開始難度開始加大, 請各位戰友做好心理準備
PooledByteBufAllocator同樣也重寫了AbstractByteBuf的newDirectBuffer和newHeapBuffer兩個抽象方法, 我們這一小節以newDirectBuffer為例, 先簡述一下其邏輯
首先看UnPooledByteBufAllocator中newDirectBuffer這個方法:
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena <ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
首先 PoolThreadCache cache = threadCache.get() 這一步是拿到一個執行緒區域性快取物件, 執行緒區域性快取, 顧明思議, 就是同一個執行緒共享的一個快取
threadCache是PooledByteBufAllocator類的一個成員變數, 型別是PoolThreadLocalCache(這兩個非常容易混淆, 切記):
private final PoolThreadLocalCache threadCache;
再看其型別PoolThreadLocalCache的定義:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
//程式碼省略
}
這裡繼承了一個FastThreadLocal類, 這個類相當於jdk的ThreadLocal, 只是效能更快, 有關FastThreadLocal, 我們在後面的章節會詳細剖析, 這裡我們只要知道, 繼承FastThreadLocal類並且重寫了initialValue方法, 則通過其get方法就能獲得initialValue返回的物件, 並且這個物件是執行緒共享的
在這裡我們看到, 在重寫的initialValue方法中, 初始化了heapArena和directArena兩個屬性之後, 通過new PoolThreadCache()這種方式建立了PoolThreadCache物件
這裡注意, PoolThreadLocalCache是一個FastThreadLocal, 而PoolThreadCache才是執行緒區域性快取, 這兩個類名非常非常像, 千萬別搞混了(我當初讀這段程式碼時因為搞混所以懵逼了)
其中heapArena和directArena是分別是用來分配堆和堆外記憶體用的兩個物件, 以directArena為例, 我們看到是通過leastUsedArena(directArenas)這種方式獲得的, directArenas是一個directArena型別的陣列, leastUsedArena(directArenas)這個方法是用來獲取陣列中一個使用最少的directArena物件
directArenas是PooledByteBufAllocator的成員變數, 是在其構造方法中初始化的:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
//程式碼省略
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
我們看到這裡通過directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 預設是cpu核心數的2倍, 這點我們可以跟蹤構造方法的呼叫鏈可以分析到
這樣保證了每一個執行緒會有一個獨享的arena
我們看newArenaArray(nDirectArena)這個方法:
private static <T> PoolArena<T>[] newArenaArray(int size) {
return new PoolArena[size];
}
這裡只是建立了一個數組, 預設長度為nDirectArena
繼續跟PooledByteBufAllocator的構造方法, 建立完了陣列, 後面在for迴圈中為陣列賦值:
首先通過new PoolArena.DirectArena建立一個DirectArena例項, 然後再為新建立的directArenas陣列賦值
再回到PoolThreadLocalCache的構造方法中:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
//程式碼省略
}
方法最後, 建立PoolThreadCache的一個物件, 我們跟進構造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
//程式碼省略
//儲存成兩個成員變數
this.heapArena = heapArena;
this.directArena = directArena;
//程式碼省略
}
這裡省略了大段程式碼, 只需要關注這裡將兩個值儲存在PoolThreadCache的成員變數中
我們回到newDirectBuffer中:
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
簡單分析的執行緒區域性快取初始化相關邏輯, 我們再往下跟:
PoolArena<ByteBuffer> directArena = cache.directArena;
通過上面的分析, 這步我們應該不陌生, 在PoolThreadCache構造方法中將directArena和heapArena中儲存在成員變數中, 這樣就可以直接通過cache.directArena這種方式拿到其成員變數的內容
從以上邏輯, 我們可以大概的分析一下流程, 通常會建立和執行緒數量相等的arena, 並以陣列的形式儲存在PooledByteBufAllocator的成員變數中, 每一個PoolThreadCache建立的時候, 都會在當前執行緒拿到一個arena, 並儲存在自身的成員變數中
5-4-1
PoolThreadCache除了維護了一個arena之外, 還維護了一個快取列表, 我們在重複分配ByteBuf的時候, 並不需要每次都通過arena進行分配, 可以直接從快取列表中拿一個ByteBuf
有關快取列表, 我們循序漸進的往下看:
在PooledByteBufAllocator中維護了三個值:
1. tinyCacheSize
2. smallCacheSize
3. normalCacheSize
tinyCacheSize代表tiny型別的ByteBuf能快取多少個
smallCacheSize代表small型別的ByteBuf能快取多少個
normalCacheSize代表normal型別的ByteBuf能快取多少個
具體tiny型別, small型別, normal是什麼意思, 我們會在後面講解
我們回到PoolThreadLocalCache類中看其構造方法:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
//程式碼省略
}
我們看到這三個屬性是在PoolThreadCache的構造方法中傳入的
這三個屬性是通過PooledByteBufAllocator的構造方法中初始化的, 跟隨構造方法的呼叫鏈會走到這個構造方法:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
}
這裡仍然呼叫了一個過載的構造方法, 這裡我們關注這幾個引數:
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE
這裡對應著幾個靜態的成員變數:
private static final int DEFAULT_TINY_CACHE_SIZE;
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;
我們在static塊中看其初始化過程:
static{
//程式碼省略
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
//程式碼省略
}
在這裡我們看到, 這三個屬性分別初始化的大小是512, 256, 64, 這三個屬性就對應了PooledByteBufAllocator另外的幾個成員變數, tinyCacheSize, smallCacheSize, normalCacheSize
也就是說, tiny型別的ByteBuf在每個快取中預設快取的數量是512個, small型別的ByteBuf在每個快取中預設快取的數量是256個, normal型別的ByteBuf在每個快取中預設快取的數量是64個
我們再到PooledByteBufAllocator中過載的構造方法中:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
super(preferDirect);
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
//程式碼省略
}
篇幅原因, 這裡也省略了大段程式碼, 大家可以通過構造方法引數找到原始碼中相對的位置進行閱讀
我們關注這段程式碼:
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
在這裡將將引數的DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE的三個值儲存到了成員變數tinyCacheSize, smallCacheSize, normalCacheSize
PooledByteBufAllocator中將這三個成員變數初始化之後, 在PoolThreadLocalCache的initialValue方法中就可以使用這三個成員變數的值了
我們再次跟到initialValue方法中:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
//程式碼省略
}
這裡就可以在建立PoolThreadCache物件的的構造方法中傳入tinyCacheSize, smallCacheSize, normalCacheSize這三個成員變量了
我們再跟到PoolThreadCache的構造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
//程式碼省略
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
//程式碼省略
}
//程式碼省略
ThreadDeathWatcher.watch(thread, freeTask);
}
其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就代表了三種類型的快取陣列, 陣列元素是MemoryRegionCache型別的物件, MemoryRegionCache就代表一個ByeBuf的快取
以tinySubPageDirectCaches為例, 我們看到tiny型別的快取是通過createSubPageCaches這個方法建立的
這裡傳入了三個引數tinyCacheSize我們之前分析過是512, PoolArena.numTinySubpagePools這裡是32(這裡不同型別的快取大小不一樣, small型別是4, normal型別是3) , SizeClass.Tiny代表其型別是tiny型別,
我們跟到createSubPageCaches這個方法中:
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
//建立陣列, 長度為32
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
//每一個節點是ubPageMemoryRegionCache物件
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
這裡首先建立了MemoryRegionCache, 長度是我們剛才分析過的32
然後通過for迴圈, 為陣列賦值, 賦值的物件是SubPageMemoryRegionCache型別的, SubPageMemoryRegionCache就是MemoryRegionCache型別的子類, 同樣也是一個快取物件, 構造方法中, cacheSize, 就是其中快取物件的數量, 如果是tiny型別就是512, sizeClass, 代表其型別, 比如tiny, small或者normal
再簡單跟到其構造方法:
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
這裡呼叫了父類的構造方法, 我們繼續跟進去:
MemoryRegionCache(int size, SizeClass sizeClass) {
//size會進行規格化
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
//佇列大小
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
首先會對其進行規格化, 其實就是查詢大於等於當前size的2的冪次方的數, 這裡如果是512那麼規格化之後還是512, 然後初始化一個佇列, 佇列大小就是傳入的大小, 如果是tiny, 這裡大小就是512
最後並儲存其型別
這裡我們不難看出, 其實每個快取的物件, 裡面是通過一個佇列儲存的, 有關快取佇列和ByteBuf之間的邏輯, 後面的小結會進行剖析
從上面剖析我們不難看出, PoolThreadCache中維護了三種類型的快取陣列, 每個快取陣列中的每個值中, 又通過一個佇列進行物件的儲存
5-4-2
當然這裡只舉了Direct型別的物件關係, heap型別其實都是一樣的, 這裡不再贅述
這一小節邏輯較為複雜, 同學們可以自己在原始碼中跟蹤一遍加深印象