1. 程式人生 > >服務器端編程心得(三)—— 一個服務器程序的架構介紹

服務器端編程心得(三)—— 一個服務器程序的架構介紹

工具 對象管理 length 客戶端 != static turn lte ron

本文將介紹我曾經做過的一個項目的服務器架構和服務器編程的一些重要細節。

一、程序運行環境

操作系統:centos 7.0

編譯器:gcc/g++ 4.8.3 cmake 2.8.11

mysql數據庫:5.5.47

項目代碼管理工具:VS2013

二、程序結構

該程序總共有17個線程,其中分為9個數據庫工作線程D和一個日誌線程L,6個普通工作線程W,一個主線程M。(以下會用這些字母來代指這些線程)

(一)、數據庫工作線程的用途

9個數據庫工作線程在線程啟動之初,與mysql建立連接,也就是說每個線程都與mysql保持一路連接,共9個數據庫連接。

每個數據庫工作線程同時存在兩個任務隊列,第一個隊列A存放需要執行數據庫增刪查改操作的任務sqlTask,第二個隊列B存放sqlTask執行完成後的結果。sqlTask執行完成後立即放入結果隊列中,因而結果隊列中任務也是一個個的需要執行的任務。大致偽代碼如下:

void db_thread_func()
{
    while (!m_bExit)
    {
        if (NULL != (pTask = m_sqlTask.Pop()))
        {
            //從m_sqlTask中取出的任務先執行完成後,pTask將攜帶結果數據
            pTask->Execute();            
            //得到結果後,立刻將該任務放入結果任務隊列
            m_resultTask.Push(pTask);
            continue;
        }

        sleep(
1000); } }v

現在的問題來了:

1. 任務隊列A中的任務從何而來,目前只有消費者,沒有生產者,那麽生產者是誰?

2. 任務隊列B中的任務將去何方,目前只有生產者沒有消費者。

這兩個問題先放一會兒,等到後面我再來回答。

(二)工作線程和主線程

在介紹主線程和工作線程具體做什麽時,我們介紹下服務器編程中常常抽象出來的幾個概念(這裏以tcp連接為例):

1. TcpServer 即Tcp服務,服務器需要綁定ip地址和端口號,並在該端口號上偵聽客戶端的連接(往往由一個成員變量TcpListener來管理偵聽細節)。所以一個TcpServer要做的就是這些工作。除此之外,每當有新連接到來時,TcpServer需要接收新連接,當多個新連接存在時,TcpServer需要有條不紊地管理這些連接:連接的建立、斷開等,即產生和管理下文中說的TcpConnection對象。

2.一個連接對應一個TcpConnection對象,TcpConnection對象管理著這個連接的一些信息:如連接狀態、本端和對端的ip地址和端口號等。

3.數據通道對象Channel,Channel記錄了socket的句柄,因而是一個連接上執行數據收發的真正執行者,Channel對象一般作為TcpConnection的成員變量。

4. TcpSession對象,是將Channel收取的數據進行解包,或者對準備好的數據進行裝包,並傳給Channel發送。

歸納起來:一個TcpServer依靠TcpListener對新連接的偵聽和處理,依靠TcpConnection對象對連接上的數據進行管理,TcpConnection實際依靠Channel對數據進行收發,依靠TcpSession對數據進行裝包和解包。也就是說一個TcpServer存在一個TcpListener,對應多個TcpConnection,有幾個TcpConnection就有幾個TcpSession,同時也就有幾個Channel。

以上說的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服務器框架的網絡層。一個好的網絡框架,應該做到與業務代碼脫耦。即上層代碼只需要拿到數據,執行業務邏輯,而不用關註數據的收發和網絡數據包的封包和解包以及網絡狀態的變化(比如網絡斷開與重連)。

拿數據的發送來說:

當業務邏輯將數據交給TcpSession,TcpSession將數據裝好包後(裝包過程後可以有一些加密或壓縮操作),交給TcpConnection::SendData(),而TcpConnection::SendData()實際是調用Channel::SendData(),因為Channel含有socket句柄,所以Channel::SendData()真正調用send()/sendto()/write()方法將數據發出去。

對於數據的接收,稍微有一點不同:

