鍍金池/ 教程/ 大數(shù)據(jù)/ Redis 數(shù)據(jù)遷移
Redis 數(shù)據(jù)淘汰機制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構 intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構 dict
不在浮沙筑高臺
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構綜述
源碼日志
Web 服務器存儲 session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構 sds
Memcached slab 分配策略
訂閱發(fā)布機制
Redis 是如何提供服務的
Redis 事務機制
Redis 集群(下)
主從復制
Redis 應用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構 redisOb
作者簡介
Redis 數(shù)據(jù)結(jié)構 ziplist
Redis 數(shù)據(jù)結(jié)構 skiplist
Redis 哨兵機制

Redis 數(shù)據(jù)遷移

Redis 提供在線數(shù)據(jù)遷移的能力,把自身的數(shù)據(jù)往其他 Redis 服務器上遷移。如果需要將部分數(shù)據(jù)遷移到另一臺Redis 服務器上,這個命令會非常有用。

redis migraiton 的實現(xiàn)比較簡單。首先將需要遷移的命令打包好,發(fā)送到指定的 Redis 服務器上,回復ok 后則刪除本地的鍵值對。

這里面用了前面講到的 rio:讀寫對象既可以是文件也可以是內(nèi)存,只需要安裝相應的讀寫函數(shù)即可。這里不難理解。

網(wǎng)絡傳輸部分,用到了 Redis 內(nèi)部的 syncio 模塊,syncio 即同步 io,每讀/寫入一部分數(shù)據(jù)會用 IO 多路復用的技術等待下一次可讀寫/的機會。在 migrateCommand() 的實現(xiàn)中,先用非阻塞的方式建立一個連接,接著將打包好的遷移數(shù)據(jù)發(fā)送到目標 Redis 服務器上,并等待目標 Redis 服務器的相應。

下面通過 migrateCommand() 來了解數(shù)據(jù)遷移是如何實現(xiàn)的:

/* MIGRATE host port key dbid timeout */
void migrateCommand(redisClient *c) {
    int fd;
    long timeout;
    long dbid;
    long long ttl = 0, expireat;
    robj *o;
    rio cmd, payload;
    // 準備需要遷移的數(shù)據(jù),這個數(shù)據(jù)可以由客戶端來指定
    ......
    // 建立一個非阻塞連接
    /* Connect */
    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
    atoi(c->argv[2]->ptr));
    if (fd == -1) {
        addReplyErrorFormat(c,"Can't connect to target node: %s",
        server.neterr);
        return;
    }
    // 等待建立成功
    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
        return;
    }
    // rio 的讀寫可以對應的是文件讀寫,也可以是內(nèi)存的讀寫,只需要安裝相應的讀寫
    // 函數(shù)
    // 初始化一塊空的sds buffer
    /* Create RESTORE payload and generate the protocol to call the command. */
    rioInitWithBuffer(&cmd,sdsempty());
    // 把需要遷移的數(shù)據(jù)打包追加到
    ......
    // 可以指定過期時間
    expireat = getExpire(c->db,c->argv[3]);
    if (expireat != -1) {
        ttl = expireat-mstime();
    if (ttl < 1) ttl = 1;
    }
    // 生成restore 命令
    ......
    // 生成包含 Redis 版本和校驗字段的 payload
    /* Finally the last argument that is the serailized object payload
    * in the DUMP format. */
    createDumpPayload(&payload,o);
    // 寫入到遷移內(nèi)容中
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
    sdslen(payload.io.buffer.ptr)));
    sdsfree(payload.io.buffer.ptr);
    // 將遷移數(shù)據(jù)送往目標 Redis 服務器
    /* Tranfer the query to the other node in 64K chunks. */
    {
    sds buf = cmd.io.buffer.ptr;
    size_t pos = 0, towrite;
    int nwritten = 0;
    // 最多只傳送64K
    while ((towrite = sdslen(buf)-pos) > 0) {
        towrite = (towrite > (64*1024) ? (64*1024) : towrite);
        // 同步寫
        nwritten = syncWrite(fd,buf+pos,towrite,timeout);
    if (nwritten != (signed)towrite) goto socket_wr_err;
        pos += nwritten;
        }
    }
    // 讀取目標 Redis 服務器的回復
    /* Read back the reply. */
    {
    char buf1[1024];
    char buf2[1024];
    /* Read the two replies */
    if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
        goto socket_rd_err;
    if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
        goto socket_rd_err;
    if (buf1[0] == '-' || buf2[0] == '-') {
        addReplyErrorFormat(c,"Target instance replied with error: %s",
        (buf1[0] == '-') ? buf1+1 : buf2+1);
    } else {
        // 回復內(nèi)容正常,說明數(shù)據(jù)已經(jīng)遷移成功,刪除原始 Redis 服務器的key-value
        robj *aux;
        dbDelete(c->db,c->argv[3]);
        signalModifiedKey(c->db,c->argv[3]);
        addReply(c,shared.ok);
        server.dirty++;
        // 將變更更新到從機,并寫入AOF 文件
        /* Translate MIGRATE as DEL for replication/AOF. */
        aux = createStringObject("DEL",3);
        rewriteClientCommandVector(c,2,aux,c->argv[3]);
        decrRefCount(aux);
        }
    }
    // 一些清理工作和錯誤處理
    ......
}

migrateCommand() 只是數(shù)據(jù)遷移的一部分代碼,目標機器還要負責將數(shù)據(jù)存儲到目標機器上,有興趣可以參考 restoreCommand() 的實現(xiàn),基本上和 migrateCommand() 是逆過來的。

上一篇:小剖 Memcache下一篇:Redis 哨兵機制