1. 程式人生 > >ZeroMQ 中文指南 第三章 高階請求-應答模式【轉載】

ZeroMQ 中文指南 第三章 高階請求-應答模式【轉載】

作者資訊如下。
ZMQ 指南

作者: Pieter Hintjens [email protected], CEO iMatix Corporation.

翻譯: 張吉 [email protected], 安居客集團 好租網工程師

NOTE: 此翻譯涵蓋2011年10月份的ZMQ穩定版本,即2.1.0 stable release。但讀者仍然可以通過此文了解ZMQ的一些基本概念和哲學。

第三章 高階請求-應答模式

在第二章中我們通過開發一系列的小應用來熟悉ØMQ的基本使用方法,每個應用會引入一些新的特性。本章會沿用這種方式,來探索更多建立在ØMQ請求-應答模式之上的高階工作模式。

本章涉及的內容有:

  • 在請求-應答模式中建立和使用訊息信封
  • 使用REQ、REP、DEALER和ROUTER套接字
  • 使用標識來手工指定應答目標
  • 使用自定義離散路由模式
  • 使用自定義最近最少使用路由模式
  • 構建高層訊息封裝類
  • 構建基本的請求應答代理
  • 合理命名套接字
  • 模擬client-worker叢集
  • 構建可擴充套件的請求-應答叢集雲
  • 使用管道套接字監控執行緒

Request-Reply Envelopes

在請求-應答模式中,信封裡儲存了應答目標的位置。這就是為什麼ØMQ網路雖然是無狀態的,但仍能完成請求-應答的過程。

在一般使用過程中,你並不需要知道請求-應答信封的工作原理。使用REQ、REP時,ØMQ會自動處理訊息信封。下一章講到的裝置(device),使用時也只需保證讀取和寫入所有的資訊即可。ØMQ使用多段訊息的方式來儲存信封,所以在複製訊息時也會複製信封。

然而,在使用高階請求-應答模式之前是需要了解信封這一機制的,以下是信封機制在ROUTER中的工作原理:

  • 從ROUTER中讀取一條訊息時,ØMQ會包上一層信封,上面註明了訊息的來源。
  • 向ROUTER寫入一條訊息時(包含信封),ØMQ會將信封拆開,並將訊息遞送給相應的物件。

如果將從ROUTER A中獲取的訊息(包含信封)寫入ROUTER B(即將訊息傳送給一個DEALER,該DEALER連線到了ROUTER),那麼在從ROUTER B中獲取該訊息時就會包含兩層信封。

信封機制的根本作用是讓ROUTER知道如何將訊息遞送給正確的應答目標,你需要做的就是在程式中保留好該信封。回顧一下REP套接字,它會將收到訊息的信封逐個拆開,將訊息本身傳送給應用程式。而在傳送時,又會在訊息外層包裹該信封,傳送給ROUTER,從而傳遞給正確的應答目標。

我們可以使用上述原理建立起一個ROUTER-DEALER裝置:

[REQ] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]
...etc.

當你用REQ套接字去連線ROUTER套接字,併發送一條請求訊息,你會從ROUTER中獲得一條如下所示的訊息:

1

  • 第三幀是應用程式傳送給REQ套接字的訊息;
  • 第二幀的空資訊是REQ套接字在傳送訊息給ROUTER之前新增的;
  • 第一幀即信封,是由ROUTER套接字新增的,記錄了訊息的來源。

如果我們在一條裝置鏈路上傳遞該訊息,最終會得到包含多層信封的訊息。最新的信封會在訊息的頂部。

2

以下將詳述我們在請求-應答模式中使用到的四種套接字型別:

  • DEALER是一種負載均衡,它會將訊息分發給已連線的節點,並使用公平佇列的機制處理接受到的訊息。DEALER的作用就像是PUSH和PULL的結合。

  • REQ傳送訊息時會在訊息頂部插入一個空幀,接受時會將空幀移去。其實REQ是建立在DEALER之上的,但REQ只有當訊息傳送並接受到迴應後才能繼續執行。

  • ROUTER在收到訊息時會在頂部新增一個信封,標記訊息來源。傳送時會通過該信封決定哪個節點可以獲取到該條訊息。

  • REP在收到訊息時會將第一個空幀之前的所有資訊儲存起來,將原始資訊傳送給應用程式。在傳送訊息時,REP會用剛才儲存的資訊包裹應答訊息。REP其實是建立在ROUTER之上的,但和REQ一樣,必須完成接受和傳送這兩個動作後才能繼續。

