1. 程式人生 > 程式設計 >redis網路通訊模組設計與實現分析

redis網路通訊模組設計與實現分析

redis的通訊模組封裝得非常簡單易用,可以直接用到自己專案中,學習下也是很有價值的。

本文基於redis原始碼4.0.1寫成,redis原始碼下載:github.com/antirez/red…

檔案結構

redis的網路通訊模組由8個檔案構成

作用如下

檔案 作用
ae.c 統一epoll、evport、kqueue、select網路事件處理介面,函式實現
ae.h 統一epoll、evport、kqueue、select網路事件處理介面,函式原型,共享結構體定義
ae_epoll.c 封裝epoll網路事件處理庫到統一的介面
ae_evport.c 封裝evport網路事件處理庫到統一的介面
ae_kqueue.c 封裝kqueue網路事件處理庫到統一的介面
ae_select.c 封裝select網路事件處理庫到統一的介面

統一網路庫底層介面

被統一的網路事件處理介面如下,參考:ae_epoll.c,ae_evport.c,ae_kqueue.c,ae_select.c

介面 作用
aeApiState 底層網路庫需要的fd、事件等資料共享結構體
aeApiCreate 建立網路控制程式碼,比如epoll控制程式碼,在這個基礎上才能進行網路事件的監聽處理
aeApiResize 修改網路庫存放事件的容器大小,就是修改aeApiState結構體的events陣列的大小
aeApiFree 刪除aeApiState這個共享結構體
aeApiAddEvent 將網路fd的讀寫操作交給網路庫進行處理,比如給這個epoll進行處理
aeApiDelEvent 從網路操作庫中刪除對某個fd的監聽,一般伺服器往客戶端寫完資料後,主動斷開客戶端連線時會使用
aeApiPoll 輪詢獲取網路正在發生io讀寫事件的事件
aeApiName 獲取底層網路庫的名字,比如epool,kqueue等

統一網路庫上層介面

參考 ae.h,ae.c,封裝通用的結構體和函式

網路讀寫事件aeFileEvent結構體

/* File event structure */
typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; 複製程式碼

這個結構體封裝了網路資料讀寫事件的處理函式原型,和客戶端傳過來的資料。

aeFileProc是處理函式的原型的巨集定義,巨集定義參考如下:

typedef void aeFileProc(
    struct aeEventLoop *eventLoop,int fd,void *clientData,int mask
);
複製程式碼

實際使用時,網路讀寫事件的函式處理需要自行編寫

定時事件aeTimeEvent結構體

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;
複製程式碼

這個定時事件是redis用來處理後臺定時任務使用的,比如處理客戶端連線超時,服務端斷開操作

活躍的fd

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
複製程式碼

就是正在等待讀、正在等待寫的網路fd,一般以一個陣列出現

mask欄位表示是等待讀,還是等待寫,對應的值如下

#define AE_READABLE 1
#define AE_WRITABLE 2
複製程式碼

網路庫上層介面核心結構體aeEventLoop

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;
複製程式碼

核心欄位如下 * 事件列表*events,index表示fd,表示網路fd的讀寫事件發生時,執行的回撥函式 * *fired 正在等待讀和寫的fd列表,通過輪詢得到,比如epoll通過epoll_wait得到的 * beforesleep 在epoll_wait前執行 * aftersleep 在epoll_wait後執行

操作aeEventLoop相關函式

主要是對aeEventLoop結構體進行操作,這裡拿幾個核心的函式舉例

aeCreateEventLoop()

這個函式主要是建立aeEventLoop結構體,並根據setsize引數給events,fired分配內容空間 然後再呼叫aeApiCreate驅動底層的網路庫,比如epoll的epoll_create(),然後儲存到aeEventLoop的apidata屬性上

aeMain()

主要是啟動無限迴圈、輪詢處理活躍的fd,如果epoll,底層則呼叫epoll_wait獲取可讀寫的fd:ae.c::aeMain() -> ae.c::aeProcessEvents() -> ae_epoll.c::aeApiPool() -> ae_epoll.c::epoll_wait()

aeCeateFileEvent()

註冊網路fd到底層網路庫上,並註冊當讀寫事件發生時,需要執行的業務回撥函式 拿epoll舉例,呼叫到epoll經過的步驟:ae.c::aeCreateFileEvent() -> ae_epoll.c::aeApiAddEvent() -> epoll_ctl()

anet.h & anet.c

主要是封裝socket操作,遮蔽系統底層socket操作的差異性,提供更好用的api,比如建立tcp server等

這裡就拿建立tcp server來看看它的流程

anetTcpServer()

anet.c::anetTcpServer() -> anet.c::_anetTcpServer() -> <sys/socket.h>::socket() -> anet.c::anetListen() -> <sys/socket.h>::listen()

核心程式碼就是呼叫系統socket庫的listen函式建立起了一個tcp的server

示例

這裡使用ae.h和anet.h來建立一個簡單的tcp伺服器,並把給客戶端傳回一個"Hello World"

程式碼如下:

#include <stdio.h>
#include <zconf.h>
#include "../src/ae.h"
#include "../src/anet.h"

char myerr[ANET_ERR_LEN] = {0};
void acceptFd(struct aeEventLoop *eventLoop,void *clientdata,int mask) {
    char myerr[ANET_ERR_LEN] = {0};
    printf("獲取客戶端連線的fd.\n");
    char ip[20] = {0};
    int port = 0;
    int clientfd = anetTcpAccept(myerr,fd,ip,sizeof(ip),&port);

    if (clientfd == AE_ERR) {
        printf("獲取客戶端連線的fd異常!! \n");
        return;
    }

    printf("客戶端ip %s port %d \n",port);

    int ret = anetNonBlock(myerr,clientfd);
    if (ret == ANET_OK) {
        printf("客戶端事件非阻塞處理設定成功\n\n");
    }
    anetEnableTcpNoDelay(myerr,clientfd);
    write(clientfd,"Hello Client!\n",14);
}

int main() {
    aeEventLoop *eventLoop = aeCreateEventLoop(1024);
    if(!eventLoop){
        return 1;
    }
    int fd = anetTcpServer(myerr,8080,"0.0.0.0",511);

    if (fd != ANET_ERR) {
        anetNonBlock(NULL,fd);
        if (aeCreateFileEvent(eventLoop,AE_READABLE,acceptFd,NULL)) {
            printf("註冊tcp伺服器接收客戶端連線的事件處理函式異常");
        }
    }

    printf("伺服器在 0.0.0.0:8080 埠監聽了! \n");
    aeMain(eventLoop);
    aeDeleteEventLoop(eventLoop);
    return 0;
}
複製程式碼

然後進行編譯並啟動,編譯可以使用gcc,效果如下

gcc -o myserver src/ae.c src/anet.c src/zmalloc.c myserver.c
複製程式碼

可以使用telnet進行測試

可以看到客戶端已經成功收到伺服器傳回的Hello World了

一些注意的點

Mac OS 10.14.5下如果無法找到c的標準庫,需要安裝這個檔案:/Library/Developer/CommandLineTools/Packages/macOS_SDK_headers_for_macOS_10.14.pkg

參考資料

  1. blog.51cto.com/leejia/1021…
  2. blog.csdn.net/yusiguyuan/…