鍍金池/ 教程/ C/ 6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
3.4 本章總結(jié)
6.3 基于鎖設(shè)計(jì)更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)
6.1 為并發(fā)設(shè)計(jì)的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動(dòng)推導(dǎo)變量類型
2.1 線程管理的基礎(chǔ)
8.5 在實(shí)踐中設(shè)計(jì)并發(fā)代碼
2.4 運(yùn)行時(shí)決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權(quán)
8.3 為多線程性能設(shè)計(jì)數(shù)據(jù)結(jié)構(gòu)
6.4 本章總結(jié)
7.3 對于設(shè)計(jì)無鎖數(shù)據(jù)結(jié)構(gòu)的指導(dǎo)建議
關(guān)于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測試和調(diào)試
5.4 本章總結(jié)
第9章 高級線程管理
5.1 內(nèi)存模型基礎(chǔ)
2.5 識(shí)別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時(shí)間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯(cuò)誤的技術(shù)
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強(qiáng)制排序
A.8 線程本地變量
第8章 并發(fā)代碼設(shè)計(jì)
3.3 保護(hù)共享數(shù)據(jù)的替代設(shè)施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個(gè)事件或其他條件
A.3 默認(rèn)函數(shù)
附錄A 對<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術(shù)
4.2 使用期望等待一次性事件
8.4 設(shè)計(jì)并發(fā)代碼的注意事項(xiàng)
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關(guān)的錯(cuò)誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護(hù)共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達(dá)式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)

基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì),需要確保訪問線程持有鎖的時(shí)間最短。對于只有一個(gè)互斥量的數(shù)據(jù)結(jié)構(gòu)來說,這十分困難。需要保證數(shù)據(jù)不被鎖之外的操作所訪問到,并且還要保證不會(huì)在固有結(jié)構(gòu)上產(chǎn)生條件競爭(如第3章所述)。當(dāng)你使用多個(gè)互斥量來保護(hù)數(shù)據(jù)結(jié)構(gòu)中不同的區(qū)域時(shí),問題會(huì)暴露的更加明顯,當(dāng)操作需要獲取多個(gè)互斥鎖時(shí),就有可能產(chǎn)生死鎖。所以,在設(shè)計(jì)時(shí),使用多個(gè)互斥量時(shí)需要格外小心。

在本節(jié)中,你將使用6.1.1節(jié)中的指導(dǎo)建議,來設(shè)計(jì)一些簡單的數(shù)據(jù)結(jié)構(gòu)——使用互斥量和鎖的方式來保護(hù)數(shù)據(jù)。每一個(gè)例子中,都是在保證數(shù)據(jù)結(jié)構(gòu)是線程安全的前提下,對數(shù)據(jù)結(jié)構(gòu)并發(fā)訪問的概率(機(jī)會(huì))進(jìn)行提高。

我們先來看看在第3章中的實(shí)現(xiàn),這個(gè)實(shí)現(xiàn)就是一個(gè)十分簡單的數(shù)據(jù)結(jié)構(gòu),它只使用了一個(gè)互斥量。但是,這個(gè)結(jié)構(gòu)是線程安全的嗎?它離真正的并發(fā)訪問又有多遠(yuǎn)呢?

6.2.1 線程安全棧——使用鎖

我們先把第3章中線程安全的棧拿過來看看:(這里試圖實(shí)現(xiàn)一個(gè)線程安全版的std:stack<>)

清單6.1 線程安全棧的類定義

#include <exception>

struct empty_stack: std::exception
{
  const char* what() const throw();
};

template<typename T>
class threadsafe_stack
{
private:
  std::stack<T> data;
  mutable std::mutex m;
public:
  threadsafe_stack(){}
  threadsafe_stack(const threadsafe_stack& other)
  {
    std::lock_guard<std::mutex> lock(other.m);
    data=other.data;
  }

  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(std::move(new_value));  // 1
  }
  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();  // 2
    std::shared_ptr<T> const res(
      std::make_shared<T>(std::move(data.top())));  // 3
    data.pop();  // 4
    return res;
  }
  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();
    value=std::move(data.top());  // 5
    data.pop();  // 6
  }
  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
};

來看看指導(dǎo)意見是如何應(yīng)用的。

首先,互斥量m能保證基本的線程安全,那就是對每個(gè)成員函數(shù)進(jìn)行加鎖保護(hù)。這就保證在同一時(shí)間內(nèi),只有一個(gè)線程可以訪問到數(shù)據(jù),所以能夠保證,數(shù)據(jù)結(jié)構(gòu)的“不變量”被破壞時(shí),不會(huì)被其他線程看到。

其次,在empty()和pop()成員函數(shù)之間會(huì)存在潛在的競爭,不過代碼會(huì)在pop()函數(shù)上鎖時(shí),顯式的查詢棧是否為空,所以這里的競爭是非惡性的。pop()通過對彈出值的直接返回,就可避免std::stack<>中top()和pop()兩成員函數(shù)之間的潛在競爭。

再次,這個(gè)類中也有一些異常源。對互斥量上鎖可能會(huì)拋出異常,因?yàn)樯湘i操作是每個(gè)成員函數(shù)所做的第一個(gè)操作,所以這是極其罕見的(因?yàn)檫@意味這問題不在鎖上,就是在系統(tǒng)資源上)。因無數(shù)據(jù)修改,所以其是安全的。因解鎖一個(gè)互斥量是不會(huì)失敗的,所以段代碼很安全,并且使用std::lock_guard<>也能保證互斥量上鎖的狀態(tài)。

對data.push()①的調(diào)用可能會(huì)拋出一個(gè)異常,不是拷貝/移動(dòng)數(shù)據(jù)值時(shí),就是內(nèi)存不足的時(shí)候。不管是哪種,std::stack<>都能保證其實(shí)安全的,所以這里也沒有問題。