REP要求訊息中的信封由一個空幀結束,所以如果你沒有用REQ傳送訊息,則需要自己在訊息中新增這個空幀。

你肯定會問,ROUTER是怎麼標識訊息的來源的?答案當然是套接字的標識。我們之前講過,一個套接字可能是瞬時的,它所連線的套接字(如ROUTER)則會給它生成一個標識,與之相關聯。一個套接字也可以顯式地給自己定義一個標識,這樣其他套接字就可以直接使用了。

這是一個瞬時的套接字,ROUTER會自動生成一個UUID來標識訊息的來源。

3

這是一個持久的套接字,標識由訊息來源自己指定。

4

下面讓我們在例項中觀察上述兩種操作。下列程式會打印出ROUTER從兩個REP套接字中獲得的訊息,其中一個沒有指定標識,另一個指定了“Hello”作為標識。

identity.c

// 
// 以下程式演示瞭如何在請求-應答模式中使用套接字標識。
// 需要注意的是s_開頭的函式是由zhelpers.h提供的。
// 我們沒有必要重複編寫那些程式碼。
//
#include "zhelpers.h"

int main (void)
{
    void *context = zmq_init (1);

    void *sink = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (sink, "inproc://example");

    // 第一個套接字由0MQ自動設定標識
    void *anonymous = zmq_socket (context, ZMQ_REQ);
    zmq_connect (anonymous, "inproc://example");
    s_send (anonymous, "ROUTER uses a generated UUID");
    s_dump (sink);

    // 第二個由自己設定
    void *identified = zmq_socket (context, ZMQ_REQ);
    zmq_setsockopt (identified, ZMQ_IDENTITY, "Hello", 5);
    zmq_connect (identified, "inproc://example");
    s_send (identified, "ROUTER socket uses REQ's socket identity");
    s_dump (sink);

    zmq_close (sink);
    zmq_close (anonymous);
    zmq_close (identified);
    zmq_term (context);
    return 0;
}

執行結果:

----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER uses a generated UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER socket uses REQ's socket identity

自定義請求-應答路由

我們已經看到ROUTER套接字是如何使用信封將訊息傳送給正確的應答目標的,下面我們從一個角度來定義ROUTER:在傳送訊息時使用一定格式的信封提供正確的路由目標,ROUTER就能夠將該條訊息非同步地傳送給對應的節點。

所以說ROUTER的行為是完全可控的。在深入理解這一特性之前,讓我們先近距離觀察一下REQ和REP套接字,賦予他們一些鮮活的角色:

  • REQ是一個“媽媽”套接字,不會耐心聽別人說話,但會不斷地丟擲問題尋求解答。REQ是嚴格同步的,它永遠位於訊息鏈路的請求端;
  • REP則是一個“爸爸”套接字,只會回答問題,不會主動和別人對話。REP也是嚴格同步的,並一直位於應答端。

關於“媽媽”套接字,正如我們小時候所經歷的,只能等她向你開口時你們才能對話。媽媽不像爸爸那麼開明,也不會像DEALER套接字一樣接受模稜兩可的回答。所以,想和REQ套接字對話只有等它主動發出請求後才行,之後它就會一直等待你的回答,不管有多久。

“爸爸”套接字則給人一種強硬、冷漠的感覺,他只做一件事:無論你提出什麼問題,都會給出一個精確的回答。不要期望一個REP套接字會主動和你對話或是將你倆的交談傳達給別人,它不會這麼做的。

我們通常認為請求-應答模式一定是有來有往、有去有回的過程,但實際上這個過程是可以非同步進行的。我們只需獲得相應節點的地址,即可通過ROUTER套接字來非同步地傳送訊息。ROUTER是ZMQ中唯一一個可以定位訊息來源的套接字。

