鍍金池/ 教程/ C/ 8.4 設(shè)計并發(fā)代碼的注意事項
3.4 本章總結(jié)
6.3 基于鎖設(shè)計更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)
6.1 為并發(fā)設(shè)計的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動推導(dǎo)變量類型
2.1 線程管理的基礎(chǔ)
8.5 在實踐中設(shè)計并發(fā)代碼
2.4 運行時決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權(quán)
8.3 為多線程性能設(shè)計數(shù)據(jù)結(jié)構(gòu)
6.4 本章總結(jié)
7.3 對于設(shè)計無鎖數(shù)據(jù)結(jié)構(gòu)的指導(dǎo)建議
關(guān)于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測試和調(diào)試
5.4 本章總結(jié)
第9章 高級線程管理
5.1 內(nèi)存模型基礎(chǔ)
2.5 識別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯誤的技術(shù)
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強制排序
A.8 線程本地變量
第8章 并發(fā)代碼設(shè)計
3.3 保護(hù)共享數(shù)據(jù)的替代設(shè)施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個事件或其他條件
A.3 默認(rèn)函數(shù)
附錄A 對<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術(shù)
4.2 使用期望等待一次性事件
8.4 設(shè)計并發(fā)代碼的注意事項
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關(guān)的錯誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護(hù)共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達(dá)式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

8.4 設(shè)計并發(fā)代碼的注意事項

目前為止,在本章中我們已經(jīng)看到了很多線程間劃分工作的方法,影響性能的因素,以及這些因素是如何影響你選擇數(shù)據(jù)訪問模式和數(shù)據(jù)結(jié)構(gòu)的。雖然,已經(jīng)有了很多設(shè)計并發(fā)代碼的內(nèi)容。你還需要考慮很多事情,比如異常安全和可擴展性。隨著系統(tǒng)中核數(shù)的增加,性能越來越高(無論是在減少執(zhí)行時間,還是增加吞吐率),這樣的代碼稱為“可擴展”代碼。理想狀態(tài)下,性能隨著核數(shù)的增加線性增長,也就是當(dāng)系統(tǒng)有100個處理器時,其性能是系統(tǒng)只有1核時的100倍。

雖然,非擴展性代碼依舊可以正常工作——單線程應(yīng)用就無法擴展——例如,異常安全是一個正確性問題。如果你的代碼不是異常安全的,最終會破壞不變量,或是造成條件競爭,亦或是你的應(yīng)用意外終止,因為某個操作會拋出異常。有了這個想法,我們就率先來看一下異常安全的問題。

8.4.1 并行算法中的異常安全

異常安全是衡量C++代碼一個很重要的指標(biāo),并發(fā)代碼也不例外。實際上,相較于串行算法,并行算法常會格外要求注意異常問題。當(dāng)一個操作在串行算法中拋出一個異常,算法只需要考慮對其本身進(jìn)行處理,以避免資源泄露和損壞不變量;這里可以允許異常傳遞給調(diào)用者,由調(diào)用者對異常進(jìn)行處理。通過對比,在并行算法中很多操作要運行在獨立的線程上。在這種情況下,異常就不再允許被傳播,因為這將會使調(diào)用堆棧出現(xiàn)問題。如果一個函數(shù)在創(chuàng)建一個新線程后帶著異常退出,那么這個應(yīng)用將會終止。

作為一個具體的例子,讓我們回顧一下清單2.8中的parallel_accumulate函數(shù):

清單8.2 std::accumulate的原始并行版本(源于清單2.8)