在第一個(gè)重載pop()中,代碼可能會(huì)拋出一個(gè)empty_stack的異常②,不過數(shù)據(jù)沒有被修改,所以其是安全的。對于res的創(chuàng)建③,也可能會(huì)拋出一個(gè)異常,這有兩方面的原因:對std::make_shared的調(diào)用,可能無法分配出足夠的內(nèi)存去創(chuàng)建新的對象,并且內(nèi)部數(shù)據(jù)需要對新對象進(jìn)行引用;或者,在拷貝或移動(dòng)構(gòu)造到新分配的內(nèi)存中返回時(shí)拋出異常。兩種情況下,c++運(yùn)行庫和標(biāo)準(zhǔn)庫能確保這里不會(huì)出現(xiàn)內(nèi)存泄露,并且新創(chuàng)建的對象(如果有的話)都能被正確銷毀。因?yàn)闆]有對棧進(jìn)行任何修改,所以這里也不會(huì)有問題。當(dāng)調(diào)用data.pop()④時(shí),其能確保不拋出異常,并且返回結(jié)果,所以這個(gè)重載pop()函數(shù)“異常-安全”。

第二個(gè)重載pop()類似,除了在拷貝賦值或移動(dòng)賦值的時(shí)候會(huì)拋出異常⑤,當(dāng)構(gòu)造一個(gè)新對象和一個(gè)std::shared_ptr實(shí)例時(shí)都不會(huì)拋出異常。同樣,在調(diào)用data.pop()⑥(這個(gè)成員函數(shù)保證不會(huì)拋出異常)之前,依舊沒有對數(shù)據(jù)結(jié)構(gòu)進(jìn)行修改,所以這個(gè)函數(shù)也為“異常-安全”。

最后,empty()也不會(huì)修改任何數(shù)據(jù),所以也是“異常-安全”函數(shù)。

當(dāng)調(diào)用持有一個(gè)鎖的用戶代碼時(shí),這里有兩個(gè)地方可能會(huì)產(chǎn)生死鎖:進(jìn)行拷貝構(gòu)造或移動(dòng)構(gòu)造(①,③)和在對數(shù)據(jù)項(xiàng)進(jìn)行拷貝賦值或移動(dòng)賦值操作⑤的時(shí)候;還有一個(gè)潛在死鎖的地方在于用戶定義的操作符new。當(dāng)這些函數(shù),無論是以直接調(diào)用棧的成員函數(shù)的方式,還是在成員函數(shù)進(jìn)行操作時(shí),對已經(jīng)插入或刪除的數(shù)據(jù)進(jìn)行操作的方式,對鎖進(jìn)行獲取,都可能造成死鎖。不過,用戶要對棧負(fù)責(zé),當(dāng)棧未對一個(gè)數(shù)據(jù)進(jìn)行拷貝或分配時(shí),用戶就不能想當(dāng)然的將其添加到棧中。

所有成員函數(shù)都使用st::lock_guard<>來保護(hù)數(shù)據(jù),所以棧的成員函數(shù)能有“線程安全”的表現(xiàn)。當(dāng)然,構(gòu)造與析構(gòu)函數(shù)不是“線程安全”的,不過這也不成問題,因?yàn)閷?shí)例的構(gòu)造與析構(gòu)只能有一次。調(diào)用一個(gè)不完全構(gòu)造對象或是已銷毀對象的成員函數(shù),無論在那種編程方式下,都不可取。所以,用戶就要保證在棧對象完成構(gòu)建前,其他線程無法對其進(jìn)行訪問;并且,一定要保證在棧對象銷毀后,所有線程都要停止對其進(jìn)行訪問。

即使在多線程情況下,并發(fā)的調(diào)用成員函數(shù)是安全的(因?yàn)槭褂面i),也要保證在單線程的情況下,數(shù)據(jù)結(jié)構(gòu)做出正確反應(yīng)。序列化線程會(huì)隱性的限制程序性能,這就是棧爭議聲最大的地方:當(dāng)一個(gè)線程在等待鎖時(shí),它就會(huì)無所事事。同樣的,對于棧來說,等待添加元素也是沒有意義的,所以當(dāng)一個(gè)線程需要等待時(shí),其會(huì)定期檢查empty()或pop(),以及對empty_stack異常進(jìn)行關(guān)注。這樣的現(xiàn)實(shí)會(huì)限制棧的實(shí)現(xiàn)的方式,在線程等待的時(shí)候,會(huì)浪費(fèi)寶貴的資源去檢查數(shù)據(jù),或是要求用戶寫寫外部等待和提示代碼(例如,使用條件變量),這就使內(nèi)部鎖失去存在的意義——這就意味著資源的浪費(fèi)。第4章中的隊(duì)列,就是一種使用條件內(nèi)部變量進(jìn)行等待的數(shù)據(jù)結(jié)構(gòu),接下來我們就來了解一下。

6.2.2 線程安全隊(duì)列——使用鎖和條件變量

第4章中的線程安全隊(duì)列,在清單6.2中重現(xiàn)一下。和使用仿std::stack<>建立的棧很像,這里隊(duì)列的建立也是參照了std::queue<>。不過,與標(biāo)準(zhǔn)容器的接口不同,我們要設(shè)計(jì)的是能在多線程下安全并發(fā)訪問的數(shù)據(jù)結(jié)構(gòu)。

清單6.2 使用條件變量實(shí)現(xiàn)的線程安全隊(duì)列

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;
  std::queue<T> data_queue;
  std::condition_variable data_cond;

public:
  threadsafe_queue()
  {}

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(std::move(data));
    data_cond.notify_one();  // 1
  }

  void wait_and_pop(T& value)  // 2
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=std::move(data_queue.front());
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()  // 3
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});  // 4
    std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=std::move(data_queue.front());
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();  // 5
    std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    return res;
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

