1. 程式人生 > 其它 >muduo筆記 網路庫(五)事件迴圈EventLoop

muduo筆記 網路庫(五)事件迴圈EventLoop

目錄

事件驅動與EventLoop

前面(muduo筆記 網路庫(一)總覽)講過,muduo網路庫處理事件是Reactor模式,one loop per thread,一個執行緒一個事件迴圈。這個迴圈稱為EventLoop,這種以事件為驅動的程式設計模式,稱為事件驅動模式。

這種事件驅動模型要求所有任務是非阻塞的,其典型特點是:
如果一個任務需要很長時間才能完成,或者中間可能導致阻塞,就需要對任務進行分段,將其設定為非阻塞的,每次監聽到前次任務完成,觸發事件回撥,從而接著完成後續任務。例如,要傳送一個大檔案,可以先發送一段,完成後,在寫完成事件回撥中又傳送下一段,這樣每次都發生一段,從而完成整個檔案傳送。

EventLoop是實現事件驅動模型的關鍵之一。核心是為執行緒提供執行迴圈,不斷監聽事件、處理事件,為使用者提供在loop迴圈中執行的介面。

還是那張圖,EventLoop實現事件驅動相關類圖關係:

聚合關係:has-a,表示擁有的關係,兩種生命週期沒有必然關聯,可以獨立存在。

組合關係:contain-a,表包含的關係,是一種強聚合關係,強調整體與部分,生命週期一致。


EventLoop

EventLoop是一個介面類,不宜暴露太多內部細節給客戶,介面及其使用應儘量簡潔。EventLoop的主要職責是:
1)提供定時執行使用者指定任務的方法,支援一次性、週期執行使用者任務;
2)提供一個執行迴圈,每當Poller監聽到有通道對應事件發生時,會將通道加入啟用通道列表,執行迴圈要不斷從取出啟用通道,然後呼叫事件回撥處理事件;
3)每個EventLoop對應一個執行緒,不允許一對多或者多對一,提供判斷當前執行緒是否為建立EventLoop物件的執行緒的方法;
4)允許在其他執行緒中呼叫EventLoop的public介面,但同時要確保執行緒安全;

下面來看看EventLoop類宣告:

/**
* Reactor模式, 每個執行緒最多一個EventLoop (One loop per thread).
* 介面類, 不要暴露太多細節給客戶
*/
class EventLoop : public noncopyable
{
public:
    typedef std::function<void()> Functor;

    EventLoop();
    ~EventLoop(); // force out-line dtor, for std::unique_ptr members.

    /* loop迴圈, 執行一個死迴圈.
     * 必須在當前物件的建立執行緒中執行.
     */
    void loop();

    /*
     * 退出loop迴圈.
     * 如果通過原始指標(raw pointer)呼叫, 不是100%執行緒安全;
     * 為了100%安全, 最好通過shared_ptr<EventLoop>呼叫
     */
    void quit();

    /*
     * Poller::poll()返回的時間, 通常意味著有資料達到.
     * 對於PollPoller, 是呼叫完poll(); 對於EPollPoller, 是呼叫完epoll_wait()
     */
    Timestamp pollReturnTime() const { return pollReturnTime_; }

    /* 獲取loop迴圈次數 */
    int64_t iterator() const { return iteration_; }

    /*
     * 在loop執行緒中, 立即執行回撥cb.
     * 如果沒在loop執行緒, 就會喚醒loop, (排隊)執行回撥cb.
     * 如果使用者在同一個loop執行緒, cb會在該函式內執行; 否則, 會在loop執行緒中排隊執行.
     * 因此, 在其他執行緒中呼叫該函式是安全的.
     */
    void runInLoop(Functor cb);

    /* 排隊回撥cb進loop執行緒.
     * 回撥cb在loop中完成polling後執行.
     * 從其他執行緒呼叫是安全的.
     */
    void queueInLoop(Functor cb);

    /* 排隊的回撥cb個數 */
    size_t queueSize() const;

    // timers

    /*
     * 在指定時間點執行回撥cb.
     * 從其他執行緒呼叫安全.
     */
    TimerId runAt(Timestamp time, TimerCallback cb);

