鍍金池/ 教程/ C/ 9.1 線程池
3.4 本章總結
6.3 基于鎖設計更加復雜的數(shù)據(jù)結構
6.1 為并發(fā)設計的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動推導變量類型
2.1 線程管理的基礎
8.5 在實踐中設計并發(fā)代碼
2.4 運行時決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉移線程所有權
8.3 為多線程性能設計數(shù)據(jù)結構
6.4 本章總結
7.3 對于設計無鎖數(shù)據(jù)結構的指導建議
關于這本書
A.1 右值引用
2.6 本章總結
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結構
4.5 本章總結
A.9 本章總結
前言
第10章 多線程程序的測試和調試
5.4 本章總結
第9章 高級線程管理
5.1 內存模型基礎
2.5 識別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯誤的技術
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強制排序
A.8 線程本地變量
第8章 并發(fā)代碼設計
3.3 保護共享數(shù)據(jù)的替代設施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結構設計
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個事件或其他條件
A.3 默認函數(shù)
附錄A 對<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結構設計
封面圖片介紹
7.2 無鎖數(shù)據(jù)結構的例子
8.6 本章總結
8.1 線程間劃分工作的技術
4.2 使用期望等待一次性事件
8.4 設計并發(fā)代碼的注意事項
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結
10.3 本章總結
10.1 與并發(fā)相關的錯誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達式函數(shù)
7.4 本章總結
1.5 本章總結
第3章 線程間共享數(shù)據(jù)

9.1 線程池

很多公司里,雇員通常會在辦公室度過他們的辦公時光(偶爾也會外出訪問客戶或供應商),或是參加貿(mào)易展會。雖然外出可能很有必要,并且可能需要很多人一起去,不過對于一些特別的雇員來說,一趟可能就是幾個月,甚至是幾年。公司要給每個雇員都配一輛車,這基本上是不可能的,不過公司可以提供一些共用車輛;這樣就會有一定數(shù)量車,來讓所有雇員使用。當一個員工要去異地旅游時,那么他就可以從共用車輛中預定一輛,并在返回公司的時候將車交還。如果某天沒有閑置的共用車輛,雇員就得不延后其旅程了。

線程池就是類似的一種方式,在大多數(shù)系統(tǒng)中,將每個任務指定給某個線程是不切實際的,不過可以利用現(xiàn)有的并發(fā)性,進行并發(fā)執(zhí)行。線程池就提供了這樣的功能,提交到線程池中的任務將并發(fā)執(zhí)行,提交的任務將會掛在任務隊列上。隊列中的每一個任務都會被池中的工作線程所獲取,當任務執(zhí)行完成后,再回到線程池中獲取下一個任務。

創(chuàng)建一個線程池時,會遇到幾個關鍵性的設計問題,比如:可使用的線程數(shù)量,高效的任務分配方式,以及是否需要等待一個任務完成。

在本節(jié),我們將看到線程池是如何解決這些問題的,從最簡單的線程池開始吧!

9.1.1 最簡單的線程池

作為最簡單的線程池,其擁有固定數(shù)量的工作線程(通常工作線程數(shù)量與std::thread::hardware_concurrency()相同)。當工作需要完成時,可以調用函數(shù)將任務掛在任務隊列中。每個工作線程都會從任務隊列上獲取任務,然后執(zhí)行這個任務,執(zhí)行完成后再回來獲取新的任務。在最簡單的線程池中,線程就不需要等待其他線程完成對應任務了。如果需要等待,就需要對同步進行管理。

下面清單中的代碼就展示了一個最簡單的線程池實現(xiàn)。

清單9.1 簡單的線程池

class thread_pool
{
  std::atomic_bool done;
  thread_safe_queue<std::function<void()> > work_queue;  // 1
  std::vector<std::thread> threads;  // 2
  join_threads joiner;  // 3

  void worker_thread()
  {
    while(!done)  // 4
    {
      std::function<void()> task;
      if(work_queue.try_pop(task))  // 5
      {
        task();  // 6
      }
      else
      {
        std::this_thread::yield();  // 7
      }
    }
  }

public:
  thread_pool():
    done(false),joiner(threads)
  {
    unsigned const thread_count=std::thread::hardware_concurrency();  // 8

    try
    {
      for(unsigned i=0;i<thread_count;++i)
      {
        threads.push_back( 
          std::thread(&thread_pool::worker_thread,this));  // 9
      }
    }
    catch(...)
    {
      done=true;  // 10
      throw;
    }
  }

