鍍金池/ 教程/ C/ 8.5 在實踐中設(shè)計并發(fā)代碼
3.4 本章總結(jié)
6.3 基于鎖設(shè)計更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)
6.1 為并發(fā)設(shè)計的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動推導(dǎo)變量類型
2.1 線程管理的基礎(chǔ)
8.5 在實踐中設(shè)計并發(fā)代碼
2.4 運行時決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權(quán)
8.3 為多線程性能設(shè)計數(shù)據(jù)結(jié)構(gòu)
6.4 本章總結(jié)
7.3 對于設(shè)計無鎖數(shù)據(jù)結(jié)構(gòu)的指導(dǎo)建議
關(guān)于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測試和調(diào)試
5.4 本章總結(jié)
第9章 高級線程管理
5.1 內(nèi)存模型基礎(chǔ)
2.5 識別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯誤的技術(shù)
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強制排序
A.8 線程本地變量
第8章 并發(fā)代碼設(shè)計
3.3 保護共享數(shù)據(jù)的替代設(shè)施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個事件或其他條件
A.3 默認函數(shù)
附錄A 對<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術(shù)
4.2 使用期望等待一次性事件
8.4 設(shè)計并發(fā)代碼的注意事項
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關(guān)的錯誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

8.5 在實踐中設(shè)計并發(fā)代碼

當為一個特殊的任務(wù)設(shè)計并發(fā)代碼時,需要根據(jù)任務(wù)本身來考慮之前所提到的問題。為了展示以上的注意事項是如何應(yīng)用的,我們將看一下在C++標準庫中三個標準函數(shù)的并行實現(xiàn)。當你遇到問題時,這里的例子可以作為很好的參照。在有較大的并發(fā)任務(wù)進行輔助下,我們也將實現(xiàn)一些函數(shù)。

我主要演示這些實現(xiàn)使用的技術(shù),不過可能這些技術(shù)并不是最先進的;更多優(yōu)秀的實現(xiàn)可以更好的利用硬件并發(fā),不過這些實現(xiàn)可能需要到與并行算法相關(guān)的學術(shù)文獻,或者是多線程的專家?guī)熘?比如:Inter的TBB[4])才能看到。

并行版的std::for_each可以看作為能最直觀體現(xiàn)并行概念,就讓我們從并行版的std::for_each開始吧!

8.5.1 并行實現(xiàn):std::for_each

std::for_each的原理很簡單:其對某個范圍中的元素,依次調(diào)用用戶提供的函數(shù)。并行和串行調(diào)用的最大區(qū)別就是函數(shù)的調(diào)用順序。std::for_each是對范圍中的第一個元素調(diào)用用戶函數(shù),接著是第二個,以此類推,而在并行實現(xiàn)中對于每個元素的處理順序就不能保證了,并且它們可能(我們希望如此)被并發(fā)的處理。

為了實現(xiàn)這個函數(shù)的并行版本,需要對每個線程上處理的元素進行劃分。你事先知道元素數(shù)量,所以可以處理前對數(shù)據(jù)進行劃分(詳見8.1.1節(jié))。假設(shè)只有并行任務(wù)運行,就可以使用std::thread::hardware_concurrency()來決定線程的數(shù)量。同樣,這些元素都能被獨立的處理,所以可以使用連續(xù)的數(shù)據(jù)塊來避免偽共享(詳見8.2.3節(jié))。

這里的算法有點類似于并行版的std::accumulate(詳見8.4.1節(jié)),不過比起計算每一個元素的加和,這里對每個元素僅僅使用了一個指定功能的函數(shù)。因為不需要返回結(jié)果,可以假設(shè)這可能會對簡化代碼,不過想要將異常傳遞給調(diào)用者,就需要使用std::packaged_taskstd::future機制對線程中的異常進行轉(zhuǎn)移。這里展示一個樣本實現(xiàn)。

清單8.7 并行版std::for_each

template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
  unsigned long const length=std::distance(first,last);

  if(!length)
    return;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
    (length+min_per_thread-1)/min_per_thread;

  unsigned long const hardware_threads=
    std::thread::hardware_concurrency();

  unsigned long const num_threads=
    std::min(hardware_threads!=0?hardware_threads:2,max_threads);

  unsigned long const block_size=length/num_threads;

  std::vector<std::future<void> > futures(num_threads-1);  // 1
  std::vector<std::thread> threads(num_threads-1);
  join_threads joiner(threads);

  Iterator block_start=first;
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);
    std::packaged_task<void(void)> task(  // 2
      [=]()
      {
        std::for_each(block_start,block_end,f);
      });
    futures[i]=task.get_future();
    threads[i]=std::thread(std::move(task));  // 3
    block_start=block_end;
  }
  std::for_each(block_start,last,f);
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    futures[i].get();  // 4
  }
}

