鍍金池/ 教程/ C/ 7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
3.4 本章總結(jié)
6.3 基于鎖設(shè)計更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)
6.1 為并發(fā)設(shè)計的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動推導(dǎo)變量類型
2.1 線程管理的基礎(chǔ)
8.5 在實踐中設(shè)計并發(fā)代碼
2.4 運行時決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權(quán)
8.3 為多線程性能設(shè)計數(shù)據(jù)結(jié)構(gòu)
6.4 本章總結(jié)
7.3 對于設(shè)計無鎖數(shù)據(jù)結(jié)構(gòu)的指導(dǎo)建議
關(guān)于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測試和調(diào)試
5.4 本章總結(jié)
第9章 高級線程管理
5.1 內(nèi)存模型基礎(chǔ)
2.5 識別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯誤的技術(shù)
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強制排序
A.8 線程本地變量
第8章 并發(fā)代碼設(shè)計
3.3 保護共享數(shù)據(jù)的替代設(shè)施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個事件或其他條件
A.3 默認函數(shù)
附錄A 對<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術(shù)
4.2 使用期望等待一次性事件
8.4 設(shè)計并發(fā)代碼的注意事項
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關(guān)的錯誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子

為了演示一些在設(shè)計無鎖數(shù)據(jù)結(jié)構(gòu)中所使用到的技術(shù),我們將看到一些無鎖實現(xiàn)的簡單數(shù)據(jù)結(jié)構(gòu)。這里不僅要在每個例子中描述一個有用的數(shù)據(jù)結(jié)構(gòu)實現(xiàn),還將使用這些例子的某些特別之處來闡述對于無鎖數(shù)據(jù)結(jié)構(gòu)的設(shè)計。

如之前所提到的,無鎖結(jié)構(gòu)依賴與原子操作和內(nèi)存序及相關(guān)保證,以確保多線程以正確的順序訪問數(shù)據(jù)結(jié)構(gòu)。最初,所有原子操作默認使用的是memory_order_seq_cst內(nèi)存序;因為簡單,所以使用(所有memory_order_seq_cst都遵循一種順序)。不過,在后面的例子中,我們將會降低內(nèi)存序的要求,使用memory_order_acquire, memory_order_release, 甚至memory_order_relaxed。雖然這個例子中沒有直接的使用鎖,但需要注意的是對std::atomic_flag的使用。一些平臺上的無鎖結(jié)構(gòu)實現(xiàn)(實際上在C++的標準庫的實現(xiàn)中),使用了內(nèi)部鎖(詳見第5章)。另一些平臺上,基于鎖的簡單數(shù)據(jù)結(jié)構(gòu)可能會更適合;當然,還有很多平臺不能一一說明;在選擇一種實現(xiàn)前,需要明確需求,并且配置各種選項以滿足要求。

那么,回到數(shù)據(jù)結(jié)構(gòu)上來吧,最簡單的數(shù)據(jù)結(jié)構(gòu)——棧。

7.2.1 寫一個無鎖的線程安全棧

棧的要求很簡單:查詢順序是添加順序的逆序——先入后出(LIFO)。所以,要確保一個值安全的添加入棧就十分重要,因為很可能在添加后,馬上被其他線程索引,同時確保只有一個線程能索引到給定值也是很重要。最簡單的棧就是鏈表,head指針指向第一個節(jié)點(可能是下一個被索引到的節(jié)點),并且每個節(jié)點依次指向下一個節(jié)點。

在這樣的情況下,添加一個節(jié)點相對來說很簡單:

  1. 創(chuàng)建一個新節(jié)點。
  2. 將當新節(jié)點的next指針指向當前的head節(jié)點。
  3. 讓head節(jié)點指向新節(jié)點。

在單線程的上下文中,這種方式?jīng)]有問題,不過當多線程對棧進行修改時,這幾步就不夠用了。至關(guān)重要的是,當有兩個線程同時添加節(jié)點的時候,在第2步和第3步的時候會產(chǎn)生條件競爭:一個線程可能在修改head的值時,另一個線程正在執(zhí)行第2步,并且在第3步中對head進行更新。這就會使之前那個線程的工作被丟棄,亦或是造成更加糟糕的后果。在了解如何解決這個條件競爭之前,還要注意一個很重要的事:當head更新,并指向了新節(jié)點時,另一個線程就能讀取到這個節(jié)點了。因此,在head設(shè)置為指向新節(jié)點前,讓新節(jié)點完全準備就緒就變得很重要了;因為,在這之后就不能對節(jié)點進行修改了。

OK,那如何應(yīng)對討厭的條件競爭呢?答案就是:在第3步的時候使用一個原子“比較/交換”操作,來保證當步驟2對head進行讀取時,不會對head進行修改。當有修改時,可以循環(huán)“比較/交換”操作。下面的代碼就展示了,不用鎖來實現(xiàn)線程安全的push()函數(shù)。

清單7.2 不用鎖實現(xiàn)push()

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    T data;
    node* next;

    node(T const& data_):  // 1
     data(data_)
    {}
  };

  std::atomic<node*> head;
public:
  void push(T const& data)
  {
    node* const new_node=new node(data); // 2
    new_node->next=head.load();  // 3
    while(!head.compare_exchange_weak(new_node->next,new_node));  // 4
  }
};

上面代碼近乎能匹配之前所說的三個步驟:創(chuàng)建一個新節(jié)點②,設(shè)置新節(jié)點的next指針指向當前head③,并設(shè)置head指針指向新節(jié)點④。node結(jié)構(gòu)用其自身的構(gòu)造函數(shù)來進行數(shù)據(jù)填充①,必須保證節(jié)點在構(gòu)造完成后隨時能被彈出。之后需要使用compare_exchange_weak()來保證在被存儲到new_node->next的head指針和之前的一樣③。代碼的亮點是使用“比較/交換”操作:當其返回false時,因為比較失敗(例如,head被其他線程鎖修改),new_node->next作為操作的第一個參數(shù),將會更新head。循環(huán)中不需要每次都重新加載head指針,因為編譯器會幫你完成這件事。同樣,因為循環(huán)可能直接就失敗了,所以這里使用compare_exchange_weak要好于使用compare_exchange_strong(詳見第5章)。

所以,這里暫時不需要pop()操作,可以先快速檢查一下push()的實現(xiàn)是否有違指導(dǎo)意見。這里唯一一個能拋出異常的地方就構(gòu)造新node的時候①,不過其會自行處理,且鏈表中的內(nèi)容沒有被修改,所以這里是安全的。因為在構(gòu)建數(shù)據(jù)的時候,是將其作為node的一部分作為存儲的,并且使用compare_exchange_weak()來更新head指針,所以這里沒有惡性的條件競爭。“比較/交換”成功時,節(jié)點已經(jīng)準備就緒,且隨時可以提取。因為這里沒有鎖,所以就不存在死鎖的情況,這里的push()函數(shù)實現(xiàn)的很成功。

那么,你現(xiàn)在已經(jīng)有往棧中添加數(shù)據(jù)的方法了,現(xiàn)在需要刪除數(shù)據(jù)的方法。其步驟如下,也很簡單:

  1. 讀取當前head指針的值。
  2. 讀取head->next。
  3. 設(shè)置head到head->next。
  4. 通過索引node,返回data數(shù)據(jù)。
  5. 刪除索引節(jié)點。

但在多線程環(huán)境下,就不像看起來那么簡單了。當有兩個線程要從棧中移除數(shù)據(jù),兩個線程可能在步驟1中讀取到同一個head(值相同)。當其中一個線程處理到步驟5,而另一個線程還在處理步驟2時,這個還在處理步驟2的線程將會解引用一個懸空指針。這只是寫無鎖代碼所遇到的最大問題之一,所以現(xiàn)在只能跳過步驟5,讓節(jié)點泄露。