    /*
     * 在當前時間點+delay延時後執行回撥cb.
     * 從其他執行緒呼叫安全.
     */
    TimerId runAfter(double delay, TimerCallback cb);

    /*
     * 每隔interval sec週期執行回撥cb.
     * 從其他執行緒呼叫安全.
     */
    TimerId runEvery(double interval, TimerCallback cb);

    /*
     * 取消定時器, timerId唯一標識定時器Timer
     * 從其他執行緒呼叫安全.
     */
    void cancel(TimerId timerId);

    // internal usage

    /* 喚醒loop執行緒, 沒有事件就緒時, loop執行緒可能阻塞在poll()/epoll_wait() */
    void wakeup();
    /* 更新Poller監聽的channel, 只能在channel所屬loop執行緒中呼叫 */
    void updateChannel(Channel* channel);
    /* 移除Poller監聽的channel, 只能在channel所屬loop執行緒中呼叫 */
    void removeChannel(Channel* channel);
    /* 判斷Poller是否正在監聽channel, 只能在channel所屬loop執行緒中呼叫 */
    bool hasChannel(Channel* channel);

    // pid_t threadId() const { return threadId_; }
    /* 斷言當前執行緒是建立當前物件的執行緒, 如果不是就終止程式(LOG_FATAL) */
    void assertInLoopThread();
    /* 判斷前執行緒是否建立當前物件的執行緒.
     * threadId_是建立當前EventLoop物件時, 記錄的執行緒tid
     */
    bool isInLoopThread() const;
    /*
     * 判斷是否有待呼叫的回撥函式(pending functor).
     * 由其他執行緒呼叫runAt/runAfter/runEvery, 會導致回撥入佇列待呼叫.
     */
    bool callingPendingFunctors() const
    { return callingPendingFunctors_; }

    /*
     * 判斷loop執行緒是否正在處理事件, 執行事件回撥.
     * loop執行緒正在遍歷,執行啟用channels時, eventHandling_會置位; 其餘時候, 會清除.
     */
    bool eventHandling() const
    { return eventHandling_; }
    /* context_ 用於應用程式傳參, 由網路庫使用者定義資料 */
    void setContext(const boost::any& context)
    { context_ = context; }
    const boost::any& getContext() const
    { return context_; }
    boost::any* getMutableContext()
    { return &context_; }

    /* 獲取當前執行緒的EventLoop物件指標 */
    static EventLoop* getEventLoopOfCurrentThread();

private:
    /* 終止程式(LOG_FATAL), 當前執行緒不是建立當前EventLoop物件的執行緒時,
     * 由assertInLoopThread()呼叫  */
    void abortNotInLoopThread();
    /* 喚醒所屬loop執行緒, 也是wakeupFd_的事件回撥 */
    void handleRead(); // waked up
    /* 處理pending函式 */
    void doPendingFunctors();
    /* 列印啟用通道的事件資訊, 用於debug */
    void printActiveChannels() const; // DEBUG

    typedef std::vector<Channel*> ChannelList;

    bool looping_;                /* atomic, true表示loop迴圈執行中 */
    std::atomic<bool> quit_;      /* loop迴圈退出條件 */
    bool eventHandling_;          /* atomic, true表示loop迴圈正在處理事件回撥 */
    bool callingPendingFunctors_; /* atomic, true表示loop迴圈正在呼叫pending函式 */
    int64_t iteration_;           /* loop迭代次數 */
    const pid_t threadId_;                   /* 執行緒id, 物件構造時初始化 */
    Timestamp pollReturnTime_;               /* poll()返回時間點 */
    std::unique_ptr<Poller> poller_;         /* 輪詢器, 用於監聽事件 */
    std::unique_ptr<TimerQueue> timerQueue_; /* 定時器佇列 */
    int wakeupFd_;                           /* 喚醒loop執行緒的eventfd */
    /* 用於喚醒loop執行緒的channel.
     * 不像TimerQueue是內部類, 不應該暴露Channel給客戶. */
    std::unique_ptr<Channel> wakeupChannel_;
    boost::any context_;            /* 用於應用程式通過當前物件傳參的變數, 由使用者定義資料 */

