Redis 提供在線數(shù)據(jù)遷移的能力,把自身的數(shù)據(jù)往其他 Redis 服務器上遷移。如果需要將部分數(shù)據(jù)遷移到另一臺Redis 服務器上,這個命令會非常有用。
redis migraiton 的實現(xiàn)比較簡單。首先將需要遷移的命令打包好,發(fā)送到指定的 Redis 服務器上,回復ok 后則刪除本地的鍵值對。
這里面用了前面講到的 rio:讀寫對象既可以是文件也可以是內(nèi)存,只需要安裝相應的讀寫函數(shù)即可。這里不難理解。
網(wǎng)絡傳輸部分,用到了 Redis 內(nèi)部的 syncio 模塊,syncio 即同步 io,每讀/寫入一部分數(shù)據(jù)會用 IO 多路復用的技術等待下一次可讀寫/的機會。在 migrateCommand() 的實現(xiàn)中,先用非阻塞的方式建立一個連接,接著將打包好的遷移數(shù)據(jù)發(fā)送到目標 Redis 服務器上,并等待目標 Redis 服務器的相應。
下面通過 migrateCommand() 來了解數(shù)據(jù)遷移是如何實現(xiàn)的:
/* MIGRATE host port key dbid timeout */
void migrateCommand(redisClient *c) {
int fd;
long timeout;
long dbid;
long long ttl = 0, expireat;
robj *o;
rio cmd, payload;
// 準備需要遷移的數(shù)據(jù),這個數(shù)據(jù)可以由客戶端來指定
......
// 建立一個非阻塞連接
/* Connect */
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
atoi(c->argv[2]->ptr));
if (fd == -1) {
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return;
}
// 等待建立成功
if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
return;
}
// rio 的讀寫可以對應的是文件讀寫,也可以是內(nèi)存的讀寫,只需要安裝相應的讀寫
// 函數(shù)
// 初始化一塊空的sds buffer
/* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty());
// 把需要遷移的數(shù)據(jù)打包追加到
......
// 可以指定過期時間
expireat = getExpire(c->db,c->argv[3]);
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
// 生成restore 命令
......
// 生成包含 Redis 版本和校驗字段的 payload
/* Finally the last argument that is the serailized object payload
* in the DUMP format. */
createDumpPayload(&payload,o);
// 寫入到遷移內(nèi)容中
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
// 將遷移數(shù)據(jù)送往目標 Redis 服務器
/* Tranfer the query to the other node in 64K chunks. */
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
// 最多只傳送64K
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
// 同步寫
nwritten = syncWrite(fd,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) goto socket_wr_err;
pos += nwritten;
}
}
// 讀取目標 Redis 服務器的回復
/* Read back the reply. */
{
char buf1[1024];
char buf2[1024];
/* Read the two replies */
if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_rd_err;
if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
goto socket_rd_err;
if (buf1[0] == '-' || buf2[0] == '-') {
addReplyErrorFormat(c,"Target instance replied with error: %s",
(buf1[0] == '-') ? buf1+1 : buf2+1);
} else {
// 回復內(nèi)容正常,說明數(shù)據(jù)已經(jīng)遷移成功,刪除原始 Redis 服務器的key-value
robj *aux;
dbDelete(c->db,c->argv[3]);
signalModifiedKey(c->db,c->argv[3]);
addReply(c,shared.ok);
server.dirty++;
// 將變更更新到從機,并寫入AOF 文件
/* Translate MIGRATE as DEL for replication/AOF. */
aux = createStringObject("DEL",3);
rewriteClientCommandVector(c,2,aux,c->argv[3]);
decrRefCount(aux);
}
}
// 一些清理工作和錯誤處理
......
}
migrateCommand() 只是數(shù)據(jù)遷移的一部分代碼,目標機器還要負責將數(shù)據(jù)存儲到目標機器上,有興趣可以參考 restoreCommand() 的實現(xiàn),基本上和 migrateCommand() 是逆過來的。