1. 程式人生 > >netty原始碼閱讀之效能優化工具類之Recycle異執行緒獲取物件

netty原始碼閱讀之效能優化工具類之Recycle異執行緒獲取物件

在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入:


        boolean scavenge() {
            // continue an existing scavenge, if any
            if (scavengeSome()) {
                return true;
            }

            // reset our scavenge cursor
            prev = null;
            cursor = head;
            return false;
        }

如果scavengeSome也就是回收到物件了,那就返回true。否則就重置,這個cursor就是當前開始搜尋的節點,也就是這次沒有回收到,下次從頭節點開始搜尋。

然後看scavengeSome方法:

        boolean scavengeSome() {
            WeakOrderQueue cursor = this.cursor;
            if (cursor == null) {
                cursor = head;
                if (cursor == null) {
                    return false;
                }
            }

            boolean success = false;
            WeakOrderQueue prev = this.prev;
            do {
                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }

                WeakOrderQueue next = cursor.next;
                if (cursor.owner.get() == null) {
                    // If the thread associated with the queue is gone, unlink it, after
                    // performing a volatile read to confirm there is no data left to collect.
                    // We never unlink the first queue, as we don't want to synchronize on updating the head.
                    if (cursor.hasFinalData()) {
                        for (;;) {
                            if (cursor.transfer(this)) {
                                success = true;
                            } else {
                                break;
                            }
                        }
                    }
                    if (prev != null) {
                        prev.next = next;
                    }
                } else {
                    prev = cursor;
                }

                cursor = next;

            } while (cursor != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }

有點長,一點點來。

看這一段:

            WeakOrderQueue cursor = this.cursor;
            if (cursor == null) {
                cursor = head;
                if (cursor == null) {
                    return false;
                }
            }

就說cursor為空,就指向頭,頭也為空,就說明這個stack沒有和其他執行緒相關的WeakOrderQueue了,返回false。

下面有個while迴圈,這個迴圈的意思就是我不停地往WeakOrderQueue獲取物件。

看這個:

                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }

意思就是把當前cursor也就是WeakOrderQueue裡面的物件裝換為this(也就是當前stack)的物件。回收成功的話就可以跳出迴圈並返回了。

如果沒有回收成功,那就調到cursor的下一個節點:

WeakOrderQueue next = cursor.next;

cursor.owner.get()代表與當前WeakOrderQueue關聯的執行緒,owner是一個弱應用,我們上一篇文章分析過。

如果關聯的執行緒不在了,就要做一些清理工作,把WeakOrderQueue移除掉等。看原始碼:

 if (cursor.owner.get() == null) {
                    // If the thread associated with the queue is gone, unlink it, after
                    // performing a volatile read to confirm there is no data left to collect.
                    // We never unlink the first queue, as we don't want to synchronize on updating the head.
                    if (cursor.hasFinalData()) {
                        for (;;) {
                            if (cursor.transfer(this)) {
                                success = true;
                            } else {
                                break;
                            }
                        }
                    }
                    if (prev != null) {
                        prev.next = next;
                    }
                }

如果節點有資料,也就是cursor.hasFinalData那就把節點裡面的資料移到stack裡面:cursor.trasfer(this))。如果成功,那就繼續迴圈,由於我們cursor.transfer每次傳遞一個Link而已,所以需要用一個for迴圈不停地把WeakOrderQueue的Link傳輸到stack裡面。

看這個程式碼:

  if (prev != null) {
                        prev.next = next;
                    }

就是把這個cursor節點釋放。

如果關聯的執行緒還在,就把pre節點和cursor節點往下移,知道cursor為null也就是沒有下一個節點,到尾了:

else {
                    prev = cursor;
                }

                cursor = next;

            } while (cursor != null && !success);

那麼我們現在分析cursor.transfer(this)這個方法:

        // transfer as many items as we can from this queue to the stack, returning true if any were transferred
        @SuppressWarnings("rawtypes")
        boolean transfer(Stack<?> dst) {
            Link head = this.head;
            if (head == null) {
                return false;
            }

            if (head.readIndex == LINK_CAPACITY) {
                if (head.next == null) {
                    return false;
                }
                this.head = head = head.next;
            }

            final int srcStart = head.readIndex;
            int srcEnd = head.get();
            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }

            final int dstSize = dst.size;
            final int expectedCapacity = dstSize + srcSize;

            if (expectedCapacity > dst.elements.length) {
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

            if (srcStart != srcEnd) {
                final DefaultHandle[] srcElems = head.elements;
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle element = srcElems[i];
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;

                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }

                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    reclaimSpace(LINK_CAPACITY);

                    this.head = head.next;
                }

                head.readIndex = srcEnd;
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;
            } else {
                // The destination stack is full already.
                return false;
            }
        }