另一個問題就是:當兩個線程讀取到同一個head值,他們將返回同一個節(jié)點。這就違反了棧結(jié)構(gòu)的意圖,所以你需要避免這樣的問題產(chǎn)生。你可以像在push()函數(shù)中解決條件競爭那樣來解決這個問題:使用“比較/交換”操作更新head。當“比較/交換”操作失敗時,不是一個新節(jié)點已被推入,就是其他線程已經(jīng)彈出了想要彈出的節(jié)點。無論是那種情況,都得返回步驟1(“比較/交換”操作將會重新讀取head)。

當“比較/交換”成功,就可以確定當前線程是彈出給定節(jié)點的唯一線程,之后就可以放心的執(zhí)行步驟4了。這里先看一下pop()的雛形:

template<typename T>
class lock_free_stack
{
public:
  void pop(T& result)
  {
    node* old_head=head.load();
    while(!head.compare_exchange_weak(old_head,old_head->next));
    result=old_head->data;
  }
};

雖然這段代碼很優(yōu)雅,但這里還有兩個節(jié)點泄露的問題。首先,這段代碼在空鏈表的時候不工作:當head指針式一個空指針時,當要訪問next指針時,將引起未定義行為。這很容易通過對nullptr的檢查進行修復(fù)(在while循環(huán)中),要不對空棧拋出一個異常,要不返回一個bool值來表明成功與否。

第二個問題就是異常安全問題。當在第3章中介紹棧結(jié)構(gòu)時,了解了在返回值的時候會出現(xiàn)異常安全問題:當有異常被拋出時,復(fù)制的值將丟失。在這種情況下,傳入引用是一種可以接受的解決方案;因為這樣就能保證,當有異常拋出時,棧上的數(shù)據(jù)不會丟失。不幸的是,不能這樣做;只能在單一線程對值進行返回的時候,才進行拷貝,以確??截惒僮鞯陌踩?,這就意味著在拷貝結(jié)束后這個節(jié)點就被刪除了。因此,通過引用獲取返回值的方式就沒有任何優(yōu)勢:直接返回也是可以的。若想要安全的返回值,你必須使用第3章中的其他方法:返回指向數(shù)據(jù)值的(智能)指針。

當返回的是智能指針時,就能返回nullptr以表明沒有值可返回,但是要求在堆上對智能指針進行內(nèi)存分配。將分配過程做為pop()的一部分時(也沒有更好的選擇了),堆分配時可能會拋出一個異常。與此相反,可以在push()操作中對內(nèi)存進行分配——無論怎樣,都得對node進行內(nèi)存分配。返回一個std::shared_ptr<>不會拋出異常,所以在pop()中進行分配就是安全的。將上面的觀點放在一起,就能看到如下的代碼。

清單7.3 帶有節(jié)點泄露的無鎖棧

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    std::shared_ptr<T> data;  // 1 指針獲取數(shù)據(jù)
    node* next;

    node(T const& data_):
      data(std::make_shared<T>(data_))  // 2 讓std::shared_ptr指向新分配出來的T
    {}
  };

  std::atomic<node*> head;
public:
  void push(T const& data)
  {
    node* const new_node=new node(data);
    new_node->next=head.load();
    while(!head.compare_exchange_weak(new_node->next,new_node));
  }
  std::shared_ptr<T> pop()
  {
    node* old_head=head.load();
    while(old_head && // 3 在解引用前檢查old_head是否為空指針
      !head.compare_exchange_weak(old_head,old_head->next));
    return old_head ? old_head->data : std::shared_ptr<T>();  // 4
  }
};

智能指針指向當前數(shù)據(jù)①,這里必須在堆上為數(shù)據(jù)分配內(nèi)存(在node結(jié)構(gòu)體中)②。而后,在compare_exchage_weak()循環(huán)中③,需要在old_head指針前,檢查指針是否為空。最終,如果存在相關(guān)節(jié)點,那么將會返回相關(guān)節(jié)點的值;當不存在時,將返回一個空指針④。注意,結(jié)構(gòu)是無鎖的,但并不是無等待的,因為在push()和pop()函數(shù)中都有while循環(huán),當compare_exchange_weak()總是失敗的時候,循環(huán)將會無限循環(huán)下去。

7.2.2 停止內(nèi)存泄露:使用無鎖數(shù)據(jù)結(jié)構(gòu)管理內(nèi)存

第一次了解pop()時,為了避免條件競爭(當有線程刪除一個節(jié)點的同時,其他線程還持有指向該節(jié)點的指針,并且要解引用)選擇了帶有內(nèi)存泄露的節(jié)點。但是,不論什么樣的C++程序,存在內(nèi)存泄露都不可接受。所以,現(xiàn)在來解決這個問題!

基本問題在于,當要釋放一個節(jié)點時,需要確認其他線程沒有持有這個節(jié)點。當只有一個線程調(diào)用pop(),就可以放心的進行釋放。當節(jié)點添加入棧后,push()就不會與節(jié)點有任何的關(guān)系了,所以只有調(diào)用pop()函數(shù)的線程與已加入節(jié)點有關(guān),并且能夠安全的將節(jié)點刪除。

另一方面,當棧同時處理多線程對pop()的調(diào)用時,就需要知道節(jié)點在什么時候被刪除。這實際上就需要你寫一個節(jié)點專用的垃圾收集器。這聽起來有些可怖,同時也相當棘手,不過并不是多么糟糕:這里需要檢查節(jié)點,并且檢查哪些節(jié)點被pop()訪問。不需要對push()中的節(jié)點有所擔心,因為這些節(jié)點推到棧上以后,才能被訪問到,而多線程只能通過pop()訪問同一節(jié)點。

當沒有線程調(diào)用pop()時,這時可以刪除棧上的任意節(jié)點。因此,當添加節(jié)點到“可刪除”列表中時,就能從中提取數(shù)據(jù)了。而后,當沒有線程通過pop()訪問節(jié)點時,就可以安全的刪除這些節(jié)點了。那怎么知道沒有線程調(diào)用pop()了呢?很簡單——計數(shù)即可。當計數(shù)器數(shù)值增加時,就是有節(jié)點推入;當減少時,就是有節(jié)點被刪除。這樣從“可刪除”列表中刪除節(jié)點就很安全了,直到計數(shù)器的值為0為止。當然,這個計數(shù)器必須是原子的,這樣它才能在多線程的情況下正確的進行計數(shù)。下面的清單中,展示了修改后的pop()函數(shù),有些支持功能的實現(xiàn)將在清單7.5中給出。

清單7.4 沒有線程通過pop()訪問節(jié)點時,就對節(jié)點進行回收

template<typename T>
class lock_free_stack
{
private:
  std::atomic<unsigned> threads_in_pop;  // 1 原子變量
  void try_reclaim(node* old_head);
public:
  std::shared_ptr<T> pop()
  {
    ++threads_in_pop;  // 2 在做事之前,計數(shù)值加1
    node* old_head=head.load();
    while(old_head &&
      !head.compare_exchange_weak(old_head,old_head->next));
    std::shared_ptr<T> res;
    if(old_head)
    { 
      res.swap(old_head->data);  // 3 回收刪除的節(jié)點
    }
    try_reclaim(old_head);  // 4 從節(jié)點中直接提取數(shù)據(jù),而非拷貝指針
    return res;
  }
};

threads_in_pop①原子變量用來記錄有多少線程試圖彈出棧中的元素。當pop()②函數(shù)調(diào)用的時候,計數(shù)器加一;當調(diào)用try_reclaim()時,計數(shù)器減一,當這個函數(shù)被節(jié)點調(diào)用時,說明這個節(jié)點已經(jīng)被刪除④。因為暫時不需要將節(jié)點刪除,可以通過swap()函數(shù)來刪除節(jié)點上的數(shù)據(jù)③(而非只是拷貝指針),當不再需要這些數(shù)據(jù)的時候,這些數(shù)據(jù)會自動刪除,而不是持續(xù)存在著(因為這里還有對未刪除節(jié)點的引用)。接下來看一下try_reclaim()是如何實現(xiàn)的。

