鍍金池/ 教程/ C/ 消息傳遞框架與完整的ATM示例
3.4 本章總結(jié)
6.3 基于鎖設(shè)計(jì)更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)
6.1 為并發(fā)設(shè)計(jì)的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動(dòng)推導(dǎo)變量類型
2.1 線程管理的基礎(chǔ)
8.5 在實(shí)踐中設(shè)計(jì)并發(fā)代碼
2.4 運(yùn)行時(shí)決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權(quán)
8.3 為多線程性能設(shè)計(jì)數(shù)據(jù)結(jié)構(gòu)
6.4 本章總結(jié)
7.3 對(duì)于設(shè)計(jì)無鎖數(shù)據(jù)結(jié)構(gòu)的指導(dǎo)建議
關(guān)于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測(cè)試和調(diào)試
5.4 本章總結(jié)
第9章 高級(jí)線程管理
5.1 內(nèi)存模型基礎(chǔ)
2.5 識(shí)別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時(shí)間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯(cuò)誤的技術(shù)
附錄B 并發(fā)庫的簡(jiǎn)單比較
5.3 同步操作和強(qiáng)制排序
A.8 線程本地變量
第8章 并發(fā)代碼設(shè)計(jì)
3.3 保護(hù)共享數(shù)據(jù)的替代設(shè)施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個(gè)事件或其他條件
A.3 默認(rèn)函數(shù)
附錄A 對(duì)<code>C++</code>11語言特性的簡(jiǎn)要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術(shù)
4.2 使用期望等待一次性事件
8.4 設(shè)計(jì)并發(fā)代碼的注意事項(xiàng)
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關(guān)的錯(cuò)誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護(hù)共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡(jiǎn)化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達(dá)式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

消息傳遞框架與完整的ATM示例

ATM:自動(dòng)取款機(jī)。

1回到第4章,我舉了一個(gè)使用消息傳遞框架在線程間發(fā)送信息的例子。這里就會(huì)使用這個(gè)實(shí)現(xiàn)來完成ATM功能。下面完整代碼就是功能的實(shí)現(xiàn),包括消息傳遞框架。

清單C.1實(shí)現(xiàn)了一個(gè)消息隊(duì)列。其可以將消息以指針(指向基類)的方式存儲(chǔ)在列表中;指定消息類型會(huì)由基類派生模板進(jìn)行處理。推送包裝類的構(gòu)造實(shí)例,以及存儲(chǔ)指向這個(gè)實(shí)例的指針;彈出實(shí)例的時(shí)候,將會(huì)返回指向其的指針。因?yàn)閙essage_base類沒有任何成員函數(shù),在訪問存儲(chǔ)消息之前,彈出線程就需要將指針轉(zhuǎn)為wrapped_message指針。

清單C.1 簡(jiǎn)單的消息隊(duì)列

#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>

namespace messaging
{
  struct message_base  // 隊(duì)列項(xiàng)的基礎(chǔ)類
  {
    virtual ~message_base()
    {}
  };

  template<typename Msg>
  struct wrapped_message:  // 每個(gè)消息類型都需要特化
    message_base
  {
    Msg contents;

    explicit wrapped_message(Msg const& contents_):
      contents(contents_)
    {}
  };

  class queue  // 我們的隊(duì)列
  {
    std::mutex m;
    std::condition_variable c;
    std::queue<std::shared_ptr<message_base> > q;  // 實(shí)際存儲(chǔ)指向message_base類指針的隊(duì)列
  public:
    template<typename T>
    void push(T const& msg)
    {
      std::lock_guard<std::mutex> lk(m);
      q.push(std::make_shared<wrapped_message<T> >(msg));  // 包裝已傳遞的信息,存儲(chǔ)指針
      c.notify_all();
    }

    std::shared_ptr<message_base> wait_and_pop()
    {
      std::unique_lock<std::mutex> lk(m);
      c.wait(lk,[&]{return !q.empty();});  // 當(dāng)隊(duì)列為空時(shí)阻塞
      auto res=q.front();
      q.pop();
      return res;
    }
  };
}

發(fā)送通過sender類(見清單C.2)實(shí)例處理過的消息。只能對(duì)已推送到隊(duì)列中的消息進(jìn)行包裝。對(duì)sender實(shí)例的拷貝,只是拷貝了指向隊(duì)列的指針,而非隊(duì)列本身。