除了在push()①中調(diào)用data_cond.notify_one(),以及wait_and_pop()②③,6.2中對隊(duì)列的實(shí)現(xiàn)與6.1中對棧的實(shí)現(xiàn)十分相近。兩個(gè)重載try_pop()除了在隊(duì)列為空時(shí)拋出異常,其他的與6.1中pop()函數(shù)完全一樣。不同的是,在6.1中對值的檢索會(huì)返回一個(gè)bool值,而在6.2中,當(dāng)指針指向空值的時(shí)候會(huì)返回NULL指針⑤,這同樣也是實(shí)現(xiàn)棧的一個(gè)有效途徑。所以,即使排除掉wait_and_pop()函數(shù),之前對棧的分析依舊適用于這里。

wiat_and_pop()函數(shù)是等待隊(duì)列向棧進(jìn)行輸入的一個(gè)解決方案;比起持續(xù)調(diào)用empty(),等待線程調(diào)用wait_and_pop()函數(shù)和數(shù)據(jù)結(jié)構(gòu)處理等待中的條件變量的方式要好很多。對于data_cond.wait()的調(diào)用,直到隊(duì)列中有一個(gè)元素的時(shí)候,才會(huì)返回,所以你就不用擔(dān)心會(huì)出現(xiàn)一個(gè)空隊(duì)列的情況了,還有,數(shù)據(jù)會(huì)一直被互斥鎖保護(hù)。因?yàn)椴蛔兞窟@里并未發(fā)生變化,所以函數(shù)不會(huì)添加新的條件競爭或是死鎖的可能。

異常安全在這里的會(huì)有一些變化,當(dāng)不止一個(gè)線程等待對隊(duì)列進(jìn)行推送操作是,只會(huì)有一個(gè)線程,因得到data_cond.notify_one(),而繼續(xù)工作著。但是,如果這個(gè)工作線程在wait_and_pop()中拋出一個(gè)異常,例如:構(gòu)造新的std::shared_ptr<>對象④時(shí)拋出異常,那么其他線程則會(huì)永世長眠。當(dāng)這種情況是不可接受時(shí),這里的調(diào)用就需要改成data_cond.notify_all(),這個(gè)函數(shù)將喚醒所有的工作線程,不過,當(dāng)大多線程發(fā)現(xiàn)隊(duì)列依舊是空時(shí),又會(huì)耗費(fèi)很多資源讓線程重新進(jìn)入睡眠狀態(tài)。第二種替代方案是,當(dāng)有異常拋出的時(shí)候,讓wait_and_pop()函數(shù)調(diào)用notify_one(),從而讓個(gè)另一個(gè)線程可以去嘗試索引存儲(chǔ)的值。第三種替代方案就是,將std::shared_ptr<>的初始化過程移到push()中,并且存儲(chǔ)std::shared_ptr<>實(shí)例,而非直接使用數(shù)據(jù)的值。將std::shared_ptr<>拷貝到內(nèi)部std::queue<>中,就不會(huì)拋出異常了,這樣wait_and_pop()又是安全的了。下面的程序清單,就是根據(jù)第三種方案進(jìn)行修改的。

清單6.3 持有std::shared_ptr<>實(shí)例的線程安全隊(duì)列

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;
  std::queue<std::shared_ptr<T> > data_queue;
  std::condition_variable data_cond;
public:
  threadsafe_queue()
  {}

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=std::move(*data_queue.front());  // 1
    data_queue.pop();
  }

  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=std::move(*data_queue.front());  // 2
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    std::shared_ptr<T> res=data_queue.front();  // 3
    data_queue.pop();
    return res;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res=data_queue.front();  // 4
    data_queue.pop();
    return res;
  }

  void push(T new_value)
  {
    std::shared_ptr<T> data(
    std::make_shared<T>(std::move(new_value)));  // 5
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(data);
    data_cond.notify_one();
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

為讓std::shared_ptr<>持有數(shù)據(jù)的結(jié)果顯而易見:彈出函數(shù)會(huì)持有一個(gè)變量的引用,為了接收這個(gè)新值,必須對存儲(chǔ)的指針進(jìn)行解引用①,②;并且,在返回到調(diào)用函數(shù)前,彈出函數(shù)都會(huì)返回一個(gè)std::shared_ptr<>實(shí)例,這里實(shí)例可以在隊(duì)列中做檢索③,④。

std::shared_ptr<>持有數(shù)據(jù)的好處:新的實(shí)例分配結(jié)束時(shí),不會(huì)被鎖在push()⑤當(dāng)中(而在清單6.2中,只能在pop()持有鎖時(shí)完成)。因?yàn)閮?nèi)存分配操作的需要在性能上付出很高的代價(jià)(性能較低),所以使用std::shared_ptr<>的方式對隊(duì)列的性能有很大的提升,其減少了互斥量持有的時(shí)間,允許其他線程在分配內(nèi)存的同時(shí),對隊(duì)列進(jìn)行其他的操作。

如同棧的例子,使用互斥量保護(hù)整個(gè)數(shù)據(jù)結(jié)構(gòu),不過會(huì)限制隊(duì)列對并發(fā)的支持;雖然,多線程可能被隊(duì)列中的各種成員函數(shù)所阻塞,但是仍有一個(gè)線程能在任意時(shí)間內(nèi)進(jìn)行工作。不過,這種限制的部分來源是因?yàn)樵趯?shí)現(xiàn)中使用了std::queue<>;因?yàn)槭褂脴?biāo)準(zhǔn)容器的原因,數(shù)據(jù)處于保護(hù)中。要對數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)進(jìn)行具體的控制,需要提供更多細(xì)粒度鎖,來完成更高級的并發(fā)。

6.2.3 線程安全隊(duì)列——使用細(xì)粒度鎖和條件變量

在清單6.2和6.3中,使用一個(gè)互斥量對一個(gè)數(shù)據(jù)隊(duì)列(data_queue)進(jìn)行保護(hù)。為了使用細(xì)粒度鎖,需要看一下隊(duì)列內(nèi)部的組成結(jié)構(gòu),并且將一個(gè)互斥量與每個(gè)數(shù)據(jù)相關(guān)聯(lián)。

