很多情況下,使用信號(hào)來終止一個(gè)長時(shí)間運(yùn)行的線程是合理的。這種線程的存在,可能是因?yàn)楣ぷ骶€程所在的線程池被銷毀,或是用戶顯式的取消了這個(gè)任務(wù),亦或其他各種原因。不管是什么原因,原理都一樣:需要使用信號(hào)來讓未結(jié)束線程停止運(yùn)行。這里需要一種合適的方式讓線程主動(dòng)的停下來,而非讓線程戛然而止。
你可能會(huì)給每種情況制定一個(gè)獨(dú)立的機(jī)制,這樣做的意義不大。不僅因?yàn)橛媒y(tǒng)一的機(jī)制會(huì)更容易在之后的場(chǎng)景中實(shí)現(xiàn),而且寫出來的中斷代碼不用擔(dān)心在哪里使用。C++11標(biāo)準(zhǔn)沒有提供這樣的機(jī)制,不過實(shí)現(xiàn)這樣的機(jī)制也并不困難。
在了解一下應(yīng)該如何實(shí)現(xiàn)這種機(jī)制前,先來了解一下啟動(dòng)和中斷線程的接口。
先看一下外部接口,需要從可中斷線程上獲取些什么?最起碼需要和std::thread
相同的接口,還要多加一個(gè)interrupt()函數(shù):
class interruptible_thread
{
public:
template<typename FunctionType>
interruptible_thread(FunctionType f);
void join();
void detach();
bool joinable() const;
void interrupt();
};
類內(nèi)部可以使用std::thread
來管理線程,并且使用一些自定義數(shù)據(jù)結(jié)構(gòu)來處理中斷?,F(xiàn)在,從線程的角度能看到什么呢?“能用這個(gè)類來中斷線程”——需要一個(gè)斷點(diǎn)(interruption point)。在不添加多余的數(shù)據(jù)的前提下,為了使斷點(diǎn)能夠正常使用,就需要使用一個(gè)沒有參數(shù)的函數(shù):interruption_point()。這意味著中斷數(shù)據(jù)結(jié)構(gòu)可以訪問thread_local變量,并在線程運(yùn)行時(shí),對(duì)變量進(jìn)行設(shè)置,因此當(dāng)線程調(diào)用interruption_point()函數(shù)時(shí),就會(huì)去檢查當(dāng)前運(yùn)行線程的數(shù)據(jù)結(jié)構(gòu)。我們將在后面看到interruption_point()的具體實(shí)現(xiàn)。
thread_local標(biāo)志是不能使用普通的std::thread
管理線程的主要原因;需要使用一種方法分配出一個(gè)可訪問的interruptible_thread實(shí)例,就像新啟動(dòng)一個(gè)線程一樣。在使用已提供函數(shù)來做這件事情前,需要將interruptible_thread實(shí)例傳遞給std::thread
的構(gòu)造函數(shù),創(chuàng)建一個(gè)能夠執(zhí)行的線程,就像下面的代碼清單所實(shí)現(xiàn)。
清單9.9 interruptible_thread的基本實(shí)現(xiàn)
class interrupt_flag
{
public:
void set();
bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag; // 1
class interruptible_thread
{
std::thread internal_thread;
interrupt_flag* flag;
public:
template<typename FunctionType>
interruptible_thread(FunctionType f)
{
std::promise<interrupt_flag*> p; // 2
internal_thread=std::thread([f,&p]{ // 3
p.set_value(&this_thread_interrupt_flag);
f(); // 4
});
flag=p.get_future().get(); // 5
}
void interrupt()
{
if(flag)
{
flag->set(); // 6
}
}
};
提供函數(shù)f是包裝了一個(gè)lambda函數(shù)③,線程將會(huì)持有f副本和本地promise變量(p)的引用②。在新線程中,lambda函數(shù)設(shè)置promise變量的值到this_thread_interrupt_flag(在thread_local①中聲明)的地址中,為的是讓線程能夠調(diào)用提供函數(shù)的副本④。調(diào)用線程會(huì)等待與其future相關(guān)的promise就緒,并且將結(jié)果存入到flag成員變量中⑤。注意,即使lambda函數(shù)在新線程上執(zhí)行,對(duì)本地變量p進(jìn)行懸空引用,都沒有問題,因?yàn)樵谛戮€程返回之前,interruptible_thread構(gòu)造函數(shù)會(huì)等待變量p,直到變量p不被引用。實(shí)現(xiàn)沒有考慮處理匯入線程,或分離線程。所以,需要flag變量在線程退出或分離前已經(jīng)聲明,這樣就能避免懸空問題。
interrupt()函數(shù)相對(duì)簡(jiǎn)單:需要一個(gè)線程去做中斷時(shí),需要一個(gè)合法指針作為一個(gè)中斷標(biāo)志,所以可以僅對(duì)標(biāo)志進(jìn)行設(shè)置⑥。
現(xiàn)在就可以設(shè)置中斷標(biāo)志了,不過不檢查線程是否被中斷,這樣的意義就不大了。使用interruption_point()函數(shù)最簡(jiǎn)單的情況;可以在一個(gè)安全的地方調(diào)用這個(gè)函數(shù),如果標(biāo)志已經(jīng)設(shè)置,就可以拋出一個(gè)thread_interrupted異常:
void interruption_point()
{
if(this_thread_interrupt_flag.is_set())
{
throw thread_interrupted();
}
}
代碼中可以在適當(dāng)?shù)牡胤绞褂眠@個(gè)函數(shù):
void foo()
{
while(!done)
{
interruption_point();
process_next_item();
}
}
雖然也能工作,但不理想。最好實(shí)在線程等待或阻塞的時(shí)候中斷線程,因?yàn)檫@時(shí)的線程不能運(yùn)行,也就不能調(diào)用interruption_point()函數(shù)!在線程等待的時(shí)候,什么方式才能去中斷線程呢?
OK,需要仔細(xì)選擇中斷的位置,并通過顯式調(diào)用interruption_point()進(jìn)行中斷,不過在線程阻塞等待的時(shí)候,這種辦法就顯得蒼白無力了,例如:等待條件變量的通知。就需要一個(gè)新函數(shù)——interruptible_wait()——就可以運(yùn)行各種需要等待的任務(wù),并且可以知道如何中斷等待。之前提到,可能會(huì)等待一個(gè)條件變量,所以就從它開始:如何做才能中斷一個(gè)等待的條件變量呢?最簡(jiǎn)單的方式是,當(dāng)設(shè)置中斷標(biāo)志時(shí),需要提醒條件變量,并在等待后立即設(shè)置斷點(diǎn)。為了讓其工作,需要提醒所有等待對(duì)應(yīng)條件變量的線程,就能確保感謝興趣的線程能夠蘇醒。偽蘇醒是無論如何都要處理的,所以其他線程(非感興趣線程)將會(huì)被當(dāng)作偽蘇醒處理——兩者之間沒什么區(qū)別。interrupt_flag結(jié)構(gòu)需要存儲(chǔ)一個(gè)指針指向一個(gè)條件變量,所以用set()函數(shù)對(duì)其進(jìn)行提醒。為條件變量實(shí)現(xiàn)的interruptible_wait()可能會(huì)看起來像下面清單中所示。
清單9.10 為std::condition_variable
實(shí)現(xiàn)的interruptible_wait有問題版
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv); // 1
cv.wait(lk); // 2
this_thread_interrupt_flag.clear_condition_variable(); // 3
interruption_point();
}
假設(shè)函數(shù)能夠設(shè)置和清除相關(guān)條件變量上的中斷標(biāo)志,代碼會(huì)檢查中斷,通過interrupt_flag為當(dāng)前線程關(guān)聯(lián)條件變量①,等待條件變量②,清理相關(guān)條件變量③,并且再次檢查中斷。如果線程在等待期間被條件變量所中斷,中斷線程將廣播條件變量,并喚醒等待該條件變量的線程,所以這里就可以檢查中斷。不幸的是,代碼有兩個(gè)問題。第一個(gè)問題比較明顯,如果想要線程安全:std::condition_variable::wait()
可以拋出異常,所以這里會(huì)直接退出,而沒有通過條件變量刪除相關(guān)的中斷標(biāo)志。這個(gè)問題很容易修復(fù),就是在析構(gòu)函數(shù)中添加相關(guān)刪除操作即可。
第二個(gè)問題就不大明顯了,這段代碼存在條件競(jìng)爭(zhēng)。雖然,線程可以通過調(diào)用interruption_point()被中斷,不過在調(diào)用wait()后,條件變量和相關(guān)中斷標(biāo)志就沒有什么系了,因?yàn)榫€程不是等待狀態(tài),所以不能通過條件變量的方式喚醒。就需要確保線程不會(huì)在最后一次中斷檢查和調(diào)用wait()間被喚醒。這里,不對(duì)std::condition_variable
的內(nèi)部結(jié)構(gòu)進(jìn)行研究;不過,可通過一種方法來解決這個(gè)問題:使用lk上的互斥量對(duì)線程進(jìn)行保護(hù),這就需要將lk傳遞到set_condition_variable()函數(shù)中去。不幸的是,這將產(chǎn)生兩個(gè)新問題:需要傳遞一個(gè)互斥量的引用到一個(gè)不知道生命周期的線程中去(這個(gè)線程做中斷操作)為該線程上鎖(調(diào)用interrupt()的時(shí)候)。這里可能會(huì)死鎖,并且可能訪問到一個(gè)已經(jīng)銷毀的互斥量,所以這種方法不可取。當(dāng)不能完全確定能中斷條件變量等待——沒有interruptible_wait()情況下也可以時(shí)(可能有些嚴(yán)格),那有沒有其他選擇呢?一個(gè)選擇就是放置超時(shí)等待,使用wait_for()并帶有一個(gè)簡(jiǎn)單的超時(shí)量(比如,1ms)。在線程被中斷前,算是給了線程一個(gè)等待的上限(以時(shí)鐘刻度為基準(zhǔn))。如果這樣做了,等待線程將會(huì)看到更多因?yàn)槌瑫r(shí)而“偽”蘇醒的線程,不過超時(shí)也不輕易的就幫助到我們。與interrupt_flag相關(guān)的實(shí)現(xiàn)的一個(gè)實(shí)現(xiàn)放在下面的清單中展示。
清單9.11 為std::condition_variable
在interruptible_wait中使用超時(shí)
class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable* thread_cond;
std::mutex set_clear_mutex;
public:
interrupt_flag():
thread_cond(0)
{}
void set()
{
flag.store(true,std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if(thread_cond)
{
thread_cond->notify_all();
}
}
bool is_set() const
{
return flag.load(std::memory_order_relaxed);
}
void set_condition_variable(std::condition_variable& cv)
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond=&cv;
}
void clear_condition_variable()
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond=0;
}
struct clear_cv_on_destruct
{
~clear_cv_on_destruct()
{
this_thread_interrupt_flag.clear_condition_variable();
}
};
};
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
interruption_point();
cv.wait_for(lk,std::chrono::milliseconds(1));
interruption_point();
}
如果有謂詞(相關(guān)函數(shù))進(jìn)行等待,1ms的超時(shí)將會(huì)完全在謂詞循環(huán)中完全隱藏:
template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk,
Predicate pred)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
while(!this_thread_interrupt_flag.is_set() && !pred())
{
cv.wait_for(lk,std::chrono::milliseconds(1));
}
interruption_point();
}
這會(huì)讓謂詞被檢查的次數(shù)增加許多,不過對(duì)于簡(jiǎn)單調(diào)用wait()這套實(shí)現(xiàn)還是很好用的。超時(shí)變量很容易實(shí)現(xiàn):通過制定時(shí)間,比如:1ms或更短。OK,對(duì)于std::condition_variable
的等待,就需要小心應(yīng)對(duì)了;std::condition_variable_any
呢?還是能做的更好嗎?
std::condition_variable_any
中斷等待std::condition_variable_any
與std::condition_variable
的不同在于,std::condition_variable_any
可以使用任意類型的鎖,而不僅有std::unique_lock<std::mutex>
??梢宰屖虑樽銎饋砀雍?jiǎn)單,并且std::condition_variable_any
可以比std::condition_variable
做的更好。因?yàn)槟芘c任意類型的鎖一起工作,就可以設(shè)計(jì)自己的鎖,上鎖/解鎖interrupt_flag的內(nèi)部互斥量set_clear_mutex,并且鎖也支持等待調(diào)用,就像下面的代碼。
清單9.12 為std::condition_variable_any
設(shè)計(jì)的interruptible_wait
class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable* thread_cond;
std::condition_variable_any* thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag():
thread_cond(0),thread_cond_any(0)
{}
void set()
{
flag.store(true,std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if(thread_cond)
{
thread_cond->notify_all();
}
else if(thread_cond_any)
{
thread_cond_any->notify_all();
}
}
template<typename Lockable>
void wait(std::condition_variable_any& cv,Lockable& lk)
{
struct custom_lock
{
interrupt_flag* self;
Lockable& lk;
custom_lock(interrupt_flag* self_,
std::condition_variable_any& cond,
Lockable& lk_):
self(self_),lk(lk_)
{
self->set_clear_mutex.lock(); // 1
self->thread_cond_any=&cond; // 2
}
void unlock() // 3
{
lk.unlock();
self->set_clear_mutex.unlock();
}
void lock()
{
std::lock(self->set_clear_mutex,lk); // 4
}
~custom_lock()
{
self->thread_cond_any=0; // 5
self->set_clear_mutex.unlock();
}
};
custom_lock cl(this,cv,lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
// rest as before
};
template<typename Lockable>
void interruptible_wait(std::condition_variable_any& cv,
Lockable& lk)
{
this_thread_interrupt_flag.wait(cv,lk);
}
自定義的鎖類型在構(gòu)造的時(shí)候,需要所鎖住內(nèi)部set_clear_mutex①,對(duì)thread_cond_any指針進(jìn)行設(shè)置,并引用std::condition_variable_any
傳入鎖的構(gòu)造函數(shù)中②。Lockable引用將會(huì)在之后進(jìn)行存儲(chǔ),其變量必須被鎖住。現(xiàn)在可以安心的檢查中斷,不用擔(dān)心競(jìng)爭(zhēng)了。如果這時(shí)中斷標(biāo)志已經(jīng)設(shè)置,那么標(biāo)志一定是在鎖住set_clear_mutex時(shí)設(shè)置的。當(dāng)條件變量調(diào)用自定義鎖的unlock()函數(shù)中的wait()時(shí),就會(huì)對(duì)Lockable對(duì)象和set_clear_mutex進(jìn)行解鎖③。這就允許線程可以嘗試中斷其他線程獲取set_clear_mutex鎖;以及在內(nèi)部wait()調(diào)用之后,檢查thread_cond_any指針。這就是在替換std::condition_variable
后,所擁有的功能(不包括管理)。當(dāng)wait()結(jié)束等待(因?yàn)榈却?,或因?yàn)閭翁K醒),因?yàn)榫€程將會(huì)調(diào)用lock()函數(shù),這里依舊要求鎖住內(nèi)部set_clear_mutex,并且鎖住Lockable對(duì)象④。現(xiàn)在,在wait()調(diào)用時(shí),custom_lock的析構(gòu)函數(shù)中⑤清理thread_cond_any指針(同樣會(huì)解鎖set_clear_mutex)之前,可以再次對(duì)中斷進(jìn)行檢查。
這次輪到中斷條件變量的等待了,不過其他阻塞情況,比如:互斥鎖,等待future等等,該怎么辦呢?通常情況下,可以使用std::condition_variable
的超時(shí)選項(xiàng),因?yàn)樵趯?shí)際運(yùn)行中不可能很快的將條件變量的等待終止(不訪問內(nèi)部互斥量或future的話)。不過,在某些情況下,你知道知道你在等待什么,這樣就可以讓循環(huán)在interruptible_wait()函數(shù)中運(yùn)行。作為一個(gè)例子,這里為std::future<>
重載了interruptible_wait()的實(shí)現(xiàn):
template<typename T>
void interruptible_wait(std::future<T>& uf)
{
while(!this_thread_interrupt_flag.is_set())
{
if(uf.wait_for(lk,std::chrono::milliseconds(1)==
std::future_status::ready)
break;
}
interruption_point();
}
等待會(huì)在中斷標(biāo)志設(shè)置好的時(shí)候,或future準(zhǔn)備就緒的時(shí)候停止,不過實(shí)現(xiàn)中每次等待future的時(shí)間只有1ms。這就意味著,中斷請(qǐng)求被確定前,平均等待的時(shí)間為0.5ms(這里假設(shè)存在一個(gè)高精度的時(shí)鐘)。通常wait_for至少會(huì)等待一個(gè)時(shí)鐘周期,所以如果時(shí)鐘周期為15ms,那么結(jié)束等待的時(shí)間將會(huì)是15ms,而不是1ms。接受與不接受這種情況,都得視情況而定。如果這必要,且時(shí)鐘支持的話,可以持續(xù)削減超時(shí)時(shí)間。這種方式將會(huì)讓線程蘇醒很多次,來檢查標(biāo)志,并且增加線程切換的開銷。
OK,我們已經(jīng)了解如何使用interruption_point()和interruptible_wait()函數(shù)檢查中斷。
當(dāng)中斷被檢查出來了,要如何處理它呢?
從中斷線程的角度看,中斷就是thread_interrupted異常,因此能像處理其他異常那樣進(jìn)行處理。
特別是使用標(biāo)準(zhǔn)catch塊對(duì)其進(jìn)行捕獲:
try
{
do_something();
}
catch(thread_interrupted&)
{
handle_interruption();
}
捕獲中斷,進(jìn)行處理。其他線程再次調(diào)用interrupt()時(shí),線程將會(huì)再次被中斷,這就被稱為斷點(diǎn)(interruption point)。如果線程執(zhí)行的是一系列獨(dú)立的任務(wù),就會(huì)需要斷點(diǎn);中斷一個(gè)任務(wù),就意味著這個(gè)任務(wù)被丟棄,并且該線程就會(huì)執(zhí)行任務(wù)列表中的其他任務(wù)。
因?yàn)閠hread_interrupted是一個(gè)異常,在能夠被中斷的代碼中,之前線程安全的注意事項(xiàng)都是適用的,就是為了確保資源不會(huì)泄露,并在數(shù)據(jù)結(jié)構(gòu)中留下對(duì)應(yīng)的退出狀態(tài)。通常,讓線程中斷是可行的,所以只需要讓異常傳播即可。不過,當(dāng)異常傳入std::thread
的析構(gòu)函數(shù)時(shí),std::terminate()
將會(huì)調(diào)用,并且整個(gè)程序?qū)?huì)終止。為了避免這種情況,需要在每個(gè)將interruptible_thread變量作為參數(shù)傳入的函數(shù)中放置catch(thread_interrupted)處理塊,可以將catch塊包裝進(jìn)interrupt_flag的初始化過程中。因?yàn)楫惓?huì)終止獨(dú)立進(jìn)程,就能保證未處理的中斷是異常安全的。interruptible_thread構(gòu)造函數(shù)中對(duì)線程的初始化,實(shí)現(xiàn)如下:
internal_thread=std::thread([f,&p]{
p.set_value(&this_thread_interrupt_flag);
try
{
f();
}
catch(thread_interrupted const&)
{}
});
下面,我們來看個(gè)更加復(fù)雜的例子。
試想,在桌面上查找一個(gè)應(yīng)用。這就需要與用戶互動(dòng),應(yīng)用的狀態(tài)需要能在顯示器上顯示,就能看出應(yīng)用有什么改變。為了避免影響GUI的響應(yīng)時(shí)間,通常會(huì)將處理線程放在后臺(tái)運(yùn)行。后臺(tái)進(jìn)程需要一直執(zhí)行,直到應(yīng)用退出;后臺(tái)線程會(huì)作為應(yīng)用啟動(dòng)的一部分被啟動(dòng),并且在應(yīng)用終止的時(shí)候停止運(yùn)行。通常這樣的應(yīng)用只有在機(jī)器關(guān)閉時(shí),才會(huì)退出,因?yàn)閼?yīng)用需要更新應(yīng)用最新的狀態(tài),就需要全時(shí)間運(yùn)行。在某些情況下,當(dāng)應(yīng)用被關(guān)閉,需要使用有序的方式將后臺(tái)線程關(guān)閉,其中一種方式就是中斷。
下面清單中為一個(gè)系統(tǒng)實(shí)現(xiàn)了簡(jiǎn)單的線程管理部分。
清單9.13 在后臺(tái)監(jiān)視文件系統(tǒng)
std::mutex config_mutex;
std::vector<interruptible_thread> background_threads;
void background_thread(int disk_id)
{
while(true)
{
interruption_point(); // 1
fs_change fsc=get_fs_changes(disk_id); // 2
if(fsc.has_changes())
{
update_index(fsc); // 3
}
}
}
void start_background_processing()
{
background_threads.push_back(
interruptible_thread(background_thread,disk_1));
background_threads.push_back(
interruptible_thread(background_thread,disk_2));
}
int main()
{
start_background_processing(); // 4
process_gui_until_exit(); // 5
std::unique_lock<std::mutex> lk(config_mutex);
for(unsigned i=0;i<background_threads.size();++i)
{
background_threads[i].interrupt(); // 6
}
for(unsigned i=0;i<background_threads.size();++i)
{
background_threads[i].join(); // 7
}
}
啟動(dòng)時(shí),后臺(tái)線程就已經(jīng)啟動(dòng)④。之后,對(duì)應(yīng)線程將會(huì)處理GUI⑤。當(dāng)用戶要求進(jìn)程退出時(shí),后臺(tái)進(jìn)程將會(huì)被中斷⑥,并且主線程會(huì)等待每一個(gè)后臺(tái)線程結(jié)束后才退出⑦。后臺(tái)線程運(yùn)行在一個(gè)循環(huán)中,并時(shí)刻檢查磁盤的變化②,對(duì)其序號(hào)進(jìn)行更新③。調(diào)用interruption_point()函數(shù),可以在循環(huán)中對(duì)中斷進(jìn)行檢查。
為什么中斷線程前,對(duì)線程進(jìn)行等待?為什么不中斷每個(gè)線程,讓它們執(zhí)行下一個(gè)任務(wù)?答案就是“并發(fā)”。線程被中斷后,不會(huì)馬上結(jié)束,因?yàn)樾枰獙?duì)下一個(gè)斷點(diǎn)進(jìn)行處理,并且在退出前執(zhí)行析構(gòu)函數(shù)和代碼異常處理部分。因?yàn)樾枰獏R聚每個(gè)線程,所以就會(huì)讓中斷線程等待,即使線程還在做著有用的工作——中斷其他線程。只有當(dāng)沒有工作時(shí)(所有線程都被中斷),不需要等待。這就允許中斷線程并行的處理自己的中斷,并更快的完成中斷。
中斷機(jī)制很容易擴(kuò)展到更深層次的中斷調(diào)用,或在特定的代碼塊中禁用中斷,這就當(dāng)做留給讀者的作業(yè)吧。