清單C.2 sender類

namespace messaging
{
  class sender
  {
    queue*q;  // sender是一個(gè)隊(duì)列指針的包裝類
  public:
    sender():  // sender無隊(duì)列(默認(rèn)構(gòu)造函數(shù))
      q(nullptr)
    {}

    explicit sender(queue*q_):  // 從指向隊(duì)列的指針進(jìn)行構(gòu)造
      q(q_)
    {}

    template<typename Message>
    void send(Message const& msg)
    {
      if(q)
      {
        q->push(msg);  // 將發(fā)送信息推送給隊(duì)列
      }
    }
  };
}

接收信息部分有些麻煩。不僅要等待隊(duì)列中的消息,還要檢查消息類型是否與所等待的消息類型匹配,并調(diào)用處理函數(shù)進(jìn)行處理。那么就從receiver類的實(shí)現(xiàn)開始吧。

清單C.3 receiver類

namespace messaging
{
  class receiver
  {
    queue q;  // 接受者擁有對(duì)應(yīng)隊(duì)列
  public:
    operator sender()  // 允許將類中隊(duì)列隱式轉(zhuǎn)化為一個(gè)sender隊(duì)列
    {
      return sender(&q);
    }
    dispatcher wait()  // 等待對(duì)隊(duì)列進(jìn)行調(diào)度
    {
      return dispatcher(&q);
    }
  };
}

sender只是引用一個(gè)消息隊(duì)列,而receiver是擁有一個(gè)隊(duì)列??梢允褂秒[式轉(zhuǎn)換的方式獲取sender引用的類。難點(diǎn)在于wait()中的調(diào)度。這里創(chuàng)建了一個(gè)dispatcher對(duì)象引用receiver中的隊(duì)列。dispatcher類實(shí)現(xiàn)會(huì)在下一個(gè)清單中看到;如你所見,任務(wù)是在析構(gòu)函數(shù)中完成的。在這個(gè)例子中,所要做的工作是對(duì)消息進(jìn)行等待,以及對(duì)其進(jìn)行調(diào)度。

清單C.4 dispatcher類

namespace messaging
{
  class close_queue  // 用于關(guān)閉隊(duì)列的消息
  {};

  class dispatcher
  {
    queue* q;
    bool chained;

    dispatcher(dispatcher const&)=delete;  // dispatcher實(shí)例不能被拷貝
    dispatcher& operator=(dispatcher const&)=delete;

    template<
      typename Dispatcher,
      typename Msg,
      typename Func>  // 允許TemplateDispatcher實(shí)例訪問內(nèi)部成員
    friend class TemplateDispatcher;

    void wait_and_dispatch()
    {
      for(;;)  // 1 循環(huán),等待調(diào)度消息
      {
        auto msg=q->wait_and_pop();
        dispatch(msg);
      }
    }

    bool dispatch(  // 2 dispatch()會(huì)檢查close_queue消息,然后拋出
      std::shared_ptr<message_base> const& msg)
    {
      if(dynamic_cast<wrapped_message<close_queue>*>(msg.get()))
      {
        throw close_queue();
      }
      return false;
    }
  public:
    dispatcher(dispatcher&& other):  // dispatcher實(shí)例可以移動(dòng)
      q(other.q),chained(other.chained)
    {
      other.chained=true;  // 源不能等待消息
    }

    explicit dispatcher(queue* q_):
      q(q_),chained(false)
    {}

    template<typename Message,typename Func>
    TemplateDispatcher<dispatcher,Message,Func>
    handle(Func&& f)  // 3 使用TemplateDispatcher處理指定類型的消息
    {
      return TemplateDispatcher<dispatcher,Message,Func>(
        q,this,std::forward<Func>(f));
    }

    ~dispatcher() noexcept(false)  // 4 析構(gòu)函數(shù)可能會(huì)拋出異常
    {  
      if(!chained)
      {
        wait_and_dispatch();
      }
    }
  };
}

從wait()返回的dispatcher實(shí)例將馬上被銷毀,因?yàn)槭桥R時(shí)變量,也向前文提到的,析構(gòu)函數(shù)在這里做真正的工作。析構(gòu)函數(shù)調(diào)用wait_and_dispatch()函數(shù),這個(gè)函數(shù)中有一個(gè)循環(huán)①,等待消息的傳入(這樣才能進(jìn)行彈出操作),然后將消息傳遞給dispatch()函數(shù)。dispatch()函數(shù)本身②很簡(jiǎn)單;會(huì)檢查小時(shí)是否是一個(gè)close_queue消息,當(dāng)是close_queue消息時(shí),拋出一個(gè)異常;如果不是,函數(shù)將會(huì)返回false來表明消息沒有被處理。因?yàn)闀?huì)拋出close_queue異常,所以析構(gòu)函數(shù)會(huì)標(biāo)示為noexcept(false);在沒有任何標(biāo)識(shí)的情況下,一般情況下析構(gòu)函數(shù)都noexcept(true)④型,這表示沒有任何異常拋出,并且close_queue異常將會(huì)使程序終止。