代碼結(jié)構(gòu)與清單8.4的差不多。最重要的不同在于futures向量對std::future<void>類型①變量進行存儲,因為工作線程不會返回值,并且簡單的lambda函數(shù)會對block_start到block_end上的任務(wù)②執(zhí)行f函數(shù)。這是為了避免傳入線程的構(gòu)造函數(shù)③。當工作線程不需要返回一個值時,調(diào)用futures[i].get()④只是提供檢索工作線程異常的方法;如果不想把異常傳遞出去,就可以省略這一步。

實現(xiàn)并行std::accumulate的時候,使用std::async會簡化代碼;同樣,parallel_for_each也可以使用std::async。實現(xiàn)如下所示。

清單8.8 使用std::async實現(xiàn)std::for_each

template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
  unsigned long const length=std::distance(first,last);

  if(!length)
    return;

  unsigned long const min_per_thread=25;

  if(length<(2*min_per_thread))
  {
    std::for_each(first,last,f);  // 1
  }
  else
  {
    Iterator const mid_point=first+length/2;
    std::future<void> first_half=  // 2
      std::async(&parallel_for_each<Iterator,Func>,
                 first,mid_point,f);
    parallel_for_each(mid_point,last,f);  // 3
    first_half.get();  // 4
  }
}

和基于std::async的parallel_accumulate(清單8.5)一樣,是在運行時對數(shù)據(jù)進行迭代劃分的,而非在執(zhí)行前劃分好,這是因為你不知道你的庫需要使用多少個線程。像之前一樣,當你將每一級的數(shù)據(jù)分成兩部分,異步執(zhí)行另外一部分②,剩下的部分就不能再進行劃分了,所以直接運行這一部分③;這樣就可以直接對std::for_each①進行使用了。這里再次使用std::asyncstd::future的get()成員函數(shù)④來提供對異常的傳播。

回到算法,函數(shù)需要對每一個元素執(zhí)行同樣的操作(這樣的操作有很多種,初學者可能會想到std::countstd::replace),一個稍微復(fù)雜一些的例子就是使用std::find。

8.5.2 并行實現(xiàn):std::find

接下來是std::find算法,因為這是一種不需要對數(shù)據(jù)元素做任何處理的算法。比如,當?shù)谝粋€元素就滿足查找標準,那就沒有必要對其他元素進行搜索了。將會看到,算法屬性對于性能具有很大的影響,并且對并行實現(xiàn)的設(shè)計有著直接的影響。這個算法是一個很特別的例子,數(shù)據(jù)訪問模式都會對代碼的設(shè)計產(chǎn)生影響(詳見8.3.2節(jié))。該類中的另一些算法包括std::equalstd::any_of

當你和妻子或者搭檔,在一個紀念盒中找尋一張老照片,當找到這張照片時,就不會再看另外的照片了。不過,你得讓其他人知道你已經(jīng)找到照片了(比如,大喊一聲“找到了!”),這樣其他人就會停止搜索了。很多算法的特性就是要對每一個元素進行處理,所以它們沒有辦法像std::find一樣,一旦找到合適數(shù)據(jù)就停止執(zhí)行。因此,你需要設(shè)計代碼對其進行使用——當?shù)玫较胍拇鸢妇椭袛嗥渌蝿?wù)的執(zhí)行,所以不能等待線程處理對剩下的元素進行處理。

如果不中斷其他線程,那么串行版本的性能可能會超越并行版,因為串行算法可以在找到匹配元素的時候,停止搜索并返回。如果系統(tǒng)能支持四個并發(fā)線程,那么每個線程就可以對總數(shù)據(jù)量的1/4進行檢查,并且在我們的實現(xiàn)只需要單核完成的1/4的時間,就能完成對所有元素的查找。如果匹配的元素在第一個1/4塊中,串行算法將會返回第一個,因為算法不需要對剩下的元素進行處理了。

一種辦法,中斷其他線程的一個辦法就是使用一個原子變量作為一個標識,在處理過每一個元素后就對這個標識進行檢查。如果標識被設(shè)置,那么就有線程找到了匹配元素,所以算法就可以停止并返回了。用這種方式來中斷線程,就可以將那些沒有處理的數(shù)據(jù)保持原樣,并且在更多的情況下,相較于串行方式,性能能提升很多。缺點就是,加載原子變量是一個很慢的操作,會阻礙每個線程的運行。

如何返回值和傳播異常呢?現(xiàn)在你有兩個選擇。你可以使用一個future數(shù)組,使用std::packaged_task來轉(zhuǎn)移值和異常,在主線程上對返回值和異常進行處理;或者使用std::promise對工作線程上的最終結(jié)果直接進行設(shè)置。這完全依賴于你想怎么樣處理工作線程上的異常。如果想停止第一個異常(即使還沒有對所有元素進行處理),就可以使用std::promise對異常和最終值進行設(shè)置。另外,如果想要讓其他工作線程繼續(xù)查找,可以使用std::packaged_task來存儲所有的異常,當線程沒有找到匹配元素時,異常將再次拋出。

