std::thread::hardware_concurrency()
在新版C++標(biāo)準(zhǔn)庫(kù)中是一個(gè)很有用的函數(shù)。這個(gè)函數(shù)將返回能同時(shí)并發(fā)在一個(gè)程序中的線程數(shù)量。例如,多核系統(tǒng)中,返回值可以是CPU核芯的數(shù)量。返回值也僅僅是一個(gè)提示,當(dāng)系統(tǒng)信息無法獲取時(shí),函數(shù)也會(huì)返回0。但是,這也無法掩蓋這個(gè)函數(shù)對(duì)啟動(dòng)線程數(shù)量的幫助。
清單2.8實(shí)現(xiàn)了一個(gè)并行版的std::accumulate
。代碼中將整體工作拆分成小任務(wù)交給每個(gè)線程去做,其中設(shè)置最小任務(wù)數(shù),是為了避免產(chǎn)生太多的線程。程序可能會(huì)在操作數(shù)量為0的時(shí)候拋出異常。比如,std::thread
構(gòu)造函數(shù)無法啟動(dòng)一個(gè)執(zhí)行線程,就會(huì)拋出一個(gè)異常。在這個(gè)算法中討論異常處理,已經(jīng)超出現(xiàn)階段的討論范圍,這個(gè)問題我們將在第8章中再來討論。
清單2.8 原生并行版的std::accumulate
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) // 1
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads= // 3
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size=length/num_threads; // 4
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1); // 5
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6
threads[i]=std::thread( // 7
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; // 8
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join)); // 10
return std::accumulate(results.begin(),results.end(),init); // 11
}
函數(shù)看起來很長(zhǎng),但不復(fù)雜。如果輸入的范圍為空①,就會(huì)得到init的值。反之,如果范圍內(nèi)多于一個(gè)元素時(shí),都需要用范圍內(nèi)元素的總數(shù)量除以線程(塊)中最小任務(wù)數(shù),從而確定啟動(dòng)線程的最大數(shù)量②,這樣能避免無謂的計(jì)算資源的浪費(fèi)。比如,一臺(tái)32芯的機(jī)器上,只有5個(gè)數(shù)需要計(jì)算,卻啟動(dòng)了32個(gè)線程。
計(jì)算量的最大值和硬件支持線程數(shù)中,較小的值為啟動(dòng)線程的數(shù)量③。因?yàn)樯舷挛念l繁的切換會(huì)降低線程的性能,所以你肯定不想啟動(dòng)的線程數(shù)多于硬件支持的線程數(shù)量。當(dāng)std::thread::hardware_concurrency()
返回0,你可以選擇一個(gè)合適的數(shù)作為你的選擇;在本例中,我選擇了"2"。你也不想在一臺(tái)單核機(jī)器上啟動(dòng)太多的線程,因?yàn)檫@樣反而會(huì)降低性能,有可能最終讓你放棄使用并發(fā)。
每個(gè)線程中處理的元素?cái)?shù)量,是范圍中元素的總量除以線程的個(gè)數(shù)得出的④。對(duì)于分配是否得當(dāng),我們會(huì)在后面討論。
現(xiàn)在,確定了線程個(gè)數(shù),通過創(chuàng)建一個(gè)std::vector<T>
容器存放中間結(jié)果,并為線程創(chuàng)建一個(gè)std::vector<std::thread>
容器⑤。這里需要注意的是,啟動(dòng)的線程數(shù)必須比num_threads少1個(gè),因?yàn)樵趩?dòng)之前已經(jīng)有了一個(gè)線程(主線程)。
使用簡(jiǎn)單的循環(huán)來啟動(dòng)線程:block_end迭代器指向當(dāng)前塊的末尾⑥,并啟動(dòng)一個(gè)新線程為當(dāng)前塊累加結(jié)果⑦。當(dāng)?shù)髦赶虍?dāng)前塊的末尾時(shí),啟動(dòng)下一個(gè)塊⑧。
啟動(dòng)所有線程后,⑨中的線程會(huì)處理最終塊的結(jié)果。對(duì)于分配不均,因?yàn)橹雷罱K塊是哪一個(gè),那么這個(gè)塊中有多少個(gè)元素就無所謂了。
當(dāng)累加最終塊的結(jié)果后,可以等待std::for_each
⑩創(chuàng)建線程的完成(如同在清單2.7中做的那樣),之后使用std::accumulate
將所有結(jié)果進(jìn)行累加?。
結(jié)束這個(gè)例子之前,需要明確:T類型的加法運(yùn)算不滿足結(jié)合律(比如,對(duì)于float型或double型,在進(jìn)行加法操作時(shí),系統(tǒng)很可能會(huì)做截?cái)嗖僮?,因?yàn)閷?duì)范圍中元素的分組,會(huì)導(dǎo)致parallel_accumulate得到的結(jié)果可能與std::accumulate
得到的結(jié)果不同。同樣的,這里對(duì)迭代器的要求更加嚴(yán)格:必須都是向前迭代器,而std::accumulate
可以在只傳入迭代器的情況下工作。對(duì)于創(chuàng)建出results容器,需要保證T有默認(rèn)構(gòu)造函數(shù)。對(duì)于算法并行,通常都要這樣的修改;不過,需要根據(jù)算法本身的特性,選擇不同的并行方式。算法并行會(huì)在第8章有更加深入的討論。需要注意的:因?yàn)椴荒苤苯訌囊粋€(gè)線程中返回一個(gè)值,所以需要傳遞results容器的引用到線程中去。另一個(gè)辦法,通過地址來獲取線程執(zhí)行的結(jié)果;第4章中,我們將使用期望(futures)完成這種方案。
當(dāng)線程運(yùn)行時(shí),所有必要的信息都需要傳入到線程中去,包括存儲(chǔ)計(jì)算結(jié)果的位置。不過,并非總需如此:有時(shí)候這是識(shí)別線程的可行方案,可以傳遞一個(gè)標(biāo)識(shí)數(shù),例如清單2.7中的i。不過,當(dāng)需要標(biāo)識(shí)的函數(shù)在調(diào)用棧的深層,同時(shí)其他線程也可調(diào)用該函數(shù),那么標(biāo)識(shí)數(shù)就會(huì)變的捉襟見肘。好消息是在設(shè)計(jì)C++的線程庫(kù)時(shí),就有預(yù)見了這種情況,在之后的實(shí)現(xiàn)中就給每個(gè)線程附加了唯一標(biāo)識(shí)符。