雖然,不會(huì)經(jīng)常的去調(diào)用wait()函數(shù),不過,在大多數(shù)時(shí)間里,你都希望對(duì)一條消息進(jìn)行處理。這時(shí)就需要handle()成員函數(shù)③的加入。這個(gè)函數(shù)是一個(gè)模板,并且消息類型不可推斷,所以你需要指定需要處理的消息類型,并且傳入函數(shù)(或可調(diào)用對(duì)象)進(jìn)行處理,并將隊(duì)列傳入當(dāng)前dispatcher對(duì)象的handle()函數(shù)。這將在清單C.5中展示。這就是為什么,在測(cè)試析構(gòu)函數(shù)中的chained值前,要等待消息耳朵原因;不僅是避免“移動(dòng)”類型的對(duì)象對(duì)消息進(jìn)行等待,而且允許將等待狀態(tài)轉(zhuǎn)移到新的TemplateDispatcher實(shí)例中。

清單C.5 TemplateDispatcher類模板

namespace messaging
{
  template<typename PreviousDispatcher,typename Msg,typename Func>
  class TemplateDispatcher
  {
    queue* q;
    PreviousDispatcher* prev;
    Func f;
    bool chained;

    TemplateDispatcher(TemplateDispatcher const&)=delete;
    TemplateDispatcher& operator=(TemplateDispatcher const&)=delete;

    template<typename Dispatcher,typename OtherMsg,typename OtherFunc>
    friend class TemplateDispatcher;  // 所有特化的TemplateDispatcher類型實(shí)例都是友元類

    void wait_and_dispatch()
    {
      for(;;)
      {
        auto msg=q->wait_and_pop();
        if(dispatch(msg))  // 1 如果消息處理過后,會(huì)跳出循環(huán)
          break;
      }
    }

    bool dispatch(std::shared_ptr<message_base> const& msg)
    {
      if(wrapped_message<Msg>* wrapper=
         dynamic_cast<wrapped_message<Msg>*>(msg.get()))  // 2 檢查消息類型,并且調(diào)用函數(shù)
      {
        f(wrapper->contents);
        return true;
      }
      else
      {
        return prev->dispatch(msg);  // 3 鏈接到之前的調(diào)度器上
      }
    }
  public:
    TemplateDispatcher(TemplateDispatcher&& other):
        q(other.q),prev(other.prev),f(std::move(other.f)),
        chained(other.chained)
    {
      other.chained=true;
    }
    TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_):
        q(q_),prev(prev_),f(std::forward<Func>(f_)),chained(false)
    {
      prev_->chained=true;
    }

    template<typename OtherMsg,typename OtherFunc>
    TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc>
    handle(OtherFunc&& of)  // 4 可以鏈接其他處理器
    {
      return TemplateDispatcher<
          TemplateDispatcher,OtherMsg,OtherFunc>(
          q,this,std::forward<OtherFunc>(of));
    }

    ~TemplateDispatcher() noexcept(false)  // 5 這個(gè)析構(gòu)函數(shù)也是noexcept(false)的
    {
      if(!chained)
      {
        wait_and_dispatch();
      }
    }
  };
}

TemplateDispatcher<>類模板仿照了dispatcher類,二者幾乎相同。特別是在析構(gòu)函數(shù)上,都是調(diào)用wait_and_dispatch()等待處理消息。