對于隊(duì)列來說,最簡單的數(shù)據(jù)結(jié)構(gòu)就是單鏈表了,就如圖6.1那樣。隊(duì)列里包含一個(gè)頭指針,其指向鏈表中的第一個(gè)元素,并且每一個(gè)元素都會(huì)指向下一個(gè)元素。從隊(duì)列中刪除數(shù)據(jù),其實(shí)就是將頭指針指向下一個(gè)元素,并將之前頭指針指向的值進(jìn)行返回。

向隊(duì)列中添加元素是要從結(jié)尾進(jìn)行的。為了做到這點(diǎn),隊(duì)列里還有一個(gè)尾指針,其指向鏈表中的最后一個(gè)元素。新節(jié)點(diǎn)的加入將會(huì)改變尾指針的next指針,之前最后一個(gè)元素將會(huì)指向新添加進(jìn)來的元素,新添加進(jìn)來的元素的next將會(huì)使新的尾指針。當(dāng)鏈表為空時(shí),頭/尾指針皆為NULL。

http://wiki.jikexueyuan.com/project/cplusplus-concurrency-action/images/chapter6/6-1.png" alt="" />

圖6.1 用單鏈表表示的隊(duì)列

下面的清單中的代碼,是一個(gè)簡單隊(duì)列的實(shí)現(xiàn),基于清單6.2代碼的精簡版本;因?yàn)檫@個(gè)隊(duì)列僅供單線程使用,所以這實(shí)現(xiàn)中只有一個(gè)try_pop()函數(shù);并且,沒有wait_and_pop()函數(shù)。

清單6.4 隊(duì)列實(shí)現(xiàn)——單線程版

template<typename T>
class queue
{
private:
  struct node
  {
    T data;
    std::unique_ptr<node> next;

    node(T data_):
    data(std::move(data_))
    {}
  };

  std::unique_ptr<node> head;  // 1
  node* tail;  // 2

public:
  queue()
  {}
  queue(const queue& other)=delete;
  queue& operator=(const queue& other)=delete;
  std::shared_ptr<T> try_pop()
  {
    if(!head)
    {
      return std::shared_ptr<T>();
    }
    std::shared_ptr<T> const res(
      std::make_shared<T>(std::move(head->data)));
    std::unique_ptr<node> const old_head=std::move(head);
    head=std::move(old_head->next);  // 3
    return res;
  }

  void push(T new_value)
  {
    std::unique_ptr<node> p(new node(std::move(new_value)));
    node* const new_tail=p.get();
    if(tail)
    {
      tail->next=std::move(p);  // 4
    }
    else
    {
      head=std::move(p);  // 5
    }
    tail=new_tail;  // 6
  }
};

首先,注意在清單呢6.4中使用了std::unique_ptr<node>來管理節(jié)點(diǎn),因?yàn)槠淠鼙WC節(jié)點(diǎn)(其引用數(shù)據(jù)的值)在刪除時(shí)候,不需要使用delete操作顯式刪除。這樣的關(guān)系鏈表,管理著從頭結(jié)點(diǎn)到尾節(jié)點(diǎn)的每一個(gè)原始指針。

雖然,這種實(shí)現(xiàn)對于單線程來說沒什么問題,但是,當(dāng)你在多線程情況下,嘗試使用細(xì)粒度鎖時(shí),就會(huì)出現(xiàn)問題。因?yàn)樵诮o定的實(shí)現(xiàn)中有兩個(gè)數(shù)據(jù)項(xiàng)(head①和tail②);即使,使用兩個(gè)互斥量,來保護(hù)頭指針和尾指針,也會(huì)出現(xiàn)問題。

顯而易見的問題就是push()可以同時(shí)修改頭指針⑤和尾指針⑥,所以push()函數(shù)會(huì)同時(shí)獲取兩個(gè)互斥量。雖然會(huì)將兩個(gè)互斥量都上鎖,但這還不是太糟糕的問題。糟糕的問題是push()和pop()都能訪問next指針指向的節(jié)點(diǎn):push()可更新tail->next④,而后try_pop()讀取read->next③。當(dāng)隊(duì)列中只有一個(gè)元素時(shí),head==tail,所以head->next和tail->next是同一個(gè)對象,并且這個(gè)對象需要保護(hù)。不過,“在同一個(gè)對象在未被head和tail同時(shí)訪問時(shí),push()和try_pop()鎖住的是同一個(gè)鎖”,就不對了。所以,你就沒有比之間實(shí)現(xiàn)更好的選擇了。這里會(huì)“柳暗花明又一村”嗎?

通過分離數(shù)據(jù)實(shí)現(xiàn)并發(fā)

你可以使用“預(yù)分配一個(gè)虛擬節(jié)點(diǎn)(無數(shù)據(jù)),確保這個(gè)節(jié)點(diǎn)永遠(yuǎn)在隊(duì)列的最后,用來分離頭尾指針能訪問的節(jié)點(diǎn)”的辦法,走出這個(gè)困境。對于一個(gè)空隊(duì)列來說,head和tail都屬于虛擬指針,而非空指針。這個(gè)辦法挺好,因?yàn)楫?dāng)隊(duì)列為空時(shí),try_pop()不能訪問head->next了。當(dāng)添加一個(gè)節(jié)點(diǎn)入隊(duì)列時(shí)(這時(shí)有真實(shí)節(jié)點(diǎn)了),head和tail現(xiàn)在指向不同的節(jié)點(diǎn),所以就不會(huì)在head->next和tail->next上產(chǎn)生競爭。這里的缺點(diǎn)是,你必須額外添加一個(gè)間接層次的指針數(shù)據(jù),來做虛擬節(jié)點(diǎn)。下面的代碼描述了這個(gè)方案如何實(shí)現(xiàn)。

清單6.5 帶有虛擬節(jié)點(diǎn)的隊(duì)列

template<typename T>
class queue
{
private:
  struct node
  {
    std::shared_ptr<T> data;  // 1
    std::unique_ptr<node> next;
  };

  std::unique_ptr<node> head;
  node* tail;

public:
  queue():
    head(new node),tail(head.get())  // 2
  {}
  queue(const queue& other)=delete;
  queue& operator=(const queue& other)=delete;