  ~thread_pool()
  {
    done=true;  // 11
  }

  template<typename FunctionType>
  void submit(FunctionType f)
  {
    work_queue.push(std::function<void()>(f));  // 12
  }
};

實現(xiàn)中有一組工作線程②,并且使用了一個線程安全隊列(見第6章)①來管理任務隊列。這種情況下,用戶不用等待任務,并且任務不需要返回任何值,所以可以使用std::function<void()>對任務進行封裝。submit()函數(shù)會將函數(shù)或可調用對象包裝成一個std::function<void()>實例,并將其推入隊列中?。

線程始于構造函數(shù):使用std::thread::hardware_concurrency()來獲取硬件支持多少個并發(fā)線程⑧,這些線程會在worker_thread()成員函數(shù)中執(zhí)行⑨。

當有異常拋出時,線程啟動就會失敗,所以需要保證任何已啟動的線程都能停止,并且能在這種情況下清理干凈。當有異常拋出時,通過使用try-catch來設置done標志⑩,還有join_threads類的實例(來自于第8章)③用來匯聚所有線程。當然也需要析構函數(shù):僅設置done標志?,并且join_threads確保所有線程在線程池銷毀前全部執(zhí)行完成。注意成員聲明的順序很重要:done標志和worker_queue必須在threads數(shù)組之前聲明,而數(shù)據(jù)必須在joiner前聲明。這就能確保成員能以正確的順序銷毀;比如,所有線程都停止運行時,隊列就可以安全的銷毀了。

worker_thread函數(shù)很簡單:從任務隊列上獲取任務⑤,以及同時執(zhí)行這些任務⑥,執(zhí)行一個循環(huán)直到done標志被設置④。如果任務隊列上沒有任務,函數(shù)會調用std::this_thread::yield()讓線程休息⑦,并且給予其他線程向任務隊列上推送任務的機會。

一些簡單的情況,這樣線程池就足以滿足要求,特別是任務沒有返回值,或需要執(zhí)行一些阻塞操作的時候。不過,在很多情況下,這樣簡單的線程池完全不夠用,其他情況使用這樣簡單的線程池可能會出現(xiàn)問題,比如:死鎖。同樣,在簡單例子中,使用std::async能提供更好的功能(如第8章中的例子)。

在本章中,我們將了解一下更加復雜的線程池實現(xiàn),通過添加特性滿足用戶需求,或減少問題的發(fā)生幾率。

首先,從已經(jīng)提交的任務開始說起。

9.1.2 等待提交到線程池中的任務

第8章中的例子中,線程間的任務劃分完成后,代碼會顯式生成新線程,主線程通常就是等待新線程在返回調用之前結束,確保所有任務都完成。使用線程池,就需要等待任務提交到線程池中,而非直接提交給單個線程。這與基于std::async的方法(第8章等待future的例子)類似,使用清單9.1中的簡單線程池,使用第4章中提到的工具:條件變量和future。雖然,會增加代碼的復雜度,不過,要比直接對任務進行等待的方式好很多。

通過增加線程池的復雜度,可以直接等待任務完成。使用submit()函數(shù)返回一個對任務描述的句柄,用來等待任務的完成。任務句柄會用條件變量或future進行包裝,這樣能使用線程池來簡化代碼。

一種特殊的情況是,執(zhí)行任務的線程需要返回一個結果到主線程上進行處理。你已經(jīng)在本書中看到多個這樣的例子,比如:parallel_accumulate()(第2章)。這種情況下,需要用future對最終的結果進行轉移。清單9.2展示了對簡單線程池的修改,通過修改就能等待任務完成,以及在工作線程完成后,返回一個結果到等待線程中去,不過std::packaged_task<>實例是不可拷貝的,僅是可移動的,所以不能再使用std::function<>來實現(xiàn)任務隊列,因為std::function<>需要存儲可復制構造的函數(shù)對象。包裝一個自定義函數(shù),用來處理只可移動的類型。這就是一個帶有函數(shù)操作符的類型擦除類。只需要處理那些沒有函數(shù)和無返回的函數(shù),所以這是一個簡單的虛函數(shù)調用。

清單9.2 可等待任務的線程池

