Netty原始碼分析第2章(NioEventLoop)---->第8節: 執行任務佇列
Netty原始碼分析第二章: NioEventLoop
第八節: 執行任務佇列
繼續回到NioEventLoop的run()方法:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue ;
case SelectStrategy.SELECT:
//輪詢io事件(1)
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
//預設是50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
//記錄下開始時間
final long ioStartTime = System.nanoTime();
try {
//處理輪詢到的key(2)
processSelectedKeys();
} finally {
//計算耗時
final long ioTime = System.nanoTime() - ioStartTime;
//執行task(3)
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
//程式碼省略
}
}
我們看到處理完輪詢到的key之後, 首先記錄下耗時, 然後通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執行taskQueue中的任務
我們知道ioRatio預設是50, 所以執行完ioTime * (100 - ioRatio) / ioRatio後, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執行時間:
跟進方法:
protected boolean runAllTasks(long timeoutNanos) {
//定時任務佇列中聚合任務
fetchFromScheduledTaskQueue();
//從普通taskQ裡面拿一個任務
Runnable task = pollTask();
//task為空, 則直接返回
if (task == null) {
//跑完所有的任務執行收尾的操作
afterRunningAllTasks();
return false;
}
//如果佇列不為空
//首先算一個截止時間(+50毫秒, 因為執行任務, 不要超過這個時間)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//執行每一個任務
for (;;) {
safeExecute(task);
//標記當前跑完的任務
runTasks ++;
//當跑完64個任務的時候, 會計算一下當前時間
if ((runTasks & 0x3F) == 0) {
//定時任務初始化到當前的時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
//如果超過截止時間則不執行(nanoTime()是耗時的)
if (lastExecutionTime >= deadline) {
break;
}
}
//如果沒有超過這個時間, 則繼續從普通任務佇列拿任務
task = pollTask();
//直到沒有任務執行
if (task == null) {
//記錄下最後執行時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
首先會執行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務佇列中聚合任務, 也就是將定時任務中找到可以執行的任務新增到taskQueue中, 我們跟進去:
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//從定時任務佇列中抓取第一個定時任務
//尋找截止時間為nanoTime的任務
Runnable scheduledTask = pollScheduledTask(nanoTime);
//如果該定時任務佇列不為空, 則塞到普通任務佇列裡面
while (scheduledTask != null) {
//如果新增到普通任務佇列過程中失敗
if (!taskQueue.offer(scheduledTask)) {
//則重新新增到定時任務佇列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
//繼續從定時任務佇列中拉取任務
//方法執行完成之後, 所有符合執行條件的定時任務佇列, 都新增到了普通任務佇列中
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime()代表從定時任務初始化到現在過去了多長時間
Runnable scheduledTask= pollScheduledTask(nanoTime)代表從定時任務佇列中拿到小於nanoTime時間的任務, 因為小於初始化到現在的時間, 說明該任務需要執行了
跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
//拿到定時任務佇列
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
//peek()方法拿到第一個任務
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
//從佇列中刪除
scheduledTaskQueue.remove();
//返回該
return scheduledTask;
}
return null;
}
我們看到首先獲得當前類繫結的定時任務佇列的成員變數
如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個任務
如果當前任務小於傳來的時間, 說明該任務需要執行, 則從定時任務佇列中刪除
我們繼續回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//從定時任務佇列中抓取第一個定時任務
//尋找截止時間為nanoTime的任務
Runnable scheduledTask = pollScheduledTask(nanoTime);
//如果該定時任務佇列不為空, 則塞到普通任務佇列裡面
while (scheduledTask != null) {
//如果新增到普通任務佇列過程中失敗
if (!taskQueue.offer(scheduledTask)) {
//則重新新增到定時任務佇列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
//繼續從定時任務佇列中拉取任務
//方法執行完成之後, 所有符合執行條件的定時任務佇列, 都新增到了普通任務佇列中
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
彈出需要執行的定時任務之後, 我們通過taskQueue.offer(scheduledTask)新增到taskQueue中, 如果新增失敗, 則通過scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新新增到定時任務佇列中
如果新增成功, 則通過pollScheduledTask(nanoTime)方法繼續新增, 直到沒有需要執行的任務
這樣就將定時任務佇列需要執行的任務新增到了taskQueue中
回到runAllTasks(long timeoutNanos)方法中:
首先通過Runnable task = pollTask()從taskQueue中拿一個任務
任務不為空, 則通過final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos計算一個截止時間, 任務的執行時間不能超過這個時間
然後在for迴圈中通過safeExecute(task)執行task, 我們跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) {
try {
//直接呼叫run()方法執行
task.run();
} catch (Throwable t) {
//發生異常不終止
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
這裡直接呼叫task的run()方法進行執行, 其中發生異常, 只打印一條日誌, 代表發生異常不終止, 繼續往下執行
回到runAllTasks(long timeoutNanos)方法:
每次執行完task, runTasks自增
這裡if ((runTasks & 0x3F) == 0)代表是否執行了64個任務, 如果執行了64個任務, 則會通過lastExecutionTime = ScheduledFutureTask.nanoTime()記錄定時任務初始化到現在的時間, 如果這個時間超過了截止時間, 則退出迴圈
如果沒有超過截止時間, 則通過task = pollTask()繼續彈出任務執行
這裡執行64個任務統計一次時間, 而不是每次執行任務都統計, 主要原因是因為獲取系統時間是個比較耗時的操作, 這裡是netty的一種優化方式
如果沒有task需要執行, 則通過afterRunningAllTasks()做收尾工作, 最後記錄下最後的執行時間
以上就是有關執行任務佇列的相關邏輯
本章總結
本章學習了有關NioEventLoopGroup的建立, NioEventLoop的建立和啟動, 以及多路複用器的輪詢處理和task執行的相關邏輯, 通過本章學習, 我們應該掌握如下內容:
1.NioEventLoopGroup如何選擇分配NioEventLoop
2.NioEventLoop如何開啟
3.NioEventLoop如何進行select操作
4.NioEventLoop如何執行task