這種情況下,我會選擇std::promise,因為其行為和std::find更為接近。這里需要注意一下搜索的元素是不是在提供的搜索范圍內(nèi)。因此,在所有線程結(jié)束前,獲取future上的結(jié)果。如果被future阻塞住,所要查找的值不在范圍內(nèi),就會持續(xù)的等待下去。實現(xiàn)代碼如下。

清單8.9 并行find算法實現(xiàn)

template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
  struct find_element  // 1
  {
    void operator()(Iterator begin,Iterator end,
                    MatchType match,
                    std::promise<Iterator>* result,
                    std::atomic<bool>* done_flag)
    {
      try
      {
        for(;(begin!=end) && !done_flag->load();++begin)  // 2
        {
          if(*begin==match)
          {
            result->set_value(begin);  // 3
            done_flag->store(true);  // 4
            return;
          }
        }
      }
      catch(...)  // 5
      {
        try
        {
          result->set_exception(std::current_exception());  // 6
          done_flag->store(true);
        }
        catch(...)  // 7
        {}
      }
    }
  };

  unsigned long const length=std::distance(first,last);

  if(!length)
    return last;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
    (length+min_per_thread-1)/min_per_thread;

  unsigned long const hardware_threads=
    std::thread::hardware_concurrency();

  unsigned long const num_threads=
    std::min(hardware_threads!=0?hardware_threads:2,max_threads);

  unsigned long const block_size=length/num_threads;

  std::promise<Iterator> result;  // 8
  std::atomic<bool> done_flag(false);  // 9
  std::vector<std::thread> threads(num_threads-1);
  {  // 10
    join_threads joiner(threads);

    Iterator block_start=first;
    for(unsigned long i=0;i<(num_threads-1);++i)
    {
      Iterator block_end=block_start;
      std::advance(block_end,block_size);
      threads[i]=std::thread(find_element(),  // 11
                             block_start,block_end,match,
                             &result,&done_flag);
      block_start=block_end;
    }
    find_element()(block_start,last,match,&result,&done_flag);  // 12
  }
  if(!done_flag.load())  //13
  {
    return last;
  }
  return result.get_future().get();  // 14
}

清單8.9中的函數(shù)主體與之前的例子相似。這次,由find_element類①的函數(shù)調(diào)用操作實現(xiàn),來完成查找工作的。循環(huán)通過在給定數(shù)據(jù)塊中的元素,檢查每一步上的標識②。如果匹配的元素被找到,就將最終的結(jié)果設(shè)置到promise③當中,并且在返回前對done_flag④進行設(shè)置。

如果有一個異常被拋出,那么它就會被通用處理代碼⑤捕獲,并且在promise⑥嘗中試存儲前,對done_flag進行設(shè)置。如果對應(yīng)promise已經(jīng)被設(shè)置,設(shè)置在promise上的值可能會拋出一個異常,所以這里⑦發(fā)生的任何異常,都可以捕獲并丟棄。

這意味著,當線程調(diào)用find_element查詢一個值,或者拋出一個異常時,如果其他線程看到done_flag被設(shè)置,那么其他線程將會終止。如果多線程同時找到匹配值或拋出異常,它們將會對promise產(chǎn)生競爭。不過,這是良性的條件競爭;因為,成功的競爭者會作為“第一個”返回線程,因此這個結(jié)果可以接受。

回到parallel_find函數(shù)本身,其擁有用來停止搜索的promise⑧和標識⑨;隨著對范圍內(nèi)的元素的查找?,promise和標識會傳遞到新線程中。主線程也使用find_element來對剩下的元素進行查找?。像之前提到的,需要在全部線程結(jié)束前,對結(jié)果進行檢查,因為結(jié)果可能是任意位置上的匹配元素。這里將“啟動-匯入”代碼放在一個塊中⑩,所以所有線程都會在找到匹配元素時?進行匯入。如果找到匹配元素,就可以調(diào)用std::future<Iterator>(來自promise?)的成員函數(shù)get()來獲取返回值或異常。

不過,這里假設(shè)你會使用硬件上所有可用的的并發(fā)線程,或使用其他機制對線程上的任務(wù)進行提前劃分。就像之前一樣,可以使用std::async,以及遞歸數(shù)據(jù)劃分的方式來簡化實現(xiàn)(同時使用C++標準庫中提供的自動縮放工具)。使用std::async的parallel_find實現(xiàn)如下所示。

清單8.10 使用std::async實現(xiàn)的并行find算法

template<typename Iterator,typename MatchType>  // 1
Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
                            std::atomic<bool>& done)
{
  try
  {
    unsigned long const length=std::distance(first,last);
    unsigned long const min_per_thread=25;  // 2
    if(length<(2*min_per_thread))  // 3
    {
      for(;(first!=last) && !done.load();++first)  // 4
      {
        if(*first==match)
        {
          done=true;  // 5
          return first;
        }
      }
      return last;  // 6
    }
    else
    { 
      Iterator const mid_point=first+(length/2);  // 7
      std::future<Iterator> async_result=
        std::async(&parallel_find_impl<Iterator,MatchType>,  // 8
                   mid_point,last,match,std::ref(done));
      Iterator const direct_result=
        parallel_find_impl(first,mid_point,match,done);  // 9
      return (direct_result==mid_point)?
        async_result.get():direct_result;  // 10
    }
  }
  catch(...)
  {
    done=true;  // 11
    throw;
  }
}