  std::shared_ptr<T> try_pop()
  {
    if(head.get()==tail)  // 3
    {
      return std::shared_ptr<T>();
    }
    std::shared_ptr<T> const res(head->data);  // 4
    std::unique_ptr<node> old_head=std::move(head);
    head=std::move(old_head->next);  // 5
    return res;  // 6
  }

  void push(T new_value)
  {
    std::shared_ptr<T> new_data(
      std::make_shared<T>(std::move(new_value)));  // 7
    std::unique_ptr<node> p(new node);  //8
    tail->data=new_data;  // 9
    node* const new_tail=p.get();
    tail->next=std::move(p);
    tail=new_tail;
  }
};

try_pop()不需要太多的修改。首先,你可以拿head和tail③進(jìn)行比較,這就要比檢查指針是否為空的好,因?yàn)樘摂M節(jié)點(diǎn)意味著head不可能是空指針。head是一個(gè)std::unique_ptr<node>對象,你需要使用head.get()來做比較。其次,因?yàn)閚ode現(xiàn)在存在數(shù)據(jù)指針中①,你就可以對指針進(jìn)行直接檢索④,而非構(gòu)造一個(gè)T類型的新實(shí)例。push()函數(shù)改動(dòng)最大:首先,你必須在堆上創(chuàng)建一個(gè)T類型的實(shí)例,并且讓其與一個(gè)std::shared_ptr<>對象相關(guān)聯(lián)⑦(節(jié)點(diǎn)使用std::make_shared就是為了避免內(nèi)存二次分配,避免增加引用次數(shù))。創(chuàng)建的新節(jié)點(diǎn)就成為了虛擬節(jié)點(diǎn),所以你不需要為new_value提供構(gòu)造函數(shù)⑧。反而這里你需要將new_value的副本賦給之前的虛擬節(jié)點(diǎn)⑨。最終,為了讓虛擬節(jié)點(diǎn)存在在隊(duì)列中,你不得不使用構(gòu)造函數(shù)來創(chuàng)建它②。

那么現(xiàn)在,我確信你會(huì)對如何對如何修改隊(duì)列,讓其變成一個(gè)線程安全的隊(duì)列感到驚訝。好吧,現(xiàn)在的push()只能訪問tail,而不能訪問head,這就是一個(gè)進(jìn)步try_pop()可以訪問head和tail,但是tail只需在最初進(jìn)行比較,所以所存在的時(shí)間很短。重大的提升在于,虛擬節(jié)點(diǎn)意味著try_pop()和push()不能對同一節(jié)點(diǎn)進(jìn)行操作,所以這里已經(jīng)不再需要互斥了。那么,你只需要使用一個(gè)互斥量來保護(hù)head和tail就夠了。那么,現(xiàn)在應(yīng)該鎖哪里?

我們的目的是為了最大程度的并發(fā)化,所以你需要上鎖的時(shí)間,要盡可能的小。push()很簡單:互斥量需要對tail的訪問進(jìn)行上鎖,這就意味著你需要對每一個(gè)新分配的節(jié)點(diǎn)進(jìn)行上鎖⑧,還有在你對當(dāng)前尾節(jié)點(diǎn)進(jìn)行賦值的時(shí)候⑨也需要上鎖。鎖需要持續(xù)到函數(shù)結(jié)束時(shí)才能解開。

try_pop()就不簡單了。首先,你需要使用互斥量鎖住head,一直到head彈出。實(shí)際上,互斥量決定了哪一個(gè)線程來進(jìn)行彈出操作。一旦head被改變⑤,你才能解鎖互斥量;當(dāng)在返回結(jié)果時(shí),互斥量就不需要進(jìn)行上鎖了⑥。這使得訪問tail需要一個(gè)尾互斥量。因?yàn)?,你需要只需要訪問tail一次,且只有在訪問時(shí)才需要互斥量。這個(gè)操作最好是通過函數(shù)進(jìn)行包裝。事實(shí)上,因?yàn)榇a只有在成員需要head時(shí),互斥量才上鎖,這項(xiàng)也需要包含在包裝函數(shù)中。最終代碼如下所示。

清單6.6 線程安全隊(duì)列——細(xì)粒度鎖版

template<typename T>
class threadsafe_queue
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    std::unique_ptr<node> next;
  };
  std::mutex head_mutex;
  std::unique_ptr<node> head;
  std::mutex tail_mutex;
  node* tail;

  node* get_tail()
  {
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    return tail;
  }

  std::unique_ptr<node> pop_head()
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if(head.get()==get_tail())
    {
      return nullptr;
    }
    std::unique_ptr<node> old_head=std::move(head);
    head=std::move(old_head->next);
    return old_head;
  }
public:
  threadsafe_queue():
  head(new node),tail(head.get())
  {}
  threadsafe_queue(const threadsafe_queue& other)=delete;
  threadsafe_queue& operator=(const threadsafe_queue& other)=delete;

  std::shared_ptr<T> try_pop()
  {
     std::unique_ptr<node> old_head=pop_head();
     return old_head?old_head->data:std::shared_ptr<T>();
  }

  void push(T new_value)
  {
    std::shared_ptr<T> new_data(
      std::make_shared<T>(std::move(new_value)));
    std::unique_ptr<node> p(new node);
    node* const new_tail=p.get();
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    tail->data=new_data;
    tail->next=std::move(p);
    tail=new_tail;
  }
};

讓我們用挑剔的目光來看一下上面的代碼,并考慮6.1.1節(jié)中給出的指導(dǎo)意見。在你觀察不變量前,你需要確定的狀態(tài)有:

  • tail->next == nullptr

  • tail->data == nullptr

  • head == taill(意味著空列表)

  • 單元素列表 head->next = tail

  • 在列表中的每一個(gè)節(jié)點(diǎn)x,x!=tail且x->data指向一個(gè)T類型的實(shí)例,并且x->next指向列表中下一個(gè)節(jié)點(diǎn)。x->next == tail意味著x就是列表中最后一個(gè)節(jié)點(diǎn)

  • 順著head的next節(jié)點(diǎn)找下去,最終會(huì)找到tail