    /* 臨時輔助變數 */
    ChannelList activeChannels_;    /* 啟用事件的通道列表 */
    Channel* currentActiveChannel_; /* 當前啟用的通道, 即正在呼叫事件回撥的通道 */

    mutable MutexLock mutex_;
    /* 待呼叫函式列表, 存放不在loop執行緒的其他執行緒呼叫 runAt/runAfter/runEvery, 而要執行的函式 */
    std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};

EventLoop不可拷貝,因為與之關聯的不僅物件本身,還有執行緒以及thread local資料等資源。

在這裡,可以把EventLoop功能簡要分為這幾大部分:
1)提供執行迴圈;
2)執行定時任務,一次性 or 週期;
3)處理啟用通道事件;
4)執行緒安全;

對於1),loop()提供執行迴圈,quit()退出迴圈,iterator()查詢迴圈次數,wakeup()用於喚醒loop執行緒,handleRead()讀取喚醒訊息。

對於2),runInLoop()在loop執行緒中“立即”執行一次使用者任務,runAt()/runAfter()新增一次性定時任務,runEvery()新增週期定時任務,doPendingFunctors()回撥所有的pending函式,vector pendingFunctors_用於排隊待處理函式到loop執行緒執行,queueSize()獲取該vector大小;cancel()取消定時任務。

對於3),updateChannel()/removeChannel()/hasChannel()用於通道更新/移除/判斷,vector activeChannels_儲存當前所有啟用的通道,currentActiveChannel_儲存當前正在處理的啟用通道;

對於4),isInLoopThread()/assertInLoopThread()判斷/斷言 當前執行緒是建立當前EventLoop物件的執行緒,互斥鎖mutex_用來做互斥訪問需要保護資料。

值得一提的是,boost::any型別的成員context_用來給使用者提供利用EventLoop傳資料的方式,相當於C裡面的void*,使用者可利用boost::any_cast進行轉型。

EventLoop的構造與析構

EventLoop代表一個事件迴圈,是one loop per thread的體現,每個執行緒只能有一個EventLoop物件。

EventLoop建構函式要點:
1)檢查當前執行緒是否已經建立了EventLoop物件,遇到錯誤就終止程式(LOG_FATAL);
2)記住本物件所屬執行緒id(threadId_);

解構函式要點:
1)清除當前執行緒EventLoop指標,便於下次再建立EventLoop物件。

__thread EventLoop* t_loopInThisThread = 0; // thread local變數, 指向當前執行緒建立的EventLoop物件

EventLoop::EventLoop()
: looping_(false),
threadId_(CurrentThread::tid()),
{
    LOG_DEBUG << "EventLoop create " << this << " in thread " << threadId_;
    if (t_loopInThisThread) // 當前執行緒已經包含了EventLoop物件
    {
        LOG_FATAL << "Another EventLoop " << t_loopInThisThread
        << " exists in this thread " << threadId_;
    }
    else // 當前執行緒尚未包含EventLoop物件
    {
        t_loopInThisThread = this;
    }
}

EventLoop::~EventLoop()
{
    LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
    << " destructs in thread " << CurrentThread::tid();
    t_loopInThisThread = NULL;
}

可以通過thread local變數t_loopInThisThread指向建立的EventLoop物件,來確保每個執行緒只有一個EventLoop物件。同一個執行緒內,可通過static函式getEventLoopOfCurrentThread,返回該EventLoop物件指標。

EventLoop *EventLoop::getEventLoopOfCurrentThread() // static
{
    return t_loopInThisThread;
}

特定執行緒檢查,確保執行緒安全

有些成員函式只能在EventLoop物件所線上程呼叫,如何檢查該前提條件(pre-condition)?
EventLoop提供了isInLoopThread()、assertInLoopThread(),分別用於判斷、斷言 當前執行緒為建立EventLoop物件執行緒。

從下面實現可看到,assertInLoopThread()斷言失敗時,呼叫abortNotInLoopThread()終止程式(LOG_FATAL)。

void EventLoop::assertInLoopThread() // 斷言當前執行緒(tid())是呼叫當前EventLoop物件的持有者執行緒(threadId_)
{
    if (!isInLoopThread())
    {
        abortNotInLoopThread();      // 斷言失敗則終止程式
    }
}

