redis網路通訊模組設計與實現分析
redis的通訊模組封裝得非常簡單易用,可以直接用到自己專案中,學習下也是很有價值的。
本文基於redis原始碼4.0.1寫成,redis原始碼下載:github.com/antirez/red…
檔案結構
redis的網路通訊模組由8個檔案構成
作用如下檔案 | 作用 |
---|---|
ae.c | 統一epoll、evport、kqueue、select網路事件處理介面,函式實現 |
ae.h | 統一epoll、evport、kqueue、select網路事件處理介面,函式原型,共享結構體定義 |
ae_epoll.c | 封裝epoll網路事件處理庫到統一的介面 |
ae_evport.c | 封裝evport網路事件處理庫到統一的介面 |
ae_kqueue.c | 封裝kqueue網路事件處理庫到統一的介面 |
ae_select.c | 封裝select網路事件處理庫到統一的介面 |
統一網路庫底層介面
被統一的網路事件處理介面如下,參考:ae_epoll.c,ae_evport.c,ae_kqueue.c,ae_select.c
介面 | 作用 |
---|---|
aeApiState | 底層網路庫需要的fd、事件等資料共享結構體 |
aeApiCreate | 建立網路控制程式碼,比如epoll控制程式碼,在這個基礎上才能進行網路事件的監聽處理 |
aeApiResize | 修改網路庫存放事件的容器大小,就是修改aeApiState結構體的events陣列的大小 |
aeApiFree | 刪除aeApiState這個共享結構體 |
aeApiAddEvent | 將網路fd的讀寫操作交給網路庫進行處理,比如給這個epoll進行處理 |
aeApiDelEvent | 從網路操作庫中刪除對某個fd的監聽,一般伺服器往客戶端寫完資料後,主動斷開客戶端連線時會使用 |
aeApiPoll | 輪詢獲取網路正在發生io讀寫事件的事件 |
aeApiName | 獲取底層網路庫的名字,比如epool,kqueue等 |
統一網路庫上層介面
參考 ae.h,ae.c,封裝通用的結構體和函式
網路讀寫事件aeFileEvent結構體
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
複製程式碼
這個結構體封裝了網路資料讀寫事件的處理函式原型,和客戶端傳過來的資料。
aeFileProc是處理函式的原型的巨集定義,巨集定義參考如下:
typedef void aeFileProc(
struct aeEventLoop *eventLoop,int fd,void *clientData,int mask
);
複製程式碼
實際使用時,網路讀寫事件的函式處理需要自行編寫
定時事件aeTimeEvent結構體
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
複製程式碼
這個定時事件是redis用來處理後臺定時任務使用的,比如處理客戶端連線超時,服務端斷開操作
活躍的fd
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
複製程式碼
就是正在等待讀、正在等待寫的網路fd,一般以一個陣列出現
mask欄位表示是等待讀,還是等待寫,對應的值如下
#define AE_READABLE 1
#define AE_WRITABLE 2
複製程式碼
網路庫上層介面核心結構體aeEventLoop
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
複製程式碼
核心欄位如下 * 事件列表*events,index表示fd,表示網路fd的讀寫事件發生時,執行的回撥函式 * *fired 正在等待讀和寫的fd列表,通過輪詢得到,比如epoll通過epoll_wait得到的 * beforesleep 在epoll_wait前執行 * aftersleep 在epoll_wait後執行
操作aeEventLoop相關函式
主要是對aeEventLoop結構體進行操作,這裡拿幾個核心的函式舉例
aeCreateEventLoop()
這個函式主要是建立aeEventLoop結構體,並根據setsize引數給events,fired分配內容空間 然後再呼叫aeApiCreate驅動底層的網路庫,比如epoll的epoll_create(),然後儲存到aeEventLoop的apidata屬性上
aeMain()
主要是啟動無限迴圈、輪詢處理活躍的fd,如果epoll,底層則呼叫epoll_wait獲取可讀寫的fd:ae.c::aeMain() -> ae.c::aeProcessEvents() -> ae_epoll.c::aeApiPool() -> ae_epoll.c::epoll_wait()
aeCeateFileEvent()
註冊網路fd到底層網路庫上,並註冊當讀寫事件發生時,需要執行的業務回撥函式 拿epoll舉例,呼叫到epoll經過的步驟:ae.c::aeCreateFileEvent() -> ae_epoll.c::aeApiAddEvent() -> epoll_ctl()
anet.h & anet.c
主要是封裝socket操作,遮蔽系統底層socket操作的差異性,提供更好用的api,比如建立tcp server等
這裡就拿建立tcp server來看看它的流程
anetTcpServer()
anet.c::anetTcpServer() -> anet.c::_anetTcpServer() -> <sys/socket.h>::socket() -> anet.c::anetListen() -> <sys/socket.h>::listen()
核心程式碼就是呼叫系統socket庫的listen函式建立起了一個tcp的server
示例
這裡使用ae.h和anet.h來建立一個簡單的tcp伺服器,並把給客戶端傳回一個"Hello World"
程式碼如下:
#include <stdio.h>
#include <zconf.h>
#include "../src/ae.h"
#include "../src/anet.h"
char myerr[ANET_ERR_LEN] = {0};
void acceptFd(struct aeEventLoop *eventLoop,void *clientdata,int mask) {
char myerr[ANET_ERR_LEN] = {0};
printf("獲取客戶端連線的fd.\n");
char ip[20] = {0};
int port = 0;
int clientfd = anetTcpAccept(myerr,fd,ip,sizeof(ip),&port);
if (clientfd == AE_ERR) {
printf("獲取客戶端連線的fd異常!! \n");
return;
}
printf("客戶端ip %s port %d \n",port);
int ret = anetNonBlock(myerr,clientfd);
if (ret == ANET_OK) {
printf("客戶端事件非阻塞處理設定成功\n\n");
}
anetEnableTcpNoDelay(myerr,clientfd);
write(clientfd,"Hello Client!\n",14);
}
int main() {
aeEventLoop *eventLoop = aeCreateEventLoop(1024);
if(!eventLoop){
return 1;
}
int fd = anetTcpServer(myerr,8080,"0.0.0.0",511);
if (fd != ANET_ERR) {
anetNonBlock(NULL,fd);
if (aeCreateFileEvent(eventLoop,AE_READABLE,acceptFd,NULL)) {
printf("註冊tcp伺服器接收客戶端連線的事件處理函式異常");
}
}
printf("伺服器在 0.0.0.0:8080 埠監聽了! \n");
aeMain(eventLoop);
aeDeleteEventLoop(eventLoop);
return 0;
}
複製程式碼
然後進行編譯並啟動,編譯可以使用gcc,效果如下
gcc -o myserver src/ae.c src/anet.c src/zmalloc.c myserver.c
複製程式碼
可以使用telnet進行測試
可以看到客戶端已經成功收到伺服器傳回的Hello World了一些注意的點
Mac OS 10.14.5下如果無法找到c的標準庫,需要安裝這個檔案:/Library/Developer/CommandLineTools/Packages/macOS_SDK_headers_for_macOS_10.14.pkg