class function_wrapper
{
  struct impl_base {
    virtual void call()=0;
    virtual ~impl_base() {}
  };

  std::unique_ptr<impl_base> impl;
  template<typename F>
  struct impl_type: impl_base
  {
    F f;
    impl_type(F&& f_): f(std::move(f_)) {}
    void call() { f(); }
  };
public:
  template<typename F>
  function_wrapper(F&& f):
    impl(new impl_type<F>(std::move(f)))
  {}

  void operator()() { impl->call(); }

  function_wrapper() = default;

  function_wrapper(function_wrapper&& other):
    impl(std::move(other.impl))
  {}

  function_wrapper& operator=(function_wrapper&& other)
  {
    impl=std::move(other.impl);
    return *this;
  }

  function_wrapper(const function_wrapper&)=delete;
  function_wrapper(function_wrapper&)=delete;
  function_wrapper& operator=(const function_wrapper&)=delete;
};

class thread_pool
{
  thread_safe_queue<function_wrapper> work_queue;  // 使用function_wrapper,而非使用std::function

  void worker_thread()
  {
    while(!done)
    {
      function_wrapper task;
      if(work_queue.try_pop(task))
      {
        task();
      }
      else
      {
        std::this_thread::yield();
      }
    }
  }
public:
  template<typename FunctionType>
  std::future<typename std::result_of<FunctionType()>::type>  // 1
    submit(FunctionType f)
  {
    typedef typename std::result_of<FunctionType()>::type
      result_type;  // 2

    std::packaged_task<result_type()> task(std::move(f));  // 3
    std::future<result_type> res(task.get_future());  // 4
    work_queue.push(std::move(task));  // 5
    return res;  // 6
  }
  // 休息一下
};

首先,修改的是submit()函數(shù)①返回一個std::future<>保存任務的返回值,并且允許調用者等待任務完全結束。因為需要知道提供函數(shù)f的返回類型,所以使用std::result_of<>std::result_of<FunctionType()>::type是FunctionType類型的引用實例(如,f),并且沒有參數(shù)。同樣,函數(shù)中可以對result_type typedef②使用std::result_of<>。

然后,將f包裝入std::packaged_task<result_type()>③,因為f是一個無參數(shù)的函數(shù)或是可調用對象,能夠返回result_type類型的實例。向任務隊列推送任務⑤和返回future⑥前,就可以從std::packaged_task<>中獲取future④。注意,要將任務推送到任務隊列中時,只能使用std::move(),因為std::packaged_task<>是不可拷貝的。為了對任務進行處理,隊列里面存的就是function_wrapper對象,而非std::function<void()>對象。

現(xiàn)在線程池允許等待任務,并且返回任務后的結果。下面的清單就展示了,如何讓parallel_accumuate函數(shù)使用線程池。

清單9.3 parallel_accumulate使用一個可等待任務的線程池

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);

  if(!length)
    return init;

  unsigned long const block_size=25;
  unsigned long const num_blocks=(length+block_size-1)/block_size;  // 1

  std::vector<std::future<T> > futures(num_blocks-1);
  thread_pool pool;

  Iterator block_start=first;
  for(unsigned long i=0;i<(num_blocks-1);++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);
    futures[i]=pool.submit(accumulate_block<Iterator,T>());  // 2
    block_start=block_end;
  }
  T last_result=accumulate_block<Iterator,T>()(block_start,last);
  T result=init;
  for(unsigned long i=0;i<(num_blocks-1);++i)
  {
    result+=futures[i].get();
  }
  result += last_result;
  return result;
}

與清單8.4相比,有幾個點需要注意一下。首先,工作量是依據(jù)使用的塊數(shù)(num_blocks①),而不是線程的數(shù)量。為了利用線程池的最大化可擴展性,需要將工作塊劃分為最小工作塊。當線程池中線程不多時,每個線程將會處理多個工作塊,不過隨著硬件可用線程數(shù)量的增長,會有越來越多的工作塊并發(fā)執(zhí)行。

當你選擇“因為能并發(fā)執(zhí)行,最小工作塊值的一試”時,就需要謹慎了。向線程池提交一個任務有一定的開銷;讓工作線程執(zhí)行這個任務,并且將返回值保存在std::future<>中,對于太小的任務,這樣的開銷不劃算。如果任務塊太小,使用線程池的速度可能都不及單線程。