這里的push()很簡單:僅修改了被tail_mutex的數(shù)據(jù),因?yàn)樾碌奈补?jié)點(diǎn)是一個(gè)空節(jié)點(diǎn),并且其data和next都為舊的尾節(jié)點(diǎn)(實(shí)際上的尾節(jié)點(diǎn))設(shè)置好,所以其能維持不變量的狀態(tài)。

有趣的部分在于try_pop()上。事實(shí)證明,不僅需要對tail_mutex上鎖,來保護(hù)對tail的讀?。贿€要保證在從頭讀取數(shù)據(jù)時(shí),不會(huì)產(chǎn)生數(shù)據(jù)競爭。如果沒有這些互斥量,當(dāng)一個(gè)線程調(diào)用try_pop()的同時(shí),另一個(gè)線程調(diào)用push(),那么這里操作順序?qū)⒉豢深A(yù)測。盡管,每一個(gè)成員函數(shù)都持有一個(gè)互斥量,這些互斥量能保護(hù)數(shù)據(jù)不會(huì)同時(shí)被多個(gè)線程訪問到;并且,隊(duì)列中的所有數(shù)據(jù)來源,都是通過調(diào)用push()得到的。因?yàn)榫€程可能會(huì)無序的方位同一數(shù)據(jù),所以這里就會(huì)有數(shù)據(jù)競爭(正如你在第5章看到的那樣),以及未定義行為。幸運(yùn)的是,在get_tail()中的tail_mutex解決了所有的問題。因?yàn)檎{(diào)用get_tail()將會(huì)鎖住同名鎖,就像push()一樣,這就為兩個(gè)操作規(guī)定好了順序。要不就是get_tail()在push()之前被調(diào)用,這種情況下,線程可以看到舊的尾節(jié)點(diǎn),要不就是在push()之后完成,這種情況下,線程就能看到tail的新值,以及新數(shù)據(jù)前的真正tail的值。

當(dāng)get_tail()調(diào)用前,head_mutex已經(jīng)上鎖,這一步也是很重要的哦。如果不這樣,調(diào)用pop_head()時(shí)就會(huì)被get_tail()和head_mutex所卡住,因?yàn)槠渌€程調(diào)用try_pop()(以及pop_head())時(shí),都需要先獲取鎖,然后阻止從下面的過程中初始化線程:

std::unique_ptr<node> pop_head() // 這是個(gè)有缺陷的實(shí)現(xiàn)
{
  node* const old_tail=get_tail();  // ① 在head_mutex范圍外獲取舊尾節(jié)點(diǎn)的值
  std::lock_guard<std::mutex> head_lock(head_mutex);

  if(head.get()==old_tail)  // ②
  {
    return nullptr;
  }
  std::unique_ptr<node> old_head=std::move(head);
  head=std::move(old_head->next);  // ③
  return old_head;
}

這是一個(gè)有缺陷的實(shí)現(xiàn),調(diào)用get_tail()是在鎖的范圍之外,你可能也許會(huì)發(fā)現(xiàn)head和tail,在你初始化線程,并獲取head_mutex時(shí),發(fā)生了改變。并且,不只是返回尾節(jié)點(diǎn)時(shí),返回的不是尾節(jié)點(diǎn)了,其值甚至都不列表中的值了。即使head是最后一個(gè)節(jié)點(diǎn),這也意味著head和old_tail②比較失敗。因此,當(dāng)你更新head③時(shí),可能會(huì)將head移到tail之后,這樣的話就意味著數(shù)據(jù)結(jié)構(gòu)遭到了破壞。在正確實(shí)現(xiàn)中(清單6.6),需要保證在head_mutex保護(hù)的范圍內(nèi)調(diào)用get_tail()。這就能保證沒有其他線程能對head進(jìn)行修改,并且tail會(huì)向正確的方向移動(dòng)(當(dāng)有新節(jié)點(diǎn)添加進(jìn)來時(shí)),這樣就很安全了。head不會(huì)傳遞給get_tail()的返回值,所以不變量的狀態(tài)時(shí)穩(wěn)定的。

當(dāng)使用pop_head()更新head時(shí)(從隊(duì)列中刪除節(jié)點(diǎn)),互斥量就已經(jīng)上鎖了,并且try_pop()可以提取數(shù)據(jù),并在確實(shí)有個(gè)數(shù)據(jù)的時(shí)候刪除一個(gè)節(jié)點(diǎn)(若沒有數(shù)據(jù),則返回std::shared_ptr<>的空實(shí)例),因?yàn)橹挥幸粋€(gè)線程可以訪問這個(gè)節(jié)點(diǎn),所以根據(jù)我們所掌握的知識(shí),認(rèn)為這個(gè)操作是安全的。

接下來,外部接口就相當(dāng)于清單6.2代碼中的子集了,所以同樣的分析結(jié)果:對于固有接口來說,不存在條件競爭。

異常是很有趣的東西。雖然,你已經(jīng)改變了數(shù)據(jù)的分配模式,但是異??赡軓膭e的地方襲來。try_pop()中的對鎖的操作會(huì)產(chǎn)生異常,并直到鎖獲取才能對數(shù)據(jù)進(jìn)行修改。因此,try_pop()是異常安全的。另一方面,push()可以在堆上新分配出一個(gè)T的實(shí)例,以及一個(gè)node的新實(shí)例,這里可能會(huì)拋出異常。但是,所有分配的對象都賦給了智能指針,那么當(dāng)異常發(fā)生時(shí),他們就會(huì)被釋放掉。一旦鎖被獲取,push()中的操作就不會(huì)拋出異常,所以push()也是異常安全的。