通過select()/poll()/epoll()等IO multiplex技術,確定好了哪些TcpConnection上有數據到來後,激活該TcpConnection的Channel對象去調用recv()/recvfrom()/read()來收取數據。數據收到以後,將數據交由TcpSession來處理,最終交給業務層。註意數據收取、解包乃至交給業務層是一定要分開的。我的意思是:最好不要解包並交給業務層和數據收取的邏輯放在一起。因為數據收取是IO操作,而解包和交給業務層是邏輯計算操作。IO操作一般比邏輯計算要慢。到底如何安排要根據服務器業務來取舍,也就是說你要想好你的服務器程序的性能瓶頸在網絡IO還是邏輯計算,即使是網絡IO,也可以分為上行操作和下行操作,上行操作即客戶端發數據給服務器,下行即服務器發數據給客戶端。有時候數據上行少,下行大。(如遊戲服務器,一個npc移動了位置,上行是該客戶端通知服務器自己最新位置,而下行確是服務器要告訴在場的每個客戶端)。

在我的博文《服務器端編程心得(一)—— 主線程與工作線程的分工》中介紹了,工作線程的流程:

while (!m_bQuit)
{
    epoll_or_select_func();

    handle_io_events();

    handle_other_things();
}

其中epoll_or_select_func()即是上文所說的通過select()/poll()/epoll()等IO multiplex技術,確定好了哪些TcpConnection上有數據到來。我的服務器代碼中一般只會監測socket可讀事件,而不會監測socket可寫事件。至於如何發數據,文章後面會介紹。所以對於可讀事件,以epoll為例,這裏需要設置的標識位是:

EPOLLIN 普通可讀事件(當連接正常時,產生這個事件,recv()/read()函數返回收到的字節數;當連接關閉,這兩個函數返回0,也就是說我們設置這個標識已經可以監測到新來數據和對端關閉事件)

EPOLLRDHUP 對端關閉事件(linux man手冊上說這個事件可以監測對端關閉,但我實際調試時發送即使對端關閉也沒觸發這個事件,仍然是EPOLLIN,只不過此時調用recv()/read()函數,返回值會為0,所以實際項目中是否可以通過設置這個標識來監測對端關閉,仍然待考證)

EPOLLPRI 帶外數據

muduo裏面將epoll_wait的超時事件設置為1毫秒,我的另一個項目將epoll_wait超時時間設置為10毫秒。這兩個數值供大家參考。

這個項目中,工作線程和主線程都是上文代碼中的邏輯,主線程監聽偵聽socket上的可讀事件,也就是監測是否有新連接來了。主線程和每個工作線程上都存在一個epollfd。如果新連接來了,則在主線程的handle_io_events()中接收新連接。產生的新連接的socket句柄掛接到哪個線程的epollfd上呢?這裏采取的做法是round-robin算法,即存在一個對象CWorkerThreadManager記錄了各個工作線程上工作狀態。偽碼大致如下:

void attach_new_fd(int newsocketfd)
{
    workerthread = get_next_worker_thread(next);
    workerthread.attach_to_epollfd(newsocketfd);
    ++next;
    if (next > max_worker_thread_num)
        next = 0;
}

即先從第一個工作線程的epollfd開始掛接新來socket,接著累加索引,這樣下次就是第二個工作線程了。如果所以超出工作線程數目,則從第一個工作重新開始。這裏解決了新連接socket“負載均衡”的問題。在實際代碼中還有個需要註意的細節就是:epoll_wait的函數中的struct epoll_event 數量開始到底要設置多少個才合理?存在的顧慮是,多了浪費,少了不夠用,我在曾經一個項目中直接用的是4096:

const int EPOLL_MAX_EVENTS = 4096;
const int dwSelectTimeout = 10000;
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);

我在陳碩的muduo網絡庫中發現作者才用了一個比較好的思路,即動態擴張數量:開始是n個,當發現有事件的fd數量已經到達n個後,將struct epoll_event數量調整成2n個,下次如果還不夠,則變成4n個,以此類推,作者巧妙地利用stl::vector在內存中的連續性來實現了這種思路:

//初始化代碼
std::vector<struct epoll_event> events_(16);

//線程循環裏面的代碼
while (m_bExit)
{
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), 1);
    if (numEvents > 0)
    {
        if (static_cast<size_t>(numEvents) == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
}

讀到這裏,你可能覺得工作線程所做的工作也不過就是調用handle_io_events()來接收網絡數據,其實不然,工作線程也可以做程序業務邏輯上的一些工作。也就是在handle_other_things()裏面。那如何將這些工作加到handle_other_things()中去做呢?寫一個隊列,任務先放入隊列,再讓handle_other_things()從隊列中取出來做?我在該項目中也借鑒了muduo庫的做法。即handle_other_things()中調用一系列函數指針,偽碼如下:

void do_other_things()
{
    somefunc();
}


//m_functors是一個stl::vector,其中每一個元素為一個函數指針
void somefunc()
{
    for (size_t i = 0; i < m_functors.size(); ++i)
    {
        m_functors[i]();
    }

    m_functors.clear();
}

當任務產生時,只要我們將執行任務的函數push_back到m_functors這個stl::vector對象中即可。但是問題來了,如果是其他線程產生的任務,兩個線程同時操作m_functors,必然要加鎖,這也會影響效率。muduo是這樣做的:

void add_task(const Functor& cb)
{
    std::unique_lock<std::mutex> lock(mutex_);
    m_functors.push_back(cb);    
}

void do_task()
{
    std::vector<Functor> functors;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(m_functors);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }
}

