鍍金池/ 教程/ Java/ 消息傳遞
標(biāo)準(zhǔn)輸入與輸出
消息傳遞
循環(huán)
注釋
Rust for Mac OS
幾種智能指針
Cell, RefCell
trait對象 (trait object)
rust web 開發(fā)
Unsafe、原始指針
Macro
迭代器
函數(shù)
Borrow, BorrowMut, ToOwned
快速上手
二叉樹
編輯器
測試與評測
Deref
安裝Rust
哈希表 HashMap
原生類型
17.錯誤處理
VS Code 安裝配置
動態(tài)數(shù)組Vec
模式匹配
操作符和格式化字符串
Rust for Linux
函數(shù)參數(shù)
Visual Studio
vim/GVim安裝配置
閉包作為參數(shù)和返回值
安全(Safety)
Cow
生命周期( Lifetime )
閉包的實現(xiàn)
所有權(quán)(Ownership)
Atom
將Rust編譯成庫
類型、運算符和字符串
類型系統(tǒng)中的幾個常見 trait
特性
屬性和編譯器參數(shù)
Spacemacs
集合類型
Rust json處理
Heap & Stack
并行
標(biāo)準(zhǔn)庫示例
基本程序結(jié)構(gòu)
鏈表
trait 和 trait對象
前期準(zhǔn)備
代碼風(fēng)格
編譯器參數(shù)
基于語義化版本的項目版本聲明與管理
Rust 版本管理工具: rustup
引用&借用(References&Borrowing)
注釋與文檔
10.1 trait關(guān)鍵字
模式
調(diào)用ffi函數(shù)
unsafe
并發(fā),并行,多線程編程
AsRef 和 AsMut
Rust旅程
Rust for Windows
結(jié)構(gòu)體與枚舉
條件分支
附錄I-術(shù)語表
變量綁定與原生類型
Mutex 與 RwLock
泛型
裸指針
常用數(shù)據(jù)結(jié)構(gòu)實現(xiàn)
系統(tǒng)命令:調(diào)用grep
Into/From 及其在 String 和 &str 互轉(zhuǎn)上的應(yīng)用
共享內(nèi)存
Sublime
網(wǎng)絡(luò)模塊:W貓的回音
函數(shù)返回值
包和模塊
高階函數(shù)
函數(shù)與方法
match關(guān)鍵字
隊列
目錄操作:簡單grep
語句和表達式
并發(fā)編程
閉包
測試
閉包的語法
同步
迭代器
String
Send 和 Sync
Rc 和 Arc
屬性
Emacs
優(yōu)先隊列
Prelude
cargo簡介
控制流(control flow)
數(shù)組、動態(tài)數(shù)組和字符串
FFI
模塊和包系統(tǒng)、Prelude
實戰(zhàn)篇
Rust 是一門系統(tǒng)級編程語言,被設(shè)計為保證內(nèi)存和線程安全,并防止段錯誤。作為系統(tǒng)級編程語言,它的基本理念是 “零開銷抽象”。理
運算符重載
Any和反射
rust數(shù)據(jù)庫操作
輸入輸出流
復(fù)合類型
性能測試

消息傳遞

消息傳遞

稍加考慮,上一節(jié)的練習(xí)題其實是不完整的,它只是評分系統(tǒng)中的一環(huán),一個評分系統(tǒng)是需要先把信息從數(shù)據(jù)庫或文件中讀取出來,然后才是評分,最后還需要把評分結(jié)果再保存到數(shù)據(jù)庫或文件中去。如果一步一步串行地做這三個步驟,是完全沒有問題的。那么我們是否可以用三個線程來分別做這三個步驟呢?上一節(jié)練習(xí)題我們已經(jīng)用了一個線程來實現(xiàn)評分,那么我們是否也可以再用一個線程來讀取成績,再用另個線程來實現(xiàn)保存呢? 如果能這樣的話,那么我們就可以利用上多核多cpu的優(yōu)勢,加快整個評分的效率。既然在此提出這個問題,答案就很明顯了。問題在于我們要怎么在Rust中來實現(xiàn),關(guān)鍵在于三個線程怎么交換信息,以達到串行的邏輯處理順序?

為了解決這個問題,下面將介紹一種Rust在標(biāo)準(zhǔn)庫中支持的消息傳遞技術(shù)。消息傳遞是并發(fā)模型里面大家比較推崇的模式,不僅僅是因為使用起來比較簡單,關(guān)鍵還在于它可以減少數(shù)據(jù)競爭,提高并發(fā)效率,為此值得深入學(xué)習(xí)。Rust是通過一個叫做通道(channel)的東西來實現(xiàn)這種模式的,下面直接進入主題。