template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
  std::atomic<bool> done(false);
  return parallel_find_impl(first,last,match,done);  // 12
}

如果想要在找到匹配項時結(jié)束,就需要在線程之間設(shè)置一個標識來表明匹配項已經(jīng)被找到。因此,需要將這個標識遞歸的傳遞。通過函數(shù)①的方式來實現(xiàn)是最簡單的辦法,只需要增加一個參數(shù)——一個done標識的引用,這個表示通過程序的主入口點傳入?。

核心實現(xiàn)和之前的代碼一樣。通常函數(shù)的實現(xiàn)中,會讓單個線程處理最少的數(shù)據(jù)項②;如果數(shù)據(jù)塊大小不足于分成兩半,就要讓當前線程完成所有的工作了③。實際算法在一個簡單的循環(huán)當中(給定范圍),直到在循環(huán)到指定范圍中的最后一個,或找到匹配項,并對標識進行設(shè)置④。如果找到匹配項,標識done就會在返回前進行設(shè)置⑤。無論是因為已經(jīng)查找到最后一個,還是因為其他線程對done進行了設(shè)置,都會停止查找。如果沒有找到,會將最后一個元素last進行返回⑥。

如果給定范圍可以進行劃分,首先要在st::async在對第二部分進行查找⑧前,要找數(shù)據(jù)中點⑦,而且需要使用std::ref將done以引用的方式傳遞。同時,可以通過對第一部分直接進行遞歸查找。兩部分都是異步的,并且在原始范圍過大時,直接遞歸查找的部分可能會再細化。

如果直接查找返回的是mid_point,這就意味著沒有找到匹配項,所以就要從異步查找中獲取結(jié)果。如果在另一半中沒有匹配項的話,返回的結(jié)果就一定是last,這個值的返回就代表了沒有找到匹配的元素⑩。如果“異步”調(diào)用被延遲(非真正的異步),那么實際上這里會運行g(shù)et();這種情況下,如果對下半部分的元素搜索成功,那么就不會執(zhí)行對上半部分元素的搜索了。如果異步查找真實的運行在其他線程上,那么async_result變量的析構(gòu)函數(shù)將會等待該線程完成,所以這里不會有線程泄露。

像之前一樣,std::async可以用來提供“異常-安全”和“異常-傳播”特性。如果直接遞歸拋出異常,future的析構(gòu)函數(shù)就能讓異步執(zhí)行的線程提前結(jié)束;如果異步調(diào)用拋出異常,那么這個異常將會通過對get()成員函數(shù)的調(diào)用進行傳播⑩。使用try/catch塊只能捕捉在done發(fā)生的異常,并且當有異常拋出?時,所有線程都能很快的終止運行。不過,不使用try/catch的實現(xiàn)依舊沒問題,不同的就是要等待所有線程的工作是否完成。

實現(xiàn)中一個重要的特性就是,不能保證所有數(shù)據(jù)都能被std::find串行處理。其他并行算法可以借鑒這個特性,因為要讓一個算法并行起來這是必須具有的特性。如果有順序問題,元素就不能并發(fā)的處理了。如果每個元素獨立,雖然對于parallel_for_each不是很重要,不過對于parallel_find,即使在開始部分已經(jīng)找到了匹配元素,也有可能返回范圍中最后一個元素;如果在知道結(jié)果的前提下,這樣的結(jié)果會讓人很驚訝。

OK,現(xiàn)在你已經(jīng)使用了并行化的std::find。如在本節(jié)開始說的那樣,其他相似算法不需要對每一個數(shù)據(jù)元素進行處理,并且同樣的技術(shù)可以使用到這些類似的算法上去。我們將在第9章中看到“中斷線程”的問題。

為了完成我們的并行“三重奏”,我們將換一個角度來看一下std::partial_sum。對于這個算法,沒有太多的文獻可參考,不過讓這個算法并行起來是一件很有趣的事。

8.5.3 并行實現(xiàn):std::partial_sum

std::partial_sum會計算給定范圍中的每個元素,并用計算后的結(jié)果將原始序列中的值替換掉。比如,有一個序列[1,2,3,4,5],在執(zhí)行該算法后會成為:[1,3(1+2),6(1+2+3),10(1+2+3+4),15(1+2+3+4+5)]。讓這樣一個算法并行起來會很有趣,因為這里不能講任務(wù)分塊,對每一塊進行獨立的計算。比如,原始序列中的第一個元素需要加到后面的一個元素中去。

確定某個范圍部分和的一種的方式,就是在獨立塊中計算部分和,然后將第一塊中最后的元素的值,與下一塊中的所有元素進行相加,依次類推。如果有個序列[1,2,3,4,5,6,7,8,9],然后將其分為三塊,那么在第一次計算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后將6(第一塊的最后一個元素)加到第二個塊中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再將第二塊的最后一個元素21加到第三塊中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。