我們對請求-應答模式下的路由做一個小結:

  • 對於瞬時的套接字,ROUTER會動態生成一個UUID來標識它,因此從ROUTER中獲取到的訊息裡會包含這個標識;
  • 對於持久的套接字,可以自定義標識,ROUTER會如直接將該標識放入訊息之中;
  • 具有顯式宣告標識的節點可以連線到其他型別的套接字;
  • 節點可以通過配置檔案等機制提前獲知對方節點的標識,作出相應的處理。

我們至少有三種模式來實現和ROUTER的連線:

  • ROUTER-DEALER
  • ROUTER-REQ
  • ROUTER-REP

每種模式下我們都可以完全掌控訊息的路由方式,但不同的模式會有不一樣的應用場景和訊息流,下一節開始我們會逐一解釋。

自定義路由也有一些注意事項:

  • 自定義路由讓節點能夠控制訊息的去向,這一點有悖ØMQ的規則。使用自定義路由的唯一理由是ØMQ缺乏更多的路由演算法供我們選擇;
  • 未來的ØMQ版本可能包含一些我們自定義的路由方式,這意味著我們現在設計的程式碼可能無法在新版本的ØMQ中執行,或者成為一種多餘;
  • 內建的路由機制是可擴充套件的,且對裝置友好,但自定義路由就需要自己解決這些問題。

所以說自定義路由的成本是比較高的,更多情況下應當交由ØMQ來完成。不過既然我們已經講到這兒了,就繼續深入下去吧!

ROUTER-DEALER路由

ROUTER-DEALDER是一種最簡單的路由方式。將ROUTER和多個DEALER相連線,用一種合適的演算法來決定如何分發訊息給DEALER。DEALER可以是一個黑洞(只負責處理訊息,不給任何返回)、代理(將訊息轉發給其他節點)或是服務(會發送返回資訊)。

如果你要求DEALER能夠進行回覆,那就要保證只有一個ROUTER連線到DEALER,因為DEALER並不知道哪個特定的節點在聯絡它,如果有多個節點,它會做負載均衡,將訊息分發出去。但如果DEALER是一個黑洞,那就可以連線任何數量的節點。

ROUTER-DEALER路由可以用來做什麼呢?如果DEALER會將它完成任務的時間回覆給ROUTER,那ROUTER就可以知道這個DEALER的處理速度有多快了。因為ROUTER和DEALER都是非同步的套接字,所以我們要用zmq_poll()來處理這種情況。

下面例子中的兩個DEALER不會返回訊息給ROUTER,我們的路由採用加權隨機演算法:傳送兩倍多的資訊給其中的一個DEALER。

5

rtdealer.c

//
// 自定義ROUTER-DEALER路由
//
// 這個例項是單個程序,這樣方便啟動。
// 每個執行緒都有自己的ZMQ上下文,所以可以認為是多個程序在執行。
//
#include "zhelpers.h"
#include <pthread.h>

// 這裡定義了兩個worker,其程式碼是一樣的。
//
static void *
worker_task_a (void *args)
{
    void *context = zmq_init (1);
    void *worker = zmq_socket (context, ZMQ_DEALER);
    zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
    zmq_connect (worker, "ipc://routing.ipc");

    int total = 0;
    while (1) {
        // 我們只接受到訊息的第二部分
        char *request = s_recv (worker);
        int finished = (strcmp (request, "END") == 0);
        free (request);
        if (finished) {
            printf ("A received: %d\n", total);
            break;
        }
        total++;
    }
    zmq_close (worker);
    zmq_term (context);
    return NULL;
}

static void *
worker_task_b (void *args)
{
    void *context = zmq_init (1);
    void *worker = zmq_socket (context, ZMQ_DEALER);
    zmq_setsockopt (worker, ZMQ_IDENTITY, "B", 1);
    zmq_connect (worker, "ipc://routing.ipc");

    int total = 0;
    while (1) {
        // 我們只接受到訊息的第二部分
        char *request = s_recv (worker);
        int finished = (strcmp (request, "END") == 0);
        free (request);
        if (finished) {
            printf ("B received: %d\n", total);
            break;
        }
        total++;
    }
    zmq_close (worker);
    zmq_term (context);
    return NULL;
}