template<typename Iterator,typename T>
struct accumulate_block
{
  void operator()(Iterator first,Iterator last,T& result)
  {
    result=std::accumulate(first,last,result);  // 1
  }
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);  // 2

  if(!length)
    return init;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
    (length+min_per_thread-1)/min_per_thread;

  unsigned long const hardware_threads=
    std::thread::hardware_concurrency();

  unsigned long const num_threads=
    std::min(hardware_threads!=0?hardware_threads:2,max_threads);

  unsigned long const block_size=length/num_threads;

  std::vector<T> results(num_threads);  // 3
  std::vector<std::thread> threads(num_threads-1);  // 4

  Iterator block_start=first;  // 5
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    Iterator block_end=block_start;  // 6
    std::advance(block_end,block_size);
    threads[i]=std::thread(  // 7
      accumulate_block<Iterator,T>(),
      block_start,block_end,std::ref(results[i]));
    block_start=block_end;  // 8
  }
  accumulate_block()(block_start,last,results[num_threads-1]);  // 9

  std::for_each(threads.begin(),threads.end(),
    std::mem_fn(&std::thread::join));

  return std::accumulate(results.begin(),results.end(),init);  // 10
}

現(xiàn)在讓我們來看一下異常要在哪拋出:基本上就是在調(diào)用函數(shù)的地方拋出異常,或在用戶定義類型上執(zhí)行某個操作時可能拋出異常。

首先,需要調(diào)用distance②,其會對用戶定義的迭代器類型進(jìn)行操作。因為,這時還沒有做任何事情,所以對于調(diào)用線程來說,所有事情都沒問題。接下來,就需要分配results③和threads④。再后,調(diào)用線程依舊沒有做任何事情,或產(chǎn)生新的線程,所以到這里也是沒有問題的。當(dāng)然,如果在構(gòu)造threads拋出異常,那么對已經(jīng)分配的results將會被清理,析構(gòu)函數(shù)會幫你打理好一切。

跳過block_start⑤的初始化(因為也是安全的),來到了產(chǎn)生新線程的循環(huán)⑥⑦⑧。當(dāng)在⑦處創(chuàng)建了第一個線程,如果再拋出異常,就會出問題的;對于新的std::thread對象將會銷毀,程序?qū)⒄{(diào)用std::terminate來中斷程序的運行。使用std::terminate的地方,可不是什么好地方。

accumulate_block⑨的調(diào)用就可能拋出異常,就會產(chǎn)生和上面類似的結(jié)果;線程對象將會被銷毀,并且調(diào)用std::terminate。另一方面,最終調(diào)用std::accumulate⑩可能會拋出異常,不過處理起來沒什么難度,因為所有的線程在這里已經(jīng)匯聚回主線程了。

上面只是對于主線程來說的,不過還有很多地方會拋出異常:對于調(diào)用accumulate_block的新線程來說就會拋出異常①。沒有任何catch塊,所以這個異常不會被處理,并且當(dāng)異常發(fā)生的時候會調(diào)用std::terminater()來終止應(yīng)用的運行。

也許這里的異常問題并不明顯,不過這段代碼是非異常安全的。

添加異常安全

好吧,我們已經(jīng)確定所有拋出異常的地方了,并且知道異常所帶來的惡性后果。能為其做些什么呢?就讓我們來解決一下在新線程上的異常問題。

在第4章時已經(jīng)使用過工具來做這件事。如果你仔細(xì)的了解過新線程用來完成什么樣的工作,要返回一個計算的結(jié)果的同時,允許代碼產(chǎn)生異常。這可以將std::packaged_taskstd::future相結(jié)合,來解決這個問題。如果使用std::packaged_task重新構(gòu)造代碼,代碼可能會是如下模樣。

清單8.3 使用std::packaged_task的并行std::accumulate

template<typename Iterator,typename T>
struct accumulate_block
{
  T operator()(Iterator first,Iterator last)  // 1
  {
    return std::accumulate(first,last,T());  // 2
  }
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);

  if(!length)
    return init;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
    (length+min_per_thread-1)/min_per_thread;

  unsigned long const hardware_threads=
    std::thread::hardware_concurrency();

  unsigned long const num_threads=
    std::min(hardware_threads!=0?hardware_threads:2,max_threads);

  unsigned long const block_size=length/num_threads;

  std::vector<std::future<T> > futures(num_threads-1);  // 3
  std::vector<std::thread> threads(num_threads-1);

  Iterator block_start=first;
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);
    std::packaged_task<T(Iterator,Iterator)> task(  // 4
      accumulate_block<Iterator,T>());
    futures[i]=task.get_future();  // 5
    threads[i]=std::thread(std::move(task),block_start,block_end);  // 6
    block_start=block_end;
  }
  T last_result=accumulate_block()(block_start,last);  // 7

  std::for_each(threads.begin(),threads.end(),
    std::mem_fn(&std::thread::join));

  T result=init;  // 8
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    result+=futures[i].get();  // 9
  }
  result += last_result;  // 10
  return result;
}

