Netty學習之旅------原始碼分析Netty執行緒本地分配機制與PooledByteBuf執行緒級物件池原理分析
阿新 • • 發佈:2019-01-08
在方法前,已經對構造方法的入參加了說明,關注如下兩個方法。 程式碼@1,建立createNormalCaches 。 由於PoolThreadCache的設計理念與PoolArena一樣,本身並不涉及到具體記憶體的儲存,PoolThreadCache內部維護MemoryRegionCache[] tinySubpageHeapCaches,MemoryRegionCache[] smallSubpageHeapCaches,其陣列長度與PoolArena相同,MemoryRegionCaches[] normalHeapCaches,快取的是noraml記憶體,Netty把大於pageSize小於chunkSize的空間成為normal記憶體。normalHeapCaches[1] 是normalHeapCaches[0] 的2倍, 先重點關注final PoolArena<byte[]> heapArena; //使用輪叫輪詢機制,每個執行緒從heapArena[]中獲取一個,用於記憶體分配。 final PoolArena<ByteBuffer> directArena; //同上 // Hold the caches for the different size classes, which are tiny, small and normal. //針對不同大小,執行緒快取的記憶體 private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private int allocations; private final Thread thread = Thread.currentThread(); //當前執行緒 private final Runnable freeTask = new Runnable() { //執行緒消亡後,釋放資源,下文會重點講解。 @Override public void run() { free0(); } }; // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; /* * @param heapArena 執行緒使用的PoolArena.HeapArena * @param directArena 執行緒使用的PoolArena.DirectArena * @param tinyCacheSize, tiny記憶體快取的個數。預設為512 * @param smallCacheSize small記憶體快取的個數,預設為256個 * @param normalCacheSize normalCacheSize快取的個數,預設為64 * @param maxCacheBufferCapacity * normalHeapCaches中單個快取區域的最大大小,預設為32k 也就是normalHeapCaches[length-1]中快取的最大記憶體空間 * @param freeSweepAllocationThreshold 在本地執行緒每分配freeSweepAllocationThreshold 次記憶體後,檢測一下是否需要釋放記憶體。 */ PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { if (maxCachedBufferCapacity < 0) { throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)"); } if (freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + maxCachedBufferCapacity + " (expected: > 0)"); } this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); //@1 } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // The thread-local cache will keep a list of pooled buffers which must be returned to // the pool when the thread is not alive anymore. ThreadDeathWatcher.watch(thread, freeTask); }
引數 numCaches,為SubPageMemoryRegionCache[]陣列的長度,而cacheSize,為每一個SubPageMemoryRegionCache中快取的記憶體個數,也就是SubPageMemoryRegionCache中entries[]的長度。這裡的cacheSize,就是PooledByteBufAllocator DEFAULT_TINY_CACHE_SIZE=512,DEFAULT_SMALL_CACHE_SIZE=256,DEFAULT_NORMAL_SIZE=64,其實這裡的取名為DEFAULT_TINY_CACHE_LENGTH更加貼切。 程式碼@1,其實應該不需要與area.chunkSize做比較,因為如果超過chunkSize的記憶體,netty不會重複使用,直接在整個堆空間或堆外空間申請並釋放。這裡可能是出於程式碼的自我保護,得到normalHeapCaches中單個 Entry所持有的記憶體不超過該值。 程式碼@2,計算normalHeapCaches陣列的長度,這裡有優化的空間,用位運算:int arraySize = Math.max(1, max >> numShiftsNormalHeap ),其中numShiftsNormalHeap為 log2(pageSize)。這樣做的原因,也就是normalHeapCaches 陣列中的元素的大小,是以2的冪倍pageSize遞增的。cacheSize預設為64,引數值來源於PooledByteBufAllocator。接下來關注PoolThreadCache的allocateTiny方法: 1.2 PoolThreadCache allocateTiny方法private static <T> NormalMemoryRegionCache<T>[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { if (cacheSize > 0) { int max = Math.min(area.chunkSize, maxCachedBufferCapacity); //@1 int arraySize = Math.max(1, max / area.pageSize); //@2 @SuppressWarnings("unchecked") NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { cache[i] = new NormalMemoryRegionCache<T>(cacheSize); } return cache; } else { return null; } }
/**
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap); //@1
return cache(normalHeapCaches, idx);
}
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity); //@2
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim(); //@3
}
return allocated;
}
程式碼@1,根據需要申請的記憶體定位陣列的下標,根據上文講解的陣列長度計算邏輯,相應的定位演算法就顯而易見了。
程式碼@2,MeomoryRegionCache內部持有的 Entry entries[]陣列是真正持有記憶體的單元,故現在將重點轉移到MemoryRegionCache的講解中。
程式碼@3,如果分配次數達到freeSweepAllocationThreshold,進行一次嘗試釋放一次。具體程式碼見 trim()方法的講解。
1.2.2 關於PoolThreadCache allocateForTiny 之MemoryRegionCache 原始碼解讀【針對1.2程式碼@2】
1)MemoryRegionCache屬性與構造方法詳解private final Entry<T>[] entries; //MemoryRegionCache真正持有記憶體的地方
/*
private static final class Entry<T> {
PoolChunk<T> chunk; //具體的PoolChunk
long handle; //記憶體持有偏移量,高32位儲存的是bitmaIdx,低32位儲存的是memoryMapIdx
}
*/
private final int maxUnusedCached; //表示允許的最大的沒有使用的記憶體數量(已經被快取),預設為size的一半。
private int head; // 作用類似於ByteBuf的readerIndex,從該位置獲取一個快取的Entiry。
private int tail; // 作用類似於ByteBuf的writerIndex,從該位置增加一個加入一個新的Entity
private int maxEntriesInUse; // 在使用中最大的entry數量
private int entriesInUse; // 目前使用中的entry數量
@SuppressWarnings("unchecked")
MemoryRegionCache(int size) { // size 預設的大小為 512, 256, 64
entries = new Entry[powerOfTwo(size)];
for (int i = 0; i < entries.length; i++) {
entries[i] = new Entry<T>();
}
maxUnusedCached = size / 2; //允許被快取,但沒有使用的最大數量,超過該值,則會觸發記憶體釋放操作。
}
初始狀態的MemoryRegionCache的各個屬性的值分別為:maxUnusedCached : 256,128,32,為size的一半;head:0 ;tail:0 ; maxEntriesInUse : 0; entriesInUse : 0 2)MemoryRegionCache的allocate方法詳解
/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = entries[head]; //@1
if (entry.chunk == null) { //@2
return false;
}
entriesInUse ++; //@3
if (maxEntriesInUse < entriesInUse) {
maxEntriesInUse = entriesInUse;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity); //@4
// only null out the chunk as we only use the chunk to check if the buffer is full or not.
entry.chunk = null; //@5
head = nextIdx(head); //@6
return true;
}
程式碼@1,從entries陣列中獲取一個entry,head指標表示下一個快取的Entry。
程式碼@2,如果entry.chunk為空,則表示執行緒裡暫未快取記憶體,返回false,表示從本地執行緒中分配失敗。
程式碼@3,每分配出一個Entry,則entriesInUse加1,表示正在使用的entry個數。
程式碼@5,用entry中的記憶體初始化ByteBuf。
程式碼@6,head指標加一,如果超過entries的length,則重新從0開始,其實也就是 (head + 1) % (entires.length - 1),這裡使用的是位運算。如果成功分配,則返回true, 結束本次記憶體的分配。
1.2.3 關於PoolThreadCache allocateForTiny 之程式碼@3,trim方法詳解:
該方法的目的是在本地執行緒分配達到一定次數後,檢測一下從本地執行緒快取分配的效率,如果總是分配不到,就是雖然本地有快取一定的記憶體,但每次分配都沒有找到合適記憶體供分配,此時需要釋記憶體回全域性分配池,避免浪費記憶體。
void trim() {
trim(tinySubPageDirectCaches);
trim(smallSubPageDirectCaches);
trim(normalDirectCaches);
trim(tinySubPageHeapCaches);
trim(smallSubPageHeapCaches);
trim(normalHeapCaches);
}
private static void trim(MemoryRegionCache<?>[] caches) {
if (caches == null) {
return;
}
for (MemoryRegionCache<?> c: caches) {
trim(c);
}
}
private static void trim(MemoryRegionCache<?> cache) {
if (cache == null) {
return;
}
cache.trim();
}
trim的具體實現是MemoryRegionCache,現在進入到MemoryRegionCache詳解:
/**
* Free up cached {@link PoolChunk}s if not allocated frequently enough.
*/
private void trim() {
int free = size() - maxEntriesInUse; //@1
entriesInUse = 0;
maxEntriesInUse = 0; //@2
if (free <= maxUnusedCached) { //@3
return;
}
int i = head;
for (; free > 0; free--) {
if (!freeEntry(entries[i])) {
// all freed
break;
}
i = nextIdx(i);
}
// Update head to point to te correct entry
// See https://github.com/netty/netty/issues/2924
head = i;
}
在進行該方法的實現邏輯之前,我先提供一張草圖,形象的反映head,tail等說明:程式碼@1,size()方法返回的是 (tail-head) & (length-1),表示當前快取了但未被使用的個數。maxEntriesInUse的值,其實就是entiryesInUse的值。 程式碼@2,程式碼@3,如果快取的並且未使用的個數如果小於允許的值(maxUnusedCached)值是放棄本次記憶體釋放,否則,需要將head到tail這部分的記憶體全部釋放,返回給全域性記憶體分配池。這裡我可能沒有理解透徹,如果是我實現的話,entriesInUse該值不會設定為空,而是直接釋放掉 tail-head這部分的記憶體就好,釋放演算法在記憶體分配與釋放篇已經做過詳細解讀,這裡不重複講解:
@SuppressWarnings({ "unchecked", "rawtypes" })
private static boolean freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
if (chunk == null) {
return false;
}
// need to synchronize on the area from which it was allocated before.
synchronized (chunk.arena) {
chunk.parent.free(chunk, entry.handle);
}
entry.chunk = null;
return true;
}
掃描一下MemoryRegionCache類,還有一個方法我們未曾分析過,就是add方法,預設一開始MemoryRegionCache類中的Entry[] entries中的PoolChunk與handle都是空的,只有通過該add方法,將執行緒用過的記憶體快取起來才能重複使用。我們要養成這樣一個習慣,一個ByteBuf用過後,需要呼叫realse方法將其釋放,具體到池化的PooledByteBuf,呼叫其realse方法,並不會將記憶體直接返還給JVM堆,而是放入到記憶體池,供重複使用,由於引入了執行緒本地快取,所以在呼叫PooledByteBuf的release方法時,並不會將它立馬返回給記憶體池(PoolArena),而是放入到本地執行緒快取中。