初試通道(channel)

Rust的通道(channel)可以把一個線程的消息(數(shù)據(jù))傳遞到另一個線程,從而讓信息在不同的線程中流動,從而實現(xiàn)協(xié)作。詳情請參見std::sync::mpsc。通道的兩端分別是發(fā)送者(Sender)和接收者(Receiver),發(fā)送者負責(zé)從一個線程發(fā)送消息,接收者則在另一個線程中接收該消息。下面我們來看一個簡單的例子:

use std::sync::mpsc;
use std::thread;

fn main() {
    // 創(chuàng)建一個通道
    let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = 
        mpsc::channel();

    // 創(chuàng)建線程用于發(fā)送消息
    thread::spawn(move || {
        // 發(fā)送一個消息,此處是數(shù)字id
        tx.send(1).unwrap();
    });

    // 在主線程中接收子線程發(fā)送的消息并輸出
    println!("receive {}", rx.recv().unwrap());
}

程序說明參見代碼中的注釋,程序執(zhí)行結(jié)果為:

receive 1

結(jié)果表明main所在的主線程接收到了新建線程發(fā)送的消息,用Rust在線程間傳遞消息就是這么簡單!

雖然簡單,但使用過其他語言就會知道,通道有多種使用方式,且比較靈活,為此我們需要進一步考慮關(guān)于RustChannel的幾個問題:

  1. 通道能保證消息的順序嗎?是否先發(fā)送的消息,先接收?
  2. 通道能緩存消息嗎?如果能的話能緩存多少?
  3. 通道的發(fā)送者和接收者支持N:1,1:N,N:M模式嗎?
  4. 通道能發(fā)送任何數(shù)據(jù)嗎?
  5. 發(fā)送后的數(shù)據(jù),在線程中繼續(xù)使用沒有問題嗎?

讓我們帶著這些問題和思考進入下一個小節(jié),那里有相關(guān)的答案。

消息類型

上面的例子中,我們傳遞的消息類型為i32,除了這種類型之外,是否還可以傳遞更多的原始類型,或者更復(fù)雜的類型,和自定義類型?下面我們嘗試發(fā)送一個更復(fù)雜的Rc類型的消息:

use std::fmt;
use std::sync::mpsc;
use std::thread;
use std::rc::Rc;

pub struct Student {
    id: u32
}

impl fmt::Display for Student {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "student {}", self.id)
    }
}

fn main() {
    // 創(chuàng)建一個通道
    let (tx, rx): (mpsc::Sender<Rc<Student>>, mpsc::Receiver<Rc<Student>>) = 
        mpsc::channel();

    // 創(chuàng)建線程用于發(fā)送消息
    thread::spawn(move || {
        // 發(fā)送一個消息,此處是數(shù)字id
        tx.send(Rc::new(Student{
            id: 1,
        })).unwrap();
    });

    // 在主線程中接收子線程發(fā)送的消息并輸出
    println!("receive {}", rx.recv().unwrap());
}

編譯代碼,奇跡沒有出現(xiàn),編譯時錯誤,錯誤提示:

error: the trait `core::marker::Send` is not 
implemented for the type `alloc::rc::Rc<Student>` [E0277]
note: `alloc::rc::Rc<Student>` cannot be sent between threads safely

看來并不是所有類型的消息都可以通過通道發(fā)送,消息類型必須實現(xiàn)marker trait Send。Rust之所以這樣強制要求,主要是為了解決并發(fā)安全的問題,再一次強調(diào),安全是Rust考慮的重中之重。如果一個類型是Send,則表明它可以在線程間安全的轉(zhuǎn)移所有權(quán)(ownership),當(dāng)所有權(quán)從一個線程轉(zhuǎn)移到另一個線程后,同一時間就只會存在一個線程能訪問它,這樣就避免了數(shù)據(jù)競爭,從而做到線程安全。ownership的強大又一次顯示出來了。通過這種做法,在編譯時即可要求所有的代碼必須滿足這一約定,這種方式方法值得借鑒,trait也是非常強大。

