netty原始碼閱讀之效能優化工具類之Recycle異執行緒獲取物件
在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入:
boolean scavenge() { // continue an existing scavenge, if any if (scavengeSome()) { return true; } // reset our scavenge cursor prev = null; cursor = head; return false; }
如果scavengeSome也就是回收到物件了,那就返回true。否則就重置,這個cursor就是當前開始搜尋的節點,也就是這次沒有回收到,下次從頭節點開始搜尋。
然後看scavengeSome方法:
boolean scavengeSome() { WeakOrderQueue cursor = this.cursor; if (cursor == null) { cursor = head; if (cursor == null) { return false; } } boolean success = false; WeakOrderQueue prev = this.prev; do { if (cursor.transfer(this)) { success = true; break; } WeakOrderQueue next = cursor.next; if (cursor.owner.get() == null) { // If the thread associated with the queue is gone, unlink it, after // performing a volatile read to confirm there is no data left to collect. // We never unlink the first queue, as we don't want to synchronize on updating the head. if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } } if (prev != null) { prev.next = next; } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success; }
有點長,一點點來。
看這一段:
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
cursor = head;
if (cursor == null) {
return false;
}
}
就說cursor為空,就指向頭,頭也為空,就說明這個stack沒有和其他執行緒相關的WeakOrderQueue了,返回false。
下面有個while迴圈,這個迴圈的意思就是我不停地往WeakOrderQueue獲取物件。
看這個:
if (cursor.transfer(this)) {
success = true;
break;
}
意思就是把當前cursor也就是WeakOrderQueue裡面的物件裝換為this(也就是當前stack)的物件。回收成功的話就可以跳出迴圈並返回了。
如果沒有回收成功,那就調到cursor的下一個節點:
WeakOrderQueue next = cursor.next;
cursor.owner.get()代表與當前WeakOrderQueue關聯的執行緒,owner是一個弱應用,我們上一篇文章分析過。
如果關聯的執行緒不在了,就要做一些清理工作,把WeakOrderQueue移除掉等。看原始碼:
if (cursor.owner.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.next = next;
}
}
如果節點有資料,也就是cursor.hasFinalData那就把節點裡面的資料移到stack裡面:cursor.trasfer(this))。如果成功,那就繼續迴圈,由於我們cursor.transfer每次傳遞一個Link而已,所以需要用一個for迴圈不停地把WeakOrderQueue的Link傳輸到stack裡面。
看這個程式碼:
if (prev != null) {
prev.next = next;
}
就是把這個cursor節點釋放。
如果關聯的執行緒還在,就把pre節點和cursor節點往下移,知道cursor為null也就是沒有下一個節點,到尾了:
else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
那麼我們現在分析cursor.transfer(this)這個方法:
// transfer as many items as we can from this queue to the stack, returning true if any were transferred
@SuppressWarnings("rawtypes")
boolean transfer(Stack<?> dst) {
Link head = this.head;
if (head == null) {
return false;
}
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head = head = head.next;
}
final int srcStart = head.readIndex;
int srcEnd = head.get();
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
final int dstSize = dst.size;
final int expectedCapacity = dstSize + srcSize;
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
reclaimSpace(LINK_CAPACITY);
this.head = head.next;
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
看第一段:
Link head = this.head;
if (head == null) {
return false;
}
這個既說明我們的頭結點Link沒有資料,也就是整一個WeakOrderQueue都沒有資料了直接返回。
然後:
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head = head = head.next;
}
在head的readIndex已經達到了LINK_CAPACITY之後,head裡面的所有資料都沒有了,那就把head指向下一個節點,head就可以丟棄了,因為沒有物件指向它了。如果head的next為空,說明整個WeakOrderQueue都沒有資料了返回false。
能夠走到下面,就證明head裡面有handle物件,然後我們找到head的readIndex,就表明我們可以從這個地方開始取物件,看原始碼:
final int srcStart = head.readIndex;
然後看結束位置:
int srcEnd = head.get();
由於head是Link,link繼承自AtomicInteger,link的get就代表link的長度,就表明我裡面移動有多少物件。
然後:
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
看這個srcSize就是現在我需要傳輸多少個物件到stack裡面,如果srcSize=0,就說明沒有物件傳輸,返回false。
繼續:
final int dstSize = dst.size;
final int expectedCapacity = dstSize + srcSize;
dst.size代表stack裡面有的資料的大小,加上srcSize就是傳輸之後的大小。
然後:
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
由於我們的stack也是用陣列實現的,所以要傳輸的資料不能超過陣列的大小,如果超過了就擴容,最後算出srcEnd,srcEnd就是我們需要傳輸的head的結束的位置。actualCapacity-dstSize就是實際能傳輸的大小,實際能傳輸的大小加上srcStart就是srcEnd,這個srcEnd也不能操作原來head的結束位置,所以取兩者的最小值。
接下去這一段:
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
這裡就是賦值操作。
有個細節:
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
如果沒有被回收過也就是element.recycleId==0,就是recycledId賦值lastRecycledId,而這個lastRecycleId我們前面也分析過,就是這個WeakOrderQueue的id。那麼如果被回收過,並且和當前的id不同也就是element.recycleId!=element.lastRecycledId,那麼就可能已經在其他執行緒回收過,丟擲異常。
srcElems[i]=null就是把Link對應的位置置空。
dropHandle我們前面文章也分析過,控制我們回收的頻率。
最後element.stack=dst和dstElems[newDstSize++]=element就是真正的賦值操作,把element的stack指向我們的stack,並且把值付給stack的element。
然後看這個:
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
reclaimSpace(LINK_CAPACITY);
this.head = head.next;
}
我們的srcEnd==LINK_CAPACITY那就說明link都回收完了,並且head.next!=null那就說明還有下一個link,那麼我們就把link的這個位置騰出來,也就是呼叫reclaimSpce(LINK_CAPACITY):
private void reclaimSpace(int space) {
assert space >= 0;
availableSharedCapacity.addAndGet(space);
}
太簡單就是把之前使用的空間重新拿出來,availableSharedCapacity又變大了。
再看這個原始碼:
head.readIndex = srcEnd;
把head.readIndex賦值為srcEnd,下次我們又可以從src開始獲取物件了。
再看:
if (dst.size == newDstSize) {
return false;
}
這段程式碼就是說我們newDstSize和之前dst.size和一樣,表明我們其實沒有回收到物件,那就返回false了。
否則,說明回收到了物件,把dist.size重新賦值,並且返回true:
dst.size = newDstSize;
return true;
最後一個return false表明stack滿了,因為srcEnd最終還是和srcStart相同,說明這個地方沒有擴容成功:
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
沒有擴容成功才會導致srcEnd=srcStart。
這個transfer就是儘量把我們當前的link裡面的物件傳輸到stack那裡,成功返回true,否則返回false。
最後我們總結一下異執行緒回收和獲取物件,假設A執行緒建立了物件,B執行緒回收物件,B執行緒回收的時候回放到A執行緒對應的stack對應的WeakOrderQueue裡面,A執行緒獲取的時候就直接從這個WeakOrderQueue裡面取就可以了。