清單7.5 采用引用計數(shù)的回收機制

template<typename T>
class lock_free_stack
{
private:
  std::atomic<node*> to_be_deleted;

  static void delete_nodes(node* nodes)
  {
    while(nodes)
    {
      node* next=nodes->next;
      delete nodes;
      nodes=next;
    }
  }
  void try_reclaim(node* old_head)
  {
    if(threads_in_pop==1)  // 1
    {
      node* nodes_to_delete=to_be_deleted.exchange(nullptr);  // 2 聲明“可刪除”列表
      if(!--threads_in_pop)  // 3 是否只有一個線程調(diào)用pop()?
      {
        delete_nodes(nodes_to_delete);  // 4
      }
      else if(nodes_to_delete)  // 5
      {
         chain_pending_nodes(nodes_to_delete);  // 6
      }
      delete old_head;  // 7
    }
    else
    {
      chain_pending_node(old_head);  // 8
      --threads_in_pop;
    }
  }
  void chain_pending_nodes(node* nodes)
  {
    node* last=nodes;
    while(node* const next=last->next)  // 9 讓next指針指向鏈表的末尾
    {
      last=next;
    }
    chain_pending_nodes(nodes,last);
  }

  void chain_pending_nodes(node* first,node* last)
  {
    last->next=to_be_deleted;  // 10
    while(!to_be_deleted.compare_exchange_weak(  // 11 用循環(huán)來保證last->next的正確性
      last->next,first));
    }
    void chain_pending_node(node* n)
    {
      chain_pending_nodes(n,n);  // 12
    }
};

回收節(jié)點時①,threads_in_pop的數(shù)值是1,也就是當前線程正在對pop()進行訪問,這時就可以安全的將節(jié)點進行刪除了⑦(將等待節(jié)點刪除也是安全的)。當數(shù)值不是1時,刪除任何節(jié)點都不安全,所以需要向等待列表中繼續(xù)添加節(jié)點⑧。

假設(shè)在某一時刻,threads_in_pop的值為1。那就可以嘗試回收等待列表,如果不回收,節(jié)點就會繼續(xù)等待,直到整個棧被銷毀。要做到回收,首先要通過一個原子exchange操作聲明②刪除列表,并將計數(shù)器減一③。如果之后計數(shù)的值為0,就意味著沒有其他線程訪問等待節(jié)點鏈表。出現(xiàn)新的等待節(jié)點時,不必為其煩惱,因為它們將被安全的回收。而后,可以使用delete_nodes對鏈表進行迭代,并將其刪除④。

當計數(shù)值在減后不為0,回收節(jié)點就不安全;所以如果存在⑤,就需要將其掛在等待刪除鏈表之后⑥,這種情況會發(fā)生在多個線程同時訪問數(shù)據(jù)結(jié)構(gòu)的時候。一些線程在第一次測試threads_in_pop①和對“回收”鏈表的聲明②操作間調(diào)用pop(),這可能新填入一個已經(jīng)被線程訪問的節(jié)點到鏈表中。在圖7.1中,線程C添加節(jié)點Y到to_be_deleted鏈表中,即使線程B仍將其引用作為old_head,之后會嘗試訪問其next指針。在線程A刪除節(jié)點的時候,會造成線程B發(fā)生未定義的行為。

http://wiki.jikexueyuan.com/project/cplusplus-concurrency-action/images/chapter7/7-1.png" alt="" />

圖7.1 三個線程同時調(diào)用pop(),說明為什么要在try_reclaim()對聲明節(jié)點進行刪除前,對threads_in_pop進行檢查。

為了將等待刪除的節(jié)點添加入等待刪除鏈表,需要復(fù)用節(jié)點的next指針將等待刪除節(jié)點鏈接在一起。在這種情況下,將已存在的鏈表鏈接到刪除鏈表后面,通過遍歷的方式找到鏈表的末尾⑨,將最后一個節(jié)點的next指針替換為當前to_be_deleted指針⑩,并且將鏈表中的第一個節(jié)點作為新的to_be_deleted指針進行存儲?。這里需要在循環(huán)中使用compare_exchange_weak來保證,通過其他線程添加進來的節(jié)點不會發(fā)生內(nèi)存泄露。這樣,在鏈表發(fā)生改變時,更新next指針很方便。添加單個節(jié)點是一種特殊情況,因為這需要將這個節(jié)點作為第一個節(jié)點,同時也是最后一個節(jié)點進行添加?。

在低負荷的情況下,這種方式?jīng)]有問題,因為在沒有線程訪問pop(),有一個合適的靜態(tài)指針。不過,這只是一個瞬時的狀態(tài),也就是為什么在回收前,需要檢查threads_in_pop計數(shù)為0③的原因;同樣也是刪除節(jié)點⑦前進行對計數(shù)器檢查的原因。刪除節(jié)點是一項耗時的工作,并且希望其他線程能對鏈表做的修改越小越好。從第一次發(fā)現(xiàn)threads_in_pop是1,到嘗試刪除節(jié)點,會用很長的時間,這樣就會讓線程有機會調(diào)用pop(),會讓threads_in_pop不為0,阻止節(jié)點的刪除操作。

在高負荷的情況,不會存在靜態(tài);因為,其他線程在初始化之后,都能進入pop()。在這樣的情況下,to_ne_deleted鏈表將會無界的增加,并且會再次泄露。當這里不存在任何靜態(tài)的情況時,就得為回收節(jié)點尋找替代機制。關(guān)鍵是要確定沒有線程訪問一個給定節(jié)點,那么這個節(jié)點就能被回收?,F(xiàn)在,最簡單的替換機制就是使用風險指針(hazard pointer)。

7.2.3 檢測使用風險指針(不可回收)的節(jié)點

“風險指針”這個術(shù)語引用于Maged Michael的技術(shù)發(fā)現(xiàn)[1]。之所以這樣叫,是因為刪除一個節(jié)點可能會讓其他引用其的線程處于危險之中。當其他線程持有這個刪除的節(jié)點的指針,并且解引用進行操作的時候,將會出現(xiàn)未定義行為。這里的基本觀點就是,當有線程去訪問要被(其他線程)刪除的對象時,會先設(shè)置對這個對象設(shè)置一個風險指針,而后通知其他線程,刪除這個指針是一個危險的行為。一旦這個對象不再被需要,那么就可以清除風險指針了。如果了解牛津/劍橋的龍舟比賽,那么這里使用到的機制和龍舟比賽開賽時差不多:每個船上的舵手都舉起手來,以表示他們還沒有準備好。只要有舵手的手是舉著的,那么裁判就不能讓比賽開始。當所有舵手的手都放下后,比賽才能開始;在比賽還未開始或感覺自己船隊的情況有變時,舵手可以再次舉手。

當線程想要刪除一個對象,那么它就必須檢查系統(tǒng)中其他線程是否持有風險指針。當沒有風險指針的時候,那么它就可以安全刪除對象。否則,它就必須等待風險指針的消失了。這樣,線程就得周期性的檢查其想要刪除的對象是否能安全刪除。

看起來很簡單,在C++中應(yīng)該怎么做呢?

首先,需要一個地點能存儲指向訪問對象的指針,這個地點就是風險指針。這個地點必須能讓所有線程看到,需要其中一些線程可以對數(shù)據(jù)結(jié)構(gòu)進行訪問。如何正確和高效的分配這些線程,的確是一個挑戰(zhàn),所以這個問題可以放在后面解決,而后假設(shè)你有一個get_hazard_pointer_for_current_thread()的函數(shù),這個函數(shù)可以返回風險指針的引用。當你讀取一個指針,并且想要解引用它的時候,你就需要這個函數(shù)——在這種情況下head數(shù)值源于下面的列表:

std::shared_ptr<T> pop()
{
  std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
  node* old_head=head.load();  // 1
  node* temp;
  do
  {
    temp=old_head;
    hp.store(old_head);  // 2
    old_head=head.load();
  } while(old_head!=temp); // 3
  // ...
}

在while循環(huán)中就能保證node不會在讀取舊head指針①時,以及在設(shè)置風險指針的時被刪除了。這種模式下,其他線程不知道有線程對這個給定的節(jié)點進行了訪問。幸運的是,當舊head節(jié)點要被刪除時,head本身是要改變的,所以需要對head進行檢查,并持續(xù)循環(huán),直到head指針中的值與風險指針中的值相同③。使用風險指針,如同依賴對已刪除對象的引用。當使用默認的new和delete操作對風險指針進行操作時,會出現(xiàn)未定義行為,所以需要確定實現(xiàn)是否支持這樣的操作,或使用自定義分配器來保證這種用法的正確性。

現(xiàn)在已經(jīng)設(shè)置了風險指針,那就可以對pop()進行處理了,基于現(xiàn)在了解到的安全知識,這里不會有其他線程來刪除節(jié)點。啊哈!這里每一次重新加載old_head時,解引用剛剛讀取到的指針時,就需要更新風險指針。當從鏈表中提取一個節(jié)點時,就可以將風險指針清除了。如果沒有其他風險指針引用節(jié)點,就可以安全的刪除節(jié)點了;否則,就需要將其添加到鏈表中,之后再將其刪除。下面的代碼就是對該方案的完整實現(xiàn)。

清單7.6 使用風險指針實現(xiàn)的pop()

std::shared_ptr<T> pop()
{
  std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
  node* old_head=head.load();
  do
  {
    node* temp;
    do  // 1 直到將風險指針設(shè)為head指針
    {
      temp=old_head;
      hp.store(old_head);
      old_head=head.load();
    } while(old_head!=temp);
  }
  while(old_head &&
    !head.compare_exchange_strong(old_head,old_head->next));
  hp.store(nullptr);  // 2 當聲明完成,清除風險指針
  std::shared_ptr<T> res;
  if(old_head)
  {
    res.swap(old_head->data);
    if(outstanding_hazard_pointers_for(old_head))  // 3 在刪除之前對風險指針引用的節(jié)點進行檢查
    {
      reclaim_later(old_head);  // 4
    }
    else
    {
      delete old_head;  // 5
    }
    delete_nodes_with_no_hazards();  // 6
  }
  return res;
}

首先,循環(huán)內(nèi)部會對風險指針進行設(shè)置,在當“比較/交換”操作失敗會重載old_head,再次進行設(shè)置①。使用compare_exchange_strong(),是因為需要在循環(huán)內(nèi)部做一些實際的工作:當compare_exchange_weak()偽失敗后,風險指針將被重置(沒有必要)。這個過程能保證風險指針在解引用(old_head)之前,能被正確的設(shè)置。當已聲明了一個風險指針,那么就可以將其清除了②。如果想要獲取一個節(jié)點,就需要檢查其他線程上的風險指針,檢查是否有其他指針引用該節(jié)點③。如果有,就不能刪除節(jié)點,只能將其放在鏈表中,之后再進行回收④;如果沒有,就能直接將這個節(jié)點刪除了⑤。最后,如果需要對任意節(jié)點進行檢查,可以調(diào)用reclaim_later()。如果鏈表上沒有任何風險指針引用節(jié)點,就可以安全的刪除這些節(jié)點⑥。當有節(jié)點持有風險指針,就只能讓下一個調(diào)用pop()的線程離開。

當然,這些函數(shù)——get_hazard_pointer_for_current_thread(), reclaim_later(), outstanding_hazard_pointers_for(), 和delete_nodes_with_no_hazards()——的實現(xiàn)細節(jié)我們還沒有看到,先來看看它們是如何工作的。

為線程分配風險指針實例的具體方案:使用get_hazard_pointer_for_current_thread()與程序邏輯的關(guān)系并不大(不過會影響效率,接下會看到具體的情況)??梢允褂靡粋€簡單的結(jié)構(gòu)體:固定長度的“線程ID-指針”數(shù)組。get_hazard_pointer_for_curent_thread()就可以通過這個數(shù)據(jù)來找到第一個釋放槽,并將當前線程的ID放入到這個槽中。當線程退出時,槽就再次置空,可以通過默認構(gòu)造std::thread::id()將線程ID放入槽中。這個實現(xiàn)就如下所示:

清單7.7 get_hazard_pointer_for_current_thread()函數(shù)的簡單實現(xiàn)

unsigned const max_hazard_pointers=100;
struct hazard_pointer
{
  std::atomic<std::thread::id> id;
  std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];

class hp_owner
{
  hazard_pointer* hp;

public:
  hp_owner(hp_owner const&)=delete;
  hp_owner operator=(hp_owner const&)=delete;
  hp_owner():
    hp(nullptr)
  {
    for(unsigned i=0;i<max_hazard_pointers;++i)
    {
      std::thread::id old_id;
      if(hazard_pointers[i].id.compare_exchange_strong(  // 6 嘗試聲明風險指針的所有權(quán)
         old_id,std::this_thread::get_id()))
      {
        hp=&hazard_pointers[i];
        break;  // 7
      }
    }
    if(!hp)  // 1
    {
      throw std::runtime_error("No hazard pointers available");
    }
  }

  std::atomic<void*>& get_pointer()
  {
    return hp->pointer;
  }

  ~hp_owner()  // 2
  {
    hp->pointer.store(nullptr);  // 8
    hp->id.store(std::thread::id());  // 9
  }
};

std::atomic<void*>& get_hazard_pointer_for_current_thread()  // 3
{
  thread_local static hp_owner hazard;  // 4 每個線程都有自己的風險指針
  return hazard.get_pointer();  // 5
}

get_hazard_pointer_for_current_thread()的實現(xiàn)看起來很簡單③:一個hp_owner④類型的thread_local(本線程所有)變量,用來存儲當前線程的風險指針,可以返回這個變量所持有的指針⑤。之后的工作:第一次有線程調(diào)用這個函數(shù)時,新hp_owner實例就被創(chuàng)建。這個實例的構(gòu)造函數(shù)⑥,會通過查詢“所有者/指針”表,尋找沒有所有者的記錄。其用compare_exchange_strong()來檢查某個記錄是否有所有者,并進行析構(gòu)②。當compare_exchange_strong()失敗,其他線程的擁有這個記錄,所以可以繼續(xù)執(zhí)行下去。當交換成功,當前線程就擁有了這條記錄,而后對其進行存儲,并停止搜索⑦。當遍歷了列表也沒有找到物所有權(quán)的記錄①,就說明有很多線程在使用風險指針,所以這里將拋出一個異常。

一旦hp_owner實例被一個給定的線程所創(chuàng)建,那么之后的訪問將會很快,因為指針在緩存中,所以表不需要再次遍歷。

當線程退出時,hp_owner的實例將會被銷毀。析構(gòu)函數(shù)會在std::thread::id()設(shè)置擁有者ID前,將指針重置為nullptr,這樣就允許其他線程對這條記錄進行復(fù)用⑧⑨。

實現(xiàn)get_hazard_pointer_for_current_thread()后,outstanding_hazard_pointer_for()實現(xiàn)就簡單了:只需要對風險指針表進行搜索,就可以找到對應(yīng)記錄。

bool outstanding_hazard_pointers_for(void* p)
{
  for(unsigned i=0;i<max_hazard_pointers;++i)
  {
    if(hazard_pointers[i].pointer.load()==p)
    {
      return true;
    }
  }
  return false;
}

實現(xiàn)都不需要對記錄的所有者進行驗證:沒有所有者的記錄會是一個空指針,所以比較代碼將總返回false,通過這種方式將代碼簡化。