在處理消息的過程中,如果不拋出異常,就需要檢查一下在循環(huán)中①,消息是否已經(jīng)得到了處理。當(dāng)成功的處理了一條消息,處理過程就可以停止,這樣就可以等待下一組消息的傳入了。當(dāng)獲取了一個(gè)和指定類型匹配的消息,使用函數(shù)調(diào)用的方式②,就要好于拋出異常(雖然,處理函數(shù)也可能會(huì)拋出異常)。如果消息類型不匹配,那么就可以鏈接前一個(gè)調(diào)度器③。在第一個(gè)實(shí)例中,dispatcher實(shí)例確實(shí)作為一個(gè)調(diào)度器,當(dāng)在handle()④函數(shù)中進(jìn)行鏈接后,就允許處理多種類型的消息。在鏈接了之前的TemplateDispatcher<>實(shí)例后,當(dāng)消息類型和當(dāng)前的調(diào)度器類型不匹配的時(shí)候,調(diào)度鏈會(huì)依次的向前尋找類型匹配的調(diào)度器。因?yàn)槿魏握{(diào)度器都可能拋出異常(包括dispatcher中對(duì)close_queue消息進(jìn)行處理的默認(rèn)處理器),析構(gòu)函數(shù)在這里會(huì)再次被聲明為noexcept(false)⑤。

這種簡(jiǎn)單的架構(gòu)允許你想隊(duì)列推送任何類型的消息,并且調(diào)度器有選擇的與接收端的消息進(jìn)行匹配。同樣,也允許為了推送消息,將消息隊(duì)列的引用進(jìn)行傳遞的同時(shí),保持接收端的私有性。

為了完成第4章的例子,消息的組成將在清單C.6中給出,各種狀態(tài)機(jī)將在清單C.7,C.8和C.9中給出。最后,驅(qū)動(dòng)代碼將在C.10給出。

清單C.6 ATM消息

struct withdraw
{
  std::string account;
  unsigned amount;
  mutable messaging::sender atm_queue;

  withdraw(std::string const& account_,
           unsigned amount_,
           messaging::sender atm_queue_):
    account(account_),amount(amount_),
    atm_queue(atm_queue_)
  {}
};

struct withdraw_ok
{};

struct withdraw_denied
{};

struct cancel_withdrawal
{
  std::string account;
  unsigned amount;
  cancel_withdrawal(std::string const& account_,
                    unsigned amount_):
    account(account_),amount(amount_)
  {}
};

struct withdrawal_processed
{
  std::string account;
  unsigned amount;
  withdrawal_processed(std::string const& account_,
                       unsigned amount_):
    account(account_),amount(amount_)
  {}
};

struct card_inserted
{
  std::string account;
  explicit card_inserted(std::string const& account_):
    account(account_)
  {}
};

struct digit_pressed
{
  char digit;
  explicit digit_pressed(char digit_):
    digit(digit_)
  {}
};

struct clear_last_pressed
{};

struct eject_card
{};

struct withdraw_pressed
{
  unsigned amount;
  explicit withdraw_pressed(unsigned amount_):
    amount(amount_)
  {}
};

struct cancel_pressed
{};

struct issue_money
{
  unsigned amount;
  issue_money(unsigned amount_):
    amount(amount_)
  {}
};

struct verify_pin
{
  std::string account;
  std::string pin;
  mutable messaging::sender atm_queue;

  verify_pin(std::string const& account_,std::string const& pin_,
             messaging::sender atm_queue_):
    account(account_),pin(pin_),atm_queue(atm_queue_)
  {}
};

struct pin_verified
{};

struct pin_incorrect
{};

struct display_enter_pin
{};

struct display_enter_card
{};

struct display_insufficient_funds
{};

struct display_withdrawal_cancelled
{};

struct display_pin_incorrect_message
{};

struct display_withdrawal_options
{};

struct get_balance
{
  std::string account;
  mutable messaging::sender atm_queue;

  get_balance(std::string const& account_,messaging::sender atm_queue_):
    account(account_),atm_queue(atm_queue_)
  {} 
};

struct balance
{
  unsigned amount;
  explicit balance(unsigned amount_):
    amount(amount_)
  {}
};

struct display_balance
{
  unsigned amount;
  explicit display_balance(unsigned amount_):
    amount(amount_)
  {}
};

struct balance_pressed
{};

清單C.7 ATM狀態(tài)機(jī)

class atm
{
  messaging::receiver incoming;
  messaging::sender bank;
  messaging::sender interface_hardware;

  void (atm::*state)();

  std::string account;
  unsigned withdrawal_amount;
  std::string pin;

  void process_withdrawal() 
  {
    incoming.wait()
      .handle<withdraw_ok>(
       [&](withdraw_ok const& msg)
       {
         interface_hardware.send(
           issue_money(withdrawal_amount));

         bank.send(
           withdrawal_processed(account,withdrawal_amount));

         state=&atm::done_processing;
       })
      .handle<withdraw_denied>(
       [&](withdraw_denied const& msg)
       {
         interface_hardware.send(display_insufficient_funds());

         state=&atm::done_processing;
       })
      .handle<cancel_pressed>(
       [&](cancel_pressed const& msg)
       {
         bank.send(
           cancel_withdrawal(account,withdrawal_amount));

         interface_hardware.send(
           display_withdrawal_cancelled());

         state=&atm::done_processing;
       });
   }

  void process_balance()
  {
    incoming.wait()
      .handle<balance>(
       [&](balance const& msg)
       {
         interface_hardware.send(display_balance(msg.amount));

         state=&atm::wait_for_action;
       })
      .handle<cancel_pressed>(
       [&](cancel_pressed const& msg)
       {
         state=&atm::done_processing;
       });
  }

  void wait_for_action()
  {
    interface_hardware.send(display_withdrawal_options());

    incoming.wait()
      .handle<withdraw_pressed>(
       [&](withdraw_pressed const& msg)
       {
         withdrawal_amount=msg.amount;
         bank.send(withdraw(account,msg.amount,incoming));
         state=&atm::process_withdrawal;
       })
      .handle<balance_pressed>(
       [&](balance_pressed const& msg)
       {
         bank.send(get_balance(account,incoming));
         state=&atm::process_balance;
       })
      .handle<cancel_pressed>(
       [&](cancel_pressed const& msg)
       {
         state=&atm::done_processing;
       });
  }

  void verifying_pin()
  {
    incoming.wait()
      .handle<pin_verified>(
       [&](pin_verified const& msg)
       {
         state=&atm::wait_for_action;
       })
      .handle<pin_incorrect>(
       [&](pin_incorrect const& msg)
       {
         interface_hardware.send(
         display_pin_incorrect_message());
         state=&atm::done_processing;
       })
      .handle<cancel_pressed>(
       [&](cancel_pressed const& msg)
       {
         state=&atm::done_processing;
       });
  }

  void getting_pin()
  {
    incoming.wait()
      .handle<digit_pressed>(
       [&](digit_pressed const& msg)
       {
         unsigned const pin_length=4;
         pin+=msg.digit;

         if(pin.length()==pin_length)
         {
           bank.send(verify_pin(account,pin,incoming));
           state=&atm::verifying_pin;
         }
       })
      .handle<clear_last_pressed>(
       [&](clear_last_pressed const& msg)
       {
         if(!pin.empty())
         {
           pin.pop_back();
         }
       })
      .handle<cancel_pressed>(
       [&](cancel_pressed const& msg)
       {
         state=&atm::done_processing;
       });
  }

  void waiting_for_card()
  {
    interface_hardware.send(display_enter_card());

    incoming.wait()
      .handle<card_inserted>(
       [&](card_inserted const& msg)
       {
         account=msg.account;
         pin="";
         interface_hardware.send(display_enter_pin());
         state=&atm::getting_pin;
       });
  }

  void done_processing()
  {
    interface_hardware.send(eject_card());
    state=&atm::waiting_for_card;
  }

  atm(atm const&)=delete;
  atm& operator=(atm const&)=delete;
public:
  atm(messaging::sender bank_,
  messaging::sender interface_hardware_):
  bank(bank_),interface_hardware(interface_hardware_)
  {}

  void done()
  {
    get_sender().send(messaging::close_queue());
  }

  void run()
  {
    state=&atm::waiting_for_card;
    try
    {
      for(;;)
      {
        (this->*state)();
      }
    }
    catch(messaging::close_queue const&)
    {
    }
  }

  messaging::sender get_sender()
  {
    return incoming;
  } 
};

清單C.8 銀行狀態(tài)機(jī)

class bank_machine
{
  messaging::receiver incoming;
  unsigned balance;
public:
  bank_machine():

  balance(199)
  {}

  void done()
  {
    get_sender().send(messaging::close_queue());
  }

  void run()
  {
    try
    {
      for(;;)
      {
        incoming.wait()
          .handle<verify_pin>(
           [&](verify_pin const& msg)
           {
             if(msg.pin=="1937")
             {
               msg.atm_queue.send(pin_verified());
             }
             else
             {
               msg.atm_queue.send(pin_incorrect());
             }
           })
          .handle<withdraw>(
           [&](withdraw const& msg)
           {
             if(balance>=msg.amount)
             {
               msg.atm_queue.send(withdraw_ok());
               balance-=msg.amount;
             }
             else
             {
               msg.atm_queue.send(withdraw_denied());
             }
           })
          .handle<get_balance>(
           [&](get_balance const& msg)
           {
             msg.atm_queue.send(::balance(balance));
           })
          .handle<withdrawal_processed>(
           [&](withdrawal_processed const& msg)
           {
           })
          .handle<cancel_withdrawal>(
           [&](cancel_withdrawal const& msg)
           {
           });
      }
    }
    catch(messaging::close_queue const&)
    {
    }
  }

  messaging::sender get_sender()
  {
  return incoming;
  }
};

清單C.9 用戶狀態(tài)機(jī)

class interface_machine
{
  messaging::receiver incoming;
public:
  void done()
  {
    get_sender().send(messaging::close_queue());
  }

  void run()
  {
    try
    {
      for(;;)
      {
        incoming.wait()
          .handle<issue_money>(
           [&](issue_money const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Issuing "
                 <<msg.amount<<std::endl;
             }
           })
          .handle<display_insufficient_funds>(
           [&](display_insufficient_funds const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Insufficient funds"<<std::endl;
             }
           })
          .handle<display_enter_pin>(
           [&](display_enter_pin const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Please enter your PIN (0-9)"<<std::endl;
             }
           })
          .handle<display_enter_card>(
           [&](display_enter_card const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Please enter your card (I)"
                 <<std::endl;
             }
           })
          .handle<display_balance>(
           [&](display_balance const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout
                 <<"The balance of your account is "
                 <<msg.amount<<std::endl;
             }
           })
          .handle<display_withdrawal_options>(
           [&](display_withdrawal_options const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Withdraw 50? (w)"<<std::endl;
               std::cout<<"Display Balance? (b)"
                 <<std::endl;
               std::cout<<"Cancel? (c)"<<std::endl;
             }
           })
          .handle<display_withdrawal_cancelled>(
           [&](display_withdrawal_cancelled const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Withdrawal cancelled"
                 <<std::endl;
             }
           })
          .handle<display_pin_incorrect_message>(
           [&](display_pin_incorrect_message const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"PIN incorrect"<<std::endl;
             }
           })
          .handle<eject_card>(
           [&](eject_card const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Ejecting card"<<std::endl;
             }
           });
      }
    }
    catch(messaging::close_queue&)
    {
    }
  }

  messaging::sender get_sender()
  {
    return incoming;
  }
};

