MULTI,EXEC,DISCARD,WATCH 四個命令是 Redis 事務(wù)的四個基礎(chǔ)命令。其中:
在介紹 Redis 事務(wù)之前,先來展開 Redis 命令隊(duì)列的內(nèi)部實(shí)現(xiàn)。
Redis 允許一個客戶端不間斷執(zhí)行多條命令:發(fā)送 MULTI 后,用戶鍵入多條命令;再發(fā)送 EXEC 即可不間斷執(zhí)行之前輸入的多條命令。因?yàn)?,Redis 是單進(jìn)程單線的工作模式,因此多條命令的執(zhí)行是不會被中斷的。
> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1
內(nèi)部實(shí)現(xiàn)不難:Redis 服務(wù)器收到來自客戶端的 MULTI 命令后,為客戶端保存一個命令隊(duì)列結(jié)構(gòu)體,直到收到 EXEC 后才開始執(zhí)行命令隊(duì)列中的命令。
下面是命令隊(duì)列的數(shù)據(jù)結(jié)構(gòu):
// 命令結(jié)構(gòu)體,命令隊(duì)列專用
/* Client MULTI/EXEC state */
typedef struct multiCmd {
// 命令參數(shù)
robj **argv;
// 參數(shù)個數(shù)
int argc;
// 命令結(jié)構(gòu)體,包含了與命令相關(guān)的參數(shù),譬如命令執(zhí)行函數(shù)
// 如需更詳細(xì)了解,參看redis.c 中的redisCommandTable 全局參數(shù)
struct redisCommand *cmd;
} multiCmd;
// 命令隊(duì)列結(jié)構(gòu)體
typedef struct multiState {
// 命令隊(duì)列
multiCmd *commands; /* Array of MULTI commands */
// 命令的個數(shù)
int count; /* Total number of MULTI commands */
// 以下兩個參數(shù)暫時沒有用到,和主從復(fù)制有關(guān)
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;
通由上面給出的 Redis 客戶端操作,來看看 Redis 服務(wù)器的狀態(tài)變化:
> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1
http://wiki.jikexueyuan.com/project/redis/images/r4.png" alt="" />
processCommand() 函數(shù)中的一段代碼可以窺探命令入隊(duì)的操作:
// 執(zhí)行命令
int processCommand(redisClient *c) {
......
// 加入命令隊(duì)列的情況
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 命令入隊(duì)
queueMultiCommand(c);
addReply(c,shared.queued);
// 真正執(zhí)行命令。
// 注意,如果是設(shè)置了多命令模式,那么不是直接執(zhí)行命令,而是讓命令入隊(duì)
} else {
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
稍后再展開事務(wù)執(zhí)行和取消的部分。
Redis 的官方文檔上說,WATCH 命令是為了讓 Redis 擁有 check-and-set(CAS) 的特性。CAS 的意思是,一個客戶端在修改某個值之前,要檢測它是否更改;如果沒有更改,修改操作才能成功。
一個不含 CAS 特性的例子:
http://wiki.jikexueyuan.com/project/redis/images/r5.png" alt="" />
含有 CAS 特性的例子:
http://wiki.jikexueyuan.com/project/redis/images/r6.png" alt="" />
在后一個例子中,client A 第一次嘗試修改失敗,因?yàn)?client B 修改了 score.client A 失敗過后,再次嘗試修改才成功。Redis 事務(wù)的 CAS 特性借助了鍵值的監(jiān)視。
Redis 數(shù)據(jù)集結(jié)構(gòu)體 redisDB 和客戶端結(jié)構(gòu)體 redisClient 都會保存鍵值監(jiān)視的相關(guān)數(shù)據(jù)。
http://wiki.jikexueyuan.com/project/redis/images/r7.png" alt="" />
監(jiān)視鍵值的過程:
// WATCH 命令執(zhí)行函數(shù)
void watchCommand(redisClient *c) {
int j;
// WATCH 命令不能在MULTI 和EXEC 之間調(diào)用
if (c->flags & REDIS_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
// 監(jiān)視所給出的鍵
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
// 監(jiān)視鍵值函數(shù)
/* Watch for the specified key */
void watchForKey(redisClient *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
// 是否已經(jīng)監(jiān)視該鍵值
/* Check if we are already watching for this key */
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
}
// 獲取監(jiān)視該鍵值的客戶端鏈表
/* This key is not already watched in this DB. Let's add it */
clients = dictFetchValue(c->db->watched_keys,key);
// 如果不存在鏈表,需要新建一個
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
// 尾插法。將客戶端添加到鏈表尾部
listAddNodeTail(clients,c);
// 將監(jiān)視鍵添加到redisClient.watched_keys 的尾部
/* Add the new key to the list of keys watched by this client */
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}
當(dāng)客戶端鍵值被修改的時候,監(jiān)視該鍵值的所有客戶端都會被標(biāo)記為 REDISDIRTY-CAS,表示此該鍵值對被修改過,因此如果這個客戶端已經(jīng)進(jìn)入到事務(wù)狀態(tài),它命令隊(duì)列中的命令是不會被執(zhí)行的。
touchWatchedKey() 是標(biāo)記某鍵值被修改的函數(shù),它一般不被 signalModifyKey() 函數(shù)包裝。下面是 touchWatchedKey() 的實(shí)現(xiàn)。
// 標(biāo)記鍵值對的客戶端為REDIS_DIRTY_CAS,表示其所監(jiān)視的數(shù)據(jù)已經(jīng)被修改過
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
// 獲取監(jiān)視key 的所有客戶端
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
// 標(biāo)記監(jiān)視key 的所有客戶端REDIS_DIRTY_CAS
/* Mark all the clients watching this key as REDIS_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
redisClient *c = listNodeValue(ln);
// REDIS_DIRTY_CAS 更改的時候會設(shè)置此標(biāo)記
c->flags |= REDIS_DIRTY_CAS;
}
}
當(dāng)用戶發(fā)出 EXEC 的時候,在它 MULTI 命令之后提交的所有命令都會被執(zhí)行。從代碼的實(shí)現(xiàn)來看,如果客戶端監(jiān)視的數(shù)據(jù)被修改,它會被標(biāo)記 REDIS_DIRTY_CAS,會調(diào)用 discardTransaction() 從而取消該事務(wù)。特別的,用戶開啟一個事務(wù)后會提交多個命令,如果命令在入隊(duì)過程中出現(xiàn)錯誤,譬如提交的命令本身不存在,參數(shù)錯誤和內(nèi)存超額等,都會導(dǎo)致客戶端被標(biāo)記 REDIS_DIRTY_EXEC,被標(biāo)記 REDIS_DIRTY_EXEC 會導(dǎo)致事務(wù)被取消。
因此總結(jié)一下:
下面是執(zhí)行事務(wù)的過程:
// 執(zhí)行事務(wù)內(nèi)的所有命令
void execCommand(redisClient *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
// 必須設(shè)置多命令標(biāo)記
if (!(c->flags & REDIS_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
// 停止執(zhí)行事務(wù)命令的情況:
// 1. 被監(jiān)視的數(shù)據(jù)被修改
// 2. 命令隊(duì)列中的命令執(zhí)行失敗
/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
discardTransaction(c);
goto handle_monitor;
}
// 執(zhí)行隊(duì)列中的所有命令
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
// 保存當(dāng)前的命令,一般為MULTI,在執(zhí)行完所有的命令后會恢復(fù)。
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
// 命令隊(duì)列中的命令被賦值給當(dāng)前的命令
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
// 遇到包含寫操作的命令需要將MULTI 命令寫入AOF 文件
/* Propagate a MULTI request once we encounter the first write op.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */
if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
execCommandPropagateMulti(c);
must_propagate = 1;
}
// 調(diào)用call() 執(zhí)行
call(c,REDIS_CALL_FULL);
// 這幾句是多余的
/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
// 恢復(fù)當(dāng)前的命令,一般為MULTI
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
// 事務(wù)已經(jīng)執(zhí)行完畢,清理與此事務(wù)相關(guān)的信息,如命令隊(duì)列和客戶端標(biāo)記
discardTransaction(c);
/* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
if (must_propagate) server.dirty++;
......
}
如上所說,被監(jiān)視的鍵值被修改或者命令入隊(duì)出錯都會導(dǎo)致事務(wù)被取消:
// 取消事務(wù)
void discardTransaction(redisClient *c) {
// 清空命令隊(duì)列
freeClientMultiState(c);
// 初始化命令隊(duì)列
initClientMultiState(c);
// 取消標(biāo)記flag
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);;
unwatchAllKeys(c);
}
你可能已經(jīng)注意到「事務(wù)」這個詞。在學(xué)習(xí)數(shù)據(jù)庫原理的時候有提到過事務(wù)的 ACID,即原子性、一致性、隔離性、持久性。接下來,看看 Redis 事務(wù)是否支持 ACID。
原子性,即一個事務(wù)中的所有操作,要么全部完成,要么全部不完成,不會結(jié)束在中間某個環(huán)節(jié)。Redis 事務(wù)不支持原子性,最明顯的是 Redis 不支持回滾操作。一致性,在事務(wù)開始之前和事務(wù)結(jié)束以后,數(shù)據(jù)庫的完整性沒有被破壞。這一點(diǎn),Redis 事務(wù)能夠保證。
隔離性,當(dāng)兩個或者多個事務(wù)并發(fā)訪問(此處訪問指查詢和修改的操作)數(shù)據(jù)庫的同一數(shù)據(jù)時所表現(xiàn)出的相互關(guān)系。Redis 不存在多個事務(wù)的問題,因?yàn)?Redis 是單進(jìn)程單線程的工作模式。
持久性,在事務(wù)完成以后,該事務(wù)對數(shù)據(jù)庫所作的更改便持久地保存在數(shù)據(jù)庫之中,并且是完全的。Redis 提供兩種持久化的方式,即 RDB 和 AOF。RDB 持久化只備份當(dāng)前內(nèi)存中的數(shù)據(jù)集,事務(wù)執(zhí)行完畢時,其數(shù)據(jù)還在內(nèi)存中,并未立即寫入到磁盤,所以 RDB 持久化不能保證 Redis 事務(wù)的持久性。再來討論 AOF 持久化,我在《深入剖析 Redis AOF 持久化策略》中討論過:Redis AOF 有后臺執(zhí)行和邊服務(wù)邊備份兩種方式。后臺執(zhí)行和 RDB 持久化類似,只能保存當(dāng)前內(nèi)存中的數(shù)據(jù)集;邊備份邊服務(wù)的方式中,因?yàn)?Redis 只是每間隔 2s 才進(jìn)行一次備份,因此它的持久性也是不完整的!
當(dāng)然,我們可以自己修改源碼保證 Redis 事務(wù)的持久性,這不難。
還有一個亮點(diǎn),就是 check-and-set CAS。一個修改操作不斷的判斷X 值是否已經(jīng)被修改,直到 X 值沒有被其他操作修改,才設(shè)置新的值。Redis 借助 WATCH/MULTI 命令來實(shí)現(xiàn) CAS 操作的。
實(shí)際操作中,多個線程嘗試修改一個全局變量,通常我們會用鎖,從讀取這個變量的時候就開始鎖住這個資源從而阻擋其他線程的修改,修改完畢后才釋放鎖,這是悲觀鎖的做法。相對應(yīng)的有一種樂觀鎖,樂觀鎖假定其他用戶企圖修改你正在修改的對象的概率很小,直到提交變更的時候才加鎖,讀取和修改的情況都不加鎖。一般情況下,不同客戶端會訪問修改不同的鍵值對,因此一般 check 一次就可以 set 了,而不需要重復(fù) check 多次。