twitter公司redis&memcached中介軟體twemproxy原始碼分析(一)
twitter公司redis&memcached中介軟體twemproxy原始碼分析(一)
twemproxy是redis和memcached連線池中介軟體
github專案地址:github.com/twitter/twe…
專案簡介參考:github.com/twitter/twe…
核心流程
主流程就是啟動了一個事件迴圈,所有的邏輯通過事件出發呼叫回撥函式執行
訊息流轉流程
訊息會在三種角色的fd中進行流轉:client,server,proxy
角色 | 作用 |
---|---|
client | 代表應用程式端接入twemproxy的fd |
proxy | 代表twemproxy伺服器的fd,等待應用程式端接入 |
server | 代表twemproxy連結redis/memcache的fd,proxy fd處理程式會把從client fd讀到的資料轉到server fd中 |
核心流程如下:
事件系統:epoll/kqueue/evport封裝
一般linux使用epoll, mac使用kqueue,Sun Solaris使用evport
twemproxy為了統一底層api呼叫,對上面的三種事件處理api進行了介面的統一: event/nc_event.h,核心api參考:
函式 | 作用 |
---|---|
event_base_create() | 建立事件迴圈管理fd |
event_add_in() | 將一個fd的讀事件納入事件管理器的管理中 |
event_add_out() | 將一個fd的讀寫事件納入事件管理器的管理中 |
event_add_conn() | 將一個fd的讀寫事件納入事件管理器的管理中 |
event_wait() | 從事件管理器中獲取準備好讀/寫的fd列表,並呼叫cb進行處理 |
核心結構體event_base參考: src/event/nc_event.h
struct event_base {
int ep; /* epoll descriptor */
struct epoll_event *event; /* event[] - events that were triggered
int nevent; /* # event */
event_cb_t cb; /* event callback */
};
複製程式碼
欄位 | 作用 |
---|---|
ep | 事件管理器fd |
struct epoll_event | 這個事件管理器可以處理的事件型別(讀、寫、hup訊號) |
int nevent | 最大取多少個事件 |
event_cb_t cb | 事件處理回撥函式 |
twemproxy中大量使用事件回撥來驅動資料流轉,上面的event_cb_t就是一個回撥函式
typedef int (*event_cb_t)(void *,uint32_t);
複製程式碼
核心資料結構和演演算法
array 變長陣列
struct array {
uint32_t nelem; /* # element */
void *elem; /* element */
size_t size; /* element size */
uint32_t nalloc; /* # allocated element */
};
複製程式碼
通過調整nelem和*elem指標內容實現動態陣列
string 變長字串
struct string {
uint32_t len; /* string length */
uint8_t *data; /* string data */
};
複製程式碼
紅黑樹
struct rbnode {
struct rbnode *left; /* left link */
struct rbnode *right; /* right link */
struct rbnode *parent; /* parent link */
int64_t key; /* key for ordering */
void *data; /* opaque data */
uint8_t color; /* red | black */
};
複製程式碼
一致性hash
參考:src/hashkit/nc_ketama.c
佇列
twemproxy使用的佇列封裝支援數種不同型別的佇列
參考:src/nc_queue.h,這個佇列檔案來自freebsd linx核心:github.com/freebsd/fre…
配置檔案解析
twemproxy使用yaml格式的配置檔案,使用libyaml庫解析這個yaml格式的配置檔案,參考:github.com/yaml/libyam…
twemproxy配置檔案解析邏輯參考:src/nc_conf.h src/nc_conf.c
協議抽象介面
twemproxy支援memcache,redis兩種協議,抽象成介面可以把這個協議從twemproxy主流程中解耦、這樣也利於新增新的協議支援
memcached
void memcache_parse_req(struct msg *r);
void memcache_parse_rsp(struct msg *r);
bool memcache_failure(struct msg *r);
void memcache_pre_coalesce(struct msg *r);
void memcache_post_coalesce(struct msg *r);
rstatus_t memcache_add_auth(struct context *ctx,struct conn *c_conn,struct conn *s_conn);
rstatus_t memcache_fragment(struct msg *r,uint32_t ncontinuum,struct msg_tqh *frag_msgq);
rstatus_t memcache_reply(struct msg *r);
void memcache_post_connect(struct context *ctx,struct conn *conn,struct server *server);
void memcache_swallow_msg(struct conn *conn,struct msg *pmsg,struct msg *msg);
複製程式碼
redis
void redis_parse_req(struct msg *r);
void redis_parse_rsp(struct msg *r);
bool redis_failure(struct msg *r);
void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth(struct context *ctx,struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r,struct msg_tqh *frag_msgq);
rstatus_t redis_reply(struct msg *r);
void redis_post_connect(struct context *ctx,struct server *server);
void redis_swallow_msg(struct conn *conn,struct msg *msg);
複製程式碼
可以看到redis和memcached協議被統一封裝成了兩組對等的介面,對應實現參考:nc_memcache.c nc_redis.c
監控系統
twemproxy使用單獨的執行緒處理監控資料獲取請求,參考src/nc_stats.c::stats_start_aggregator
static rstatus_t
stats_start_aggregator(struct stats *st)
{
rstatus_t status;
// error handle...
status = stats_listen(st);
// error handle...
status = pthread_create(&st->tid,NULL,stats_loop,st);
// error handle...
return NC_OK;
}
複製程式碼
這個監控執行緒和主執行緒間使用結構體共享監控資料,主執行緒往這個結構體中寫入實時監控資料,監控執行緒接到客戶端獲取監控資料的請求後,從這個結構體中讀取監控資料輸出給客戶端,核心結構體參考:src/nc_stats.h
typedef enum stats_type {
// ...
} stats_type_t;
struct stats_metric {
// ...
};
struct stats_server {
// ...
};
struct stats_pool {
// ...
};
struct stats_buffer {
// ...
};
struct stats {
// ...
};
複製程式碼
使用命令curl -s 127.0.0.1:22222 | jq .
可以檢視實時的監控資訊
訊號處理
參考:src/nc_signal.h src/nc_signal.c
static struct signal signals[] = {
{ SIGUSR1,"SIGUSR1",0,signal_handler },{ SIGUSR2,"SIGUSR2",{ SIGTTIN,"SIGTTIN",{ SIGTTOU,"SIGTTOU",{ SIGHUP,"SIGHUP",{ SIGINT,"SIGINT",{ SIGSEGV,"SIGSEGV",(int)SA_RESETHAND,{ SIGPIPE,"SIGPIPE",SIG_IGN },{ 0,NULL }
};
複製程式碼
可以看到twemproxy對多個訊號進行回撥:signal_handler
void
signal_handler(int signo)
{
//...
switch (signo) {
case SIGUSR1:
break;
case SIGUSR2:
break;
case SIGTTIN:
actionstr = ",up logging level";
action = log_level_up;
break;
case SIGTTOU:
actionstr = ",down logging level";
action = log_level_down;
break;
case SIGHUP:
actionstr = ",reopening log file";
action = log_reopen;
break;
case SIGINT:
done = true;
actionstr = ",exiting";
break;
case SIGSEGV:
log_stacktrace();
actionstr = ",core dumping";
raise(SIGSEGV);
break;
default:
NOT_REACHED();
}
log_safe("signal %d (%s) received%s",signo,sig->signame,actionstr);
if (action != NULL) {
action();
}
if (done) {
exit(1);
}
}
複製程式碼
可以看到使用SIGTTIN/訊號增加日誌輸出級別,SIGTTOU降低日誌輸入級別,SIGHUP重新開啟日誌檔案,SIGINT退出程式,SIGSEGV輸出記憶體core dump檔案
一些注意的點
本文基於twemproxy 0.4.1寫成
儘管codis目前已經成為redis中介軟體的事實選擇,但是twemproxy作為第一款memcache&redis中介軟體,仍具有比較大學習價值,並且支援memcache