假設(shè)你在旅游,而且正在一輛在夜間運(yùn)行的火車(chē)上。在夜間,如何在正確的站點(diǎn)下車(chē)呢?一種方法是整晚都要醒著,然后注意到了哪一站。這樣,你就不會(huì)錯(cuò)過(guò)你要到達(dá)的站點(diǎn),但是這樣會(huì)讓你感到很疲倦。另外,你可以看一下時(shí)間表,估計(jì)一下火車(chē)到達(dá)目的地的時(shí)間,然后在一個(gè)稍早的時(shí)間點(diǎn)上設(shè)置鬧鈴,然后你就可以安心的睡會(huì)了。這個(gè)方法聽(tīng)起來(lái)也很不錯(cuò),也沒(méi)有錯(cuò)過(guò)你要下車(chē)的站點(diǎn),但是當(dāng)火車(chē)晚點(diǎn)的時(shí)候,你就要被過(guò)早的叫醒了。當(dāng)然,鬧鐘的電池也可能會(huì)沒(méi)電了,并導(dǎo)致你睡過(guò)站。理想的方式是,無(wú)論是早或晚,只要當(dāng)火車(chē)到站的時(shí)候,有人或其他東西能把你喚醒,就好了。
這和線程有什么關(guān)系呢?好吧,讓我們來(lái)聯(lián)系一下。當(dāng)一個(gè)線程等待另一個(gè)線程完成任務(wù)時(shí),它會(huì)有很多選擇。第一,它可以持續(xù)的檢查共享數(shù)據(jù)標(biāo)志(用于做保護(hù)工作的互斥量),直到另一線程完成工作時(shí)對(duì)這個(gè)標(biāo)志進(jìn)行重設(shè)。不過(guò),就是一種浪費(fèi):線程消耗寶貴的執(zhí)行時(shí)間持續(xù)的檢查對(duì)應(yīng)標(biāo)志,并且當(dāng)互斥量被等待線程上鎖后,其他線程就沒(méi)有辦法獲取鎖,這樣線程就會(huì)持續(xù)等待。因?yàn)橐陨戏绞綄?duì)等待線程限制資源,并且在完成時(shí)阻礙對(duì)標(biāo)識(shí)的設(shè)置。這種情況類(lèi)似與,保持清醒狀態(tài)和列車(chē)駕駛員聊了一晚上:駕駛員不得不緩慢駕駛,因?yàn)槟惴稚⒘怂淖⒁饬Γ曰疖?chē)需要更長(zhǎng)的時(shí)間,才能到站。同樣的,等待的線程會(huì)等待更長(zhǎng)的時(shí)間,這些線程也在消耗著系統(tǒng)資源。
第二個(gè)選擇是在等待線程在檢查間隙,使用std::this_thread::sleep_for()
進(jìn)行周期性的間歇(詳見(jiàn)4.3節(jié)):
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解鎖互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再鎖互斥量
}
}
這個(gè)循環(huán)中,在休眠前②,函數(shù)對(duì)互斥量進(jìn)行解鎖①,并且在休眠結(jié)束后再對(duì)互斥量進(jìn)行上鎖,所以另外的線程就有機(jī)會(huì)獲取鎖并設(shè)置標(biāo)識(shí)。
這個(gè)實(shí)現(xiàn)就進(jìn)步很多,因?yàn)楫?dāng)線程休眠時(shí),線程沒(méi)有浪費(fèi)執(zhí)行時(shí)間,但是很難確定正確的休眠時(shí)間。太短的休眠和沒(méi)有休眠一樣,都會(huì)浪費(fèi)執(zhí)行時(shí)間;太長(zhǎng)的休眠時(shí)間,可能會(huì)讓任務(wù)等待線程醒來(lái)。休眠時(shí)間過(guò)長(zhǎng)是很少見(jiàn)的情況,因?yàn)檫@會(huì)直接影響到程序的行為,當(dāng)在高節(jié)奏游戲中,它意味著丟幀,或在一個(gè)實(shí)時(shí)應(yīng)用中超越了一個(gè)時(shí)間片。
第三個(gè)選擇(也是優(yōu)先的選擇)是,使用C++標(biāo)準(zhǔn)庫(kù)提供的工具去等待事件的發(fā)生。通過(guò)另一線程觸發(fā)等待事件的機(jī)制是最基本的喚醒方式(例如:流水線上存在額外的任務(wù)時(shí)),這種機(jī)制就稱(chēng)為“條件變量”。從概念上來(lái)說(shuō),一個(gè)條件變量會(huì)與多個(gè)事件或其他條件相關(guān),并且一個(gè)或多個(gè)線程會(huì)等待條件的達(dá)成。當(dāng)某些線程被終止時(shí),為了喚醒等待線程(允許等待線程繼續(xù)執(zhí)行)終止的線程將會(huì)向等待著的線程廣播“條件達(dá)成”的信息。
C++標(biāo)準(zhǔn)庫(kù)對(duì)條件變量有兩套實(shí)現(xiàn):std::condition_variable
和std::condition_variable_any
。這兩個(gè)實(shí)現(xiàn)都包含在<condition_variable>
頭文件的聲明中。兩者都需要與一個(gè)互斥量一起才能工作(互斥量是為了同步);前者僅限于與std::mutex
一起工作,而后者可以和任何滿(mǎn)足最低標(biāo)準(zhǔn)的互斥量一起工作,從而加上了_any的后綴。因?yàn)?code>std::condition_variable_any更加通用,這就可能從體積、性能,以及系統(tǒng)資源的使用方面產(chǎn)生額外的開(kāi)銷(xiāo),所以std::condition_variable
一般作為首選的類(lèi)型,當(dāng)對(duì)靈活性有硬性要求時(shí),我們才會(huì)去考慮std::condition_variable_any
。
所以,如何使用std::condition_variable
去處理之前提到的情況——當(dāng)有數(shù)據(jù)需要處理時(shí),如何喚醒休眠中的線程對(duì)其進(jìn)行處理?以下清單展示了一種使用條件變量做喚醒的方式。
清單4.1 使用std::condition_variable
處理數(shù)據(jù)等待
std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // 2
data_cond.notify_one(); // 3
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut); // 4
data_cond.wait(
lk,[]{return !data_queue.empty();}); // 5
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock(); // 6
process(data);
if(is_last_chunk(data))
break;
}
}
首先,你擁有一個(gè)用來(lái)在兩個(gè)線程之間傳遞數(shù)據(jù)的隊(duì)列①。當(dāng)數(shù)據(jù)準(zhǔn)備好時(shí),使用std::lock_guard
對(duì)隊(duì)列上鎖,將準(zhǔn)備好的數(shù)據(jù)壓入隊(duì)列中②,之后線程會(huì)對(duì)隊(duì)列中的數(shù)據(jù)上鎖。然后調(diào)用std::condition_variable
的notify_one()成員函數(shù),對(duì)等待的線程(如果有等待線程)進(jìn)行通知③。
在另外一側(cè),你有一個(gè)正在處理數(shù)據(jù)的線程,這個(gè)線程首先對(duì)互斥量上鎖,但在這里std::unique_lock
要比std::lock_guard
④更加合適——且聽(tīng)我細(xì)細(xì)道來(lái)。線程之后會(huì)調(diào)用std::condition_variable
的成員函數(shù)wait(),傳遞一個(gè)鎖和一個(gè)lambda函數(shù)表達(dá)式(作為等待的條件⑤)。Lambda函數(shù)是C++11
添加的新特性,它可以讓一個(gè)匿名函數(shù)作為其他表達(dá)式的一部分,并且非常合適作為標(biāo)準(zhǔn)函數(shù)的謂詞,例如wait()函數(shù)。在這個(gè)例子中,簡(jiǎn)單的lambda函數(shù)[]{return !data_queue.empty();}
會(huì)去檢查data_queue是否不為空,當(dāng)data_queue不為空——那就意味著隊(duì)列中已經(jīng)準(zhǔn)備好數(shù)據(jù)了。附錄A的A.5節(jié)有Lambda函數(shù)更多的信息。
wait()會(huì)去檢查這些條件(通過(guò)調(diào)用所提供的lambda函數(shù)),當(dāng)條件滿(mǎn)足(lambda函數(shù)返回true)時(shí)返回。如果條件不滿(mǎn)足(lambda函數(shù)返回false),wait()函數(shù)將解鎖互斥量,并且將這個(gè)線程(上段提到的處理數(shù)據(jù)的線程)置于阻塞或等待狀態(tài)。當(dāng)準(zhǔn)備數(shù)據(jù)的線程調(diào)用notify_one()通知條件變量時(shí),處理數(shù)據(jù)的線程從睡眠狀態(tài)中蘇醒,重新獲取互斥鎖,并且對(duì)條件再次檢查,在條件滿(mǎn)足的情況下,從wait()返回并繼續(xù)持有鎖。當(dāng)條件不滿(mǎn)足時(shí),線程將對(duì)互斥量解鎖,并且重新開(kāi)始等待。這就是為什么用std::unique_lock
而不使用std::lock_guard
——等待中的線程必須在等待期間解鎖互斥量,并在這這之后對(duì)互斥量再次上鎖,而std::lock_guard
沒(méi)有這么靈活。如果互斥量在線程休眠期間保持鎖住狀態(tài),準(zhǔn)備數(shù)據(jù)的線程將無(wú)法鎖住互斥量,也無(wú)法添加數(shù)據(jù)到隊(duì)列中;同樣的,等待線程也永遠(yuǎn)不會(huì)知道條件何時(shí)滿(mǎn)足。
清單4.1使用了一個(gè)簡(jiǎn)單的lambda函數(shù)用于等待⑤,這個(gè)函數(shù)用于檢查隊(duì)列何時(shí)不為空,不過(guò)任意的函數(shù)和可調(diào)用對(duì)象都可以傳入wait()。當(dāng)你已經(jīng)寫(xiě)好了一個(gè)函數(shù)去做檢查條件(或許比清單中簡(jiǎn)單檢查要復(fù)雜很多),那就可以直接將這個(gè)函數(shù)傳入wait();不一定非要放在一個(gè)lambda表達(dá)式中。在調(diào)用wait()的過(guò)程中,一個(gè)條件變量可能會(huì)去檢查給定條件若干次;然而,它總是在互斥量被鎖定時(shí)這樣做,當(dāng)且僅當(dāng)提供測(cè)試條件的函數(shù)返回true時(shí),它就會(huì)立即返回。當(dāng)?shù)却€程重新獲取互斥量并檢查條件時(shí),如果它并非直接響應(yīng)另一個(gè)線程的通知,這就是所謂的偽喚醒(spurious wakeup)。因?yàn)槿魏蝹螁拘训臄?shù)量和頻率都是不確定的,這里不建議使用一個(gè)有副作用的函數(shù)做條件檢查。當(dāng)你這樣做了,就必須做好多次產(chǎn)生副作用的心理準(zhǔn)備。
解鎖std::unique_lock
的靈活性,不僅適用于對(duì)wait()的調(diào)用;它還可以用于有待處理但還未處理的數(shù)據(jù)⑥。處理數(shù)據(jù)可能是一個(gè)耗時(shí)的操作,并且如你在第3章見(jiàn)到的,你就知道持有鎖的時(shí)間過(guò)長(zhǎng)是一個(gè)多么糟糕的主意。
使用隊(duì)列在多個(gè)線程中轉(zhuǎn)移數(shù)據(jù)(如清單4.1)是很常見(jiàn)的。做得好的話(huà),同步操作可以限制在隊(duì)列本身,同步問(wèn)題和條件競(jìng)爭(zhēng)出現(xiàn)的概率也會(huì)降低。鑒于這些好處,現(xiàn)在從清單4.1中提取出一個(gè)通用線程安全的隊(duì)列。
當(dāng)你正在設(shè)計(jì)一個(gè)通用隊(duì)列時(shí),花一些時(shí)間想想有哪些操作需要添加到隊(duì)列實(shí)現(xiàn)中去,就如之前在3.2.3節(jié)看到的線程安全的棧??梢钥匆幌翪++標(biāo)準(zhǔn)庫(kù)提供的實(shí)現(xiàn),找找靈感;std::queue<>
容器的接口展示如下:
清單4.2 std::queue
接口
template <class T, class Container = std::deque<T> >
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template <class... Args> void emplace(Args&&... args);
};
當(dāng)你忽略構(gòu)造、賦值以及交換操作時(shí),你就剩下了三組操作:1. 對(duì)整個(gè)隊(duì)列的狀態(tài)進(jìn)行查詢(xún)(empty()和size());2.查詢(xún)?cè)陉?duì)列中的各個(gè)元素(front()和back());3.修改隊(duì)列的操作(push(), pop()和emplace())。這就和3.2.3中的棧一樣了,因此你也會(huì)遇到在固有接口上的條件競(jìng)爭(zhēng)。因此,你需要將front()和pop()合并成一個(gè)函數(shù)調(diào)用,就像之前在棧實(shí)現(xiàn)時(shí)合并top()和pop()一樣。與清單4.1中的代碼不同的是:當(dāng)使用隊(duì)列在多個(gè)線程中傳遞數(shù)據(jù)時(shí),接收線程通常需要等待數(shù)據(jù)的壓入。這里我們提供pop()函數(shù)的兩個(gè)變種:try_pop()和wait_and_pop()。try_pop() ,嘗試從隊(duì)列中彈出數(shù)據(jù),總會(huì)直接返回(當(dāng)有失敗時(shí)),即使沒(méi)有值可檢索;wait_and_pop(),將會(huì)等待有值可檢索的時(shí)候才返回。當(dāng)你使用之前棧的方式來(lái)實(shí)現(xiàn)你的隊(duì)列,你實(shí)現(xiàn)的隊(duì)列接口就可能會(huì)是下面這樣:
清單4.3 線程安全隊(duì)列的接口
#include <memory> // 為了使用std::shared_ptr
template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(
const threadsafe_queue&) = delete; // 不允許簡(jiǎn)單的賦值
void push(T new_value);
bool try_pop(T& value); // 1
std::shared_ptr<T> try_pop(); // 2
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};
就像之前對(duì)棧做的那樣,在這里你將很多構(gòu)造函數(shù)剪掉了,并且禁止了對(duì)隊(duì)列的簡(jiǎn)單賦值。和之前一樣,你也需要提供兩個(gè)版本的try_pop()和wait_for_pop()。第一個(gè)重載的try_pop()①在引用變量中存儲(chǔ)著檢索值,所以它可以用來(lái)返回隊(duì)列中值的狀態(tài);當(dāng)檢索到一個(gè)變量時(shí),他將返回true,否則將返回false(詳見(jiàn)A.2節(jié))。第二個(gè)重載②就不能做這樣了,因?yàn)樗怯脕?lái)直接返回檢索值的。當(dāng)沒(méi)有值可檢索時(shí),這個(gè)函數(shù)可以返回NULL指針。
那么問(wèn)題來(lái)了,如何將以上這些和清單4.1中的代碼相關(guān)聯(lián)呢?好吧,我們現(xiàn)在就來(lái)看看怎么去關(guān)聯(lián)。你可以從之前的代碼中提取push()和wait_and_pop(),如以下清單所示。
清單4.4 從清單4.1中提取push()和wait_and_pop()
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue; // 1
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
data_queue.push(data); // 2
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data); // 3
process(data);
if(is_last_chunk(data))
break;
}
}
線程隊(duì)列的實(shí)例中包含有互斥量和條件變量,所以獨(dú)立的變量就不需要了①,并且調(diào)用push()也不需要外部同步②。當(dāng)然,wait_and_pop()還要兼顧條件變量的等待③。
另一個(gè)wait_and_pop()函數(shù)的重載寫(xiě)起來(lái)就很瑣碎了,剩下的函數(shù)就像從清單3.5實(shí)現(xiàn)的棧中一個(gè)個(gè)的粘過(guò)來(lái)一樣。最終的隊(duì)列實(shí)現(xiàn)如下所示。
清單4.5 使用條件變量的線程安全隊(duì)列(完整版)
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 1 互斥量必須是可變的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
empty()是一個(gè)const成員函數(shù),并且傳入拷貝構(gòu)造函數(shù)的other形參是一個(gè)const引用;因?yàn)槠渌€程可能有這個(gè)類(lèi)型的非const引用對(duì)象,并調(diào)用變種成員函數(shù),所以這里有必要對(duì)互斥量上鎖。如果鎖住互斥量是一個(gè)可變操作,那么這個(gè)互斥量對(duì)象就會(huì)標(biāo)記為可變的①,之后他就可以在empty()和拷貝構(gòu)造函數(shù)中上鎖了。
條件變量在多個(gè)線程等待同一個(gè)事件時(shí),也是很有用的。當(dāng)線程用來(lái)分解工作負(fù)載,并且只有一個(gè)線程可以對(duì)通知做出反應(yīng),與清單4.1中使用的結(jié)構(gòu)完全相同;運(yùn)行多個(gè)數(shù)據(jù)實(shí)例——處理線程(processing thread)。當(dāng)新的數(shù)據(jù)準(zhǔn)備完成,調(diào)用notify_one()將會(huì)觸發(fā)一個(gè)正在執(zhí)行wait()的線程,去檢查條件和wait()函數(shù)的返回狀態(tài)(因?yàn)槟銉H是向data_queue添加一個(gè)數(shù)據(jù)項(xiàng))。 這里不保證線程一定會(huì)被通知到,即使只有一個(gè)等待線程被通知時(shí),所有處線程也有可能都在處理數(shù)據(jù)。
另一種可能是,很多線程等待同一事件,對(duì)于通知他們都需要做出回應(yīng)。這會(huì)發(fā)生在共享數(shù)據(jù)正在初始化的時(shí)候,當(dāng)處理線程可以使用同一數(shù)據(jù)時(shí),就要等待數(shù)據(jù)被初始化(有不錯(cuò)的機(jī)制可用來(lái)應(yīng)對(duì);可見(jiàn)第3章,3.3.1節(jié)),或等待共享數(shù)據(jù)的更新,比如,定期重新初始化(periodic reinitialization)。在這些情況下,準(zhǔn)備線程準(zhǔn)備數(shù)據(jù)數(shù)據(jù)時(shí),就會(huì)通過(guò)條件變量調(diào)用notify_all()成員函數(shù),而非直接調(diào)用notify_one()函數(shù)。顧名思義,這就是全部線程在都去執(zhí)行wait()(檢查他們等待的條件是否滿(mǎn)足)的原因。
當(dāng)?shù)却€程只等待一次,當(dāng)條件為true時(shí),它就不會(huì)再等待條件變量了,所以一個(gè)條件變量可能并非同步機(jī)制的最好選擇。尤其是,條件在等待一組可用的數(shù)據(jù)塊時(shí)。在這樣的情況下,期望(future)就是一個(gè)適合的選擇。