鍍金池/ 教程/ Scala/ 實(shí)戰(zhàn)中的 Promise 和 Future
高階函數(shù)與 DRY
序列提取
譯者結(jié)語
類型 Future
類型 Option
Scala 初學(xué)指南
類型類
模式匹配與匿名函數(shù)
路徑依賴類型
提取器
類型 Either
Try 與錯(cuò)誤處理
介紹
實(shí)戰(zhàn)中的 Promise 和 Future
柯里化和部分函數(shù)應(yīng)用
無處不在的模式

實(shí)戰(zhàn)中的 Promise 和 Future

上一章介紹了 Future 類型,以及如何用它來編寫高可讀性、高組合性的異步執(zhí)行代碼。

Future 只是整個(gè)謎團(tuán)的一部分: 它是一個(gè)只讀類型,允許你使用它計(jì)算得到的值,或者處理計(jì)算中出現(xiàn)的錯(cuò)誤。 但是在這之前,必須得有一種方法把這個(gè)值放進(jìn)去。 這一章里,你將會看到如何通過 Promise 類型來達(dá)到這個(gè)目的。

類型 Promise

之前,我們把一段順序執(zhí)行的代碼塊傳遞給了 scala.concurrent 里的 future 方法, 并且在作用域中給出了一個(gè) ExecutionContext,它神奇地異步調(diào)用代碼塊,返回一個(gè) Future 類型的結(jié)果。

雖然這種獲得 Future 的方式很簡單,但還有其他的方法來創(chuàng)建 Future 實(shí)例,并填充它,這就是 Promise。 Promise 允許你在 Future 里放入一個(gè)值,不過只能做一次,F(xiàn)uture 一旦完成,就不能更改了。

一個(gè) Future 實(shí)例總是和一個(gè)(也只能是一個(gè))Promise 實(shí)例關(guān)聯(lián)在一起。 如果你在 REPL 里調(diào)用 future 方法,你會發(fā)現(xiàn)返回的也是一個(gè) Promise:

import concurrent.Future
import concurrent.Future

scala> import concurrent.future
import concurrent.future

scala> import concurrent.ExecutionContext.Implicits.global
import concurrent.ExecutionContext.Implicits.global

scala> val f: Future[String] = future { "Hello World!" }
f: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@2b509249

你得到的對象是一個(gè) DefaultPromise ,它實(shí)現(xiàn)了 FuturePromise 接口, 不過這就是具體的實(shí)現(xiàn)細(xì)節(jié)了(譯注,有興趣的讀者可翻閱其實(shí)現(xiàn)的源碼), 使用者只需要知道代碼實(shí)現(xiàn)把 Future 和對應(yīng)的 Promise 之間的聯(lián)系分的很清晰。

這個(gè)小例子說明了:除了通過 Promise,沒有其他方法可以完成一個(gè) Future, future 方法也只是一個(gè)輔助函數(shù),隱藏了具體的實(shí)現(xiàn)機(jī)制。

現(xiàn)在,讓我們動動手,看看怎樣直接使用 Promise 類型。

給出承諾

當(dāng)我們談?wù)撈鸪兄Z能否被兌現(xiàn)時(shí),一個(gè)很熟知的例子是那些政客的競選諾言。

假設(shè)被推選的政客給他的投票者一個(gè)減稅的承諾。 這可以用 Promise[TaxCut] 表示:

import concurrent.Promise
case class TaxCut(reduction: Int)
// either give the type as a type parameter to the factory method:
val taxcut = Promise[TaxCut]()
// or give the compiler a hint by specifying the type of your val:
val taxcut2: Promise[TaxCut] = Promise()
// taxcut: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
// taxcut2: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@346974c6

一旦創(chuàng)建了這個(gè) Promise,就可以在它上面調(diào)用 future 方法來獲取承諾的未來:

 val taxCutF: Future[TaxCut] = taxcut.future
 // `> scala.concurrent.Future[TaxCut] `  scala.concurrent.impl.Promise$DefaultPromise@66ae2a84