reclaim_later()和delete_nodes_with_no_hazards()可以對簡單的鏈表進行操作;reclaim_later()只是將節(jié)點添加到列表中,delete_nodes_with_no_hazards()就是搜索整個列表,并將無風險指針的記錄進行刪除。下面將展示它們的具體實現(xiàn)。

清單7.8 回收函數(shù)的簡單實現(xiàn)

template<typename T>
void do_delete(void* p)
{
  delete static_cast<T*>(p);
}

struct data_to_reclaim
{
  void* data;
  std::function<void(void*)> deleter;
  data_to_reclaim* next;

  template<typename T>
  data_to_reclaim(T* p):  // 1
    data(p),
    deleter(&do_delete<T>),
    next(0)
  {}

  ~data_to_reclaim()
  {
    deleter(data);  // 2
  }
};

std::atomic<data_to_reclaim*> nodes_to_reclaim;

void add_to_reclaim_list(data_to_reclaim* node)  // 3
{
  node->next=nodes_to_reclaim.load();
  while(!nodes_to_reclaim.compare_exchange_weak(node->next,node));
}

template<typename T>
void reclaim_later(T* data)  // 4
{
  add_to_reclaim_list(new data_to_reclaim(data));  // 5
}

void delete_nodes_with_no_hazards()
{
  data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr);  // 6
  while(current)
  {
    data_to_reclaim* const next=current->next;
    if(!outstanding_hazard_pointers_for(current->data))  // 7
    {
      delete current;  // 8
    }
    else
    {
      add_to_reclaim_list(current);  // 9
    }
    current=next;
  }
}

首先,reclaim_later()是一個函數(shù)模板④。因為風險指針是一個通用解決方案,所以這里就不能將棧節(jié)點的類型寫死。使用std::atomic<void*>對風險指針進行存儲。需要對任意類型的指針進行處理,不過不能使用void*形式,因為當要刪除數(shù)據(jù)項時,delete操作只能對實際類型指針進行操作。data_to_reclaim的構(gòu)造函數(shù)處理的就很優(yōu)雅:reclaim_later()只是為指針創(chuàng)建一個data_to_reclaim的實例,并且將實例添加到回收鏈表中⑤。add_to_reclaim_list()③就是使用compare_exchange_weak()循環(huán)來訪問鏈表頭(就如你之前看到的那樣)。

當將節(jié)點添加入鏈表時,data_to_reclaim的析構(gòu)函數(shù)不會被調(diào)用;析構(gòu)函數(shù)會在沒有風險指針指向節(jié)點的時候調(diào)用,這也就是delete_nodes_with_no_hazards()的作用。

delete_nodes_with_no_hazards()將已聲明的鏈表節(jié)點進行回收,使用的是exchange()函數(shù)⑥(這個步驟簡單且關(guān)鍵,是為了保證只有一個線程回收這些節(jié)點)。這樣,其他線程就能自由將節(jié)點添加到鏈表中,或在不影響回收指定節(jié)點線程的情況下,對節(jié)點進行回收。

只要有節(jié)點存在于鏈表中,就需要檢查每個節(jié)點,查看節(jié)點是否被風險指針所指向⑦。如果沒有風險指針,那么就可以安全的將記錄刪除(并且清除存儲的數(shù)據(jù))⑧。否則,就只能將這個節(jié)點添加到鏈表的后面,再進行回收⑨。

雖然這個實現(xiàn)很簡單,也的確安全的回收了被刪除的節(jié)點,不過這個過程增加了很多開銷。遍歷風險指針數(shù)組需要檢查max_hazard_pointers原子變量,并且每次pop()調(diào)用時,都需要再檢查一遍。原子操作很耗時——在臺式CPU上,100次原子操作要比100次非原子操作慢——所以,這里pop()成為了性能瓶頸。這種方式,不僅需要遍歷節(jié)點的風險指針鏈表,還要遍歷等待鏈表上的每一個節(jié)點。顯然,這種方式很糟糕。當有max_hazard_pointers在鏈表中,那么就需要檢查max_hazard_pointers多個已存儲的風險指針。我去!還有更好一點的方法嗎?

對風險指針(較好)的回收策略

當然有更好的辦法。這里只展示一個風險指針的簡單實現(xiàn),來幫助解釋技術(shù)問題。首先,要考慮的是內(nèi)存性能。比起對回收鏈表上的每個節(jié)點進行檢查都要調(diào)用pop(),除非有超過max_hazard_pointer數(shù)量的節(jié)點存在于鏈表之上,要不就不需要嘗試回收任何節(jié)點。這樣就能保證至少有一個節(jié)點能夠回收,如果只是等待鏈表中的節(jié)點數(shù)量達到max_hazard_pointers+1,那比之前的方案也沒好到哪里去。當獲取了max_hazard_pointers數(shù)量的節(jié)點時,可以調(diào)用pop()對節(jié)點進行回收,所以這樣也不是很好。不過,當有2max_hazard_pointers個節(jié)點在列表中時,就能保證至少有max_hazard_pointers可以被回收,在再次嘗試回收任意節(jié)點前,至少會對pop()有max_hazard_pointers次調(diào)用。這就很不錯了。比起檢查max_hazard_pointers個節(jié)點就調(diào)用max_hazard_pointers次pop()(而且還不一定能回收節(jié)點),當檢查2max_hazard_pointers個節(jié)點時,每max_hazard_pointers次對pop()的調(diào)用,就會有max_hazard_pointers個節(jié)點能被回收。這就意味著,對兩個節(jié)點檢查調(diào)用pop(),其中就有一個節(jié)點能被回收。

這個方法有個缺點(有增加內(nèi)存使用的情況):就是得對回收鏈表上的節(jié)點進行計數(shù),這就意味著要使用原子變量,并且還有很多線程爭相對回收鏈表進行訪問。如果還有多余的內(nèi)存,可以增加內(nèi)存的使用來實現(xiàn)更好的回收策略:每個線程中的都擁有其自己的回收鏈表,作為線程的本地變量。這樣就不需要原子變量進行計數(shù)了。這樣的話,就需要分配max_hazard_pointers x max_hazard_pointers個節(jié)點。所有節(jié)點被回收完畢前時,有線程退出,那么其本地鏈表可以像之前一樣保存在全局中,并且添加到下一個線程的回收鏈表中,讓下一個線程對這些節(jié)點進行回收。

風險指針另一個缺點:與IBM申請的專利所沖突[2]。要讓寫的軟件在一個國家中使用,那么就必須擁有合法的知識產(chǎn)權(quán),所以需要擁有合適的許可證。這對所有無鎖的內(nèi)存回收技術(shù)都適用(這是一個活躍的研究領(lǐng)域),所以很多大公司都會有自己的專利。你可能會問,“為什么用了這么大的篇幅來介紹一個大多數(shù)人都沒辦法的技術(shù)呢?”,這公平性的問題。首先,使用這種技術(shù)可能不需要買一個許可證。比如,當你使用GPL下的免費軟件許可來進行軟件開發(fā),那么你的軟件將會包含到IBM不主張聲明中。其次,也是很重要的,在設(shè)計無鎖代碼的時候,還需要從使用的技術(shù)角度進行思考,比如,高消耗的原子操作。

所以,是否有非專利的內(nèi)存回收技術(shù),且能被大多數(shù)人所使用呢?很幸運,的確有。引用計數(shù)就是這樣一種機制。

7.2.4 檢測使用引用計數(shù)的節(jié)點

回到7.2.2節(jié)的問題,“想要刪除節(jié)點還能被其他讀者線程訪問,應(yīng)該怎么辦?"。當能安全并精確的識別,節(jié)點是否還被引用,以及沒有線程訪問這些節(jié)點的具體時間,以便將對應(yīng)節(jié)點進行刪除。風險指針是通過將使用中的節(jié)點存放到鏈表中,解決問題。而引用計數(shù)是通過對每個節(jié)點上訪問的線程數(shù)量進行統(tǒng)計,解決問題。