bool EventLoop::isInLoopThread() const // 判斷當前執行緒是否為當前EventLoop物件的持有者執行緒
{ return threadId_ == CurrentThread::tid(); }

void EventLoop::abortNotInLoopThread() // LOG_FATAL 終止程式
{
    LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
    << " was created in threadId_ = " << threadId_
    << ", current thread id = " << CurrentThread::tid();
}

loop迴圈

提供執行迴圈,不斷監聽事件、處理事件。

/**
*  真正的工作迴圈.
*  獲得所有當前啟用事件的通道,用Poller->poll()填到activeChannels_,
*  然後呼叫Channel::handleEvent()處理每個啟用通道.
*
*  最後排隊執行所有pending函式, 通常是其他執行緒通過loop來呼叫執行使用者任務
*/
void EventLoop::loop()
{
    assert(!looping_);    // to avoid reduplicate loop
    assertInLoopThread(); // to avoid new EventLoop() and loop() are not one thread
    looping_ = true;
    quit_ = false; // FIXME: what if someone calls quit() before loop() ?
    LOG_TRACE << "EventLoop " << this << " start looping";

    while (!quit_)
    {
        activeChannels_.clear(); // 清除啟用事件的通道列表
        // 監聽所有通道, 可能阻塞執行緒, 所有啟用事件對應通道會填入activeChannels_
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        ++iteration_; // 迴圈次數+1
        if (Logger::logLevel() <= Logger::TRACE)
        {
            printActiveChannels();
        }
        // TODO sort channel by priority
        // 處理所有啟用事件

        eventHandling_ = true;
        for (Channel* channel : activeChannels_)
        {
            currentActiveChannel_ = channel;
            // 通過Channel::handleEvent回撥事件處理函式
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;

        // 執行pending函式, 由其他執行緒請求呼叫的使用者任務
        doPendingFunctors();
    }

    LOG_TRACE << "EventLoop " << this << " stop looping";
    looping_ = false;
}

loop執行緒執行事件回撥的關鍵是,用Poller::poll()將啟用事件的通道填入通道列表activeChannels_,然後逐一呼叫每個通道的handleEvent,從而呼叫為Channel註冊的事件回撥來處理事件。

新增、更新、刪除通道

loop迴圈用來處理啟用事件,那使用者如何更新新增、更新、刪除通道事件呢?
前面已經提到,可以用updateChannel/removeChannel 更新/移除 Poller 監聽的通道。

關於 Poller這部分,可參見 muduo筆記 網路庫(二)I/O複用封裝Poller

/**
* 根據具體poller物件, 來更新通道.
* 會修改poller物件監聽的通道陣列.
* @note 必須在channel所屬loop執行緒執行
*/
void EventLoop::updateChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    poller_->updateChannel(channel);
}

/**
* 根據具體poller物件, 來刪除通道.
* 會刪除poller物件監聽的通道陣列.
* @note 如果待移除通道正在啟用事件佇列, 應該先從啟用事件佇列中移除
*/
void EventLoop::removeChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    if (eventHandling_)
    {
        assert(currentActiveChannel_ == channel ||
        std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
    }
    poller_->removeChannel(channel);
}

另外,可用hasChannel來判斷Poller是否正在監聽channel。

/**
* 判斷poller是否正在監聽通道channel
* @note 必須在channel所屬loop執行緒執行
*/
bool EventLoop::hasChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    return poller_->hasChannel(channel);
}

定時任務

使用者呼叫runAt/runAfter/runEvery 執行定時任務,呼叫cancel取消定時任務。

/**
* 定時功能,由使用者指定絕對時間
* @details 每為定時器佇列timerQueue新增一個Timer,
* timerQueue內部就會新建一個Timer物件, TimerId就保含了這個物件的唯一標識(序列號)
* @param time 時間戳物件, 單位1us
* @param cb 超時回撥函式. 當前時間超過time代表時間時, EventLoop就會呼叫cb
* @return 一個繫結timerQueue內部新增的Timer物件的TimerId物件, 用來唯一標識該Timer物件
*/
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
    return timerQueue_->addTimer(std::move(cb), time, 0.0);
}

