1. 程式人生 > 程式設計 >twitter公司redis&memcached中介軟體twemproxy原始碼分析(一)

twitter公司redis&memcached中介軟體twemproxy原始碼分析(一)

twitter公司redis&memcached中介軟體twemproxy原始碼分析(一)

twemproxy是redis和memcached連線池中介軟體

github專案地址:github.com/twitter/twe…

專案簡介參考:github.com/twitter/twe…

檔案參考:github.com/twitter/twe…

核心流程

主流程就是啟動了一個事件迴圈,所有的邏輯通過事件出發呼叫回撥函式執行

訊息流轉流程

訊息會在三種角色的fd中進行流轉:client,server,proxy

角色 作用
client 代表應用程式端接入twemproxy的fd
proxy 代表twemproxy伺服器的fd,等待應用程式端接入
server 代表twemproxy連結redis/memcache的fd,proxy fd處理程式會把從client fd讀到的資料轉到server fd中

核心流程如下:

事件系統:epoll/kqueue/evport封裝

一般linux使用epoll, mac使用kqueue,Sun Solaris使用evport

twemproxy為了統一底層api呼叫,對上面的三種事件處理api進行了介面的統一: event/nc_event.h,核心api參考:

函式 作用
event_base_create() 建立事件迴圈管理fd
event_add_in() 將一個fd的讀事件納入事件管理器的管理中
event_add_out() 將一個fd的讀寫事件納入事件管理器的管理中
event_add_conn() 將一個fd的讀寫事件納入事件管理器的管理中
event_wait() 從事件管理器中獲取準備好讀/寫的fd列表,並呼叫cb進行處理

核心結構體event_base參考: src/event/nc_event.h

struct event_base {
    int                ep;      /* epoll descriptor */
    struct epoll_event
*event;
/* event[] - events that were triggered int nevent; /* # event */ event_cb_t cb; /* event callback */ }; 複製程式碼
欄位 作用
ep 事件管理器fd
struct epoll_event 這個事件管理器可以處理的事件型別(讀、寫、hup訊號)
int nevent 最大取多少個事件
event_cb_t cb 事件處理回撥函式

twemproxy中大量使用事件回撥來驅動資料流轉,上面的event_cb_t就是一個回撥函式

typedef int (*event_cb_t)(void *,uint32_t);
複製程式碼

核心資料結構和演演算法

array 變長陣列

struct array {
    uint32_t nelem;  /* # element */
    void     *elem;  /* element */
    size_t   size;   /* element size */
    uint32_t nalloc; /* # allocated element */
};
複製程式碼

通過調整nelem和*elem指標內容實現動態陣列

string 變長字串

struct string {
    uint32_t len;   /* string length */
    uint8_t  *data; /* string data */
};
複製程式碼

紅黑樹

struct rbnode {
    struct rbnode *left;     /* left link */
    struct rbnode *right;    /* right link */
    struct rbnode *parent;   /* parent link */
    int64_t       key;       /* key for ordering */
    void          *data;     /* opaque data */
    uint8_t       color;     /* red | black */
};
複製程式碼

一致性hash

參考:src/hashkit/nc_ketama.c

佇列

twemproxy使用的佇列封裝支援數種不同型別的佇列

參考:src/nc_queue.h,這個佇列檔案來自freebsd linx核心:github.com/freebsd/fre…

配置檔案解析

twemproxy使用yaml格式的配置檔案,使用libyaml庫解析這個yaml格式的配置檔案,參考:github.com/yaml/libyam…

twemproxy配置檔案解析邏輯參考:src/nc_conf.h src/nc_conf.c

協議抽象介面

twemproxy支援memcache,redis兩種協議,抽象成介面可以把這個協議從twemproxy主流程中解耦、這樣也利於新增新的協議支援

memcached

void memcache_parse_req(struct msg *r);
void memcache_parse_rsp(struct msg *r);
bool memcache_failure(struct msg *r);
void memcache_pre_coalesce(struct msg *r);
void memcache_post_coalesce(struct msg *r);
rstatus_t memcache_add_auth(struct context *ctx,struct conn *c_conn,struct conn *s_conn);
rstatus_t memcache_fragment(struct msg *r,uint32_t ncontinuum,struct msg_tqh *frag_msgq);
rstatus_t memcache_reply(struct msg *r);
void memcache_post_connect(struct context *ctx,struct conn *conn,struct server *server);
void memcache_swallow_msg(struct conn *conn,struct msg *pmsg,struct msg *msg);
複製程式碼

