稍加考慮,上一節(jié)的練習(xí)題其實是不完整的,它只是評分系統(tǒng)中的一環(huán),一個評分系統(tǒng)是需要先把信息從數(shù)據(jù)庫或文件中讀取出來,然后才是評分,最后還需要把評分結(jié)果再保存到數(shù)據(jù)庫或文件中去。如果一步一步串行地做這三個步驟,是完全沒有問題的。那么我們是否可以用三個線程來分別做這三個步驟呢?上一節(jié)練習(xí)題我們已經(jīng)用了一個線程來實現(xiàn)評分,那么我們是否也可以再用一個線程來讀取成績,再用另個線程來實現(xiàn)保存呢? 如果能這樣的話,那么我們就可以利用上多核多cpu的優(yōu)勢,加快整個評分的效率。既然在此提出這個問題,答案就很明顯了。問題在于我們要怎么在Rust中來實現(xiàn),關(guān)鍵在于三個線程怎么交換信息,以達到串行的邏輯處理順序?
為了解決這個問題,下面將介紹一種Rust在標(biāo)準(zhǔn)庫中支持的消息傳遞技術(shù)。消息傳遞是并發(fā)模型里面大家比較推崇的模式,不僅僅是因為使用起來比較簡單,關(guān)鍵還在于它可以減少數(shù)據(jù)競爭,提高并發(fā)效率,為此值得深入學(xué)習(xí)。Rust是通過一個叫做通道(channel
)的東西來實現(xiàn)這種模式的,下面直接進入主題。
Rust的通道(channel
)可以把一個線程的消息(數(shù)據(jù))傳遞到另一個線程,從而讓信息在不同的線程中流動,從而實現(xiàn)協(xié)作。詳情請參見std::sync::mpsc
。通道的兩端分別是發(fā)送者(Sender
)和接收者(Receiver
),發(fā)送者負責(zé)從一個線程發(fā)送消息,接收者則在另一個線程中接收該消息。下面我們來看一個簡單的例子:
use std::sync::mpsc;
use std::thread;
fn main() {
// 創(chuàng)建一個通道
let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) =
mpsc::channel();
// 創(chuàng)建線程用于發(fā)送消息
thread::spawn(move || {
// 發(fā)送一個消息,此處是數(shù)字id
tx.send(1).unwrap();
});
// 在主線程中接收子線程發(fā)送的消息并輸出
println!("receive {}", rx.recv().unwrap());
}
程序說明參見代碼中的注釋,程序執(zhí)行結(jié)果為:
receive 1
結(jié)果表明main
所在的主線程接收到了新建線程發(fā)送的消息,用Rust在線程間傳遞消息就是這么簡單!
雖然簡單,但使用過其他語言就會知道,通道有多種使用方式,且比較靈活,為此我們需要進一步考慮關(guān)于Rust
的Channel
的幾個問題:
讓我們帶著這些問題和思考進入下一個小節(jié),那里有相關(guān)的答案。
上面的例子中,我們傳遞的消息類型為i32
,除了這種類型之外,是否還可以傳遞更多的原始類型,或者更復(fù)雜的類型,和自定義類型?下面我們嘗試發(fā)送一個更復(fù)雜的Rc
類型的消息:
use std::fmt;
use std::sync::mpsc;
use std::thread;
use std::rc::Rc;
pub struct Student {
id: u32
}
impl fmt::Display for Student {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "student {}", self.id)
}
}
fn main() {
// 創(chuàng)建一個通道
let (tx, rx): (mpsc::Sender<Rc<Student>>, mpsc::Receiver<Rc<Student>>) =
mpsc::channel();
// 創(chuàng)建線程用于發(fā)送消息
thread::spawn(move || {
// 發(fā)送一個消息,此處是數(shù)字id
tx.send(Rc::new(Student{
id: 1,
})).unwrap();
});
// 在主線程中接收子線程發(fā)送的消息并輸出
println!("receive {}", rx.recv().unwrap());
}
編譯代碼,奇跡沒有出現(xiàn),編譯時錯誤,錯誤提示:
error: the trait `core::marker::Send` is not
implemented for the type `alloc::rc::Rc<Student>` [E0277]
note: `alloc::rc::Rc<Student>` cannot be sent between threads safely
看來并不是所有類型的消息都可以通過通道發(fā)送,消息類型必須實現(xiàn)marker trait Send
。Rust之所以這樣強制要求,主要是為了解決并發(fā)安全的問題,再一次強調(diào),安全是Rust考慮的重中之重。如果一個類型是Send
,則表明它可以在線程間安全的轉(zhuǎn)移所有權(quán)(ownership
),當(dāng)所有權(quán)從一個線程轉(zhuǎn)移到另一個線程后,同一時間就只會存在一個線程能訪問它,這樣就避免了數(shù)據(jù)競爭,從而做到線程安全。ownership
的強大又一次顯示出來了。通過這種做法,在編譯時即可要求所有的代碼必須滿足這一約定,這種方式方法值得借鑒,trait
也是非常強大。
看起來問題得到了完美的解決,然而由于Send
本身是一個不安全的marker trait
,并沒有實際的API
,所以實現(xiàn)它很簡單,但沒有強制保障,就只能靠開發(fā)者自己約束,否則還是可能引發(fā)并發(fā)安全問題。對于這一點,也不必太過擔(dān)心,因為Rust中已經(jīng)存在的類,都已經(jīng)實現(xiàn)了Send
或!Send
,我們只要使用就行。Send
是一個默認應(yīng)用到所有Rust已存在類的trait,所以我們用!Send
顯式標(biāo)明該類沒有實現(xiàn)Send
。目前幾乎所有的原始類型都是Send
,例如前面例子中發(fā)送的i32
。對于開發(fā)者而言,我們可能會更關(guān)心哪些是非Send
,也就是實現(xiàn)了!Send
,因為這會導(dǎo)致線程不安全。更全面的信息參見Send
官網(wǎng)API。
對于不是Send
的情況(!Send
),大致分為兩類:
*mut T
和*const T
,因為不同線程通過指針都可以訪問數(shù)據(jù),從而可能引發(fā)線程安全問題。Rc
和Weak
也不是,因為引用計數(shù)會被共享,但是并沒有做并發(fā)控制。雖然有這些!Send
的情況,但是逃不過編譯器的火眼金睛,只要你錯誤地使用了消息類型,編譯器都會給出類似于上面的錯誤提示。我們要擔(dān)心的不是這些,因為錯誤更容易出現(xiàn)在新創(chuàng)建的自定義類,有下面兩點需要注意:
如果自定義類的所有字段都是Send
,那么這個自定義類也是Send
。
反之,如果有一個字段是!Send
,那么這個自定義類也是!Send
。
如果類的字段存在遞歸包含的情況,按照該原則以此類推來推論類是Send
還是!Send
。
Send
或者!Send
時,必須確保符合它的約定。到此,消息類型的相關(guān)知識已經(jīng)介紹完了,說了這么久,也該讓大家自己練習(xí)一下了:請實現(xiàn)一個自定義類,該類包含一個Rc字段,讓這個類變成可以在通道中發(fā)送的消息類型。
在粗略地嘗試通道之后,是時候更深入一下了。Rust的標(biāo)準(zhǔn)庫其實提供了兩種類型的通道:異步通道和同步通道。上面的例子都是使用的異步通道,為此這一小節(jié)我們優(yōu)先進一步介紹異步通道,后續(xù)再介紹同步通道。異步通道指的是:不管接收者是否正在接收消息,消息發(fā)送者在發(fā)送消息時都不會阻塞。為了驗證這一點,我們嘗試多增加一個線程來發(fā)送消息:
use std::sync::mpsc;
use std::thread;
// 線程數(shù)量
const THREAD_COUNT :i32 = 2;
fn main() {
// 創(chuàng)建一個通道
let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();
// 創(chuàng)建線程用于發(fā)送消息
for id in 0..THREAD_COUNT {
// 注意Sender是可以clone的,這樣就可以支持多個發(fā)送者
let thread_tx = tx.clone();
thread::spawn(move || {
// 發(fā)送一個消息,此處是數(shù)字id
thread_tx.send(id + 1).unwrap();
println!("send {}", id + 1);
});
}
thread::sleep_ms(2000);
println!("wake up");
// 在主線程中接收子線程發(fā)送的消息并輸出
for _ in 0..THREAD_COUNT {
println!("receive {}", rx.recv().unwrap());
}
}
運行結(jié)果:
send 1
send 2
wake up
receive 1
receive 2
在代碼中,我們故意讓main
所在的主線程睡眠2秒,從而讓發(fā)送者所在線程優(yōu)先執(zhí)行,通過結(jié)果可以發(fā)現(xiàn),發(fā)送者發(fā)送消息時確實沒有阻塞。還記得在前面提到過很多關(guān)于通道的問題嗎?從這個例子里面還發(fā)現(xiàn)什么沒?除了不阻塞之外,我們還能發(fā)現(xiàn)另外的三個特征:
1.通道是可以同時支持多個發(fā)送者的,通過clone
的方式來實現(xiàn)。
這類似于Rc
的共享機制。
其實從Channel
所在的庫名std::sync::mpsc
也可以知道這點。
因為mpsc
就是多生產(chǎn)者單消費者(Multiple Producers Single Consumer)的簡寫。
可以有多個發(fā)送者,但只能有一個接收者,即支持的N:1模式。
2.異步通道具備消息緩存的功能,因為1和2是在沒有接收之前就發(fā)了的,在此之后還能接收到這兩個消息。
那么通道到底能緩存多少消息?在理論上是無窮的,嘗試一下便知:
use std::sync::mpsc;
use std::thread;
fn main() {
// 創(chuàng)建一個通道
let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();
// 創(chuàng)建線程用于發(fā)送消息
let new_thread = thread::spawn(move || {
// 發(fā)送無窮多個消息
let mut i = 0;
loop {
i = i + 1;
// add code here
println!("send {}", i);
match tx.send(i) {
Ok(_) => (),
Err(e) => {
println!("send error: {}, count: {}", e, i);
return;
},
}
}
});
// 在主線程中接收子線程發(fā)送的消息并輸出
new_thread.join().unwrap();
println!("receive {}", rx.recv().unwrap());
}
最后的結(jié)果就是耗費內(nèi)存為止。
3.消息發(fā)送和接收的順序是一致的,滿足先進先出原則。
上面介紹的內(nèi)容大多是關(guān)于發(fā)送者和通道的,下面開始考察一下接收端。通過上面的幾個例子,細心一點的可能已經(jīng)發(fā)現(xiàn)接收者的recv
方法應(yīng)該會阻塞當(dāng)前線程,如果不阻塞,在多線程的情況下,發(fā)送的消息就不可能接收完全。所以沒有發(fā)送者發(fā)送消息,那么接收者將會一直等待,這一點要謹記。在某些場景下,一直等待是符合實際需求的。但某些情況下并不需一直等待,那么就可以考慮釋放通道,只要通道釋放了,recv
方法就會立即返回。
異步通道的具有良好的靈活性和擴展性,針對業(yè)務(wù)需要,可以靈活地應(yīng)用于實際項目中,實在是必備良藥!
同步通道在使用上同異步通道一樣,接收端也是一樣的,唯一的區(qū)別在于發(fā)送端,我們先來看下面的例子:
use std::sync::mpsc;
use std::thread;
fn main() {
// 創(chuàng)建一個同步通道
let (tx, rx): (mpsc::SyncSender<i32>, mpsc::Receiver<i32>) = mpsc::sync_channel(0);
// 創(chuàng)建線程用于發(fā)送消息
let new_thread = thread::spawn(move || {
// 發(fā)送一個消息,此處是數(shù)字id
println!("before send");
tx.send(1).unwrap();
println!("after send");
});
println!("before sleep");
thread::sleep_ms(5000);
println!("after sleep");
// 在主線程中接收子線程發(fā)送的消息并輸出
println!("receive {}", rx.recv().unwrap());
new_thread.join().unwrap();
}
運行結(jié)果:
before sleep
before send
after sleep
receive 1
after send
除了多了一些輸出代碼之外,上面這段代碼幾乎和前面的異步通道的沒有什么區(qū)別,唯一不同的在于創(chuàng)建同步通道的那行代碼。同步通道是sync_channel
,對應(yīng)的發(fā)送者也變成了SyncSender
。為了顯示出同步通道的區(qū)別,故意添加了一些打印。和異步通道相比,存在兩點不同:
對照上面兩點和運行結(jié)果來分析,由于主線程在接收消息前先睡眠了,從而子線程這個時候會被調(diào)度執(zhí)行發(fā)送消息,由于通道能緩存的消息為0,而這個時候接收者還沒有接收,所以tx.send(1).unwrap()
就會阻塞子線程,直到主線程接收消息,即執(zhí)行println!("receive {}", rx.recv().unwrap());
。運行結(jié)果印證了這點,要是沒阻塞,那么在before send
之后就應(yīng)該是after send
了。
相比較而言,異步通道更沒有責(zé)任感一些,因為消息發(fā)送者一股腦的只管發(fā)送,不管接收者是否能快速處理。這樣就可能出現(xiàn)通道里面緩存大量的消息得不到處理,從而占用大量的內(nèi)存,最終導(dǎo)致內(nèi)存耗盡。而同步通道則能避免這種問題,把接受者的壓力能傳遞到發(fā)送者,從而一直傳遞下去。