假設,任務塊的大小合理,就不用為這些事而擔心:打包任務、獲取future或存儲之后要匯入的std::thread對象;使用線程池的時候,這些都需要注意。之后,就是調用submit()來提交任務②。

線程池也需要注意異常安全。任何異常都會通過submit()返回給future,并在獲取future的結果時,拋出異常。如果函數(shù)因為異常退出,線程池的析構函數(shù)會丟掉那些沒有完成的任務,等待線程池中的工作線程完成工作。

在簡單的例子中,這個線程池工作的還算不錯,因為這里的任務都是相互獨立的。不過,當任務隊列中的任務有依賴關系時,這個線程池就不能勝任了。

9.1.3 等待依賴任務

快速排序算法為例,原理很簡單:數(shù)據(jù)與中軸數(shù)據(jù)項比較,在中軸項兩側分為大于和小于的兩個序列,然后再對這兩組序列進行排序。這兩組序列會遞歸排序,最后會整合成一個全排序序列。要將這個算法寫成并發(fā)模式,需要保證遞歸調用能夠使用硬件的并發(fā)能力。

回到第4章,第一次接觸這個例子,我們使用std::async來執(zhí)行每一層的調用,讓標準庫來選擇,是在新線程上執(zhí)行這個任務,還是當對應get()調用時,進行同步執(zhí)行。運行起來很不錯,因為每一個任務都在其自己的線程上執(zhí)行,或當需要的時候進行調用。

當回顧第8章時,使用了一個固定線程數(shù)量(根據(jù)硬件可用并發(fā)線程數(shù))的結構體。在這樣的情況下,使用了棧來掛起要排序的數(shù)據(jù)塊。當每個線程在為一個數(shù)據(jù)塊排序前,會向數(shù)據(jù)棧上添加一組要排序的數(shù)據(jù),然后對當前數(shù)據(jù)塊排序結束后,接著對另一塊進行排序。這里,等待其他線程完成排序,可能會造成死鎖,因為這會消耗有限的線程。有一種情況很可能會出現(xiàn),就是所有線程都在等某一個數(shù)據(jù)塊被排序,不過沒有線程在做排序。通過拉取棧上數(shù)據(jù)塊的線程,對數(shù)據(jù)塊進行排序,來解決這個問題;因為,已處理的指定數(shù)據(jù)塊,就是其他線程都在等待排序的數(shù)據(jù)塊。

如果只用簡單的線程池進行替換,例如:第4章替換std::async的線程池。只有固定數(shù)量的線程,因為線程池中沒有空閑的線程,線程會等待沒有被安排的任務。因此,需要和第8章中類似的解決方案:當?shù)却硞€數(shù)據(jù)塊完成時,去處理未完成的數(shù)據(jù)塊。如果使用線程池來管理任務列表和相關線程——使用線程池的主要原因——就不用再去訪問任務列表了??梢詫€程池做一些改動,自動完成這些事情。

最簡單的方法就是在thread_pool中添加一個新函數(shù),來執(zhí)行任務隊列上的任務,并對線程池進行管理。高級線程池的實現(xiàn)可能會在等待函數(shù)中添加邏輯,或等待其他函數(shù)來處理這個任務,優(yōu)先的任務會讓其他的任務進行等待。下面清單中的實現(xiàn),就展示了一個新run_pending_task()函數(shù),對于快速排序的修改將會在清單9.5中展示。

清單9.4 run_pending_task()函數(shù)實現(xiàn)

void thread_pool::run_pending_task()
{
  function_wrapper task;
  if(work_queue.try_pop(task))
  {
    task();
  }
  else
  {
    std::this_thread::yield();
  }
}

run_pending_task()的實現(xiàn)去掉了在worker_thread()函數(shù)的主循環(huán)。函數(shù)任務隊列中有任務的時候,執(zhí)行任務;要是沒有的話,就會讓操作系統(tǒng)對線程進行重新分配。

下面快速排序算法的實現(xiàn)要比清單8.1中版本簡單許多,因為所有線程管理邏輯都被移入到線程池。

清單9.5 基于線程池的快速排序實現(xiàn)

template<typename T>
struct sorter  // 1
{
  thread_pool pool;  // 2