看起來簡單粗暴……不,優(yōu)雅;實際上管理起來卻是很困難:首先,你會想到的就是由std::shared_ptr<>來完成這個任務(wù),其是有內(nèi)置引用計數(shù)的指針。不幸的是,雖然std::shared_ptr<>上的一些操作是原子的,不過其也不能保證是無鎖的。智能指針上的原子操作和對其他原子類型的操作并沒有什么不同,但是std::shared_ptr<>旨在用于有多個上下文的情況下,并且在無鎖結(jié)構(gòu)中使用原子操作,無異于對該類增加了很多性能開銷。如果平臺支持std::atomic_is_lock_free(&some_shared_ptr)實現(xiàn)返回true,那么所有內(nèi)存回收問題就都迎刃而解了。使用std::shared_ptr<node>構(gòu)成的鏈表實現(xiàn),如下所示:

清單7.9 無鎖棧——使用無鎖std::shared_ptr<>的實現(xiàn)

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    std::shared_ptr<node> next;
    node(T const& data_):
      data(std::make_shared<T>(data_))
    {}
  };

  std::shared_ptr<node> head;
public:
  void push(T const& data)
  {
    std::shared_ptr<node> const new_node=std::make_shared<node>(data);
    new_node->next=head.load();
    while(!std::atomic_compare_exchange_weak(&head,
        &new_node->next,new_node));
  }
  std::shared_ptr<T> pop()
  {
    std::shared_ptr<node> old_head=std::atomic_load(&head);
    while(old_head && !std::atomic_compare_exchange_weak(&head,
        &old_head,old_head->next));
    return old_head ? old_head->data : std::shared_ptr<T>();
  }
};

在一些情況下,使用std::shared_ptr<>實現(xiàn)的結(jié)構(gòu)并非無鎖,這就需要手動管理引用計數(shù)。

一種方式是對每個節(jié)點使用兩個引用計數(shù):內(nèi)部計數(shù)和外部計數(shù)。兩個值的總和就是對這個節(jié)點的引用數(shù)。外部計數(shù)記錄有多少指針指向節(jié)點,即在指針每次進行讀取的時候,外部計數(shù)加一。當線程結(jié)束對節(jié)點的訪問時,內(nèi)部計數(shù)減一。指針在讀取時,外部計數(shù)加一;在讀取結(jié)束時,內(nèi)部計數(shù)減一。

當不需要“外部計數(shù)-指針”對時(該節(jié)點就不能被多線程所訪問了),在外部計數(shù)減一和在被棄用的時候,內(nèi)部計數(shù)將會增加。當內(nèi)部計數(shù)等于0,那么就沒有指針對該節(jié)點進行引用,就可以將該節(jié)點安全的刪除。使用原子操作來更新共享數(shù)據(jù)也很重要?,F(xiàn)在,就讓我們來看一下使用這種技術(shù)實現(xiàn)的無鎖棧,只有確定節(jié)點能安全刪除的情況下,才能進行節(jié)點回收。

下面程序清單中就展示了內(nèi)部數(shù)據(jù)結(jié)構(gòu),以及對push()簡單優(yōu)雅的實現(xiàn)。

清單7.10 使用分離引用計數(shù)的方式推送一個節(jié)點到無鎖棧中

template<typename T>
class lock_free_stack
{
private:
  struct node;

  struct counted_node_ptr  // 1
  {
    int external_count;
    node* ptr;
  };

  struct node
  {
    std::shared_ptr<T> data;
    std::atomic<int> internal_count;  // 2
    counted_node_ptr next;  // 3

    node(T const& data_):
      data(std::make_shared<T>(data_)),
      internal_count(0)
    {}
  };

  std::atomic<counted_node_ptr> head;  // 4

public:
  ~lock_free_stack()
  {
    while(pop());
  }

  void push(T const& data)  // 5
  {
    counted_node_ptr new_node;
    new_node.ptr=new node(data);
    new_node.external_count=1;
    new_node.ptr->next=head.load();
    while(!head.compare_exchange_weak(new_node.ptr->next,new_node));
  }
};

外部計數(shù)包含在counted_node_ptr的指針中①,且這個結(jié)構(gòu)體會被node中的next指針③和內(nèi)部計數(shù)②用到。counted_node_ptr是一個簡單的結(jié)構(gòu)體,所以可以使用特化std::atomic<>模板來對鏈表的頭指針進行聲明④。

且counted_node_ptr體積夠小,能夠讓std::atomic<counted_node_ptr>無鎖。在一些平臺上支持雙字比較和交換操作,可以直接對結(jié)構(gòu)體進行操作。當你的平臺不支持這樣的操作時,最好使用std::shared_ptr<>變量(如清單7.9那樣);當類型的體積過大,超出了平臺支持指令,那么原子std::atomic<>將使用鎖來保證其操作的原子性(從而會讓你的“無鎖”算法“基于鎖”來完成)。另外,如果想要限制計數(shù)器的大小,需要已知平臺上指針所占的空間(比如,地址空間只剩下48位,而一個指針就要占64位),可以將計數(shù)存在一個指針空間內(nèi),不過為了適應(yīng)平臺,也可以存在一個機器字當中。這樣的技巧需要對特定系統(tǒng)有足夠的了解,當然已經(jīng)超出本書討論的范圍。

push()相對簡單⑤,可以構(gòu)造一個counted_node_ptr實例,去引用新分配出來的(帶有相關(guān)數(shù)據(jù)的)node,并且將node中的next指針設(shè)置為當前head。之后使用compare_exchange_weak()對head的值進行設(shè)置,就像之前代碼清單中所示。因為internal_count剛被設(shè)置,所以其值為0,并且external_count是1。因為這是一個新節(jié)點,那么這個節(jié)點只有一個外部引用(head指針)。

通常,pop()都有一個從繁到簡的過程,實現(xiàn)代碼如下。

清單7.11 使用分離引用計數(shù)從無鎖棧中彈出一個節(jié)點

template<typename T>
class lock_free_stack
{
private:
  void increase_head_count(counted_node_ptr& old_counter)
  {
    counted_node_ptr new_counter;

    do
    {
      new_counter=old_counter;
      ++new_counter.external_count;
    }
    while(!head.compare_exchange_strong(old_counter,new_counter));  // 1

    old_counter.external_count=new_counter.external_count;
  }
public:
  std::shared_ptr<T> pop()
  {
    counted_node_ptr old_head=head.load();
    for(;;)
    {
      increase_head_count(old_head);
      node* const ptr=old_head.ptr;  // 2
      if(!ptr)
      {
        return std::shared_ptr<T>();
      }
      if(head.compare_exchange_strong(old_head,ptr->next))  // 3
      {
        std::shared_ptr<T> res;
        res.swap(ptr->data);  // 4

        int const count_increase=old_head.external_count-2;  // 5

        if(ptr->internal_count.fetch_add(count_increase)==  // 6
           -count_increase)
        {
          delete ptr;
        }

        return res;  // 7
      }
      else if(ptr->internal_count.fetch_sub(1)==1)
      {
        delete ptr;  // 8
      }
    }
  }
};

當加載head的值之后,就必須將外部引用加一,是為了表明這個節(jié)點正在引用,并且保證在解引用時的安全性。在引用計數(shù)增加前解引用指針,那么就會有線程能夠訪問這個節(jié)點,從而當前引用指針就成為了一個懸空指針。這就是將引用計數(shù)分離的主要原因:通過增加外部引用計數(shù),保證指針在訪問期間的合法性。在compare_exchange_strong()的循環(huán)中①完成增加,通過比較和設(shè)置整個結(jié)構(gòu)體來保證指針不會在同一時間內(nèi)被其他線程修改。

當計數(shù)增加,就能安全的解引用ptr,并讀取head指針的值,就能訪問指向的節(jié)點②。如果指針是空指針,那么將會訪問到鏈表的最后。當指針不為空時,就能嘗試對head調(diào)用compare_exchange_strong()來刪除這個節(jié)點③。