第一個修改就是調(diào)用accumulate_block的操作現(xiàn)在就是直接將結(jié)果返回,而非使用引用將結(jié)果存儲在某個地方①。使用std::packaged_taskstd::future是線程安全的,所以你可以使用它們來對結(jié)果進(jìn)行轉(zhuǎn)移。當(dāng)調(diào)用std::accumulate②時,需要你顯示傳入T的默認(rèn)構(gòu)造函數(shù),而非復(fù)用result的值,不過這只是一個小改動。

下一個改動就是,不用向量來存儲結(jié)果,而使用futures向量為每個新生線程存儲std::future<T>③。在新線程生成循環(huán)中,首先要為accumulate_block創(chuàng)建一個任務(wù)④。std::packaged_task<T(Iterator,Iterator)>聲明,需要操作的兩個Iterators和一個想要獲取的T。然后,從任務(wù)中獲取future⑤,再將需要處理的數(shù)據(jù)塊的開始和結(jié)束信息傳入⑥,讓新線程去執(zhí)行這個任務(wù)。當(dāng)任務(wù)執(zhí)行時,future將會獲取對應(yīng)的結(jié)果,以及任何拋出的異常。

使用future,就不能獲得到一組結(jié)果數(shù)組,所以需要將最終數(shù)據(jù)塊的結(jié)果賦給一個變量進(jìn)行保存⑦,而非對一個數(shù)組進(jìn)行填槽。同樣,因為需要從future中獲取結(jié)果,使用簡單的for循環(huán),就要比使用std::accumulate好的多;循環(huán)從提供的初始值開始⑧,并且將每個future上的值進(jìn)行累加⑨。如果相關(guān)任務(wù)拋出一個異常,那么異常就會被future捕捉到,并且使用get()的時候獲取數(shù)據(jù)時,這個異常會再次拋出。最后,在返回結(jié)果給調(diào)用者之前,將最后一個數(shù)據(jù)塊上的結(jié)果添加入結(jié)果中⑩。

這樣,一個問題就已經(jīng)解決:在工作線程上拋出的異常,可以在主線程上拋出。如果不止一個工作線程拋出異常,那么只有一個能在主線程中拋出,不過這不會有產(chǎn)生太大的問題。如果這個問題很重要,你可以使用類似std::nested_exception來對所有拋出的異常進(jìn)行捕捉。

剩下的問題就是,當(dāng)生成第一個新線程和當(dāng)所有線程都匯入主線程時,拋出異常;這樣會讓線程產(chǎn)生泄露。最簡單的方法就是捕獲所有拋出的線程,匯入的線程依舊是joinable()的,并且會再次拋出異常:

try
{
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    // ... as before
  }
  T last_result=accumulate_block()(block_start,last);

  std::for_each(threads.begin(),threads.end(),
  std::mem_fn(&std::thread::join));
}
catch(...)
{
  for(unsigned long i=0;i<(num_thread-1);++i)
  {
  if(threads[i].joinable())
    thread[i].join();
  }
  throw;
}

現(xiàn)在好了,無論線程如何離開這段代碼,所有線程都可以被匯入。不過,try-catch很不美觀,并且這里有重復(fù)代碼??梢詫ⅰ罢!笨刂屏魃系木€程在catch塊上執(zhí)行的線程進(jìn)行匯入。重復(fù)代碼是沒有必要的,因為這就意味著更多的地方需要改變。不過,現(xiàn)在讓我們來提取一個對象的析構(gòu)函數(shù);畢竟,析構(gòu)函數(shù)是C++中處理資源的慣用方式??匆幌履愕念悾?/p>