因?yàn)闆]有修改任何接口,所以不會(huì)死鎖。在實(shí)現(xiàn)內(nèi)部也不會(huì)有死鎖;唯一需要獲取兩個(gè)鎖的是pop_head(),這個(gè)函數(shù)需要獲取head_mutex和tail_mutex,所以不會(huì)產(chǎn)生死鎖。

那么剩下的問題就都在實(shí)際并發(fā)的可行性上了。這個(gè)結(jié)構(gòu)對并發(fā)訪問的考慮要多于清單6.2中的代碼,因?yàn)檫@里鎖粒度更加的小,并且更多的數(shù)據(jù)不在鎖的保護(hù)范圍內(nèi)。比如,在push()中,新節(jié)點(diǎn)和新數(shù)據(jù)的分配都不需要鎖來保護(hù)。這就意味著多線程情況下,節(jié)點(diǎn)及數(shù)據(jù)的分配是“安全”并發(fā)的。同一時(shí)間內(nèi),只有一個(gè)線程可以將它的節(jié)點(diǎn)和數(shù)據(jù)添加到隊(duì)列中,所以代碼中只是簡單使用了指針賦值的形式,相較于基于std::queue<>的實(shí)現(xiàn)中,對于std::queue<>的內(nèi)部操作進(jìn)行上鎖,這個(gè)結(jié)構(gòu)中就不需要了。

同樣,try_pop()持有tail_mutex也只有很短的時(shí)間,只為保護(hù)對tail的讀取。因此,當(dāng)有數(shù)據(jù)push進(jìn)隊(duì)列后,try_pop()幾乎及可以完全并發(fā)調(diào)用了。同樣在執(zhí)行中,對head_mutex的持有時(shí)間也是極短的。當(dāng)并發(fā)訪問時(shí),這就會(huì)增加對try_pop()的訪問次數(shù);且只有一個(gè)線程,在同一時(shí)間內(nèi)可以訪問pop_head(),且多線程情況下可以刪除隊(duì)列中的舊節(jié)點(diǎn),并且安全的返回?cái)?shù)據(jù)。

等待數(shù)據(jù)彈出

OK,所以清單6.6提供了一個(gè)使用細(xì)粒度鎖的線程安全隊(duì)列,不過只有try_pop()可以并發(fā)訪問(且只有一個(gè)重載存在)。那么在清單6.2中方便的wait_and_pop()呢?你能通過細(xì)粒度鎖實(shí)現(xiàn)一個(gè)相同功能的接口嗎?

當(dāng)然,答案是“是的”,不過的確有些困難,困難在哪里?修改push()是相對簡單的:只需要在函數(shù)體末尾添加data_cond.notify_ont()函數(shù)的調(diào)用即可(如同清單6.2中那樣)。當(dāng)然,事實(shí)并沒有那么簡單:你使用細(xì)粒度鎖,是為了保證最大程度的并發(fā)。當(dāng)將互斥量和notify_one()混用的時(shí),如果被通知的線程在互斥量解鎖后被喚醒,那么這個(gè)線程就不得不等待互斥量上鎖。另一方面,當(dāng)解鎖操作在notify_one()之前調(diào)用,那么互斥量可能會(huì)等待線程醒來,來獲取互斥鎖(假設(shè)沒有其他線程對互斥量上鎖)。這可能是一個(gè)微小的改動(dòng),但是對于一些情況來說,就顯的很重要了。

wait_and_pop()就有些復(fù)雜了,因?yàn)樾枰_定在哪里等待,也就是函數(shù)在哪里執(zhí)行,并且需要確定哪些互斥量需要上鎖。等待的條件是“隊(duì)列非空”,這就意味著head!=tail。這樣寫的話,就需要同時(shí)獲取head_mutex和tail_mutex,并對其進(jìn)行上鎖,不過在清單6.6中已經(jīng)使用tail_mutex來保護(hù)對tail的讀取,以及不用和自身記性比較,所以這種邏輯也同樣適用于這里。如果有函數(shù)讓head!=get_tail(),你只需要持有head_mutex,然后你就可以使用鎖,對data_cond.wait()的調(diào)用進(jìn)行保護(hù)。當(dāng)你將等待邏輯添加入結(jié)構(gòu)當(dāng)中,那么實(shí)現(xiàn)的方式與try_pop()基本上是一樣的。

對于try_pop()和wait_and_pop()的重載都需要深思熟慮。當(dāng)你將返回std::shared_ptr<>替換為從“old_head后索引出的值,并且拷貝賦值給value參數(shù)”進(jìn)行返回時(shí),那么這里將會(huì)存在異常安全問題。數(shù)據(jù)項(xiàng)在互斥鎖未上鎖的情況下被刪除,將剩下的數(shù)據(jù)返回給調(diào)用者。不過,當(dāng)拷貝賦值拋出異常(可能性很大)時(shí),數(shù)據(jù)項(xiàng)將會(huì)丟失,因?yàn)樗鼪]有被返回隊(duì)列原來的位置上。

當(dāng)T類型有無異常拋出的移動(dòng)賦值操作,或無異常拋出的交換操作時(shí),你可以使用它,不過,你肯定更喜歡一種通用的解決方案,無論T是什么類型,這個(gè)方案都能使用。在這種情況下,在節(jié)點(diǎn)從列表中刪除前,你就不得不將有可能拋出異常的代碼,放在鎖保護(hù)的范圍內(nèi),來保證異常安全性。這也就意味著你需要對pop_head()進(jìn)行重載,查找索引值在列表改動(dòng)前的位置。

相比之下,empty()就更加的簡單:只需要鎖住head_mutex,并且檢查head==get_tail()(詳見清單6.10)就可以了。最終的代碼,在清單6.7,6.8,6.9和6.10中。

清單6.7 可上鎖和等待的線程安全隊(duì)列——內(nèi)部機(jī)構(gòu)及接口

template<typename T>
class threadsafe_queue
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    std::unique_ptr<node> next;
  };

  std::mutex head_mutex;
  std::unique_ptr<node> head;
  std::mutex tail_mutex;
  node* tail;
  std::condition_variable data_cond;