將原始數(shù)據(jù)分割成塊,加上之前塊的部分和就能夠并行了。如果每個塊中的末尾元素都是第一個被更新的,那么塊中其他的元素就能被其他線程所更新,同時另一個線程對下一塊進行更新,等等。當處理的元素比處理核心的個數(shù)多的時候,這樣完成工作沒問題,因為每一個核芯在每一個階段都有合適的數(shù)據(jù)可以進行處理。

如果有很多的處理器(就是要比處理的元素個數(shù)多),那么之前的方式就無法正常工作了。如果還是將工作劃分給每個處理器,那么在第一步就沒必要去做了。這種情況下,傳遞結(jié)果就意味著讓處理器進行等待,這時需要給這些處于等待中的處理器一些工作。所以,可以采用完全不同的方式來處理這個問題。比起將數(shù)據(jù)塊中的最后一個元素的結(jié)果向后面的元素塊傳遞,可以對部分結(jié)果進行傳播:第一次與相鄰的元素(距離為1)相加和(和之前一樣),之后和距離為2的元素相加,在后來和距離為4的元素相加,以此類推。比如,初始序列為[1,2,3,4,5,6,7,8,9],第一次后為[1,3,5,7,9,11,13,15,17],第二次后為[1,3,6,10,14,18, 22,26,30],下一次就要隔4個元素了。第三次后[1, 3, 6, 10, 15, 21, 28, 36, 44],下一次就要隔8個元素了。第四次后[1, 3, 6, 10, 15, 21, 28, 36, 45],這就是最終的結(jié)果。雖然,比起第一種方法多了很多步驟,不過在可并發(fā)平臺下,這種方法提高了并行的可行性;每個處理器可在每一步中處理一個數(shù)據(jù)項。

總體來說,當有N個操作時(每步使用一個處理器)第二種方法需要log(N)[底為2]步;在本節(jié)中,N就相當于數(shù)據(jù)鏈表的長度。比起第一種,每個線程對分配塊做N/k個操作,然后在做N/k次結(jié)果傳遞(這里的k是線程的數(shù)量)。因此,第一種方法的時間復(fù)雜度為O(N),不過第二種方法的時間復(fù)雜度為Q(Nlog(N))。當數(shù)據(jù)量和處理器數(shù)量相近時,第二種方法需要每個處理器上log(N)個操作,第一種方法中每個處理器上執(zhí)行的操作數(shù)會隨著k的增加而增多,因為需要對結(jié)果進行傳遞。對于處理單元較少的情況,第一種方法會比較合適;對于大規(guī)模并行系統(tǒng),第二種方法比較合適。

不管怎么樣,先將效率問題放一邊,讓我們來看一些代碼。下面清單實現(xiàn)的,就是第一種方法。

清單8.11 使用劃分的方式來并行的計算部分和

template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
  typedef typename Iterator::value_type value_type;

  struct process_chunk  // 1
  {
    void operator()(Iterator begin,Iterator last,
                    std::future<value_type>* previous_end_value,
                    std::promise<value_type>* end_value)
    {
      try
      {
        Iterator end=last;
        ++end;
        std::partial_sum(begin,end,begin);  // 2
        if(previous_end_value)  // 3
        {
          value_type& addend=previous_end_value->get();  // 4
          *last+=addend;  // 5
          if(end_value)
          {
            end_value->set_value(*last);  // 6
          }
          std::for_each(begin,last,[addend](value_type& item)  // 7
                        {
                          item+=addend;
                        });
         }
         else if(end_value)
         {
           end_value->set_value(*last);  // 8
         }
       }
       catch(...)  // 9
       {
         if(end_value)
         {
           end_value->set_exception(std::current_exception());  // 10
         }
         else
         {
           throw;  // 11
         }
       }
     }
   };

  unsigned long const length=std::distance(first,last);

  if(!length)
    return last;

  unsigned long const min_per_thread=25;  // 12
  unsigned long const max_threads=
    (length+min_per_thread-1)/min_per_thread;

  unsigned long const hardware_threads=
    std::thread::hardware_concurrency();

  unsigned long const num_threads=
    std::min(hardware_threads!=0?hardware_threads:2,max_threads);

  unsigned long const block_size=length/num_threads;

  typedef typename Iterator::value_type value_type;

  std::vector<std::thread> threads(num_threads-1);  // 13
  std::vector<std::promise<value_type> >
    end_values(num_threads-1);  // 14
  std::vector<std::future<value_type> >
    previous_end_values;  // 15
  previous_end_values.reserve(num_threads-1);  // 16
  join_threads joiner(threads);

  Iterator block_start=first;
  for(unsigned long i=0;i<(num_threads-1);++i)
  {
    Iterator block_last=block_start;
    std::advance(block_last,block_size-1);  // 17
    threads[i]=std::thread(process_chunk(),  // 18
                           block_start,block_last,
                           (i!=0)?&previous_end_values[i-1]:0,
                           &end_values[i]);
    block_start=block_last;
    ++block_start;  // 19
    previous_end_values.push_back(end_values[i].get_future());  // 20
  }
  Iterator final_element=block_start;
  std::advance(final_element,std::distance(block_start,last)-1);  // 21
  process_chunk()(block_start,final_element,  // 22
                  (num_threads>1)?&previous_end_values.back():0,
                  0);
}