  std::list<T> do_sort(std::list<T>& chunk_data)
  {
    if(chunk_data.empty())
    {
      return chunk_data;
    }

    std::list<T> result;
    result.splice(result.begin(),chunk_data,chunk_data.begin());
    T const& partition_val=*result.begin();

    typename std::list<T>::iterator divide_point=
      std::partition(chunk_data.begin(),chunk_data.end(),
                     [&](T const& val){return val<partition_val;});

    std::list<T> new_lower_chunk;
    new_lower_chunk.splice(new_lower_chunk.end(),
                           chunk_data,chunk_data.begin(),
                           divide_point);

    std::future<std::list<T> > new_lower=  // 3
      pool.submit(std::bind(&sorter::do_sort,this,
                            std::move(new_lower_chunk)));

    std::list<T> new_higher(do_sort(chunk_data));

    result.splice(result.end(),new_higher);
    while(!new_lower.wait_for(std::chrono::seconds(0)) ==
      std::future_status::timeout)
    {
      pool.run_pending_task();  // 4
    }

    result.splice(result.begin(),new_lower.get());
    return result;
  }
};

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
  if(input.empty())
  {
    return input;
  }
  sorter<T> s;

  return s.do_sort(input);
}

與清單8.1相比,這里將實際工作放在sorter類模板的do_sort()成員函數(shù)中執(zhí)行①,即使例子中僅對thread_pool實例進行包裝②。

線程和任務管理,在線程等待的時候,就會少向線程池中提交一個任務③,并且執(zhí)行任務隊列上未完成的任務④。需要顯式的管理線程和棧上要排序的數(shù)據(jù)塊。當有任務提交到線程池中,可以使用std::bind()綁定this指針到do_sort()上,綁定是為了讓數(shù)據(jù)塊進行排序。這種情況下,需要對new_lower_chunk使用std::move()將其傳入函數(shù),數(shù)據(jù)移動要比拷貝的方式開銷少。

雖然,使用等待其他任務的方式,解決了死鎖問題,這個線程池距離理想的線程池很遠。

首先,每次對submit()的調用和對run_pending_task()的調用,訪問的都是同一個隊列。在第8章中,當多線程去修改一組數(shù)據(jù),就會對性能有所影響,所以需要解決這個問題。

9.1.4 避免隊列中的任務競爭

線程每次調用線程池的submit()函數(shù),都會推送一個任務到工作隊列中。就像工作線程為了執(zhí)行任務,從任務隊列中獲取任務一樣。這意味著隨著處理器的增加,在任務隊列上就會有很多的競爭,這會讓性能下降。使用無鎖隊列會讓任務沒有明顯的等待,但是乒乓緩存會消耗大量的時間。

為了避免乒乓緩存,每個線程建立獨立的任務隊列。這樣,每個線程就會將新任務放在自己的任務隊列上,并且當線程上的任務隊列沒有任務時,去全局的任務列表中取任務。下面列表中的實現(xiàn),使用了一個thread_local變量,來保證每個線程都擁有自己的任務列表(如全局列表那樣)。

清單9.6 線程池——線程具有本地任務隊列

class thread_pool
{
  thread_safe_queue<function_wrapper> pool_work_queue;

  typedef std::queue<function_wrapper> local_queue_type;  // 1
  static thread_local std::unique_ptr<local_queue_type>
    local_work_queue;  // 2

  void worker_thread()
  {
    local_work_queue.reset(new local_queue_type);  // 3
    while(!done)
    {
      run_pending_task();
    }
  }

public:
  template<typename FunctionType>
  std::future<typename std::result_of<FunctionType()>::type>
    submit(FunctionType f)
  {
    typedef typename std::result_of<FunctionType()>::type result_type;

    std::packaged_task<result_type()> task(f);
    std::future<result_type> res(task.get_future());
    if(local_work_queue)  // 4
    {
      local_work_queue->push(std::move(task));
    }
    else
    {
      pool_work_queue.push(std::move(task));  // 5
    }
    return res;
  }

  void run_pending_task()
  {
    function_wrapper task;
    if(local_work_queue && !local_work_queue->empty())  // 6
    {
      task=std::move(local_work_queue->front());
      local_work_queue->pop();
      task();
    }
    else if(pool_work_queue.try_pop(task))  // 7
    {
      task();
    }
    else
    {
      std::this_thread::yield();
    }
  }
// rest as before
};

因為不希望非線程池中的線程也擁有一個任務隊列,使用std::unique_ptr<>指向線程本地的工作隊列②;這個指針在worker_thread()中進行初始化③。std:unique_ptr<>的析構函數(shù)會保證在線程退出的時候,工作隊列被銷毀。