public:
  threadsafe_queue():
    head(new node),tail(head.get())
  {}
  threadsafe_queue(const threadsafe_queue& other)=delete;
  threadsafe_queue& operator=(const threadsafe_queue& other)=delete;

  std::shared_ptr<T> try_pop();
  bool try_pop(T& value);
  std::shared_ptr<T> wait_and_pop();
  void wait_and_pop(T& value);
  void push(T new_value);
  bool empty();
};

向隊(duì)列中添加新節(jié)點(diǎn)是相當(dāng)簡單的——下面的實(shí)現(xiàn)與上面的代碼差不多。

清單6.8 可上鎖和等待的線程安全隊(duì)列——推入新節(jié)點(diǎn)

template<typename T>
void threadsafe_queue<T>::push(T new_value)
{
  std::shared_ptr<T> new_data(
  std::make_shared<T>(std::move(new_value)));
  std::unique_ptr<node> p(new node);
  {
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    tail->data=new_data;
    node* const new_tail=p.get();
    tail->next=std::move(p);
    tail=new_tail;
  }
  data_cond.notify_one();
}

如同之前所提到的,復(fù)雜部分都在pop那邊,所以提供幫助性函數(shù)去簡化這部分就很重要了。下一個(gè)清單中將展示wait_and_pop()的實(shí)現(xiàn),以及先關(guān)的幫助函數(shù)。

清單6.9 可上鎖和等待的線程安全隊(duì)列——wait_and_pop()

template<typename T>
class threadsafe_queue
{
private:
  node* get_tail()
  {
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    return tail;
  }

  std::unique_ptr<node> pop_head()  // 1
  {
    std::unique_ptr<node> old_head=std::move(head);
    head=std::move(old_head->next);
    return old_head;
  }

  std::unique_lock<std::mutex> wait_for_data()  // 2
  {
    std::unique_lock<std::mutex> head_lock(head_mutex);
    data_cond.wait(head_lock,[&]{return head.get()!=get_tail();});
    return std::move(head_lock);  // 3
  }

  std::unique_ptr<node> wait_pop_head()
  {
    std::unique_lock<std::mutex> head_lock(wait_for_data());  // 4
    return pop_head();
  }

  std::unique_ptr<node> wait_pop_head(T& value)
  {
    std::unique_lock<std::mutex> head_lock(wait_for_data());  // 5
    value=std::move(*head->data);
    return pop_head();
  }
public:
  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_ptr<node> const old_head=wait_pop_head();
    return old_head->data;
  }

  void wait_and_pop(T& value)
  {
    std::unique_ptr<node> const old_head=wait_pop_head(value);
  }
};

清單6.9中所示的pop部分的實(shí)現(xiàn)中有一些幫助函數(shù)來降低代碼的復(fù)雜度,例如pop_head()①和wait_for_data()②,這些函數(shù)分別是刪除頭結(jié)點(diǎn)和等待隊(duì)列中有數(shù)據(jù)彈出的。wait_for_data()特別值得關(guān)注,因?yàn)槠洳粌H等待使用lambda函數(shù)對條件變量進(jìn)行等待,而且它還會(huì)將鎖的實(shí)例返回給調(diào)用者③。這就需要確保同一個(gè)鎖在執(zhí)行與wait_pop_head()重載④⑤的相關(guān)操作時(shí),已持有鎖。pop_head()是對try_pop()代碼的復(fù)用,將在下面進(jìn)行展示:

清單6.10 可上鎖和等待的線程安全隊(duì)列——try_pop()和empty()

template<typename T>
class threadsafe_queue
{
private:
  std::unique_ptr<node> try_pop_head()
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if(head.get()==get_tail())
    {
      return std::unique_ptr<node>();
    }
    return pop_head();
  }

  std::unique_ptr<node> try_pop_head(T& value)
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if(head.get()==get_tail())
    {
      return std::unique_ptr<node>();
    }
    value=std::move(*head->data);
    return pop_head();
  }
public:
  std::shared_ptr<T> try_pop()
  {
    std::unique_ptr<node> old_head=try_pop_head();
    return old_head?old_head->data:std::shared_ptr<T>();
  }

  bool try_pop(T& value)
  {
    std::unique_ptr<node> const old_head=try_pop_head(value);
    return old_head;
  }

 ?bool empty()
  {
    std::lock_guard<std::mutex> head_lock(head_mutex);
    return (head.get()==get_tail());
  }
};

這個(gè)隊(duì)列的實(shí)現(xiàn)將作為第7章無鎖隊(duì)列的基礎(chǔ)。這是一個(gè)無界隊(duì)列;線程可以持續(xù)向隊(duì)列中添加數(shù)據(jù)項(xiàng),即使沒有元素被刪除。與之相反的就是有界隊(duì)列,在有界隊(duì)列中,隊(duì)列在創(chuàng)建的時(shí)候最大長度就已經(jīng)是固定的了。當(dāng)有界隊(duì)列滿載時(shí),嘗試在向其添加元素的操作將會(huì)失敗或者阻塞,直到有元素從隊(duì)列中彈出。在任務(wù)執(zhí)行時(shí)(詳見第8章),有界隊(duì)列對于線程間的工作花費(fèi)是很有幫助的。其會(huì)阻止線程對隊(duì)列進(jìn)行填充,并且可以避免線程從較遠(yuǎn)的地方對數(shù)據(jù)項(xiàng)進(jìn)行索引。

無界隊(duì)列的實(shí)現(xiàn),很容易擴(kuò)展成,可在push()中等待跳進(jìn)變量的定長隊(duì)列。相對于等待隊(duì)列中具有數(shù)據(jù)項(xiàng)(pop()執(zhí)行完成后),你就需要等待隊(duì)列中數(shù)據(jù)項(xiàng)小于最大值就可以了。對于有界隊(duì)列更多的討論,已經(jīng)超出了本書的范圍,就不再多說;現(xiàn)在越過隊(duì)列,向更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)進(jìn)發(fā)。