int main (void)
{
    void *context = zmq_init (1);
    void *client = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (client, "ipc://routing.ipc");

    pthread_t worker;
    pthread_create (&worker, NULL, worker_task_a, NULL);
    pthread_create (&worker, NULL, worker_task_b, NULL);

    // 等待執行緒連線至套接字,否則我們傳送的訊息將不能被正確路由
    sleep (1);

    // 傳送10個任務,給A兩倍多的量
    int task_nbr;
    srandom ((unsigned) time (NULL));
    for (task_nbr = 0; task_nbr < 10; task_nbr++) {
        // 傳送訊息的兩個部分:第一部分是目標地址
        if (randof (3) > 0)
            s_sendmore (client, "A");
        else
            s_sendmore (client, "B");

        // 然後是任務
        s_send (client, "This is the workload");
    }
    s_sendmore (client, "A");
    s_send (client, "END");

    s_sendmore (client, "B");
    s_send (client, "END");

    zmq_close (client);
    zmq_term (context);
    return 0;
}

對上述程式碼的兩點說明:

  • ROUTER並不知道DEALER何時會準備好,我們可以用訊號機制來解決,但為了不讓這個例子太過複雜,我們就用sleep(1)的方式來處理。如果沒有這句話,那ROUTER一開始發出的訊息將無法被路由,ØMQ會丟棄這些訊息。
  • 需要注意的是,除了ROUTER會丟棄無法路由的訊息外,PUB套接字當沒有SUB連線它時也會丟棄傳送出去的訊息。其他套接字則會將無法傳送的訊息儲存起來,直到有節點來處理它們。

在將訊息路由給DEALER時,我們手工建立了這樣一個信封:

6

ROUTER套接字會移除第一幀,只將第二幀的內容傳遞給相應的DEALER。當DEALER傳送訊息給ROUTER時,只會傳送一幀,ROUTER會在外層包裹一個信封(新增第一幀),返回給我們。

如果你定義了一個非法的信封地址,ROUTER會直接丟棄該訊息,不作任何提示。對於這一點我們也無能為力,因為出現這種情況只有兩種可能,一是要送達的目標節點不復存在了,或是程式中錯誤地指定了目標地址。如何才能知道訊息會被正確地路由?唯一的方法是讓路由目標傳送一些反饋訊息給我們。後面幾章會講述這一點。

DEALER的工作方式就像是PUSH和PULL的結合。但是,我們不能用PULL或PUSH去構建請求-應答模式。

最近最少使用演算法路由(LRU模式)

我們之前講過REQ套接字永遠是對話的發起方,然後等待對方回答。這一特性可以讓我們能夠保持多個REQ套接字等待調配。換句話說,REQ套接字會告訴我們它已經準備好了。

你可以將ROUTER和多個REQ相連,請求-應答的過程如下:

  • REQ傳送訊息給ROUTER
  • ROUTER返回訊息給REQ
  • REQ傳送訊息給ROUTER
  • ROUTER返回訊息給REQ

和DEALER相同,REQ只能和一個ROUTER連線,除非你想做類似多路冗餘路由這樣的事(我甚至不想在這裡解釋),其複雜度會超過你的想象並迫使你放棄的。

7

ROUTER-REQ模式可以用來做什麼?最常用的做法就是最近最少使用演算法(LRU)路由了,ROUTER發出的請求會讓等待最久的REQ來處理。請看示例:

//
// 自定義ROUTER-REQ路由
//
#include "zhelpers.h"
#include <pthread.h>

#define NBR_WORKERS 10

static void *
worker_task(void *args) {
    void *context = zmq_init(1);
    void *worker = zmq_socket(context, ZMQ_REQ);

    // s_set_id()函式會根據套接字生成一個可列印的字串,
    // 並以此作為該套接字的標識。 
    s_set_id(worker);
    zmq_connect(worker, "ipc://routing.ipc");

    int total = 0;
    while (1) {
        // 告訴ROUTER我已經準備好了
        s_send(worker, "ready");

        // 從ROUTER中獲取工作,直到收到結束的資訊
        char *workload = s_recv(worker);
        int finished = (strcmp(workload, "END") == 0);
        free(workload);
        if (finished) {
            printf("Processed: %d tasks\n", total);
            break;
        }
        total++;

        // 隨機等待一段時間
        s_sleep(randof(1000) + 1);
    }
    zmq_close(worker);
    zmq_term(context);
    return NULL;
}