清單C.10 驅(qū)動(dòng)代碼

int main()
{
  bank_machine bank;
  interface_machine interface_hardware;

  atm machine(bank.get_sender(),interface_hardware.get_sender());

  std::thread bank_thread(&bank_machine::run,&bank);
  std::thread if_thread(&interface_machine::run,&interface_hardware);
  std::thread atm_thread(&atm::run,&machine);

  messaging::sender atmqueue(machine.get_sender());

  bool quit_pressed=false;

  while(!quit_pressed)
  {
    char c=getchar();
    switch(c)
    {
    case '0':
    case '1':
    case '2':
    case '3':
    case '4':
    case '5':
    case '6':
    case '7':
    case '8':
    case '9':
      atmqueue.send(digit_pressed(c));
      break;
    case 'b':
      atmqueue.send(balance_pressed());
      break;
    case 'w':
      atmqueue.send(withdraw_pressed(50));
      break;
    case 'c':
      atmqueue.send(cancel_pressed());
      break;
    case 'q':
      quit_pressed=true;
      break;
    case 'i':
      atmqueue.send(card_inserted("acc1234"));
      break;
    }
  }

  bank.done();
  machine.done();
  interface_hardware.done();

  atm_thread.join();
  bank_thread.join();
  if_thread.join();
}