/**
* 定時功能, 由使用者相對時間, 通過runAt實現
* @param delay 相對時間, 單位s, 精度1us(小數)
* @param cb 超時回撥
*/
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
    Timestamp time(addTime(Timestamp::now(), delay));
    return runAt(time, std::move(cb));
}

/**
* 定時功能, 由使用者指定週期, 重複執行
* @param interval 執行週期, 單位s, 精度1us(小數)
* @param cb 超時回撥
* @return 一個繫結timerQueue內部新增的Timer物件的TimerId物件, 用來唯一標識該Timer物件
*/
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
    Timestamp time(addTime(Timestamp::now(), interval));
    return timerQueue_->addTimer(std::move(cb), time, interval);
}

/**
* 取消指定定時器
* @param timerId Timer id, 唯一對應一個Timer物件
*/
void EventLoop::cancel(TimerId timerId)
{
    return timerQueue_->cancel(timerId);
}

使用者執行一個loop執行緒,並新增定時任務示例:

void threadFunc()
{
    assert(EventLoop::getEventLoopOfCurrentThread() == NULL);  // 斷言當前執行緒沒有建立EventLoop物件
    EventLoop loop; // 建立EventLoop物件
    assert(EventLoop::getEventLoopOfCurrentThread() == &loop); // 斷言當前執行緒建立了EventLoop物件
    loop.runAfter(1.0, callback); // 1sec後執行callback
    loop.loop(); // 啟動loop迴圈
}

runInLoop與queueInLoop執行使用者任務

同樣是執行使用者任務函式,runInLoop和queueInLoop都可以被多個執行緒執行,分為2種情況:1)如果當前執行緒是建立當前EventLoop物件的執行緒,那麼立即執行使用者任務;2)如果不是,那麼在loop迴圈中排隊執行(本次迴圈末尾),實際上這點也是由queueInLoop完成的。

queueInLoop只做了runInLoop的第2)種情況的工作,也就是隻會在loop迴圈中排隊執行使用者任務。

為什麼要對pendingFunctors_加鎖?
因為queueInLoop可以被多個執行緒訪問,意味著pendingFunctors_也能被多個執行緒訪問,加鎖確保執行緒安全。

/**
* 執行使用者任務
* @param cb 使用者任務函式
* @note 可以被多個執行緒執行:
* 如果當前執行緒是建立當前EventLoop物件的執行緒,直接執行;
* 否則,使用者任務函式入佇列pendingFunctors_成為一個pending functor,在loop迴圈中排隊執行
*/
void EventLoop::runInLoop(Functor cb)
{
    if (isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(std::move(cb));
    }
}

/**
* 排隊進入pendingFunctors_,等待執行
* @param cb 使用者任務函式
* @note 如果當前執行緒不是建立當前EventLoop物件的執行緒 或者正在呼叫pending functor,
* 就喚醒loop執行緒,避免loop執行緒阻塞.
*/
void EventLoop::queueInLoop(Functor cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(std::move(cb));
    }

    if (!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup();
    }
}

eventfd與wakeup()喚醒

有2處可能導致loop執行緒阻塞:
1)Poller::poll()中呼叫poll(2)/epoll_wait(7) 監聽fd,沒有事件就緒時;
2)使用者任務函式呼叫了可能導致阻塞的函式;

而當EventLoop加入使用者任務時,loop迴圈是沒辦法直接知道的,要避免無謂的等待,就需要及時喚醒loop執行緒。
muduo用eventfd技術,來喚醒執行緒。

eventfd原理

eventfd是Linux特有的(Linux 2.6以後),專用於事件通知的機制,類似於管道(pipe)、域套接字(UNIX Domain Socket)。
建立eventfd 函式原型:

#include <sys/eventfd.h>
/* 建立一個檔案描述符(event fd), 用於事件通知 
 *  initval 計數初值
 * flags 標誌位, 如果沒用到可設為0, 也可以用以下選項 按位或 取值: 
 *     EFD_CLOEXEC 為新建的fd設定close-on-exec(FD_CLOEXEC), 等效於以O_CLOEXEC方式open(2)
 *     EFD_NONBLOCK 等效於fcntl(2)設定O_NONBLOCK
 *     EFD_SEMAPHORE 將eventfd當訊號量一樣呼叫, read 將導致計數-1, write 將導致計數+1; 如果沒指定該標誌, read將返回8byte計數值, 且計數值歸0, write將計數值+指定值.
 * 返回 新建的fd, 用於事件通知, 繫結到一個eventfd物件; 失敗, 返回-1
 */
