1. 程式人生 > >Redis 設計與實現(第十章) -- 持久化AOF

Redis 設計與實現(第十章) -- 持久化AOF

written led atom 我們 continue ctc unixtime 機制 warn

概述


Redis除了RDB方式提供持久化外,還提供了AOF的方式,和RDB保存數據庫的鍵值對不同,AOF是記錄數據庫執行的命令來記錄數據庫狀態的。當AOF開始時,Redis服務器加載時,會先檢查AOF文件是否存在,如果存在,則加載AOF,否則加載RDB文件。本章主要分為:

1.AOF實現

2.AOF文件載入與還原

3.AOF重寫及實現

AOF實現

AOF持久化功能的實現可以分為命令追加、文件寫入、文件同步三個步驟。

命令追加

當AOF持久化功能處於打開狀態時,服務器在執行完一個寫命令之後,會以協議格式將被執行的寫命令追加到AOF_BUF緩沖區的末尾,可以看下redisServer的數據庫數據結構中,有包含這一屬性:

/* AOF persistence */
    int aof_state;                  /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
    int aof_fsync;                  /* Kind of fsync() policy */
    char *aof_filename;             /* Name of the AOF file */
    int aof_no_fsync_on_rewrite;    /* Don‘t fsync if a rewrite is in prog. */
    int
aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /*
Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ sds aof_buf; /* AOF buffer, written before entering the event loop ,且類型為SDS*/ int aof_fd; /* File descriptor of currently selected AOF file */ int aof_selected_db; /* Currently selected DB in AOF */

比如在客戶端執行命令,set key value,服務器執行完這個命令後,會將命令轉換為以下格式追加到緩沖區末尾:

技術分享

文件寫入與同步

Redis服務器就是一個事件循環,這個循環中的文件事件復雜客戶端的請求以及回復,而時間事件就是執行像serverCron這樣需要定時執行的函數。

在服務器處理文件事件時,可能會執行寫命令,會有一些寫命令被追加到AOF緩沖區。所以在服務器結束一個文件事件前,會調用flushAppendOnlyFile函數還判斷是否需要到緩沖區的內容進行保存到AOF文件,函數的源碼如下:

void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) return;  //判斷緩沖區大小

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)  //判斷AOF配置
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponing, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can‘t wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }

append_fsync有三種選項,默認為AOF_FSYNC_EVERYSEC:

技術分享

AOF文件的載入

生成AOF文件後,會啟動服務器時,會優先加載AOF文件,怎麽去加載呢?

首先,Redis server會fork一個無網絡連接的偽客戶端(因為redis命令只能從客戶端執行,前面講了AOF文件保存的是執行的命令),接下來就是從AOF文件中讀取並解析一行命令,並通過偽客戶端來執行該命令。然後循環讀取文件、解析、執行。具體過程如下:

技術分享

AOF重寫

前面有講到,AOF文件保存的是執行的命令,比如先在Redis客戶端執行了四條命令,set msg 1,set msg 2,set msg 3,set msg 4,這樣四條命令,其實最後msg的值為4,我們只需要保存最後一條命令即可,但是如果按照之前的實現,我們需要保存4條命令。正是由於這種機制,所以AOF文件會越來越大,而且會有很多類似這種情況的命令。體積過大不僅可能影響主機,更加會影響到載入及還原的時間,所以我們需要對AOF文件進行重寫,以減少體積。類似上面這種情況,在AOF重寫後,只需要有一條命令就夠了。那AOF是怎麽實現的呢?

實現

雖然Redis中這個功能叫做AOF重寫,但是並沒有對現有的AOF文件進行任何讀取,分析,寫入操作,這個功能實際上是通過讀取服務器當前的數據庫狀態實現的。

還是拿上面的set msg的幾條命令來舉例,保存到AOF時,會有4條命令,這時候最快的速度不是去分析AOF文件,而且直接從數據庫中取出msg的內容為4,再通過命令set msg 4保存到新的AOF文件中。其他類型的list,hash等都是類似的操作。所以AOF重寫後只有一條命令。源碼如下:

技術分享
int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();
    char byte;
    size_t processed = 0;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /* SELECT the new DB */
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /* Iterate this DB writing every entry */
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;

            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            /* If this key is already expired skip it */
            if (expiretime != -1 && expiretime < now) continue;

            /* Save the key and associated value */
            if (o->type == REDIS_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(&aof,o) == 0) goto werr;
            } else if (o->type == REDIS_LIST) {
                if (rewriteListObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_SET) {
                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_ZSET) {
                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_HASH) {
                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
            } else {
                redisPanic("Unknown object type");
            }
            /* Save the expire time */
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
            }
            /* Read some diff from the parent process from time to time. */
            if (aof.processed_bytes > processed+1024*10) {
                processed = aof.processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }

    /* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;

    /* Read again a few times to get more data from the parent.
     * We can‘t read forever (the server may receive data from clients
     * faster than it is able to send data to the child), so we try to read
     * some more data in a loop as soon as there is a good chance more data
     * will come. If it looks like we are wasting time, we abort (this
     * happens after 20 ms without new data). */
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
    }

    /* Ask the master to stop sending diffs. */
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 10 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != !) goto werr;
    redisLog(REDIS_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

    /* Read the final diff if any. */
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
    redisLog(REDIS_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS‘s output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
    return REDIS_OK;

werr:
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}
rewriteAppendOnlyFile

在AOF重寫時,為了不影響服務器處理其他請求,會使用子進程來處理AOF的重寫。但是這樣會有一個問題:在重寫AOF期間,可能會有其他客戶端請求命令過來,新的命令修改了數據庫狀態,這樣就使得服務器當前狀態和AOF文件所保存的狀態不一致。舉個例子:

在AOF重寫時,數據庫中只有一個k1值,在子進程重寫過程中,有產生了k2,k3,k4幾個鍵值,這樣AOF重寫後的狀態(k1)就與數據庫當前狀態(k1,k2,k3,k4)不一致。

技術分享

為了解決這個問題,Redis設置了一個重寫緩沖區,當Redis服務器執行一個寫命令後,會同時將命令寫入到AOF緩沖區和AOF重寫緩沖區。

在子進程完成AOF重寫後,會給父進程發一個信號,父進程會再調用函數,進行以下處理:

1.將AOF重寫緩沖區的內容寫入到AOF文件,這樣AOF保存的數據庫狀態和當前數據庫狀態就一致了;

2.將新的AOF文件進行改名,原子的覆蓋現有的AOF文件,完成新舊文件的替換。

技術分享

Redis 設計與實現(第十章) -- 持久化AOF