返回的 Future 可能并不和 Promise 一樣,但在同一個(gè) Promise 上調(diào)用 future 方法總是返回同一個(gè)對象, 以確保 Promise 和 Future 之間一對一的關(guān)系。

結(jié)束承諾

一旦給出了承諾,并告訴全世界會在不遠(yuǎn)的將來兌現(xiàn)它,那最好盡力去實(shí)現(xiàn)。 在 Scala 中,可以結(jié)束一個(gè) Promise,無論成功還是失敗。

兌現(xiàn)承諾

為了成功結(jié)束一個(gè) Promise,你可以調(diào)用它的 success 方法,并傳遞一個(gè)大家期許的結(jié)果:

  taxcut.success(TaxCut(20))

這樣做之后,Promise 就無法再寫入其他值了,如果偏要再寫,會產(chǎn)生異常。

此時(shí),和 Promise 關(guān)聯(lián)的 Future 也成功完成,注冊的回調(diào)會開始執(zhí)行, 或者說對這個(gè) Future 進(jìn)行了映射,那這個(gè)時(shí)候,映射函數(shù)也該執(zhí)行了。

一般來說,Promise 的完成和對返回的 Future 的處理發(fā)生在不同的線程。 很可能你創(chuàng)建了 Promise,并立即返回和它關(guān)聯(lián)的 Future 給調(diào)用者,而實(shí)際上,另外一個(gè)線程還在計(jì)算它。

為了說明這一點(diǎn),我們拿減稅來舉個(gè)例子:

object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
    val p = Promise[TaxCut]()
    Future {
      println("Starting the new legislative period.")
      Thread.sleep(2000)
      p.success(TaxCut(20))
      println("We reduced the taxes! You must reelect us!!!!1111")
    }
    p.future
  }
}

這個(gè)例子中使用了 Future 伴生對象,不過不要被它搞混淆了,這個(gè)例子的重點(diǎn)是:Promise 并不是在調(diào)用者的線程里完成的。

現(xiàn)在我們來兌現(xiàn)當(dāng)初的競選宣言,在 Future 上添加一個(gè) onComplete 回調(diào):

import scala.util.{Success, Failure}
val taxCutF: Future[TaxCut] = Government.redeemCampaignPledge()
println("Now that they're elected, let's see if they remember their promises...")
taxCutF.onComplete {
  case Success(TaxCut(reduction)) =>
    println(s"A miracle! They really cut our taxes by $reduction percentage points!")
  case Failure(ex) =>
    println(s"They broke their promises! Again! Because of a ${ex.getMessage}")
}

多次運(yùn)行這個(gè)例子,會發(fā)現(xiàn)顯示屏輸出的結(jié)果順序是不確定的,而且,最終回調(diào)函數(shù)會執(zhí)行,進(jìn)入成功的那個(gè) case 。

違背諾言

政客習(xí)慣違背諾言,Scala 程序員有時(shí)候也只能這樣做。 調(diào)用 failure 方法,傳遞一個(gè)異常,結(jié)束 Promise:

case class LameExcuse(msg: String) extends Exception(msg)
object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
     val p = Promise[TaxCut]()
     Future {
       println("Starting the new legislative period.")
       Thread.sleep(2000)
       p.failure(LameExcuse("global economy crisis"))
       println("We didn't fulfill our promises, but surely they'll understand.")
     }
     p.future
   }
}

這個(gè) redeemCampaignPledge 實(shí)現(xiàn)最終會違背承諾。 一旦用 failure 結(jié)束這個(gè) Promise,也無法再次寫入了,正如 success 方法一樣。 相關(guān)聯(lián)的 Future 也會以 Failure 收場。

如果已經(jīng)有了一個(gè) Try,那可以直接把它傳遞給 Promise 的 complete 方法,以此來結(jié)束這個(gè)它。 如果這個(gè) Try 是一個(gè) Success,關(guān)聯(lián)的 Future 會成功完成,否則,就失敗。

基于 Future 的編程實(shí)踐

