很多公司里,雇員通常會在辦公室度過他們的辦公時光(偶爾也會外出訪問客戶或供應商),或是參加貿(mào)易展會。雖然外出可能很有必要,并且可能需要很多人一起去,不過對于一些特別的雇員來說,一趟可能就是幾個月,甚至是幾年。公司要給每個雇員都配一輛車,這基本上是不可能的,不過公司可以提供一些共用車輛;這樣就會有一定數(shù)量車,來讓所有雇員使用。當一個員工要去異地旅游時,那么他就可以從共用車輛中預定一輛,并在返回公司的時候將車交還。如果某天沒有閑置的共用車輛,雇員就得不延后其旅程了。
線程池就是類似的一種方式,在大多數(shù)系統(tǒng)中,將每個任務指定給某個線程是不切實際的,不過可以利用現(xiàn)有的并發(fā)性,進行并發(fā)執(zhí)行。線程池就提供了這樣的功能,提交到線程池中的任務將并發(fā)執(zhí)行,提交的任務將會掛在任務隊列上。隊列中的每一個任務都會被池中的工作線程所獲取,當任務執(zhí)行完成后,再回到線程池中獲取下一個任務。
創(chuàng)建一個線程池時,會遇到幾個關鍵性的設計問題,比如:可使用的線程數(shù)量,高效的任務分配方式,以及是否需要等待一個任務完成。
在本節(jié),我們將看到線程池是如何解決這些問題的,從最簡單的線程池開始吧!
作為最簡單的線程池,其擁有固定數(shù)量的工作線程(通常工作線程數(shù)量與std::thread::hardware_concurrency()
相同)。當工作需要完成時,可以調用函數(shù)將任務掛在任務隊列中。每個工作線程都會從任務隊列上獲取任務,然后執(zhí)行這個任務,執(zhí)行完成后再回來獲取新的任務。在最簡單的線程池中,線程就不需要等待其他線程完成對應任務了。如果需要等待,就需要對同步進行管理。
下面清單中的代碼就展示了一個最簡單的線程池實現(xiàn)。
清單9.1 簡單的線程池
class thread_pool
{
std::atomic_bool done;
thread_safe_queue<std::function<void()> > work_queue; // 1
std::vector<std::thread> threads; // 2
join_threads joiner; // 3
void worker_thread()
{
while(!done) // 4
{
std::function<void()> task;
if(work_queue.try_pop(task)) // 5
{
task(); // 6
}
else
{
std::this_thread::yield(); // 7
}
}
}
public:
thread_pool():
done(false),joiner(threads)
{
unsigned const thread_count=std::thread::hardware_concurrency(); // 8
try
{
for(unsigned i=0;i<thread_count;++i)
{
threads.push_back(
std::thread(&thread_pool::worker_thread,this)); // 9
}
}
catch(...)
{
done=true; // 10
throw;
}
}
~thread_pool()
{
done=true; // 11
}
template<typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std::function<void()>(f)); // 12
}
};
實現(xiàn)中有一組工作線程②,并且使用了一個線程安全隊列(見第6章)①來管理任務隊列。這種情況下,用戶不用等待任務,并且任務不需要返回任何值,所以可以使用std::function<void()>
對任務進行封裝。submit()函數(shù)會將函數(shù)或可調用對象包裝成一個std::function<void()>
實例,并將其推入隊列中?。
線程始于構造函數(shù):使用std::thread::hardware_concurrency()
來獲取硬件支持多少個并發(fā)線程⑧,這些線程會在worker_thread()成員函數(shù)中執(zhí)行⑨。
當有異常拋出時,線程啟動就會失敗,所以需要保證任何已啟動的線程都能停止,并且能在這種情況下清理干凈。當有異常拋出時,通過使用try-catch來設置done標志⑩,還有join_threads類的實例(來自于第8章)③用來匯聚所有線程。當然也需要析構函數(shù):僅設置done標志?,并且join_threads確保所有線程在線程池銷毀前全部執(zhí)行完成。注意成員聲明的順序很重要:done標志和worker_queue必須在threads數(shù)組之前聲明,而數(shù)據(jù)必須在joiner前聲明。這就能確保成員能以正確的順序銷毀;比如,所有線程都停止運行時,隊列就可以安全的銷毀了。
worker_thread函數(shù)很簡單:從任務隊列上獲取任務⑤,以及同時執(zhí)行這些任務⑥,執(zhí)行一個循環(huán)直到done標志被設置④。如果任務隊列上沒有任務,函數(shù)會調用std::this_thread::yield()
讓線程休息⑦,并且給予其他線程向任務隊列上推送任務的機會。
一些簡單的情況,這樣線程池就足以滿足要求,特別是任務沒有返回值,或需要執(zhí)行一些阻塞操作的時候。不過,在很多情況下,這樣簡單的線程池完全不夠用,其他情況使用這樣簡單的線程池可能會出現(xiàn)問題,比如:死鎖。同樣,在簡單例子中,使用std::async
能提供更好的功能(如第8章中的例子)。
在本章中,我們將了解一下更加復雜的線程池實現(xiàn),通過添加特性滿足用戶需求,或減少問題的發(fā)生幾率。
首先,從已經(jīng)提交的任務開始說起。
第8章中的例子中,線程間的任務劃分完成后,代碼會顯式生成新線程,主線程通常就是等待新線程在返回調用之前結束,確保所有任務都完成。使用線程池,就需要等待任務提交到線程池中,而非直接提交給單個線程。這與基于std::async
的方法(第8章等待future的例子)類似,使用清單9.1中的簡單線程池,使用第4章中提到的工具:條件變量和future。雖然,會增加代碼的復雜度,不過,要比直接對任務進行等待的方式好很多。
通過增加線程池的復雜度,可以直接等待任務完成。使用submit()函數(shù)返回一個對任務描述的句柄,用來等待任務的完成。任務句柄會用條件變量或future進行包裝,這樣能使用線程池來簡化代碼。
一種特殊的情況是,執(zhí)行任務的線程需要返回一個結果到主線程上進行處理。你已經(jīng)在本書中看到多個這樣的例子,比如:parallel_accumulate()(第2章)。這種情況下,需要用future對最終的結果進行轉移。清單9.2展示了對簡單線程池的修改,通過修改就能等待任務完成,以及在工作線程完成后,返回一個結果到等待線程中去,不過std::packaged_task<>
實例是不可拷貝的,僅是可移動的,所以不能再使用std::function<>
來實現(xiàn)任務隊列,因為std::function<>
需要存儲可復制構造的函數(shù)對象。包裝一個自定義函數(shù),用來處理只可移動的類型。這就是一個帶有函數(shù)操作符的類型擦除類。只需要處理那些沒有函數(shù)和無返回的函數(shù),所以這是一個簡單的虛函數(shù)調用。
清單9.2 可等待任務的線程池
class function_wrapper
{
struct impl_base {
virtual void call()=0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template<typename F>
struct impl_type: impl_base
{
F f;
impl_type(F&& f_): f(std::move(f_)) {}
void call() { f(); }
};
public:
template<typename F>
function_wrapper(F&& f):
impl(new impl_type<F>(std::move(f)))
{}
void operator()() { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other):
impl(std::move(other.impl))
{}
function_wrapper& operator=(function_wrapper&& other)
{
impl=std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&)=delete;
function_wrapper(function_wrapper&)=delete;
function_wrapper& operator=(const function_wrapper&)=delete;
};
class thread_pool
{
thread_safe_queue<function_wrapper> work_queue; // 使用function_wrapper,而非使用std::function
void worker_thread()
{
while(!done)
{
function_wrapper task;
if(work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
}
public:
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> // 1
submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type
result_type; // 2
std::packaged_task<result_type()> task(std::move(f)); // 3
std::future<result_type> res(task.get_future()); // 4
work_queue.push(std::move(task)); // 5
return res; // 6
}
// 休息一下
};
首先,修改的是submit()函數(shù)①返回一個std::future<>
保存任務的返回值,并且允許調用者等待任務完全結束。因為需要知道提供函數(shù)f的返回類型,所以使用std::result_of<>
:std::result_of<FunctionType()>::type
是FunctionType類型的引用實例(如,f),并且沒有參數(shù)。同樣,函數(shù)中可以對result_type typedef②使用std::result_of<>
。
然后,將f包裝入std::packaged_task<result_type()>
③,因為f是一個無參數(shù)的函數(shù)或是可調用對象,能夠返回result_type類型的實例。向任務隊列推送任務⑤和返回future⑥前,就可以從std::packaged_task<>
中獲取future④。注意,要將任務推送到任務隊列中時,只能使用std::move()
,因為std::packaged_task<>
是不可拷貝的。為了對任務進行處理,隊列里面存的就是function_wrapper對象,而非std::function<void()>
對象。
現(xiàn)在線程池允許等待任務,并且返回任務后的結果。下面的清單就展示了,如何讓parallel_accumuate函數(shù)使用線程池。
清單9.3 parallel_accumulate使用一個可等待任務的線程池
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length)
return init;
unsigned long const block_size=25;
unsigned long const num_blocks=(length+block_size-1)/block_size; // 1
std::vector<std::future<T> > futures(num_blocks-1);
thread_pool pool;
Iterator block_start=first;
for(unsigned long i=0;i<(num_blocks-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
futures[i]=pool.submit(accumulate_block<Iterator,T>()); // 2
block_start=block_end;
}
T last_result=accumulate_block<Iterator,T>()(block_start,last);
T result=init;
for(unsigned long i=0;i<(num_blocks-1);++i)
{
result+=futures[i].get();
}
result += last_result;
return result;
}
與清單8.4相比,有幾個點需要注意一下。首先,工作量是依據(jù)使用的塊數(shù)(num_blocks①),而不是線程的數(shù)量。為了利用線程池的最大化可擴展性,需要將工作塊劃分為最小工作塊。當線程池中線程不多時,每個線程將會處理多個工作塊,不過隨著硬件可用線程數(shù)量的增長,會有越來越多的工作塊并發(fā)執(zhí)行。
當你選擇“因為能并發(fā)執(zhí)行,最小工作塊值的一試”時,就需要謹慎了。向線程池提交一個任務有一定的開銷;讓工作線程執(zhí)行這個任務,并且將返回值保存在std::future<>
中,對于太小的任務,這樣的開銷不劃算。如果任務塊太小,使用線程池的速度可能都不及單線程。
假設,任務塊的大小合理,就不用為這些事而擔心:打包任務、獲取future或存儲之后要匯入的std::thread
對象;使用線程池的時候,這些都需要注意。之后,就是調用submit()來提交任務②。
線程池也需要注意異常安全。任何異常都會通過submit()返回給future,并在獲取future的結果時,拋出異常。如果函數(shù)因為異常退出,線程池的析構函數(shù)會丟掉那些沒有完成的任務,等待線程池中的工作線程完成工作。
在簡單的例子中,這個線程池工作的還算不錯,因為這里的任務都是相互獨立的。不過,當任務隊列中的任務有依賴關系時,這個線程池就不能勝任了。
快速排序算法為例,原理很簡單:數(shù)據(jù)與中軸數(shù)據(jù)項比較,在中軸項兩側分為大于和小于的兩個序列,然后再對這兩組序列進行排序。這兩組序列會遞歸排序,最后會整合成一個全排序序列。要將這個算法寫成并發(fā)模式,需要保證遞歸調用能夠使用硬件的并發(fā)能力。
回到第4章,第一次接觸這個例子,我們使用std::async
來執(zhí)行每一層的調用,讓標準庫來選擇,是在新線程上執(zhí)行這個任務,還是當對應get()調用時,進行同步執(zhí)行。運行起來很不錯,因為每一個任務都在其自己的線程上執(zhí)行,或當需要的時候進行調用。
當回顧第8章時,使用了一個固定線程數(shù)量(根據(jù)硬件可用并發(fā)線程數(shù))的結構體。在這樣的情況下,使用了棧來掛起要排序的數(shù)據(jù)塊。當每個線程在為一個數(shù)據(jù)塊排序前,會向數(shù)據(jù)棧上添加一組要排序的數(shù)據(jù),然后對當前數(shù)據(jù)塊排序結束后,接著對另一塊進行排序。這里,等待其他線程完成排序,可能會造成死鎖,因為這會消耗有限的線程。有一種情況很可能會出現(xiàn),就是所有線程都在等某一個數(shù)據(jù)塊被排序,不過沒有線程在做排序。通過拉取棧上數(shù)據(jù)塊的線程,對數(shù)據(jù)塊進行排序,來解決這個問題;因為,已處理的指定數(shù)據(jù)塊,就是其他線程都在等待排序的數(shù)據(jù)塊。
如果只用簡單的線程池進行替換,例如:第4章替換std::async
的線程池。只有固定數(shù)量的線程,因為線程池中沒有空閑的線程,線程會等待沒有被安排的任務。因此,需要和第8章中類似的解決方案:當?shù)却硞€數(shù)據(jù)塊完成時,去處理未完成的數(shù)據(jù)塊。如果使用線程池來管理任務列表和相關線程——使用線程池的主要原因——就不用再去訪問任務列表了??梢詫€程池做一些改動,自動完成這些事情。
最簡單的方法就是在thread_pool中添加一個新函數(shù),來執(zhí)行任務隊列上的任務,并對線程池進行管理。高級線程池的實現(xiàn)可能會在等待函數(shù)中添加邏輯,或等待其他函數(shù)來處理這個任務,優(yōu)先的任務會讓其他的任務進行等待。下面清單中的實現(xiàn),就展示了一個新run_pending_task()函數(shù),對于快速排序的修改將會在清單9.5中展示。
清單9.4 run_pending_task()函數(shù)實現(xiàn)
void thread_pool::run_pending_task()
{
function_wrapper task;
if(work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
run_pending_task()的實現(xiàn)去掉了在worker_thread()函數(shù)的主循環(huán)。函數(shù)任務隊列中有任務的時候,執(zhí)行任務;要是沒有的話,就會讓操作系統(tǒng)對線程進行重新分配。
下面快速排序算法的實現(xiàn)要比清單8.1中版本簡單許多,因為所有線程管理邏輯都被移入到線程池。
清單9.5 基于線程池的快速排序實現(xiàn)
template<typename T>
struct sorter // 1
{
thread_pool pool; // 2
std::list<T> do_sort(std::list<T>& chunk_data)
{
if(chunk_data.empty())
{
return chunk_data;
}
std::list<T> result;
result.splice(result.begin(),chunk_data,chunk_data.begin());
T const& partition_val=*result.begin();
typename std::list<T>::iterator divide_point=
std::partition(chunk_data.begin(),chunk_data.end(),
[&](T const& val){return val<partition_val;});
std::list<T> new_lower_chunk;
new_lower_chunk.splice(new_lower_chunk.end(),
chunk_data,chunk_data.begin(),
divide_point);
std::future<std::list<T> > new_lower= // 3
pool.submit(std::bind(&sorter::do_sort,this,
std::move(new_lower_chunk)));
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(),new_higher);
while(!new_lower.wait_for(std::chrono::seconds(0)) ==
std::future_status::timeout)
{
pool.run_pending_task(); // 4
}
result.splice(result.begin(),new_lower.get());
return result;
}
};
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input);
}
與清單8.1相比,這里將實際工作放在sorter類模板的do_sort()成員函數(shù)中執(zhí)行①,即使例子中僅對thread_pool實例進行包裝②。
線程和任務管理,在線程等待的時候,就會少向線程池中提交一個任務③,并且執(zhí)行任務隊列上未完成的任務④。需要顯式的管理線程和棧上要排序的數(shù)據(jù)塊。當有任務提交到線程池中,可以使用std::bind()
綁定this指針到do_sort()上,綁定是為了讓數(shù)據(jù)塊進行排序。這種情況下,需要對new_lower_chunk使用std::move()
將其傳入函數(shù),數(shù)據(jù)移動要比拷貝的方式開銷少。
雖然,使用等待其他任務的方式,解決了死鎖問題,這個線程池距離理想的線程池很遠。
首先,每次對submit()的調用和對run_pending_task()的調用,訪問的都是同一個隊列。在第8章中,當多線程去修改一組數(shù)據(jù),就會對性能有所影響,所以需要解決這個問題。
線程每次調用線程池的submit()函數(shù),都會推送一個任務到工作隊列中。就像工作線程為了執(zhí)行任務,從任務隊列中獲取任務一樣。這意味著隨著處理器的增加,在任務隊列上就會有很多的競爭,這會讓性能下降。使用無鎖隊列會讓任務沒有明顯的等待,但是乒乓緩存會消耗大量的時間。
為了避免乒乓緩存,每個線程建立獨立的任務隊列。這樣,每個線程就會將新任務放在自己的任務隊列上,并且當線程上的任務隊列沒有任務時,去全局的任務列表中取任務。下面列表中的實現(xiàn),使用了一個thread_local變量,來保證每個線程都擁有自己的任務列表(如全局列表那樣)。
清單9.6 線程池——線程具有本地任務隊列
class thread_pool
{
thread_safe_queue<function_wrapper> pool_work_queue;
typedef std::queue<function_wrapper> local_queue_type; // 1
static thread_local std::unique_ptr<local_queue_type>
local_work_queue; // 2
void worker_thread()
{
local_work_queue.reset(new local_queue_type); // 3
while(!done)
{
run_pending_task();
}
}
public:
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type>
submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
if(local_work_queue) // 4
{
local_work_queue->push(std::move(task));
}
else
{
pool_work_queue.push(std::move(task)); // 5
}
return res;
}
void run_pending_task()
{
function_wrapper task;
if(local_work_queue && !local_work_queue->empty()) // 6
{
task=std::move(local_work_queue->front());
local_work_queue->pop();
task();
}
else if(pool_work_queue.try_pop(task)) // 7
{
task();
}
else
{
std::this_thread::yield();
}
}
// rest as before
};
因為不希望非線程池中的線程也擁有一個任務隊列,使用std::unique_ptr<>
指向線程本地的工作隊列②;這個指針在worker_thread()中進行初始化③。std:unique_ptr<>
的析構函數(shù)會保證在線程退出的時候,工作隊列被銷毀。
submit()會檢查當前線程是否具有一個工作隊列④。如果有,就是線程池中的線程,可以將任務放入線程的本地隊列中;否者,就像之前一樣將這個任務放在線程池中的全局隊列中⑤。
run_pending_task()⑥中的檢查和之前類似,只是要對是否存在本地任務隊列進行檢查。如果存在,就會從隊列中的第一個任務開始處理;注意本地任務隊列可以是一個普通的std::queue<>
①,因為這個隊列只能被一個線程所訪問,就不存在競爭。如果本地線程上沒有任務,就會從全局工作列表上獲取任務⑦。
這樣就能有效避免競爭,不過當任務分配不均時,造成的結果就是:某個線程本地隊列中有很多任務的同時,其他線程無所事事。例如:舉一個快速排序的例子,只有一開始的數(shù)據(jù)塊能在線程池上被處理,因為剩余部分會放在工作線程的本地隊列上進行處理,這樣的使用方式也違背使用線程池的初衷。
幸好,這個問題是有解:本地工作隊列和全局工作隊列上沒有任務時,可從別的線程隊列中竊取任務。
為了讓沒有任務的線程能從其他線程的任務隊列中獲取任務,就需要本地任務列表可以進行訪問,這樣才能讓run_pending_tasks()竊取任務。需要每個線程在線程池隊列上進行注冊,或由線程池指定一個線程。同樣,還需要保證數(shù)據(jù)隊列中的任務適當?shù)谋煌胶捅Wo,這樣隊列的不變量就不會被破壞。
實現(xiàn)一個無鎖隊列,讓其擁有線程在其他線程竊取任務的時候,能夠推送和彈出一個任務是可能的;不過,這個隊列的實現(xiàn)就超出了本書的討論范圍。為了證明這種方法的可行性,將使用一個互斥量來保護隊列中的數(shù)據(jù)。我們希望任務竊取是一個不常見的現(xiàn)象,這樣就會減少對互斥量的競爭,并且使得簡單隊列的開銷最小。下面,實現(xiàn)了一個簡單的基于鎖的任務竊取隊列。
清單9.7 基于鎖的任務竊取隊列
class work_stealing_queue
{
private:
typedef function_wrapper data_type;
std::deque<data_type> the_queue; // 1
mutable std::mutex the_mutex;
public:
work_stealing_queue()
{}
work_stealing_queue(const work_stealing_queue& other)=delete;
work_stealing_queue& operator=(
const work_stealing_queue& other)=delete;
void push(data_type data) // 2
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty() const
{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.empty();
}
bool try_pop(data_type& res) // 3
{
std::lock_guard<std::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}
res=std::move(the_queue.front());
the_queue.pop_front();
return true;
}
bool try_steal(data_type& res) // 4
{
std::lock_guard<std::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}
res=std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
這個隊列對std::deque<fuction_wrapper>
進行了簡單的包裝①,就能通過一個互斥鎖來對所有訪問進行控制了。push()②和try_pop()③對隊列的前端進行操作,try_steal()④對隊列的后端進行操作。
這就說明每個線程中的“隊列”是一個后進先出的棧,最新推入的任務將會第一個執(zhí)行。從緩存角度來看,這將對性能有所提升,因為任務相關的數(shù)據(jù)一直存于緩存中,要比提前將任務相關數(shù)據(jù)推送到棧上好。同樣,這種方式很好的映射到某個算法上,例如:快速排序。之前的實現(xiàn)中,每次調用do_sort()都會推送一個任務到棧上,并且等待這個任務執(zhí)行完畢。通過對最新推入任務的處理,就可以保證在將當前所需數(shù)據(jù)塊處理完成前,其他任務是否需要這些數(shù)據(jù)塊,從而可以減少活動任務的數(shù)量和棧的使用次數(shù)。try_steal()從隊列末尾獲取任務,為了減少與try_pop()之間的競爭;使用在第6、7章中的所討論的技術來讓try_pop()和try_steal()并發(fā)執(zhí)行。
OK,現(xiàn)在擁有了一個很不錯的任務隊列,并且支持竊??;那這個隊列將如何在線程池中使用呢?這里簡單的展示一下。
清單9.8 使用任務竊取的線程池
class thread_pool
{
typedef function_wrapper task_type;
std::atomic_bool done;
thread_safe_queue<task_type> pool_work_queue;
std::vector<std::unique_ptr<work_stealing_queue> > queues; // 1
std::vector<std::thread> threads;
join_threads joiner;
static thread_local work_stealing_queue* local_work_queue; // 2
static thread_local unsigned my_index;
void worker_thread(unsigned my_index_)
{
my_index=my_index_;
local_work_queue=queues[my_index].get(); // 3
while(!done)
{
run_pending_task();
}
}
bool pop_task_from_local_queue(task_type& task)
{
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type& task)
{
return pool_work_queue.try_pop(task);
}
bool pop_task_from_other_thread_queue(task_type& task) // 4
{
for(unsigned i=0;i<queues.size();++i)
{
unsigned const index=(my_index+i+1)%queues.size(); // 5
if(queues[index]->try_steal(task))
{
return true;
}
}
return false;
}
public:
thread_pool():
done(false),joiner(threads)
{
unsigned const thread_count=std::thread::hardware_concurrency();
try
{
for(unsigned i=0;i<thread_count;++i)
{
queues.push_back(std::unique_ptr<work_stealing_queue>( // 6
new work_stealing_queue));
threads.push_back(
std::thread(&thread_pool::worker_thread,this,i));
}
}
catch(...)
{
done=true;
throw;
}
}
~thread_pool()
{
done=true;
}
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(
FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
if(local_work_queue)
{
local_work_queue->push(std::move(task));
}
else
{
pool_work_queue.push(std::move(task));
}
return res;
}
void run_pending_task()
{
task_type task;
if(pop_task_from_local_queue(task) || // 7
pop_task_from_pool_queue(task) || // 8
pop_task_from_other_thread_queue(task)) // 9
{
task();
}
else
{
std::this_thread::yield();
}
}
};
這段代碼與清單9.6很相似。第一個不同在于,每個線程都有一個work_stealing_queue,而非只是普通的std::queue<>
②。當每個線程被創(chuàng)建,就創(chuàng)建了一個屬于自己的工作隊列⑥,每個線程自己的工作隊列將存儲在線程池的全局工作隊列中①。列表中隊列的序號,會傳遞給線程函數(shù),然后使用序號來索引對應隊列③。這就意味著線程池可以訪問任意線程中的隊列,為了給閑置線程竊取任務。run_pending_task()將會從線程的任務隊列中取出一個任務來執(zhí)行⑦,或從線程池隊列中獲取一個任務⑧,亦或從其他線程的隊列中獲取一個任務⑨。
pop_task_from_other_thread_queue()④會遍歷池中所有線程的任務隊列,然后嘗試竊取任務。為了避免每個線程都嘗試從列表中的第一個線程上竊取任務,每一個線程都會從下一個線程開始遍歷,通過自身的線程序號來確定開始遍歷的線程序號。
使用線程池有很多好處,還有很多很多的方式能為某些特殊用法提升性能,不過這就留給讀者作為作業(yè)吧。
特別是還沒有探究動態(tài)變換大小的線程池,即使線程被阻塞的時候(例如:I/O或互斥鎖),程序都能保證CPU最優(yōu)的使用率。
下面,我們將來了解一下線程管理的“高級”用法——中斷線程。