int main(void) {
    void *context = zmq_init(1);
    void *client = zmq_socket(context, ZMQ_ROUTER);
    zmq_bind(client, "ipc://routing.ipc");
    srandom((unsigned) time(NULL));

    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        pthread_t worker;
        pthread_create(&worker, NULL, worker_task, NULL);
    }
    int task_nbr;
    for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
        // 最近最少使用的worker就在訊息佇列中
        char *address = s_recv(client);
        char *empty = s_recv(client);
        free(empty);
        char *ready = s_recv(client);
        free(ready);

        s_sendmore(client, address);
        s_sendmore(client, "");
        s_send(client, "This is the workload");
        free(address);
    }
    // 通知所有REQ套接字結束工作
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        char *address = s_recv(client);
        char *empty = s_recv(client);
        free(empty);
        char *ready = s_recv(client);
        free(ready);

        s_sendmore(client, address);
        s_sendmore(client, "");
        s_send(client, "END");
        free(address);
    }
    zmq_close(client);
    zmq_term(context);
    return 0;
}

在這個示例中,實現LRU演算法並沒有用到特別的資料結構,因為ØMQ的訊息佇列機制已經提供了等價的實現。一個更為實際的LRU演算法應該將已準備好的worker收集起來,儲存在一個佇列中進行分配。以後我們會講到這個例子。

程式的執行結果會將每個worker的執行次數打印出來。由於REQ套接字會隨機等待一段時間,而我們也沒有做負載均衡,所以我們希望看到的是每個worker執行相近的工作量。這也是程式執行的結果。

Processed: 8 tasks
Processed: 8 tasks
Processed: 11 tasks
Processed: 7 tasks
Processed: 9 tasks
Processed: 11 tasks
Processed: 14 tasks
Processed: 11 tasks
Processed: 11 tasks
Processed: 10 tasks

關於以上程式碼的幾點說明:

  • 我們不需要像前一個例子一樣等待一段時間,因為REQ套接字會明確告訴ROUTER它已經準備好了。

  • 我們使用了zhelpers.h提供的s_set_id()函式來為套接字生成一個可列印的字串標識,這是為了讓例子簡單一些。在現實環境中,REQ套接字都是匿名的,你需要直接呼叫zmq_recv()和zmq_send()來處理訊息,因為s_recv()和s_send()只能處理字串標識的套接字。

  • 更糟的是,我們使用了隨機的標識,不要在現實環境中使用隨機標識的持久套接字,這樣做會將節點消耗殆盡。

  • 如果你只是將上面的程式碼拷貝過來,沒有充分理解,那你就像是看到蜘蛛人從屋頂上飛下來,你也照著做了,後果自負吧。

在將訊息路由給REQ套接字時,需要注意一定的格式,即地址-空幀-訊息:

8

使用地址進行路由

在經典的請求-應答模式中,ROUTER一般不會和REP套接字通訊,而是由DEALER去和REP通訊。DEALER會將訊息隨機分發給多個REP,並獲得結果。ROUTER更適合和REQ套接字通訊。

我們應該記住,ØMQ的經典模型往往是執行得最好的,畢竟人走得多的路往往是條好路,如果不按常理出牌,那很有可能會跌入無敵深潭。下面我們就將ROUTER和REP進行連線,看看會發生什麼。

REP套接字有兩個特點:

  • 它需要完成完整的請求-應答週期;
  • 它可以接受任意大小的信封,並能完整地返回該信封。

在一般的請求-應答模式中,REP是匿名的,可以隨時替換。因為我們這裡在將自定義路由,就要做到將一條訊息傳送給REP A,而不是REP B。這樣才能保證網路的一端是你,另一端是特定的REP。

ØMQ的核心理念之一是周邊的節點應該儘可能的智慧,且數量眾多,而中介軟體則是固定和簡單的。這就意味著周邊節點可以向其他特定的節點發送訊息,比如可以連線到一個特定的REP。這裡我們先不討論如何在多個節點之間進行路由,只看最後一步中ROUTER如何和特定的REP通訊的。

9