class join_threads
{
  std::vector<std::thread>& threads;
public:
  explicit join_threads(std::vector<std::thread>& threads_):
    threads(threads_)
  {}
  ~join_threads()
  {
    for(unsigned long i=0;i<threads.size();++i)
    {
      if(threads[i].joinable())
        threads[i].join();
    }
  }
};

這個類和在清單2.3中看到的thread_guard類很相似,除了使用向量的方式來擴展線程量。用這個類簡化后的代碼如下所示:

清單8.4 異常安全版std::accumulate

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);

  if(!length)
    return init;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
    (length+min_per_thread-1)/min_per_thread;

  unsigned long const hardware_threads=
    std::thread::hardware_concurrency();

  unsigned long const num_threads=
    std::min(hardware_threads!=0?hardware_threads:2,max_threads);

  unsigned long const block_size=length/num_threads;

  std::vector<std::future<T> > futures(num_threads-1);
  std::vector<std::thread> threads(num_threads-1);
  join_threads joiner(threads);  // 1

  Iterator block_start=first;
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);
    std::packaged_task<T(Iterator,Iterator)> task(
      accumulate_block<Iterator,T>());
    futures[i]=task.get_future();
    threads[i]=std::thread(std::move(task),block_start,block_end);
    block_start=block_end;
  }
  T last_result=accumulate_block()(block_start,last);
  T result=init;
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    result+=futures[i].get();  // 2
  }
  result += last_result;
  return result;
}

當(dāng)創(chuàng)建了線程容器,就對新類型創(chuàng)建了一個實例①,可讓退出線程進(jìn)行匯入。然后,可以再顯式的匯入循環(huán)中將線程刪除,在原理上來說是安全的:因為線程,無論怎么樣退出,都需要匯入主線程。注意這里對futures[i].get()②的調(diào)用,將會阻塞線程,直到結(jié)果準(zhǔn)備就緒,所以這里不需要顯式的將線程進(jìn)行匯入。和清單8.2中的原始代碼不同:原始代碼中,你需要將線程匯入,以確保results向量被正確填充。不僅需要異常安全的代碼,還需要較短的函數(shù)實現(xiàn),因為這里已經(jīng)將匯入部分的代碼放到新(可復(fù)用)類型中去了。

std::async()的異常安全

現(xiàn)在,你已經(jīng)了解了,當(dāng)需要顯式管理線程的時候,需要代碼是異常安全的。那現(xiàn)在讓我們來看一下使用std::async()是怎么樣完成異常安全的。在本例中,標(biāo)準(zhǔn)庫對線程進(jìn)行了較好的管理,并且當(dāng)“期望”處以就緒狀態(tài)的時候,就能生成一個新的線程。對于異常安全,還需要注意一件事,如果在沒有等待的情況下對“期望”實例進(jìn)行銷毀,析構(gòu)函數(shù)會等待對應(yīng)線程執(zhí)行完畢后才執(zhí)行。這就能橋面的必過線程泄露的問題,因為線程還在執(zhí)行,且持有數(shù)據(jù)的引用。下面的代碼將展示使用std::async()完成異常安全的實現(xiàn)。

清單8.5 異常安全并行版std::accumulate——使用std::async()

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);  // 1
  unsigned long const max_chunk_size=25;
  if(length<=max_chunk_size)
  {
    return std::accumulate(first,last,init);  // 2
  }
  else
  {
    Iterator mid_point=first;
    std::advance(mid_point,length/2);  // 3
    std::future<T> first_half_result=
      std::async(parallel_accumulate<Iterator,T>,  // 4
        first,mid_point,init);
    T second_half_result=parallel_accumulate(mid_point,last,T());  // 5
    return first_half_result.get()+second_half_result;  // 6
  }
}