看起來問題得到了完美的解決,然而由于Send本身是一個不安全的marker trait,并沒有實際的API,所以實現(xiàn)它很簡單,但沒有強制保障,就只能靠開發(fā)者自己約束,否則還是可能引發(fā)并發(fā)安全問題。對于這一點,也不必太過擔(dān)心,因為Rust中已經(jīng)存在的類,都已經(jīng)實現(xiàn)了Send!Send,我們只要使用就行。Send是一個默認應(yīng)用到所有Rust已存在類的trait,所以我們用!Send顯式標(biāo)明該類沒有實現(xiàn)Send。目前幾乎所有的原始類型都是Send,例如前面例子中發(fā)送的i32。對于開發(fā)者而言,我們可能會更關(guān)心哪些是非Send,也就是實現(xiàn)了!Send,因為這會導(dǎo)致線程不安全。更全面的信息參見Send官網(wǎng)API。

對于不是Send的情況(!Send),大致分為兩類:

  1. 原始指針,包括*mut T*const T,因為不同線程通過指針都可以訪問數(shù)據(jù),從而可能引發(fā)線程安全問題。
  2. RcWeak也不是,因為引用計數(shù)會被共享,但是并沒有做并發(fā)控制。

雖然有這些!Send的情況,但是逃不過編譯器的火眼金睛,只要你錯誤地使用了消息類型,編譯器都會給出類似于上面的錯誤提示。我們要擔(dān)心的不是這些,因為錯誤更容易出現(xiàn)在新創(chuàng)建的自定義類,有下面兩點需要注意:

  1. 如果自定義類的所有字段都是Send,那么這個自定義類也是Send。 反之,如果有一個字段是!Send,那么這個自定義類也是!Send。 如果類的字段存在遞歸包含的情況,按照該原則以此類推來推論類是Send還是!Send。

  2. 在為一個自定義類實現(xiàn)Send或者!Send時,必須確保符合它的約定。

到此,消息類型的相關(guān)知識已經(jīng)介紹完了,說了這么久,也該讓大家自己練習(xí)一下了:請實現(xiàn)一個自定義類,該類包含一個Rc字段,讓這個類變成可以在通道中發(fā)送的消息類型。

異步通道(Channel)

在粗略地嘗試通道之后,是時候更深入一下了。Rust的標(biāo)準(zhǔn)庫其實提供了兩種類型的通道:異步通道和同步通道。上面的例子都是使用的異步通道,為此這一小節(jié)我們優(yōu)先進一步介紹異步通道,后續(xù)再介紹同步通道。異步通道指的是:不管接收者是否正在接收消息,消息發(fā)送者在發(fā)送消息時都不會阻塞。為了驗證這一點,我們嘗試多增加一個線程來發(fā)送消息:

use std::sync::mpsc;
use std::thread;

// 線程數(shù)量
const THREAD_COUNT :i32 = 2;

fn main() {
    // 創(chuàng)建一個通道
    let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();

    // 創(chuàng)建線程用于發(fā)送消息
    for id in 0..THREAD_COUNT {
        // 注意Sender是可以clone的,這樣就可以支持多個發(fā)送者
        let thread_tx = tx.clone();
        thread::spawn(move || {
            // 發(fā)送一個消息,此處是數(shù)字id
            thread_tx.send(id + 1).unwrap();
            println!("send {}", id + 1);
        });
    }

    thread::sleep_ms(2000);
    println!("wake up");
    // 在主線程中接收子線程發(fā)送的消息并輸出
    for _ in 0..THREAD_COUNT {
        println!("receive {}", rx.recv().unwrap());
    }
}

運行結(jié)果:

send 1
send 2
wake up
receive 1
receive 2

在代碼中,我們故意讓main所在的主線程睡眠2秒,從而讓發(fā)送者所在線程優(yōu)先執(zhí)行,通過結(jié)果可以發(fā)現(xiàn),發(fā)送者發(fā)送消息時確實沒有阻塞。還記得在前面提到過很多關(guān)于通道的問題嗎?從這個例子里面還發(fā)現(xiàn)什么沒?除了不阻塞之外,我們還能發(fā)現(xiàn)另外的三個特征:

1.通道是可以同時支持多個發(fā)送者的,通過clone的方式來實現(xiàn)。 這類似于Rc的共享機制。 其實從Channel所在的庫名std::sync::mpsc也可以知道這點。 因為mpsc就是多生產(chǎn)者單消費者(Multiple Producers Single Consumer)的簡寫。 可以有多個發(fā)送者,但只能有一個接收者,即支持的N:1模式。

2.異步通道具備消息緩存的功能,因為1和2是在沒有接收之前就發(fā)了的,在此之后還能接收到這兩個消息。