這張圖描述了以下事件:

  • client有一條訊息,將來會通過另一個ROUTER將該訊息傳送回去。這條資訊包含了兩個地址、一個空幀、以及訊息內容;
  • client將該條訊息傳送給了ROUTER,並指定了REP的地址;
  • ROUTER將該地址移去,並以此決定其下哪個REP可以獲得該訊息;
  • REP收到該條包含地址、空幀、以及內容的訊息;
  • REP將空幀之前的所有內容移去,交給worker去處理訊息;
  • worker處理完成後將回復交給REP;
  • REP將之前儲存好的信封包裹住該條回覆,併發送給ROUTER;
  • ROUTER在該條回覆上又添加了一個註明REP的地址的幀。

這個過程看起來很複雜,但還是有必要取了解清楚的。只要記住,REP套接字會原封不動地將信封返回回去。

rtpapa.c

//
//  自定義ROUTER-REP路由
//
#include "zhelpers.h"

//  這裡使用一個程序來強調事件發生的順序性
int main (void) 
{
    void *context = zmq_init (1);

    void *client = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (client, "ipc://routing.ipc");

    void *worker = zmq_socket (context, ZMQ_REP);
    zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
    zmq_connect (worker, "ipc://routing.ipc");

    //  等待worker連線
    sleep (1);

    //  傳送REP的標識、地址、空幀、以及訊息內容
    s_sendmore (client, "A");
    s_sendmore (client, "address 3");
    s_sendmore (client, "address 2");
    s_sendmore (client, "address 1");
    s_sendmore (client, "");
    s_send     (client, "This is the workload");

    //  worker只會得到訊息內容
    s_dump (worker);

    //  worker不需要處理信封
    s_send (worker, "This is the reply");

    //  看看ROUTER裡收到了什麼
    s_dump (client);

    zmq_close (client);
    zmq_close (worker);
    zmq_term (context);
    return 0;
}

執行結果

----------------------------------------
[020] This is the workload
----------------------------------------
[001] A
[009] address 3
[009] address 2
[009] address 1
[000]
[017] This is the reply

關於以上程式碼的幾點說明:

  • 在現實環境中,ROUTER和REP套接字處於不同的節點。本例沒有啟用多程序,為的是讓事件的發生順序更為清楚。

  • zmq_connect()並不是瞬間完成的,REP和ROUTER連線的時候是會花費一些時間的。在現實環境中,ROUTER無從得知REP是否已經連線成功了,除非得到REP的某些迴應。本例中使用sleep(1)來處理這一問題,如果不這樣做,那REP將無法獲得訊息(自己嘗試一下吧)。

  • 我們使用REP的套接字標識來進行路由,如果你不信,可以將訊息傳送給B,看看A能不能收到。

  • 本例中的s_dump()等函式來自於zhelpers.h檔案,可以看到在進行套接字連線時程式碼都是一樣的,所以我們才能在ØMQ API的基礎上搭建上層的API。等今後我們討論到複雜應用程式的時候再詳細說明。

要將訊息路由給REP,我們需要建立它能辨別的信封:

10

請求-應答模式下的訊息代理

這一節我們將對如何使用ØMQ訊息信封做一個回顧,並嘗試編寫一個通用的訊息代理裝置。我們會建立一個佇列裝置來連線多個client和worker,裝置的路由演算法可以由我們自己決定。這裡我們選擇最近最少使用演算法,因為這和負載均衡一樣比較實用。

首先讓我們回顧一下經典的請求-應答模型,嘗試用它建立一個不斷增長的巨型服務網路。最基本的請求-應答模型是:

11

這個模型支援多個REP套接字,但如果我們想支援多個REQ套接字,就需要增加一箇中間件,它通常是ROUTER和DEALER的結合體,簡單將兩個套接字之間的資訊進行搬運,因此可以用現成的ZMQ_QUEUE裝置來實現:

+--------+  +--------+  +--------+
| Client |  | Client |  | Client |
+--------+  +--------+  +--------+
|  REQ   |  |  REQ   |  |  REQ   |
+---+----+  +---+----+  +---+----+
    |           |           |
    +-----------+-----------+
                |
            +---+----+
            | ROUTER |
            +--------+
            | Device |
            +--------+
            | DEALER |
            +---+----+
                |
    +-----------+-----------+
    |           |           |
+---+----+  +---+----+  +---+----+
|  REP   |  |  REP   |  |  REP   |
+--------+  +--------+  +--------+
| Worker |  | Worker |  | Worker |
+--------+  +--------+  +--------+