這個版本對數(shù)據(jù)進(jìn)行遞歸劃分,而非在預(yù)計算后對數(shù)據(jù)進(jìn)行分塊;因此,這個版本要比之前的版本簡單很多,并且這個版本也是異常安全的。和之前一樣,一開始要確定序列的長度①,如果其長度小于數(shù)據(jù)塊包含數(shù)據(jù)的最大數(shù)量,那么可以直接調(diào)用std::accumulate②。如果元素的數(shù)量超出了數(shù)據(jù)塊包含數(shù)據(jù)的最大數(shù)量,那么就需要找到數(shù)量中點③,將這個數(shù)據(jù)塊分成兩部分,然后再生成一個異步任務(wù)對另一半數(shù)據(jù)進(jìn)行處理④。第二半的數(shù)據(jù)是通過直接的遞歸調(diào)用來處理的⑤,之后將兩個塊的結(jié)果加和到一起⑥。標(biāo)準(zhǔn)庫能保證std::async的調(diào)用能夠充分的利用硬件線程,并且不會產(chǎn)生線程的超額認(rèn)購,一些“異步”調(diào)用是在調(diào)用get()⑥后同步執(zhí)行的。

優(yōu)雅的地方,不僅在于利用硬件并發(fā)的優(yōu)勢,并且還能保證異常安全。如果有異常在遞歸調(diào)用⑤中拋出,通過調(diào)用std::async④所產(chǎn)生的“期望”,將會在異常傳播時被銷毀。這就需要依次等待異步任務(wù)的完成,因此也能避免懸空線程的出現(xiàn)。另外,當(dāng)異步任務(wù)拋出異常,且被future所捕獲,在對get()⑥調(diào)用的時候,future中存儲的異常,會再次拋出。

除此之外,在設(shè)計并發(fā)代碼的時候還要考慮哪些其他因素?讓我們來看一下擴展性 (scalability)。隨著系統(tǒng)中核數(shù)的增加,應(yīng)用性能如何提升?

8.4.2 可擴展性和Amdahl定律

擴展性代表了應(yīng)用利用系統(tǒng)中處理器執(zhí)行任務(wù)的能力。一種極端就是將應(yīng)用寫死為單線程運行,這種應(yīng)用就是完全不可擴展的;即使添加了100個處理器到你的系統(tǒng)中,應(yīng)用的性能都不會有任何改變。另一種就是像SETI@Home[3]項目一樣,讓應(yīng)用使用系統(tǒng)中成千上萬的處理器(以個人電腦的形式加入網(wǎng)絡(luò)的用戶)成為可能。

對于任意的多線程程序,在程序運行的時候,運行的工作線程數(shù)量會有所不同。應(yīng)用初始階段只有一個線程,之后會在這個線程上衍生出新的線程。理想狀態(tài):每個線程都做著有用的工作,不過這種情況幾乎是不可能發(fā)生的。線程通常會花時間進(jìn)行互相等待,或等待I/O操作的完成。

一種簡化的方式就是就是將程序劃分成“串行”部分和“并行”部分。串行部分:只能由單線程執(zhí)行一些工作的地方。并行部分:可以讓所有可用的處理器一起工作的部分。當(dāng)在多處理系統(tǒng)上運行你的應(yīng)用時,“并行”部分理論上會完成的相當(dāng)快,因為其工作被劃分為多份,放在不同的處理器上執(zhí)行?!按小辈糠謩t不同,還是只能一個處理器執(zhí)行所有工作。這樣(簡化)假設(shè)下,就可以對隨著處理數(shù)量的增加,估計一下性能的增益:當(dāng)程序“串行”部分的時間用fs來表示,那么性能增益(P)就可以通過處理器數(shù)量(N)進(jìn)行估計:

http://wiki.jikexueyuan.com/project/cplusplus-concurrency-action/images/chapter8/amdahl_law.png" alt="" />

這就是Amdahl定律,在討論并發(fā)程序性能的時候都會引用到的公式。如果每行代碼都能并行化,串行部分就為0,那么性能增益就為N?;蛘撸?dāng)串行部分為1/3時,當(dāng)處理器數(shù)量無限增長,你都無法獲得超過3的性能增益。