當compare_exchange_strong()成功時,就擁有對應(yīng)節(jié)點的所有權(quán),并且可以和data進行交換④,然后返回。這樣數(shù)據(jù)就不會持續(xù)保存,因為其他線程也會對棧進行訪問,所以會有其他指針指向這個節(jié)點。而后,可以使用原子操作fetch_add⑥,將外部計數(shù)加到內(nèi)部計數(shù)中去。如果現(xiàn)在引用計數(shù)為0,那么之前的值(fetch_add返回的值),在相加之前肯定是一個負數(shù),這種情況下就可以將節(jié)點刪除。這里需要注意的是,相加的值要比外部引用計數(shù)少2⑤;當節(jié)點已經(jīng)從鏈表中刪除,就要減少一次計數(shù),并且這個線程無法再次訪問指定節(jié)點,所以還要再減一。無論節(jié)點是否被刪除,都能完成操作,所以可以將獲取的數(shù)據(jù)進行返回⑦。

當“比較/交換”③失敗,就說明其他線程在之前把對應(yīng)節(jié)點刪除了,或者其他線程添加了一個新的節(jié)點到棧中。無論是哪種原因,需要通過“比較/交換”的調(diào)用,對具有新值的head重新進行操作。不過,首先需要減少節(jié)點(要刪除的節(jié)點)上的引用計數(shù)。這個線程將再也沒有辦法訪問這個節(jié)點了。如果當前線程是最后一個持有引用(因為其他線程已經(jīng)將這個節(jié)點從棧上刪除了)的線程,那么內(nèi)部引用計數(shù)將會為1,所以減一的操作將會讓計數(shù)器為0。這樣,你就能在循環(huán)⑧進行之前將對應(yīng)節(jié)點刪除了。

目前,使用默認std::memory_order_seq_cst內(nèi)存序來規(guī)定原子操作的執(zhí)行順序。在大多數(shù)系統(tǒng)中,這種操作方式都很耗時,且同步操作的開銷要高于內(nèi)存序。現(xiàn)在,就可以考慮對數(shù)據(jù)結(jié)構(gòu)的邏輯進行修改,對數(shù)據(jù)結(jié)構(gòu)的部分放寬內(nèi)存序要求;就沒有必要在棧上增加過度的開銷了。現(xiàn)在讓我們來檢查一下棧的操作,并且捫心自問,這里能對一些操作使用更加寬松的內(nèi)存序么?如果使用了,能確保同級安全嗎?

7.2.5 應(yīng)用于無鎖棧上的內(nèi)存模型

在修改內(nèi)存序之前,需要檢查一下操作之間的依賴關(guān)系。而后,再去確定適合這種需求關(guān)系的最小內(nèi)存序。為了保證這種方式能夠工作,需要在從線程的視角進行觀察。其中最簡單的視角就是,向棧中推入一個數(shù)據(jù)項,之后讓其他線程從棧中彈出這個數(shù)據(jù)。

即使在簡單的例子中,都需要三個重要的數(shù)據(jù)參與。1、counted_node_ptr轉(zhuǎn)移的數(shù)據(jù)head。2、head引用的node。3、節(jié)點所指向的數(shù)據(jù)項。

做push()的線程,會先構(gòu)造數(shù)據(jù)項和節(jié)點,再設(shè)置head。做pop()的線程,會先加載head的值,再做在循環(huán)中對head做“比較/交換”操作,并增加引用計數(shù),再讀取對應(yīng)的node節(jié)點,獲取next的指向的值,現(xiàn)在就可以看到一組需求關(guān)系。next的值是普通的非原子對象,所以為了保證讀取安全,這里必須確定存儲(推送線程)和加載(彈出線程)的先行關(guān)系。因為唯一的原子操作就是push()函數(shù)中的compare_exchange_weak(),這里需要釋放操作來獲取兩個線程間的先行關(guān)系,這里compare_exchange_weak()必須是std::memory_order_release或更嚴格的內(nèi)存序。當compare_exchange_weak()調(diào)用失敗,什么都不會改變,并且可以持續(xù)循環(huán)下去,所以使用std::memory_order_relaxed就足夠了。

void push(T const& data)
{
  counted_node_ptr new_node;
  new_node.ptr=new node(data);
  new_node.external_count=1;
  new_node.ptr->next=head.load(std::memory_order_relaxed)
  while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
    std::memory_order_release,std::memory_order_relaxed));
}

那pop()的實現(xiàn)呢?為了確定先行關(guān)系,必須在訪問next值之前使用std::memory_order_acquire或更嚴格內(nèi)存序的操作。因為,在increase_head_count()中使用compare_exchange_strong()就獲取next指針指向的舊值,所以想要其獲取成功就需要確定內(nèi)存序。如同調(diào)用push()那樣,當交換失敗,循環(huán)會繼續(xù),所以在失敗的時候使用松散的內(nèi)存序:

void increase_head_count(counted_node_ptr& old_counter)
{
  counted_node_ptr new_counter;

  do
  {
    new_counter=old_counter;
    ++new_counter.external_count;
  }
  while(!head.compare_exchange_strong(old_counter,new_counter,
        std::memory_order_acquire,std::memory_order_relaxed));

  old_counter.external_count=new_counter.external_count;
}

當compare_exchange_strong()調(diào)用成功,那么ptr中的值就被存到old_counter中。存儲操作是push()中的一個釋放操作,并且compare_exchange_strong()操作是一個獲取操作,現(xiàn)在存儲同步于加載,并且能夠獲取先行關(guān)系。因此,在push()中存儲ptr的值,要先行于在pop()中對ptr->next的訪問?,F(xiàn)在的操作就安全了。

要注意的是,內(nèi)存序?qū)ead.load()的初始化并不妨礙分析,所以現(xiàn)在就可以使用std::memory_order_relaxed。

接下來,compare_exchange_strong()將old_head.ptr->next設(shè)置為head。是否需要做什么來保證操作線程中的數(shù)據(jù)完整性呢?當交換成功,你就能訪問ptr->data,所以這里需要保證在push()線程中已經(jīng)對ptr->data進行了存儲(在加載之前)。在increase_head_count()中的獲取操作,能保證與push()線程中的存儲和“比較/交換”同步。這里的先行關(guān)系是:在push()線程中存儲數(shù)據(jù),先行于存儲head指針;調(diào)用increase_head_count()先行于對ptr->data的加載。即使,pop()中的“比較/交換”操作使用std::memory_order_relaxed,這些操作還是能正常運行。唯一不同的地方就是,調(diào)用swap()讓ptr->data有所變化,且沒有其他線程可以對同一節(jié)點進行操作(這就是“比較/交換”操作的作用)。

當compare_exchange_strong()失敗,那么新值就不會去更新old_head,繼續(xù)循環(huán)。這里,已確定在increase_head_count()中使用std::memory_order_acquire內(nèi)存序的可行性,所以這里使用std::memory_order_relaxed也可以。

其他線程呢?是否需要設(shè)置一些更為嚴格的內(nèi)存序來保證其他線程的安全呢?回答是“不用”。因為,head只會因“比較/交換”操作有所改變;對于“讀-改-寫”操作來說,push()中的“比較/交換”操作是構(gòu)成釋放序列的一部分。因此,即使有很多線程在同一時間對head進行修改,push()中的compare_exchange_weak()與increase_head_count()(讀取已存儲的值)中的compare_exchange_strong()也是同步的。