redis

void redis_parse_req(struct msg *r);
void redis_parse_rsp(struct msg *r);
bool redis_failure(struct msg *r);
void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth(struct context *ctx,struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r,struct msg_tqh *frag_msgq);
rstatus_t redis_reply(struct msg *r);
void redis_post_connect(struct context *ctx,struct server *server);
void redis_swallow_msg(struct conn *conn,struct msg *msg);
複製程式碼

可以看到redis和memcached協議被統一封裝成了兩組對等的介面,對應實現參考:nc_memcache.c nc_redis.c

監控系統

twemproxy使用單獨的執行緒處理監控資料獲取請求,參考src/nc_stats.c::stats_start_aggregator

static rstatus_t
stats_start_aggregator(struct stats *st)
{
    rstatus_t status;
    // error handle...
    status = stats_listen(st);
    // error handle...

    status = pthread_create(&st->tid,NULL,stats_loop,st);
    // error handle...
    return NC_OK;
}
複製程式碼

這個監控執行緒和主執行緒間使用結構體共享監控資料,主執行緒往這個結構體中寫入實時監控資料,監控執行緒接到客戶端獲取監控資料的請求後,從這個結構體中讀取監控資料輸出給客戶端,核心結構體參考:src/nc_stats.h

typedef enum stats_type {
    // ...
} stats_type_t;

struct stats_metric {
    // ...
};

struct stats_server {
    // ...
};

struct stats_pool {
    // ...
};

struct stats_buffer {
    // ...
};

struct stats {
    // ...
};
複製程式碼

使用命令curl -s 127.0.0.1:22222 | jq .可以檢視實時的監控資訊

訊號處理

參考:src/nc_signal.h src/nc_signal.c

static struct signal signals[] = {
    { SIGUSR1,"SIGUSR1",0,signal_handler },{ SIGUSR2,"SIGUSR2",{ SIGTTIN,"SIGTTIN",{ SIGTTOU,"SIGTTOU",{ SIGHUP,"SIGHUP",{ SIGINT,"SIGINT",{ SIGSEGV,"SIGSEGV",(int)SA_RESETHAND,{ SIGPIPE,"SIGPIPE",SIG_IGN },{ 0,NULL }
};
複製程式碼

可以看到twemproxy對多個訊號進行回撥:signal_handler

void
signal_handler(int signo)
{
    //...

    switch (signo) {
    case SIGUSR1:
        break;

    case SIGUSR2:
        break;

    case SIGTTIN:
        actionstr = ",up logging level";
        action = log_level_up;
        break;

    case SIGTTOU:
        actionstr = ",down logging level";
        action = log_level_down;
        break;

    case SIGHUP:
        actionstr = ",reopening log file";
        action = log_reopen;
        break;

    case SIGINT:
        done = true;
        actionstr = ",exiting";
        break;

    case SIGSEGV:
        log_stacktrace();
        actionstr = ",core dumping";
        raise(SIGSEGV);
        break;

    default:
        NOT_REACHED();
    }

    log_safe("signal %d (%s) received%s",signo,sig->signame,actionstr);

    if (action != NULL) {
        action();
    }

    if (done) {
        exit(1);
    }
}
複製程式碼

可以看到使用SIGTTIN/訊號增加日誌輸出級別,SIGTTOU降低日誌輸入級別,SIGHUP重新開啟日誌檔案,SIGINT退出程式,SIGSEGV輸出記憶體core dump檔案

一些注意的點

本文基於twemproxy 0.4.1寫成

儘管codis目前已經成為redis中介軟體的事實選擇,但是twemproxy作為第一款memcache&redis中介軟體,仍具有比較大學習價值,並且支援memcache

參考資料

  1. man7.org/linux/man-p…
  2. blog.just4fun.site/command-too…
  3. unix.stackexchange.com/questions/1…
  4. www.jianshu.com/p/83ac498e1…