Amdahl定律明確了,對代碼最大化并發(fā)可以保證所有處理器都能用來做有用的工作。如果將“串行”部分的減小,或者減少線程的等待,就可以在多處理器的系統(tǒng)中獲取更多的性能收益。或者,當(dāng)能提供更多的數(shù)據(jù)讓系統(tǒng)進(jìn)行處理,并且讓并行部分做最重要的工作,就可以減少“串行”部分,以獲取更高的性能增益。

擴展性:當(dāng)有更多的處理器加入時,減少一個動作的執(zhí)行時間,或在給定時間內(nèi)做更多工作。有時這兩個指標(biāo)是等價的(如果處理器的速度相當(dāng)快,那么就可以處理更多的數(shù)據(jù)),有時不是。選擇線程間的工作劃分的技術(shù)前,辨別哪些方面是能否擴展的就十分的重要。

本節(jié)開始已經(jīng)提到,線程并非任何時候都做的是有用的工作。有時,它們會等待其他線程,或者等待I/O完成,亦或是等待其他的事情。如果線程在等待的時候,系統(tǒng)中還有必要的任務(wù)需要完成時,就可以將等待“隱藏”起來。

8.4.3 使用多線程隱藏延遲

之前討論了很多有關(guān)多線程性能的話題?,F(xiàn)在假設(shè),線程在一個處理器上運行時不會偷懶,并且做的工作都很有用。當(dāng)然,這只是假設(shè);在實際應(yīng)用中,線程會經(jīng)常因為等待某些事情而阻塞。

不論等待的理由是什么,如果有和系統(tǒng)中物理單元相同數(shù)量的線程,那么線程阻塞就意味著在等待CPU時間片。處理器將會在阻塞的時間內(nèi)運行另一個線程,而不是什么事情都不做。因此,當(dāng)知道一些線程需要像這樣耗費相當(dāng)一段時間進(jìn)行等待時,可以利用CPU的空閑時間去運行一個或多個線程。

試想一個病毒掃描程序,使用流水線對線程間的工作進(jìn)行劃分。第一個線程對文件系統(tǒng)中的文件進(jìn)行檢查,并將它們放入一個隊列中。同時,另一個線程從隊列中獲取文件名,加載文件,之后對它們進(jìn)行病毒掃描。線程對文件系統(tǒng)中的文件進(jìn)行掃描就會受到I/O操作的限制,所以可以通過執(zhí)行額外的掃描線程,充分利用CPU的“空閑”時間。這時還需要一個文件搜索線程,以及足夠多的掃描線程。當(dāng)掃描線程為了掃描文件,還要從磁盤上讀取到重要部分的文件時,就能體會到多掃描線程的意義所在了。不過,在某些時候線程也過于多,系統(tǒng)將會因為越來越多的任務(wù)切換而降低效率,就像8.2.5節(jié)描述的那樣。

同之前一樣,這也是一種優(yōu)化,對修改(線程數(shù)量)前后性能的測量很重要;優(yōu)化的線程數(shù)量高度依賴要完成工作的先天屬性,以及等待時間所占的百分比。

應(yīng)用可能不用額外的線程,而使用CPU的空閑時間。例如,如果一個線程因為I/O操作被阻塞,這個線程可能會使用異步I/O(如果可以用的話),當(dāng)I/O操作在后臺執(zhí)行完成后,線程就可以做其他有用的工作了。在其他情況下,當(dāng)一個線程等待其他線程去執(zhí)行一個操作時,比起阻塞,不如讓阻塞線程自己來完成這個操作,就像在第7章中看到的無鎖隊列那樣。在一個極端的例子中,當(dāng)一個線程等待一個任務(wù)完成,并且這個任務(wù)還沒有被其他任何線程所執(zhí)行時,等待線程就可以執(zhí)行這個任務(wù),或執(zhí)行另一個不完整的任務(wù)。在清單8.1中看到這樣的例子,排序函數(shù)持續(xù)的嘗試對數(shù)據(jù)進(jìn)行排序,即使那些數(shù)據(jù)已經(jīng)不需要排序了。