剩余操作就可以用來處理fetch_add()操作(用來改變引用計數(shù)的操作),因為已知其他線程不可能對該節(jié)點的數(shù)據(jù)進行修改,所以從節(jié)點中返回數(shù)據(jù)的線程可以繼續(xù)執(zhí)行。不過,當線程獲取其他線程修改后的值時,就代表操作失敗(swap()是用來提取數(shù)據(jù)項的引用)。那么,為了避免數(shù)據(jù)競爭,需要保證swap()先行于delete操作。一種簡單的解決辦法,在“成功返回”分支中對fetch_add()使用std::memory_order_release內(nèi)存序,在“再次循環(huán)”分支中對fetch_add()使用std::memory_order_qcquire內(nèi)存序。不過,這就有點矯枉過正:只有一個線程做delete操作(將引用計數(shù)設(shè)置為0的線程),所以只有這個線程需要獲取操作。幸運的是,因為fetch_add()是一個“讀-改-寫”操作,是釋放序列的一部分,所以可以使用一個額外的load()做獲取。當“再次循環(huán)”分支將引用計數(shù)減為0時,fetch_add()可以重載引用計數(shù),這里使用std::memory_order_acquire為了保持需求的同步關(guān)系;并且,fetch_add()本身可以使用std::memory_order_relaxed。使用新pop()的棧實現(xiàn)如下。

清單7.12 基于引用計數(shù)和松散原子操作的無鎖棧

template<typename T>
class lock_free_stack
{
private:
  struct node;
  struct counted_node_ptr
  {
    int external_count;
    node* ptr;
  };

  struct node
  {
    std::shared_ptr<T> data;
    std::atomic<int> internal_count;
    counted_node_ptr next;

    node(T const& data_):
      data(std::make_shared<T>(data_)),
      internal_count(0)
    {}
  };

  std::atomic<counted_node_ptr> head;

  void increase_head_count(counted_node_ptr& old_counter)
  {
    counted_node_ptr new_counter;

    do
    {
      new_counter=old_counter;
      ++new_counter.external_count;
    }
    while(!head.compare_exchange_strong(old_counter,new_counter,
                                        std::memory_order_acquire,
                                        std::memory_order_relaxed));
    old_counter.external_count=new_counter.external_count;
  }
public:
  ~lock_free_stack()
  {
    while(pop());
  }

  void push(T const& data)
  {
    counted_node_ptr new_node;
    new_node.ptr=new node(data);
    new_node.external_count=1;
    new_node.ptr->next=head.load(std::memory_order_relaxed)
    while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
                                      std::memory_order_release,
                                      std::memory_order_relaxed));
  }
  std::shared_ptr<T> pop()
  {
    counted_node_ptr old_head=
       head.load(std::memory_order_relaxed);
    for(;;)
    {
      increase_head_count(old_head);
      node* const ptr=old_head.ptr;
      if(!ptr)
      {
        return std::shared_ptr<T>();
      }
      if(head.compare_exchange_strong(old_head,ptr->next,
                                      std::memory_order_relaxed))
      {
        std::shared_ptr<T> res;
        res.swap(ptr->data);

        int const count_increase=old_head.external_count-2;

        if(ptr->internal_count.fetch_add(count_increase,
              std::memory_order_release)==-count_increase)
        {
          delete ptr;
        }

        return res;
      }
      else if(ptr->internal_count.fetch_add(-1,
                   std::memory_order_relaxed)==1)
      {
        ptr->internal_count.load(std::memory_order_acquire);
        delete ptr;
      }
    }
  }
};

這是一種鍛煉,不過鍛煉要告一段落了,我們已經(jīng)獲得比之前好很多的棧實現(xiàn)。在深思熟慮后,通過使用更多的松散操作,在不影響并發(fā)性的同時提高性能。實現(xiàn)中的pop()有37行,而功能等同于清單6.1中的那7行的基于鎖的棧實現(xiàn),和清單7.2中無內(nèi)存管理的無鎖棧實現(xiàn)。對于接下來要設(shè)計的無鎖隊列,將看到類似的情況:無鎖結(jié)構(gòu)的復(fù)雜性,主要在于內(nèi)存的管理。

7.2.6 寫一個無鎖的線程安全隊列

隊列的提供的挑戰(zhàn)與棧的有些不同,因為push()和pop()在隊列中,操作的不是同一個地方。因此,同步的需求就不一樣了。需要保證對一端的修改是正確的,且對另一端是可見的。不過,在清單6.6中隊列有一個try_pop()成員函數(shù),其作用和清單7.2中簡單的無鎖棧的pop()功能差不多,那么就可以合理的假設(shè)無鎖代碼都很相似。這是為什么呢?

如果將清單6.6中的代碼作為基礎(chǔ),就需要兩個node指針:head和tail??梢宰尪嗑€程對它們進行訪問,所以這兩個節(jié)點最好是原子的,這樣就不用考慮互斥問題了。讓我們對清單6.6中的代碼做一些修改,并且看一下應(yīng)該從哪里開始設(shè)計。先來看一下下面的代碼。

清單7.13 單生產(chǎn)者/單消費者模型下的無鎖隊列

template<typename T>
class lock_free_queue
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    node* next;

    node():
       next(nullptr)
    {}
  };

  std::atomic<node*> head;
  std::atomic<node*> tail;

  node* pop_head()
  {
    node* const old_head=head.load();
    if(old_head==tail.load())  // 1
    {
      return nullptr;
    }
    head.store(old_head->next);
    return old_head;
  }
public:
  lock_free_queue():
      head(new node),tail(head.load())
  {}

  lock_free_queue(const lock_free_queue& other)=delete;
  lock_free_queue& operator=(const lock_free_queue& other)=delete;

  ~lock_free_queue()
  {
    while(node* const old_head=head.load())
    {
      head.store(old_head->next);
      delete old_head;
    }
  }
  std::shared_ptr<T> pop()
  {
    node* old_head=pop_head();
    if(!old_head)
    {
      return std::shared_ptr<T>();
    }

    std::shared_ptr<T> const res(old_head->data);  // 2
    delete old_head;
    return res;
  }

  void push(T new_value)
  {
    std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
    node* p=new node;  // 3
    node* const old_tail=tail.load();  // 4
    old_tail->data.swap(new_data);  // 5
    old_tail->next=p;  // 6
    tail.store(p);  // 7
  }
};

一眼望去,這個實現(xiàn)并沒什么不好,當只有一個線程調(diào)用一次push(),且只有一個線程調(diào)用pop()。在這種情況下,隊列完美工作。push()和pop()之間的先行關(guān)系就很重要了,這直接關(guān)系到獲取到的data。對tail的存儲⑦同步于對tail的加載①;存儲之前節(jié)點的data指針⑤先行于存儲tail;并且,加載tail先行于加載data指針②,所以對data的存儲要先行于加載,一切都沒問題。因此,這是一個完美的單生產(chǎn)者,單消費者(SPSC, single-producer, single-consume)隊列。

問題在于當多線程對push()或pop()并發(fā)調(diào)用。先看一下push():如果有兩個線程并發(fā)調(diào)用push(),那么它們會新分配兩個節(jié)點作為虛擬節(jié)點③,也會讀取到相同的tail值④,因此也會同時修改同一個節(jié)點,同時設(shè)置data和next指針⑤⑥。明顯的數(shù)據(jù)競爭!

pop_head()函數(shù)也有類似的問題。當有兩個線程并發(fā)的調(diào)用這個函數(shù)時,這兩個線程就會讀取到同一個head中同樣的值,并且會同時通過next指針去復(fù)寫舊值。兩個線程現(xiàn)在都能索引到同一個節(jié)點——真是一場災(zāi)難!這里,不僅要保證只有一個pop()線程可以訪問給定項,還要保證其他線程在讀取head指針時,可以安全的訪問節(jié)點中的next。這就和無鎖棧中pop()的問題一樣了,那么就有很多解決方案可以在這里使用。

pop()的問題解決了,那么push()呢?問題在于為了獲取push()和pop()間的先行關(guān)系,就需要在為虛擬節(jié)點設(shè)置數(shù)據(jù)項前,更新tail指針。這就意味著,并發(fā)訪問push()時,因為每個線程所讀取到的是同一個tail指針,所以線程會為同一個數(shù)據(jù)項進行競爭。

多線程下的push()

第一個選擇是在兩個