這個實現(xiàn)中,使用的結(jié)構(gòu)體和之前算法中的一樣,將問題進行分塊解決,每個線程處理最小的數(shù)據(jù)塊?。其中,有一組線程?和一組promise?,用來存儲每塊中的最后一個值;并且實現(xiàn)中還有一組future?,用來對前一塊中的最后一個值進行檢索??梢詾閒uture?做些儲備,以避免生成新線程時,再分配內(nèi)存。

主循環(huán)和之前一樣,不過這次是讓迭代器指向了每個數(shù)據(jù)塊的最后一個元素,而不是作為一個普通值傳遞到最后?,這樣就方便向其他塊傳遞當前塊的最后一個元素了。實際處理是在process_chunk函數(shù)對象中完成的,這個結(jié)構(gòu)體看上去不是很長;當前塊的開始和結(jié)束迭代器和前塊中最后一個值的future一起,作為參數(shù)進行傳遞,并且promise用來保留當前范圍內(nèi)最后一個值的原始值?。

生成新的線程后,就對開始塊的ID進行更新,別忘了傳遞最后一個元素?,并且將當前塊的最后一個元素存儲到future,上面的數(shù)據(jù)將在循環(huán)中再次使用到?。

在處理最后一個數(shù)據(jù)塊前,需要獲取之前數(shù)據(jù)塊中最后一個元素的迭代器(21),這樣就可以將其作為參數(shù)傳入process_chunk(22)中了。std::partial_sum不會返回一個值,所以在最后一個數(shù)據(jù)塊被處理后,就不用再做任何事情了。當所有線程的操作完成時,求部分和的操作也就算完成了。

OK,現(xiàn)在來看一下process_chunk函數(shù)對象①。對于整塊的處理是始于對std::partial_sum的調(diào)用,包括對于最后一個值的處理②,不過得要知道當前塊是否是第一塊③。如果當前塊不是第一塊,就會有一個previous_end_value值從前面的塊傳過來,所以這里需要等待這個值的產(chǎn)生④。為了將算法最大程度的并行,首先需要對最后一個元素進行更新⑤,這樣你就能將這個值傳遞給下一個數(shù)據(jù)塊(如果有下一個數(shù)據(jù)塊的話)⑥。當完成這個操作,就可以使用std::for_each和簡單的lambda函數(shù)⑦對剩余的數(shù)據(jù)項進行更新。

如果previous_end_value值為空,當前數(shù)據(jù)塊就是第一個數(shù)據(jù)塊,所以只需要為下一個數(shù)據(jù)塊更新end_value⑧(如果有下一個數(shù)據(jù)塊的話——當前數(shù)據(jù)塊可能是唯一的數(shù)據(jù)塊)。

最后,如果有任意一個操作拋出異常,就可以將其捕獲⑨,并且存入promise⑩,如果下一個數(shù)據(jù)塊嘗試獲取前一個數(shù)據(jù)塊的最后一個值④時,異常會再次拋出。處理最后一個數(shù)據(jù)塊時,異常會全部重新拋出?,因為拋出動作一定會在主線程上進行。

因為線程間需要同步,這里的代碼就不容易使用std::async重寫。任務(wù)等待會讓線程中途去執(zhí)行其他的任務(wù),所以所有的任務(wù)必須同時執(zhí)行。

基于塊,以傳遞末尾元素值的方法就介紹到這里,讓我們來看一下第二種計算方式。

實現(xiàn)以2的冪級數(shù)為距離部分和算法

第二種算法通過增加距離的方式,讓更多的處理器充分發(fā)揮作用。在這種情況下,沒有進一步同步的必要了,因為所有中間結(jié)果都直接傳遞到下一個處理器上去了。不過,在實際中我們很少見到,單個處理器處理對一定數(shù)量的元素執(zhí)行同一條指令,這種方式成為單指令-多數(shù)據(jù)流(SIMD)。因此,代碼必須能處理通用情況,并且需要在每步上對線程進行顯式同步。

完成這種功能的一種方式是使用柵欄(barrier)——一種同步機制:只有所有線程都到達柵欄處,才能進行之后的操作;先到達的線程必須等待未到達的線程。C++11標準庫沒有直接提供這樣的工具,所以你得自行設(shè)計一個。