那么通道到底能緩存多少消息?在理論上是無窮的,嘗試一下便知:

use std::sync::mpsc;
use std::thread;

fn main() {
    // 創(chuàng)建一個通道
    let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();

    // 創(chuàng)建線程用于發(fā)送消息
    let new_thread = thread::spawn(move || {
        // 發(fā)送無窮多個消息
        let mut i = 0;
        loop {
            i = i + 1;
            // add code here
            println!("send {}", i);
            match tx.send(i) {
                Ok(_) => (),
                Err(e) => {
                    println!("send error: {}, count: {}", e, i);
                    return;
                },
            }
        }
    });

    // 在主線程中接收子線程發(fā)送的消息并輸出
    new_thread.join().unwrap();
    println!("receive {}", rx.recv().unwrap());
}

最后的結(jié)果就是耗費內(nèi)存為止。

3.消息發(fā)送和接收的順序是一致的,滿足先進先出原則。

上面介紹的內(nèi)容大多是關(guān)于發(fā)送者和通道的,下面開始考察一下接收端。通過上面的幾個例子,細心一點的可能已經(jīng)發(fā)現(xiàn)接收者的recv方法應(yīng)該會阻塞當(dāng)前線程,如果不阻塞,在多線程的情況下,發(fā)送的消息就不可能接收完全。所以沒有發(fā)送者發(fā)送消息,那么接收者將會一直等待,這一點要謹記。在某些場景下,一直等待是符合實際需求的。但某些情況下并不需一直等待,那么就可以考慮釋放通道,只要通道釋放了,recv方法就會立即返回。

異步通道的具有良好的靈活性和擴展性,針對業(yè)務(wù)需要,可以靈活地應(yīng)用于實際項目中,實在是必備良藥!

同步通道

同步通道在使用上同異步通道一樣,接收端也是一樣的,唯一的區(qū)別在于發(fā)送端,我們先來看下面的例子:

use std::sync::mpsc;
use std::thread;

fn main() {
    // 創(chuàng)建一個同步通道
    let (tx, rx): (mpsc::SyncSender<i32>, mpsc::Receiver<i32>) = mpsc::sync_channel(0);

    // 創(chuàng)建線程用于發(fā)送消息
    let new_thread = thread::spawn(move || {
        // 發(fā)送一個消息,此處是數(shù)字id
        println!("before send");
        tx.send(1).unwrap();
        println!("after send");
    });

    println!("before sleep");
    thread::sleep_ms(5000);
    println!("after sleep");
    // 在主線程中接收子線程發(fā)送的消息并輸出
    println!("receive {}", rx.recv().unwrap());
    new_thread.join().unwrap();
}

運行結(jié)果:

before sleep
before send
after sleep
receive 1
after send

除了多了一些輸出代碼之外,上面這段代碼幾乎和前面的異步通道的沒有什么區(qū)別,唯一不同的在于創(chuàng)建同步通道的那行代碼。同步通道是sync_channel,對應(yīng)的發(fā)送者也變成了SyncSender。為了顯示出同步通道的區(qū)別,故意添加了一些打印。和異步通道相比,存在兩點不同:

  1. 同步通道是需要指定緩存的消息個數(shù)的,但需要注意的是,最小可以是0,表示沒有緩存。
  2. 發(fā)送者是會被阻塞的。當(dāng)通道的緩存隊列不能再緩存消息時,發(fā)送者發(fā)送消息時,就會被阻塞。

對照上面兩點和運行結(jié)果來分析,由于主線程在接收消息前先睡眠了,從而子線程這個時候會被調(diào)度執(zhí)行發(fā)送消息,由于通道能緩存的消息為0,而這個時候接收者還沒有接收,所以tx.send(1).unwrap()就會阻塞子線程,直到主線程接收消息,即執(zhí)行println!("receive {}", rx.recv().unwrap());。運行結(jié)果印證了這點,要是沒阻塞,那么在before send之后就應(yīng)該是after send了。

相比較而言,異步通道更沒有責(zé)任感一些,因為消息發(fā)送者一股腦的只管發(fā)送,不管接收者是否能快速處理。這樣就可能出現(xiàn)通道里面緩存大量的消息得不到處理,從而占用大量的內(nèi)存,最終導(dǎo)致內(nèi)存耗盡。而同步通道則能避免這種問題,把接受者的壓力能傳遞到發(fā)送者,從而一直傳遞下去。

上一篇:注釋與文檔下一篇:復(fù)合類型