submit()會檢查當前線程是否具有一個工作隊列④。如果有,就是線程池中的線程,可以將任務放入線程的本地隊列中;否者,就像之前一樣將這個任務放在線程池中的全局隊列中⑤。

run_pending_task()⑥中的檢查和之前類似,只是要對是否存在本地任務隊列進行檢查。如果存在,就會從隊列中的第一個任務開始處理;注意本地任務隊列可以是一個普通的std::queue<>①,因為這個隊列只能被一個線程所訪問,就不存在競爭。如果本地線程上沒有任務,就會從全局工作列表上獲取任務⑦。

這樣就能有效避免競爭,不過當任務分配不均時,造成的結果就是:某個線程本地隊列中有很多任務的同時,其他線程無所事事。例如:舉一個快速排序的例子,只有一開始的數(shù)據(jù)塊能在線程池上被處理,因為剩余部分會放在工作線程的本地隊列上進行處理,這樣的使用方式也違背使用線程池的初衷。

幸好,這個問題是有解:本地工作隊列和全局工作隊列上沒有任務時,可從別的線程隊列中竊取任務。

9.1.5 竊取任務

為了讓沒有任務的線程能從其他線程的任務隊列中獲取任務,就需要本地任務列表可以進行訪問,這樣才能讓run_pending_tasks()竊取任務。需要每個線程在線程池隊列上進行注冊,或由線程池指定一個線程。同樣,還需要保證數(shù)據(jù)隊列中的任務適當?shù)谋煌胶捅Wo,這樣隊列的不變量就不會被破壞。

實現(xiàn)一個無鎖隊列,讓其擁有線程在其他線程竊取任務的時候,能夠推送和彈出一個任務是可能的;不過,這個隊列的實現(xiàn)就超出了本書的討論范圍。為了證明這種方法的可行性,將使用一個互斥量來保護隊列中的數(shù)據(jù)。我們希望任務竊取是一個不常見的現(xiàn)象,這樣就會減少對互斥量的競爭,并且使得簡單隊列的開銷最小。下面,實現(xiàn)了一個簡單的基于鎖的任務竊取隊列。

清單9.7 基于鎖的任務竊取隊列

class work_stealing_queue
{
private:
  typedef function_wrapper data_type;
  std::deque<data_type> the_queue;  // 1
  mutable std::mutex the_mutex;

public:
  work_stealing_queue()
  {}

  work_stealing_queue(const work_stealing_queue& other)=delete;
  work_stealing_queue& operator=(
    const work_stealing_queue& other)=delete;

  void push(data_type data)  // 2
  {
    std::lock_guard<std::mutex> lock(the_mutex);
    the_queue.push_front(std::move(data));
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lock(the_mutex);
    return the_queue.empty();
  }

  bool try_pop(data_type& res)  // 3
  {
    std::lock_guard<std::mutex> lock(the_mutex);
    if(the_queue.empty())
    {
      return false;
    }

    res=std::move(the_queue.front());
    the_queue.pop_front();
    return true;
  }

  bool try_steal(data_type& res)  // 4
  {
    std::lock_guard<std::mutex> lock(the_mutex);
    if(the_queue.empty())
    {
      return false;
    }

    res=std::move(the_queue.back());
    the_queue.pop_back();
    return true;
  }
};

這個隊列對std::deque<fuction_wrapper>進行了簡單的包裝①,就能通過一個互斥鎖來對所有訪問進行控制了。push()②和try_pop()③對隊列的前端進行操作,try_steal()④對隊列的后端進行操作。

這就說明每個線程中的“隊列”是一個后進先出的棧,最新推入的任務將會第一個執(zhí)行。從緩存角度來看,這將對性能有所提升,因為任務相關的數(shù)據(jù)一直存于緩存中,要比提前將任務相關數(shù)據(jù)推送到棧上好。同樣,這種方式很好的映射到某個算法上,例如:快速排序。之前的實現(xiàn)中,每次調用do_sort()都會推送一個任務到棧上,并且等待這個任務執(zhí)行完畢。通過對最新推入任務的處理,就可以保證在將當前所需數(shù)據(jù)塊處理完成前,其他任務是否需要這些數(shù)據(jù)塊,從而可以減少活動任務的數(shù)量和棧的使用次數(shù)。try_steal()從隊列末尾獲取任務,為了減少與try_pop()之間的競爭;使用在第6、7章中的所討論的技術來讓try_pop()和try_steal()并發(fā)執(zhí)行。