看到沒有,利用一個棧變量functors將m_functors中的任務函數指針倒換(swap)過來了,這樣大大減小了對m_functors操作時的加鎖粒度。前後變化:變化前,相當於原來A給B多少東西,B消耗多少,A給的時候,B不能消耗;B消耗的時候A不能給。現在變成A將東西放到籃子裏面去,B從籃子裏面拿,B如果拿去一部分後,只有消耗完了才會來拿,或者A通知B去籃子裏面拿,而B忙碌時,A是不會通知B來拿,這個時候A只管將東西放在籃子裏面就可以了。

bool bBusy = false;
void add_task(const Functor& cb)
{
    std::unique_lock<std::mutex> lock(mutex_);
    m_functors_.push_back(cb);

    //B不忙碌時只管往籃子裏面加,不要通知B
    if (!bBusy)
    {
        wakeup_to_do_task();
    }
}

void do_task()
{
    bBusy = true;
    std::vector<Functor> functors;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }

    bBusy = false;
}

看,多巧妙的做法!

因為每個工作線程都存在一個m_functors,現在問題來了,如何將產生的任務均衡地分配給每個工作線程。這個做法類似上文中如何將新連接的socket句柄掛載到工作線程的epollfd上,也是round-robin算法。上文已經描述,此處不再贅述。

還有種情況,就是希望任務產生時,工作線程能夠立馬執行這些任務,而不是等epoll_wait超時返回之後。這個時候的做法,就是使用一些技巧喚醒epoll_wait,linux系統可以使用socketpair或timerevent、eventfd等技巧(這個細節在我的博文《服務器端編程心得(一)—— 主線程與工作線程的分工》已經詳細介紹過了)。

上文中留下三個問題:

1. 數據庫線程任務隊列A中的任務從何而來,目前只有消費者,沒有生產者,那麽生產者是誰?

2.數據庫線程任務隊列B中的任務將去何方,目前只有生產者沒有消費者。

3.業務層的數據如何發送出去?

問題1的答案是:業務層產生任務可能會交給數據庫任務隊列A,這裏的業務層代碼可能就是工作線程中do_other_things()函數執行體中的調用。至於交給這個9個數據庫線程的哪一個的任務隊列,同樣采用了round-robin算法。所以就存在一個對象CDbThreadManager來管理這九個數據庫線程。下面的偽碼是向數據庫工作線程中加入任務:

bool CDbThreadManager::AddTask(IMysqlTask* poTask )
{
    if (m_index >= m_dwThreadsCount)
    {
        m_index = 0;
    }

    return m_aoMysqlThreads[m_index++].AddTask(poTask);
}

同理問題2中的消費者也可能就是do_other_things()函數執行體中的調用。

現在來說問題3,業務層的數據產生後,經過TcpSession裝包後,需要發送的話,產生任務丟給工作線程的do_other_things(),然後在相關的Channel裏面發送,因為沒有監測該socket上的可寫事件,所以該數據可能調用send()或者write()時會阻塞,沒關系,sleep()一會兒,繼續發送,一直嘗試,到數據發出去。偽碼如下:

bool Channel::Send()
{
    int offset = 0;
    while (true)
    {
        int n = ::send(socketfd, buf + offset, length - offset);
        if (n == -1)
        {
            if (errno == EWOULDBLOCK)
            {
                ::sleep(100);
                continue;
            }
        }
        //對方關閉了socket,這端建議也關閉
        else if (n == 0)
        {
            close(socketfd);
            return false;
        }

        offset += n;
        if (offset >= length)
            break;

    }

    return true;    
}

最後,還有一個日誌線程沒有介紹,高性能的日誌實現方案目前並不常見。限於文章篇幅,下次再介紹。

zhangyl 2016.12.02晚12:35

技術分享圖片

服務器端編程心得(三)—— 一個服務器程序的架構介紹