比起添加線程數(shù)量讓其對處理器進(jìn)行充分利用,有時也要在增加線程的同時,確保外部事件被及時的處理,以提高系統(tǒng)的響應(yīng)能力。

8.4.4 使用并發(fā)提高響應(yīng)能力

很多流行的圖形化用戶接口框架都是事件驅(qū)動型(event driven);對圖形化接口進(jìn)行操作是通過按下按鍵或移動鼠標(biāo)進(jìn)行,將產(chǎn)生一系列需要應(yīng)用處理的事件或信息。系統(tǒng)也可能產(chǎn)生信息或事件。為了確定所有事件和信息都能被正確的處理,應(yīng)用通常會有一個事件循環(huán),就像下面的代碼:

while(true)
{
  event_data event=get_event();
  if(event.type==quit)
    break;
  process(event);
}

顯然,API中的細(xì)節(jié)可能不同,不過結(jié)構(gòu)通常是一樣的:等待一個事件,對其做必要的處理,之后等待下一個事件。如果是一個單線程應(yīng)用,那么就會讓長期任務(wù)很難書寫,如同在8.1.3節(jié)中所描述。為了確保用戶輸入被及時的處理,無論應(yīng)時在做些什么,get_event()和process()必須以合理的頻率調(diào)用。這就意味著任務(wù)要被周期性的懸掛,并且返回到事件循環(huán)中,或get_event()/process()必須在一個合適地方進(jìn)行調(diào)用。每個選項的復(fù)雜程度取決于任務(wù)的實現(xiàn)方式。

通過使用并發(fā)分離關(guān)注,可以將一個很長的任務(wù)交給一個全新的線程,并且留下一個專用的GUI線程來處理這些事件。線程可以通過簡單的機制進(jìn)行通訊,而不是將事件處理代碼和任務(wù)代碼混在一起。下面的例子就是展示了這樣的分離。

清單8.6 將GUI線程和任務(wù)線程進(jìn)行分離

std::thread task_thread;
std::atomic<bool> task_cancelled(false);

void gui_thread()
{
  while(true)
  {
    event_data event=get_event();
    if(event.type==quit)
      break;
    process(event);
  }
}

void task()
{
  while(!task_complete() && !task_cancelled)
  {
    do_next_operation();
  }
  if(task_cancelled)
  {
    perform_cleanup();
  }
  else
  {
    post_gui_event(task_complete);
  }
}

void process(event_data const& event)
{
  switch(event.type)
  {
  case start_task:
    task_cancelled=false;
    task_thread=std::thread(task);
    break;
  case stop_task:
    task_cancelled=true;
    task_thread.join();
    break;
  case task_complete:
    task_thread.join();
    display_results();
    break;
  default:
    //...
  }
}

通過這種方式對關(guān)注進(jìn)行分離,用戶線程將總能及時的對事件進(jìn)行響應(yīng),及時完成任務(wù)需要花費很長事件。使用應(yīng)用的時候,響應(yīng)事件通常也是影響用戶體驗的重要一點;無論是特定操作被不恰當(dāng)?shù)膱?zhí)行(無論是什么操作),應(yīng)用都會被完全鎖住。通過使用專門的事件處理線程,GUI就能處理GUI指定的信息了(比如對于調(diào)整窗口的大小或顏色),而不需要中斷處理器,進(jìn)行耗時的處理;同時,還能向長期任務(wù)傳遞相關(guān)的信息。

現(xiàn)在,你可以將本章中在設(shè)計并發(fā)代碼時要考慮的所有問題進(jìn)行一下回顧。作為一個整體,它們都很具有代表性,不過當(dāng)你熟練的使用“多線程編程”時,考慮其中的很多問題將變成你習(xí)慣。如果你是初學(xué)者,我希望這些例子能讓你明白,這些問題是如何影響多線程代碼的。


[3] http://setiathome.ssl.berkeley.edu/