試想游樂場中的過山車。如果有適量的游客在等待,那么過山車管理員就要保證,在過山車啟動前,每一個位置都得坐一個游客。柵欄的工作原理也一樣:你已經(jīng)知道了“座位”的數(shù)量,線程就是要等待所有“座位”都坐滿。當?shù)却€程夠數(shù),那么它們可以繼續(xù)運行;這時,柵欄會重置,并且會讓下一撥線程開始扥帶。通常,會在循環(huán)中這樣做,當同一個線程再次到達柵欄處,它會再次等待。這種方法是為了讓線程同步,所以不會有線程在其他未完成的情況下,就去完成下一個任務(wù)。如果有線程提前執(zhí)行,對于這樣一個算法,就是一場災(zāi)難,因為提前出發(fā)的線程可能會修改要被其他線程使用到的數(shù)據(jù),后面線程獲取到的數(shù)據(jù)就不是正確數(shù)據(jù)了。

下面的代碼就簡單的實現(xiàn)了一個柵欄。

清單8.12 簡單的柵欄類

class barrier
{
  unsigned const count;
  std::atomic<unsigned> spaces;
  std::atomic<unsigned> generation;
public:
  explicit barrier(unsigned count_):  // 1
    count(count_),spaces(count),generation(0)
  {}

  void wait()
  {
    unsigned const my_generation=generation;  // 2
    if(!--spaces)  // 3
    {
      spaces=count;  // 4
      ++generation;  // 5
    }
    else
    {
      while(generation==my_generation)  // 6
        std::this_thread::yield();  // 7
    }
  }
};

這個實現(xiàn)中,用一定數(shù)量的“座位”構(gòu)造了一個barrier①,這個數(shù)量將會存儲count變量中。起初,柵欄中的spaces與count數(shù)量相當。當有線程都在等待時,spaces的數(shù)量就會減少③。當spaces的數(shù)量減到0時,spaces的值將會重置為count④,并且generation變量會增加,以向線程發(fā)出信號,讓這些等待線程能夠繼續(xù)運行⑤。如果spaces沒有到達0,那么線程會繼續(xù)等待。這個實現(xiàn)使用了一個簡單的自旋鎖⑥,對generation的檢查會在wait()開始的時候進行②。因為generation只會在所有線程都到達柵欄的時候更新⑤,在等待的時候使用yield()⑦就不會讓CPU處于忙等待的狀態(tài)。

這個實現(xiàn)比較“簡單”的真實意義:使用自旋等待的情況下,如果讓線程等待很長時間就不會很理想,并且如果超過count數(shù)量的線程對wait()進行調(diào)用,這個實現(xiàn)就沒有辦法工作了。如果想要很好的處理這樣的情況,必須使用一個更加健壯(更加復(fù)雜)的實現(xiàn)。我依舊堅持對原子變量操作順序的一致性,因為這會讓事情更加簡單,不過有時還是需要放松這樣的約束。全局同步對于大規(guī)模并行架構(gòu)來說是消耗巨大的,因為相關(guān)處理器會穿梭于存儲柵欄狀態(tài)的緩存行中(可見8.2.2中對乒乓緩存的討論),所以需要格外的小心,來確保使用的是最佳同步方法。

不論怎么樣,這些都需要你考慮到;需要有固定數(shù)量的線程執(zhí)行同步循環(huán)。好吧,大多數(shù)情況下線程數(shù)量都是固定的。你可能還記得,代碼起始部分的幾個數(shù)據(jù)項,只需要幾步就能得到其最終值。這就意味著,無論是讓所有線程循環(huán)處理范圍內(nèi)的所有元素,還是讓柵欄來同步線程,都會遞減count的值。我會選擇后者,因為其能避免線程做不必要的工作,僅僅是等待最終步驟完成。

這意味著你要將count改為一個原子變量,這樣在多線程對其進行更新的時候,就不需要添加額外的同步:

std::atomic<unsigned> count;

初始化保持不變,不過當spaces的值被重置后,你需要顯式的對count進行l(wèi)oad()操作:

spaces=count.load();

這就是要對wait()函數(shù)的改動;現(xiàn)在需要一個新的成員函數(shù)來遞減count。這個函數(shù)命名為done_waiting(),因為當一個線程完成其工作,并在等待的時候,才能對其進行調(diào)用它:

void done_waiting()
{
  --count;  // 1
  if(!--spaces)  // 2
  {
    spaces=count.load();  // 3
    ++generation;
  }
}

實現(xiàn)中,首先要減少count①,所以下一次spaces將會被重置為一個較小的數(shù)。然后,需要遞減spaces的值②。如果不做這些操作,有些線程將會持續(xù)等待,因為spaces被舊的count初始化,大于期望值。一組當中最后一個線程需要對計數(shù)器進行重置,并且遞增generation的值③,就像在wait()里面做的那樣。最重要的區(qū)別:最后一個線程不需要等待。當最后一個線程結(jié)束,整個等待也就隨之結(jié)束!