Figure # - Stretched request-reply

這種結構的關鍵在於,ROUTER會將訊息來自哪個REQ記錄下來,生成一個信封。DEALER和REP套接字在傳輸訊息的過程中不會丟棄或更改信封的內容,這樣當訊息返回給ROUTER時,它就知道應該傳送給哪個REQ了。這個模型中的REP套接字是匿名的,並沒有特定的地址,所以只能提供同一種服務。

上述結構中,對REP的路由我們使用了DEADER自帶的負載均衡演算法。但是,我們想用LRU演算法來進行路由,這就要用到ROUTER-REP模式:

12

這個ROUTER-ROUTER的LRU佇列不能簡單地在兩個套接字間搬運訊息,以下程式碼會比較複雜,不過在請求-應答模式中複用性很高。

lruqueue.c

//
//  使用LRU演算法的裝置
//  client和worker處於不同的執行緒中
//
#include "zhelpers.h"
#include <pthread.h>

#define NBR_CLIENTS 10
#define NBR_WORKERS 3

//  出隊操作,使用一個可儲存任何型別的陣列實現
#define DEQUEUE(q) memmove (&(q)[0], &(q)[1], sizeof (q) - sizeof (q [0]))

//  使用REQ套接字實現基本的請求-應答模式
//  由於s_send()和s_recv()不能處理0MQ的二進位制套接字標識,
//  所以這裡會生成一個可列印的字串標識。
//
static void *
client_task (void *args)
{
    void *context = zmq_init (1);
    void *client = zmq_socket (context, ZMQ_REQ);
    s_set_id (client);          //  設定可列印的標識
    zmq_connect (client, "ipc://frontend.ipc");

    //  傳送請求並獲取應答資訊
    s_send (client, "HELLO");
    char *reply = s_recv (client);
    printf ("Client: %s\n", reply);
    free (reply);
    zmq_close (client);
    zmq_term (context);
    return NULL;
}

//  worker使用REQ套接字實現LRU演算法
//
static void *
worker_task (void *args)
{
    void *context = zmq_init (1);
    void *worker = zmq_socket (context, ZMQ_REQ);
    s_set_id (worker);          //  設定可列印的標識
    zmq_connect (worker, "ipc://backend.ipc");

    //  告訴代理worker已經準備好
    s_send (worker, "READY");

    while (1) {
        //  將訊息中空幀之前的所有內容(信封)儲存起來,
        //  本例中空幀之前只有一幀,但可以有更多。
        char *address = s_recv (worker);
        char *empty = s_recv (worker);
        assert (*empty == 0);
        free (empty);

        //  獲取請求,併發送回應
        char *request = s_recv (worker);
        printf ("Worker: %s\n", request);
        free (request);

        s_sendmore (worker, address);
        s_sendmore (worker, "");
        s_send     (worker, "OK");
        free (address);
    }
    zmq_close (worker);
    zmq_term (context);
    return NULL;
}