如果想使用基于 Future 的編程范式以增加應(yīng)用的擴(kuò)展性,那應(yīng)用從下到上都必須被設(shè)計(jì)成非阻塞模式。 這意味著,基本上應(yīng)用層所有的函數(shù)都應(yīng)該是異步的,并且返回 Future。

當(dāng)下,一個(gè)可能的使用場景是開發(fā) Web 應(yīng)用。 流行的 Scala Web 框架,允許你將響應(yīng)作為 Future[Response] 返回,而不是等到你完成響應(yīng)再返回。 這個(gè)非常重要,因?yàn)樗试S Web 服務(wù)器用少量的線程處理更多的連接。 通過賦予服務(wù)器 Future[Response] 的能力,你可以最大化服務(wù)器線程池的利用率。

而且,應(yīng)用的服務(wù)可能需要多次調(diào)用數(shù)據(jù)庫層以及(或者)某些外部服務(wù), 這時(shí)候可以獲取多個(gè) Future,用 for 語句將它們組合成新的 Future,簡單可讀! 最終,Web 層再將這樣的一個(gè) Future 變成 Future[Response]。

但是該怎樣在實(shí)踐中實(shí)現(xiàn)這些呢?需要考慮三種不同的場景:

非阻塞IO

應(yīng)用很可能涉及到大量的 IO 操作。比如,可能需要和數(shù)據(jù)庫交互,還可能作為客戶端去調(diào)用其他的 Web 服務(wù)。

如果是這樣,可以使用一些基于 Java 非阻塞 IO 實(shí)現(xiàn)的庫,也可以直接或通過 Netty 這樣的庫來使用 Java 的 NIO API。 這樣的庫可以用定量的線程池處理大量的連接。

但如果是想開發(fā)這樣的一個(gè)庫,直接和 Promise 打交道更為合適。

阻塞 IO

有時(shí)候,并沒有基于 NIO 的庫可用。比如,Java 世界里大多數(shù)的數(shù)據(jù)庫驅(qū)動都是使用阻塞 IO。 在 Web 應(yīng)用中,如果用這樣的驅(qū)動發(fā)起大量訪問數(shù)據(jù)庫的調(diào)用,要記得這些調(diào)用是發(fā)生在服務(wù)器線程里的。 為了避免這個(gè)問題,可以將所有需要和數(shù)據(jù)庫交互的代碼都放入 future 代碼塊里,就像這樣:

// get back a Future[ResultSet] or something similar:
Future {
  queryDB(query)
}

到現(xiàn)在為止,我們都是使用隱式可用的全局 ExecutionContext 來執(zhí)行這些代碼塊。 通常,更好的方式是創(chuàng)建一個(gè)專用的 ExecutionContext 放在數(shù)據(jù)庫層里。 可以從 Java的 ExecutorService 來它,這也意味著,可以異步的調(diào)整線程池來執(zhí)行數(shù)據(jù)庫調(diào)用,應(yīng)用的其他部分不受影響。

import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

長時(shí)間運(yùn)行的計(jì)算

取決于應(yīng)用的本質(zhì)特點(diǎn),一個(gè)應(yīng)用偶爾還會調(diào)用一些長時(shí)間運(yùn)行的任務(wù),它們完全不涉及 IO(CPU 密集的任務(wù))。 這些任務(wù)也不應(yīng)該在服務(wù)器線程中執(zhí)行,因此需要將它們變成 Future:

Future {
  longRunningComputation(data, moreData)
}

同樣,最好有一些專屬的 ExecutionContext 來處理這些 CPU 密集的計(jì)算。 怎樣調(diào)整這些線程池大小取決于應(yīng)用的特征,這些已經(jīng)超過了本文的范圍。

總結(jié)

這一章里,我們學(xué)習(xí)了 Promise - 基于 Future 的并發(fā)范式的可寫組件,以及怎樣用它來完成一個(gè) Future; 同時(shí),還給出了一些在實(shí)踐中使用它們的建議。

下一章會討論 Scala 函數(shù)式編程是如何增加代碼可用性(一個(gè)長久以來和面向?qū)ο缶幊滔嚓P(guān)聯(lián)的概念)的。

上一篇:提取器下一篇:類型 Future