OK,現(xiàn)在擁有了一個很不錯的任務隊列,并且支持竊??;那這個隊列將如何在線程池中使用呢?這里簡單的展示一下。

清單9.8 使用任務竊取的線程池

class thread_pool
{
  typedef function_wrapper task_type;

  std::atomic_bool done;
  thread_safe_queue<task_type> pool_work_queue;
  std::vector<std::unique_ptr<work_stealing_queue> > queues;  // 1
  std::vector<std::thread> threads;
  join_threads joiner;

  static thread_local work_stealing_queue* local_work_queue;  // 2
  static thread_local unsigned my_index;

  void worker_thread(unsigned my_index_)
  {
    my_index=my_index_;
    local_work_queue=queues[my_index].get();  // 3
    while(!done)
    {
      run_pending_task();
    }
  }

  bool pop_task_from_local_queue(task_type& task)
  {
    return local_work_queue && local_work_queue->try_pop(task);
  }

  bool pop_task_from_pool_queue(task_type& task)
  {
    return pool_work_queue.try_pop(task);
  }

  bool pop_task_from_other_thread_queue(task_type& task)  // 4
  {
    for(unsigned i=0;i<queues.size();++i)
    {
      unsigned const index=(my_index+i+1)%queues.size();  // 5
      if(queues[index]->try_steal(task))
      {
        return true;
      }
    }
    return false;
  }

public:
  thread_pool():
    done(false),joiner(threads)
  {
    unsigned const thread_count=std::thread::hardware_concurrency();

    try
    {
      for(unsigned i=0;i<thread_count;++i)
      {
        queues.push_back(std::unique_ptr<work_stealing_queue>(  // 6
                         new work_stealing_queue));
        threads.push_back(
          std::thread(&thread_pool::worker_thread,this,i));
      }
    }
    catch(...)
    {
      done=true;
      throw;
    }
  }

  ~thread_pool()
  {
    done=true;
  }

  template<typename FunctionType>
  std::future<typename std::result_of<FunctionType()>::type> submit(
    FunctionType f)
  { 
    typedef typename std::result_of<FunctionType()>::type result_type;
    std::packaged_task<result_type()> task(f);
    std::future<result_type> res(task.get_future());
    if(local_work_queue)
    {
      local_work_queue->push(std::move(task));
    }
    else
    {
      pool_work_queue.push(std::move(task));
    }
    return res;
  }

  void run_pending_task()
  {
    task_type task;
    if(pop_task_from_local_queue(task) ||  // 7
       pop_task_from_pool_queue(task) ||  // 8
       pop_task_from_other_thread_queue(task))  // 9
    {
      task();
    }
    else
    {
      std::this_thread::yield();
    }
  }
};

這段代碼與清單9.6很相似。第一個不同在于,每個線程都有一個work_stealing_queue,而非只是普通的std::queue<>②。當每個線程被創(chuàng)建,就創(chuàng)建了一個屬于自己的工作隊列⑥,每個線程自己的工作隊列將存儲在線程池的全局工作隊列中①。列表中隊列的序號,會傳遞給線程函數(shù),然后使用序號來索引對應隊列③。這就意味著線程池可以訪問任意線程中的隊列,為了給閑置線程竊取任務。run_pending_task()將會從線程的任務隊列中取出一個任務來執(zhí)行⑦,或從線程池隊列中獲取一個任務⑧,亦或從其他線程的隊列中獲取一個任務⑨。

pop_task_from_other_thread_queue()④會遍歷池中所有線程的任務隊列,然后嘗試竊取任務。為了避免每個線程都嘗試從列表中的第一個線程上竊取任務,每一個線程都會從下一個線程開始遍歷,通過自身的線程序號來確定開始遍歷的線程序號。

使用線程池有很多好處,還有很多很多的方式能為某些特殊用法提升性能,不過這就留給讀者作為作業(yè)吧。

特別是還沒有探究動態(tài)變換大小的線程池,即使線程被阻塞的時候(例如:I/O或互斥鎖),程序都能保證CPU最優(yōu)的使用率。

下面,我們將來了解一下線程管理的“高級”用法——中斷線程。