看第一段:

  Link head = this.head;
            if (head == null) {
                return false;
            }

這個既說明我們的頭結點Link沒有資料,也就是整一個WeakOrderQueue都沒有資料了直接返回。

然後:


            if (head.readIndex == LINK_CAPACITY) {
                if (head.next == null) {
                    return false;
                }
                this.head = head = head.next;
            }

在head的readIndex已經達到了LINK_CAPACITY之後,head裡面的所有資料都沒有了,那就把head指向下一個節點,head就可以丟棄了,因為沒有物件指向它了。如果head的next為空,說明整個WeakOrderQueue都沒有資料了返回false。

能夠走到下面,就證明head裡面有handle物件,然後我們找到head的readIndex,就表明我們可以從這個地方開始取物件,看原始碼:

final int srcStart = head.readIndex;

然後看結束位置:

int srcEnd = head.get();

由於head是Link,link繼承自AtomicInteger,link的get就代表link的長度,就表明我裡面移動有多少物件。 

然後:

            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }

看這個srcSize就是現在我需要傳輸多少個物件到stack裡面,如果srcSize=0,就說明沒有物件傳輸,返回false。

繼續:

            final int dstSize = dst.size;
            final int expectedCapacity = dstSize + srcSize;

dst.size代表stack裡面有的資料的大小,加上srcSize就是傳輸之後的大小。

然後:

            if (expectedCapacity > dst.elements.length) {
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

由於我們的stack也是用陣列實現的,所以要傳輸的資料不能超過陣列的大小,如果超過了就擴容,最後算出srcEnd,srcEnd就是我們需要傳輸的head的結束的位置。actualCapacity-dstSize就是實際能傳輸的大小,實際能傳輸的大小加上srcStart就是srcEnd,這個srcEnd也不能操作原來head的結束位置,所以取兩者的最小值。

接下去這一段:
 

                final DefaultHandle[] srcElems = head.elements;
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle element = srcElems[i];
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;

                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }

這裡就是賦值操作。

有個細節:

if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }

如果沒有被回收過也就是element.recycleId==0,就是recycledId賦值lastRecycledId,而這個lastRecycleId我們前面也分析過,就是這個WeakOrderQueue的id。那麼如果被回收過,並且和當前的id不同也就是element.recycleId!=element.lastRecycledId,那麼就可能已經在其他執行緒回收過,丟擲異常。

srcElems[i]=null就是把Link對應的位置置空。

dropHandle我們前面文章也分析過,控制我們回收的頻率。

最後element.stack=dst和dstElems[newDstSize++]=element就是真正的賦值操作,把element的stack指向我們的stack,並且把值付給stack的element。

 

然後看這個:

                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    reclaimSpace(LINK_CAPACITY);

                    this.head = head.next;
                }

我們的srcEnd==LINK_CAPACITY那就說明link都回收完了,並且head.next!=null那就說明還有下一個link,那麼我們就把link的這個位置騰出來,也就是呼叫reclaimSpce(LINK_CAPACITY):

        private void reclaimSpace(int space) {
            assert space >= 0;
            availableSharedCapacity.addAndGet(space);
        }

太簡單就是把之前使用的空間重新拿出來,availableSharedCapacity又變大了。

 

再看這個原始碼:

 head.readIndex = srcEnd;

把head.readIndex賦值為srcEnd,下次我們又可以從src開始獲取物件了。

 

再看:

                if (dst.size == newDstSize) {
                    return false;
                }

這段程式碼就是說我們newDstSize和之前dst.size和一樣,表明我們其實沒有回收到物件,那就返回false了。

否則,說明回收到了物件,把dist.size重新賦值,並且返回true:

                dst.size = newDstSize;
                return true;

最後一個return false表明stack滿了,因為srcEnd最終還是和srcStart相同,說明這個地方沒有擴容成功:

            if (expectedCapacity > dst.elements.length) {
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

沒有擴容成功才會導致srcEnd=srcStart。

 

這個transfer就是儘量把我們當前的link裡面的物件傳輸到stack那裡,成功返回true,否則返回false。

 

 

最後我們總結一下異執行緒回收和獲取物件,假設A執行緒建立了物件,B執行緒回收物件,B執行緒回收的時候回放到A執行緒對應的stack對應的WeakOrderQueue裡面,A執行緒獲取的時候就直接從這個WeakOrderQueue裡面取就可以了。