現(xiàn)在就準備開始寫部分和的第二個實現(xiàn)吧。在每一步中,每一個線程都在柵欄出調(diào)用wait(),來保證線程所處步驟一致,并且當所有線程都結(jié)束,那么最后一個線程會調(diào)用done_waiting()來減少count的值。如果使用兩個緩存對原始數(shù)據(jù)進行保存,柵欄也可以提供你所需要的同步。每一步中,線程都會從原始數(shù)據(jù)或是緩存中讀取數(shù)據(jù),并且將新值寫入對應(yīng)位置。如果有線程先從原始數(shù)據(jù)處獲取數(shù)據(jù),那下一步就從緩存上獲取數(shù)據(jù)(或相反)。這就能保證在讀與寫都是由獨立線程完成,并不存在條件競爭。當線程結(jié)束等待循環(huán),就能保證正確的值最終被寫入到原始數(shù)據(jù)當中。下面的代碼就是這樣的實現(xiàn)。

清單8.13 通過兩兩更新對的方式實現(xiàn)partial_sum

struct barrier
{
  std::atomic<unsigned> count;
  std::atomic<unsigned> spaces;
  std::atomic<unsigned> generation;

  barrier(unsigned count_):
    count(count_),spaces(count_),generation(0)
  {}

  void wait()
  {
    unsigned const gen=generation.load();
    if(!--spaces)
    {
      spaces=count.load();
      ++generation;
    }
    else
    {
      while(generation.load()==gen)
      {
        std::this_thread::yield();
      }
    }
  }

  void done_waiting()
  {
    --count;
    if(!--spaces)
    {
      spaces=count.load();
      ++generation;
    }
  }
};

template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
  typedef typename Iterator::value_type value_type;

  struct process_element  // 1
  {
    void operator()(Iterator first,Iterator last,
                    std::vector<value_type>& buffer,
                    unsigned i,barrier& b)
    {
      value_type& ith_element=*(first+i);
      bool update_source=false;

      for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
      {
        value_type const& source=(step%2)?  // 2
          buffer[i]:ith_element;

        value_type& dest=(step%2)?
          ith_element:buffer[i];

        value_type const& addend=(step%2)?  // 3
          buffer[i-stride]:*(first+i-stride);

        dest=source+addend;  // 4
        update_source=!(step%2);
        b.wait();  // 5
      }
      if(update_source)  // 6
      {
        ith_element=buffer[i];
      }
      b.done_waiting();  // 7
    }
  };

  unsigned long const length=std::distance(first,last);

  if(length<=1)
    return;

  std::vector<value_type> buffer(length);
  barrier b(length);

  std::vector<std::thread> threads(length-1);  // 8
  join_threads joiner(threads);

  Iterator block_start=first;
  for(unsigned long i=0;i<(length-1);++i)
  {
    threads[i]=std::thread(process_element(),first,last,  // 9
                           std::ref(buffer),i,std::ref(b));
  }
  process_element()(first,last,buffer,length-1,b);  // 10
}

代碼的整體結(jié)構(gòu)應(yīng)該不用說了。process_element類有函數(shù)調(diào)用操作可以用來做具體的工作①,就是運行一組線程⑨,并將線程存儲到vector中⑧,同樣還需要在主線程中對其進行調(diào)用⑩。這里與之前最大的區(qū)別就是,線程的數(shù)量是根據(jù)列表中的數(shù)據(jù)量來定的,而非根據(jù)std::thread::hardware_concurrency。如我之前所說,除非你使用的是一個大規(guī)模并行的機器,因為這上面的線程都十分廉價(雖然這樣的方式并不是很好),還能為我們展示了其整體結(jié)構(gòu)。這個結(jié)構(gòu)在有較少線程的時候,每一個線程只能處理源數(shù)據(jù)中的部分數(shù)據(jù),當沒有足夠的線程支持該結(jié)構(gòu)時,效率要比傳遞算法低。

不管怎樣,主要的工作都是調(diào)用process_element的函數(shù)操作符來完成的。每一步,都會從原始數(shù)據(jù)或緩存中獲取第i個元素②,并且將獲取到的元素加到指定stride的元素中去③,如果從原始數(shù)據(jù)開始讀取的元素,加和后的數(shù)需要存儲在緩存中④。然后,在開始下一步前,會在柵欄處等待⑤。當stride超出了給定數(shù)據(jù)的范圍,當最終結(jié)果已經(jīng)存在緩存中時,就需要更新原始數(shù)據(jù)中的數(shù)據(jù),同樣這也意味著本次加和結(jié)束。最后,在調(diào)用柵欄中的done_waiting()函數(shù)⑦。

注意這個解決方案并不是異常安全的。如果某個線程在process_element執(zhí)行時拋出一個異常,其就會終止整個應(yīng)用。這里可以使用一個std::promise來存儲異常,就像在清單8.9中parallel_find的實現(xiàn),或僅使用一個被互斥量保護的std::exception_ptr即可。

總結(jié)下這三個例子。希望其能保證我們了解8.1、8.2、8.3和8.4節(jié)中提到的設(shè)計考量,并且證明了這些技術(shù)在真實的代碼中,需要承擔些什么責任。


[4] http://threadingbuildingblocks.org/