int main (void)
{
    //  準備0MQ上下文和套接字
    void *context = zmq_init (1);
    void *frontend = zmq_socket (context, ZMQ_ROUTER);
    void *backend  = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (frontend, "ipc://frontend.ipc");
    zmq_bind (backend,  "ipc://backend.ipc");

    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
        pthread_t client;
        pthread_create (&client, NULL, client_task, NULL);
    }
    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_task, NULL);
    }
    //  LRU邏輯
    //  - 一直從backend中獲取訊息;當有超過一個worker空閒時才從frontend獲取訊息。
    //  - 當woker迴應時,會將該worker標記為已準備好,並轉發woker的迴應給client
    //  - 如果client傳送了請求,就將該請求轉發給下一個worker

    //  存放可用worker的佇列
    int available_workers = 0;
    char *worker_queue [10];

    while (1) {
        zmq_pollitem_t items [] = {
            { backend,  0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        zmq_poll (items, available_workers? 2: 1, -1);

        //  處理backend中worker的佇列
        if (items [0].revents & ZMQ_POLLIN) {
            //  將worker的地址入隊
            char *worker_addr = s_recv (backend);
            assert (available_workers < NBR_WORKERS);
            worker_queue [available_workers++] = worker_addr;

            //  跳過空幀

            char *empty = s_recv (backend);
            assert (empty [0] == 0);
            free (empty);

            // 第三幀是“READY”或是一個client的地址
            char *client_addr = s_recv (backend);

            //  如果是一個應答訊息,則轉發給client
            if (strcmp (client_addr, "READY") != 0) {
                empty = s_recv (backend);
                assert (empty [0] == 0);
                free (empty);
                char *reply = s_recv (backend);
                s_sendmore (frontend, client_addr);
                s_sendmore (frontend, "");
                s_send     (frontend, reply);
                free (reply);
                if (--client_nbr == 0)
                    break;      //  處理N條訊息後退出
            }
            free (client_addr);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  獲取下一個client的請求,交給空閒的worker處理
            //  client請求的訊息格式是:[client地址][空幀][請求內容]
            char *client_addr = s_recv (frontend);
            char *empty = s_recv (frontend);
            assert (empty [0] == 0);
            free (empty);
            char *request = s_recv (frontend);

            s_sendmore (backend, worker_queue [0]);
            s_sendmore (backend, "");
            s_sendmore (backend, client_addr);
            s_sendmore (backend, "");
            s_send     (backend, request);

            free (client_addr);
            free (request);

            //  將該worker的地址出隊
            free (worker_queue [0]);
            DEQUEUE (worker_queue);
            available_workers--;
        }
    }
    zmq_close (frontend);
    zmq_close (backend);
    zmq_term (context);
    return 0;
}

這段程式有兩個關鍵點:1、各個套接字是如何處理信封的;2、LRU演算法。我們先來看信封的格式。

我們知道REQ套接字在傳送訊息時會向頭部新增一個空幀,接收時又會自動移除。我們要做的就是在傳輸訊息時滿足REQ的要求,處理好空幀。另外還要注意,ROUTER會在所有收到的訊息前新增訊息來源的地址。

現在我們就將完整的請求-應答流程走一遍,我們將client套接字的標識設為“CLIENT”,worker的設為“WORKER”。以下是client傳送的訊息:

13

代理從ROUTER中獲取到的訊息格式如下:

14

代理會從LRU佇列中獲取一個空閒woker的地址,作為信封附加在訊息之上,傳送給ROUTER。注意要新增一個空幀。

15

REQ(worker)收到訊息時,會將信封和空幀移去:

16

可以看到,worker收到的訊息和client端ROUTER收到的訊息是一致的。worker需要將該訊息中的信封儲存起來,只對訊息內容做操作。

在返回的過程中:

  • worker通過REQ傳輸給device訊息[client地址][空幀][應答內容];
  • device從worker端的ROUTER中獲取到[worker地址][空幀][client地址][空幀][應答內容];
  • device將worker地址儲存起來,併發送[client地址][空幀][應答內容]給client端的ROUTER;
  • client從REQ中獲得到[應答內容]。

然後再看看LRU演算法,它要求client和worker都使用REQ套接字,並正確的儲存和返回訊息信封,具體如下:

  • 建立一組poll,不斷地從backend(worker端的ROUTER)獲取訊息;只有當有空閒的worker時才從frontend(client端的ROUTER)獲取訊息;

  • 迴圈執行poll

  • 如果backend有訊息,只有兩種情況:1)READY訊息(該worker已準備好,等待分配);2)應答訊息(需要轉發給client)。兩種情況下我們都會儲存worker的地址,放入LRU佇列中,如果有應答內容,則轉發給相應的client。

  • 如果frontend有訊息,我們從LRU佇列中取出下一個worker,將該請求傳送給它。這就需要傳送[worker地址][空幀][client地址][空幀][請求內容]到worker端的ROUTER。

我們可以對該演算法進行擴充套件,如在worker啟動時做一個自我測試,計算出自身的處理速度,並隨READY訊息傳送給代理,這樣代理在分配工作時就可以做相應的安排。

ØMQ上層API的封裝

使用ØMQ提供的API操作多段訊息時是很麻煩的,如以下程式碼:

while (1) {
    //  將訊息中空幀之前的所有內容(信封)儲存起來,
    //  本例中空幀之前只有一幀,但可以有更多。
    char *address = s_recv (worker);
    char *empty = s_recv (worker);
    assert (*empty == 0);
    free (empty);

    //  獲取請