int eventfd(unsigned int initval, int flags);

建立完event fd後,可用read(2)讀取event fd,如果fd是阻塞的,read可能阻塞執行緒;如果event fd設定了EFD_NONBLOCK,read返回EAGIAN錯誤。直到另外一個執行緒對event fd進行write。

// 為wakeupChannel_設定讀回撥
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 使能wakeupChannel_讀事件
wakeupChannel_->enableReading();

eventfd使用示例:
執行緒1阻塞等待,執行緒2喚醒執行緒1。

#include <stdio.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <pthread.h>

#define __STDC_FORMAT_MACROS // for 跨平臺列印
#include <inttypes.h>

void* thread_func1(void* arg) /* 等待執行緒 */
{
    int wakeupfd = *(int*)arg;
    printf("thread_func1 start\n");
    uint64_t rdata;
    int ret = read(wakeupfd, &rdata, sizeof(rdata));
    if (ret < 0) {
        perror("thread_func1 read error");
        pthread_exit(NULL);
    }
    
    printf("thread_func1 success to be waked up, rdata = %" PRId64 "\n", rdata);
}

void* thread_func2(void* arg) /* 喚醒執行緒 */
{
    int wakeupfd = *(int*)arg;
    printf("thread_func2 ready to sleep 1 sec\n");
    sleep(1);
    uint64_t wdata = 10;
    int ret = write(wakeupfd, &wdata, sizeof(wdata));
    if (ret < 0) {
        perror("thread_func2 write error");
        pthread_exit(NULL);
    }
    printf("thread_func2 success to wake up another thread, wdata = %" PRId64 "\n", wdata);
}

/* 建立2個執行緒,thread_func1阻塞等待eventfd,thread_func2喚醒等等eventfd的執行緒 */
int main()
{
    int evfd = eventfd(0, 0);
    if (evfd < 0) {
        perror("eventfd error");
        exit(1);
    }
    pthread_t th1, th2;
    pthread_create(&th1, NULL, thread_func1, (void*)&evfd);
    pthread_create(&th2, NULL, thread_func2, (void*)&evfd);
    pthread_join(th1, NULL);
    pthread_join(th2, NULL);
    return 0;
}

EventLoop使用eventfd喚醒loop執行緒

1)建立event fd
建構函式中,wakeupFd_ 初值為createEventfd()

int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_SYSERR << "Failed in eventfd";
        abort();
    }
    return evtfd;
}

2)繫結event fd與喚醒通道wakeupChannel_利用event fd構造一個Channel物件後,傳遞給wakeupChannel_,便於Poller監聽、事件回撥

// 為wakeupChannel_設定讀回撥
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 使能wakeupChannel_讀事件
wakeupChannel_->enableReading();

3)啟動loop迴圈,可能阻塞在poll(2)/epoll_wait(7)

4)其他執行緒通過queueInLoop()呼叫wakeup(),喚醒阻塞的loop執行緒

/**
* 其他執行緒喚醒等待在wakeupFd_上的執行緒, 產生讀就緒事件.
* @note write將新增8byte資料到內部計數器. 被喚醒執行緒必須呼叫read讀取8byte資料.
*/
void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = sockets::write(wakeupFd_, &one, sizeof(one));
    if (n != sizeof(one))
    {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
    }
}

5)loop執行緒被喚醒後,讀取event fd

/**
* 處理wakeupChannel_讀事件
* @note read wakeupfd_
*/
void EventLoop::handleRead()
{
    uint64_t one = 1;
    ssize_t n = sockets::read(wakeupFd_, &one, sizeof(one));
    if (n != sizeof(one))
    {
        LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
    }
}

參考

https://blog.csdn.net/sinat_35261315/article/details/78329657

https://www.cnblogs.com/ailumiyana/p/10087539.html

eventfd https://blog.csdn.net/qq_28114615/article/details/97929524