鍍金池/ 教程/ Scala/ Finagle 介紹
Java 與 Scala
Searchbird
更多的集合
高級(jí)類型
集合
基礎(chǔ)
使用 specs 測試
簡單構(gòu)建工具
Scala 并發(fā)編程
Finagle 介紹
類型和多態(tài)基礎(chǔ)
模式匹配與函數(shù)組合
基礎(chǔ)知識(shí)(續(xù))

Finagle 介紹

Finagle-Friendly REPL

我們將要討論的不是標(biāo)準(zhǔn) Scala 的代碼。如果你喜歡使用 REPL 學(xué)習(xí),你可能想知道如何獲得一個(gè)加入 Finagle 及其依賴的 Scala REPL。

你可以在這里獲取 Finagle 源代碼。

如果你在 finagle 目錄下有 Finagle 的源代碼,你可以通過下面的命令得到一個(gè)控制臺(tái)

$ cd finagle
$ ./sbt "project finagle-http" console
 ...build output...
scala>

Futures

Finagle 使用 com.twitter.util.Future [1]編碼延遲操作。Future 是尚未生成的值的一個(gè)句柄。Finagle 使用 Future 作為其異步 API 的返回值。同步 API 會(huì)在返回前等待結(jié)果;但是異步 API 則不會(huì)等待。例如,個(gè)對(duì)互聯(lián)網(wǎng)上一些服務(wù)的HTTP請(qǐng)求可能半秒都不會(huì)返回。你不希望你的程序阻塞等待半秒?!奥钡?API 可以立即返回一個(gè) Future,然后在需要解析其值時(shí)“填充”。

val myFuture = MySlowService(request) // returns right away
   ...do other things...
val serviceResult = myFuture.get() // blocks until service "fills in" myFuture

在實(shí)踐中,你不會(huì)發(fā)送一個(gè)請(qǐng)求,然后在幾行代碼后調(diào)用 myFuture.get。Future 提供了注冊(cè)回調(diào)的方法,在值變得可用時(shí)會(huì)調(diào)用注冊(cè)的回調(diào)函數(shù)。

如果你用過其他異步 API,當(dāng)看到“回調(diào)”你也許會(huì)畏縮。你可能會(huì)聯(lián)想到他們難以辨認(rèn)的代碼流,被調(diào)用的函數(shù)藏在離調(diào)用處遠(yuǎn)遠(yuǎn)的地方。但是,F(xiàn)uture 可以利用 Scala 中“函數(shù)是一等公民”的特性編寫出更可讀的代碼流。你可以在調(diào)用它的地方簡單地定義一個(gè)處理函數(shù)。

例如,寫代碼調(diào)度請(qǐng)求,然后“處理”回應(yīng),你可以保持代碼在一起:

val future = dispatch(req) // returns immediately, but future is "empty"
future onSuccess { reply => // when the future gets "filled", use its value
  println(reply)
}

你可以在 REPL 中用體驗(yàn)一下 Future。雖然不是學(xué)習(xí)如何在實(shí)際代碼中使用他們的好方法,但可以幫助理解 API。當(dāng)你使用 REPL,Promise 是一個(gè)方便的類。它是 Future 抽象類的一個(gè)具體子類。你可以用它來創(chuàng)建一個(gè)還沒有值的 Future。

scala> import com.twitter.util.{Future,Promise}
import com.twitter.util.{Future, Promise}

scala> val f6 = Future.value(6) // create already-resolved future
f6: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@c63a8af

scala> f6.get()
res0: Int = 6

scala> val fex = Future.exception(new Exception) // create resolved sad future
fex: com.twitter.util.Future[Nothing] = com.twitter.util.ConstFuture@38ddab20

scala> fex.get()
java.lang.Exception
  ... stack trace ...

scala> val pr7 = new Promise[Int] // create unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1994943491(...)

scala> pr7.get()
  ...console hangs, waiting for future to resolve...
Ctrl-C
Execution interrupted by signal.

scala> pr7.setValue(7)

scala> pr7.get()
res1: Int = 7

scala>

在實(shí)際代碼中使用 Future 時(shí),你通常不會(huì)調(diào)用 get,而是使用回調(diào)函數(shù)。get 僅僅是方便在 REPL 修修補(bǔ)補(bǔ)。

順序組合

Future 有類似集合 API 中的組合子(如 map, flatMap) ?;仡櫼幌录辖M合子,它讓你可以表達(dá)如 “我有一個(gè)整數(shù) List 和一個(gè) square 函數(shù):map 那個(gè)列表獲得整數(shù)平方的列表”這樣的操作。這種表達(dá)方式很靈巧;你可以把組合子函數(shù)和另一個(gè)函數(shù)放在一起有效地組成一個(gè)新函數(shù)。面向 Future 的組合子可以讓你這樣表達(dá):“我有一個(gè)期望整數(shù)的 Future 和一個(gè) square 函數(shù):map 那個(gè) Future 獲得一個(gè)期望整數(shù)平方的 Future”。

如果你在定義一個(gè)異步 API,傳入一個(gè)請(qǐng)求值,你的 API 應(yīng)該返回一個(gè)包裝在 Future 中的響應(yīng)。因此,這些把輸入和函數(shù)加入 Future 的組合子是相當(dāng)有用的:它們幫助你根據(jù)其它異步 API 定義你自己的異步 API。

最重要的 Future 的組合子是 flatMap[2]

def Future[A].flatMap[B](f: A => Future[B]): Future[B]

flatMap 序列化兩個(gè) Future。即,它接受一個(gè)Future和一個(gè)異步函數(shù),并返回另一個(gè) Future。方法簽名中是這樣寫的:給定一個(gè) Future 成功的值,函數(shù)f提供下一個(gè) Future。如果/當(dāng)輸入的 Future 成功完成,flatMap 自動(dòng)調(diào)用f。只有當(dāng)這兩個(gè) Future 都已完成,此操作所代表的 Future才算完成。如果任何一個(gè) Future 失敗,則操作確定的 Future 也將失敗。這種隱交織的錯(cuò)誤讓我們只需要在必要時(shí)來處理錯(cuò)誤,所以語法意義很大。flatMap 是這些語義組合子的標(biāo)準(zhǔn)名稱。

如果你有一個(gè) Future 并且想在異步 API 使用其值,使用 flatMap。例如,假設(shè)你有一個(gè) Future[User],需要一個(gè) Future[Boolean]表示用戶是否已被禁止。有一個(gè) isBanned 的異步 API 來判斷一個(gè)用戶是否已被禁止。此時(shí)可以使用 flatMap :

scala> import com.twitter.util.{Future,Promise}
import com.twitter.util.{Future, Promise}

scala> class User(n: String) { val name = n }
defined class User

scala> def isBanned(u: User) = { Future.value(false) }
isBanned: (u: User)com.twitter.util.Future[Boolean]

scala> val pru = new Promise[User]
pru: com.twitter.util.Promise[User] = Promise@897588993(...)

scala> val futBan = pru flatMap isBanned // apply isBanned to future
futBan: com.twitter.util.Future[Boolean] = Promise@1733189548(...)

scala> futBan.get()
  ...REPL hangs, futBan not resolved yet...
Ctrl-C
Execution interrupted by signal.

scala> pru.setValue(new User("prudence"))

scala> futBan.get()
res45: Boolean = false

scala>

同樣,如果要在 Future 中應(yīng)用一個(gè)同步函數(shù),可以使用 map。例如,假設(shè)你有一個(gè) Future[RawCredentials] 需要一個(gè) Future[Credentials]。你有一個(gè)的同步的 normalize 函數(shù)將 RawCredentials 轉(zhuǎn)換成 Credentials??梢允褂?map:

scala> class RawCredentials(u: String, pw: String) {
     |   val username = u
     |   val password = pw
     | }
defined class RawCredentials

scala> class Credentials(u: String, pw: String) {
     |   val username = u
     |   val password = pw
     | }
defined class Credentials

scala> def normalize(raw: RawCredentials) = {
     |   new Credentials(raw.username.toLowerCase(), raw.password)
     | }
normalize: (raw: RawCredentials)Credentials

scala> val praw = new Promise[RawCredentials]
praw: com.twitter.util.Promise[RawCredentials] = Promise@1341283926(...)

scala> val fcred = praw map normalize // apply normalize to future
fcred: com.twitter.util.Future[Credentials] = Promise@1309582018(...)

scala> fcred.get()
   ...REPL hangs, fcred doesn't have a value yet...
Ctrl-C
Execution interrupted by signal.

scala> praw.setValue(new RawCredentials("Florence", "nightingale"))

scala> fcred.get().username
res48: String = florence

scala>

Scala 有快捷語法來調(diào)用 flatMap:for 表達(dá)式。假設(shè)你想通過異步 API 驗(yàn)證登錄請(qǐng)求,然后通過另一個(gè)異步 API 檢查用戶是否被禁止。在 for 表達(dá)式的幫助下,我們可以這樣寫:

scala> def authenticate(req: LoginRequest) = {
     |   // TODO: we should check the password
     |   Future.value(new User(req.username))
     | }
authenticate: (req: LoginRequest)com.twitter.util.Future[User]

scala> val f = for {
     |  u <- authenticate(request)
     |  b <- isBanned(u)
     | } yield (u, b)
f: com.twitter.util.Future[(User, Boolean)] = Promise@35785606(...)

scala>

它產(chǎn)生一個(gè) f: Future[(User, Boolean)],包含用戶對(duì)象和一個(gè)表示該用戶是否已被禁止的布爾值。注意這里是怎樣實(shí)現(xiàn)順序組合的:isBanned 使用了 authenticate 的輸出作為其輸入。

并發(fā)組合

你可能想一次獲取來自多個(gè)服務(wù)的數(shù)據(jù)。例如,如果你正在編寫一個(gè) Web 服務(wù)來顯示內(nèi)容和廣告,它可能會(huì)從兩個(gè)服務(wù)中分別獲取內(nèi)容和廣告。但是,你怎么告訴代碼來等待兩份答復(fù)呢?如果必須自己實(shí)現(xiàn)可能會(huì)非常棘手,幸運(yùn)的是你可以使用并發(fā)組合子。

Future 提供了一些并發(fā)組合子。一般來說,他們都是將 Future 的一個(gè)序列轉(zhuǎn)換成包含一個(gè)序列的 Future,只是方式略微不同。這是很好的,因?yàn)樗ū举|(zhì)上)可以讓你把幾個(gè) Future 封裝成一個(gè)單一的 Future。

object Future {
  …
  def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
  def join(fs: Seq[Future[_]]): Future[Unit]
  def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]
}

collect 參數(shù)是具有相同類型 Future 的一個(gè)集合,返回一個(gè) Future,其類型是包含那個(gè)類型值的一個(gè)序列。當(dāng)所有的 Future 都成功完成或者當(dāng)中任何一個(gè)失敗,都會(huì)使這個(gè) Future 完成。返回序列的順序和傳入序列的順序相對(duì)應(yīng)。

scala> val f2 = Future.value(2)
f2: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@13ecdec0

scala> val f3 = Future.value(3)
f3: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@263bb672

scala> val f23 = Future.collect(Seq(f2, f3))
f23: com.twitter.util.Future[Seq[Int]] = Promise@635209178(...)

scala> val f5 = f23 map (_.sum)
f5: com.twitter.util.Future[Int] = Promise@1954478838(...)

scala> f5.get()
res9: Int = 5

join 參數(shù)是混合類型的 Future 序列,返回一個(gè) Future[Unit],當(dāng)所有的相關(guān) Future 完成時(shí)(無論他們是否失敗)該 Future 完成。其作用是標(biāo)識(shí)一組異構(gòu)操作完成。對(duì)那個(gè)內(nèi)容和廣告的例子來說,這可能是一個(gè)很好的解決方案。

scala> val ready = Future.join(Seq(f2, f3))
ready: com.twitter.util.Future[Unit] = Promise@699347471(...)

scala> ready.get() // doesn't ret value, but I know my futures are done

scala>

當(dāng)傳入的 Future 序列的第一個(gè) Future 完成的時(shí)候,select 會(huì)返回一個(gè) Future。它會(huì)將那個(gè)完成的 Future 和其它未完成的 Future 一起放在 Seq 中返回。 (它不會(huì)做任何事情來取消剩余的 Future。你可以等待更多的回應(yīng),或者忽略他們)

scala> val pr7 = new Promise[Int] // unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1608532943(...)

scala> val sel = Future.select(Seq(f2, pr7)) // select from 2 futs, one resolved
sel: com.twitter.util.Future[...] = Promise@1003382737(...)

scala> val(complete, stragglers) = sel.get()
complete: com.twitter.util.Try[Int] = Return(2)
stragglers: Seq[...] = List(...)

scala> complete.get()
res110: Int = 2

scala> stragglers(0).get() // our list of not-yet-finished futures has one item
  ...get() hangs the REPL because this straggling future is not finished...
Ctrl-C
Execution interrupted by signal.

scala> pr7.setValue(7)

scala> stragglers(0).get()
res113: Int = 7

scala>

組合例子:緩存速率限制

這些組合子表達(dá)了典型的網(wǎng)絡(luò)服務(wù)操作。這段假設(shè)的代碼在對(duì)速率進(jìn)行限制(為了保持本地速率限制緩存)的同時(shí),將用戶的請(qǐng)求調(diào)度到后臺(tái)服務(wù):

// Find out if user is rate-limited. This can be slow; we have to ask
// the remote server that keeps track of who is rate-limited.
def isRateLimited(u: User): Future[Boolean] = {
  ...
}

// Notice how you can swap this implementation out now with something that might
// implement a different, more restrictive policy.

// Check the cache to find out if user is rate-limited. This cache
// implementation is just a Map, and can return a value right way. But we
// return a Future anyhow in case we need to use a slower implementation later.
def isLimitedByCache(u: User): Future[Boolean] =  Future.value(limitCache(u))

// Update the cache
def setIsLimitedInCache(user: User, v: Boolean) { limitCache(user) = v }

// Get a timeline of tweets... unless the user is rate-limited (then throw
// an exception instead)
def getTimeline(cred: Credentials): Future[Timeline] =
  isLimitedByCache(cred.user) flatMap {
    case true => Future.exception(new Exception("rate limited"))
    case false =>

      // First we get auth'd user then we get timeline.
      // Sequential composition of asynchronous APIs: use flatMap
      val timeline = auth(cred) flatMap(getTimeline)
      val limited = isRateLimited(cred.user) onSuccess(
                                       setIsLimitedInCache(cred.user, _))

      // 'join' concurrently combines differently-typed futures
      // 'flatMap' sequentially combines, specifies what to do next
      timeline join limited flatMap {
        case (_, true) => Future.exception(new Exception("rate limited"))
        case (timeline, _) => Future.value(timeline)
      }
  }
}

這個(gè)例子結(jié)合了順序和并發(fā)組合。請(qǐng)注意,除了給轉(zhuǎn)化速率限制回應(yīng)一個(gè)異常以外,沒有明確的錯(cuò)誤處理。如果任何 Future 在這里失敗,它會(huì)自動(dòng)傳播到返回的 Future 中。

組合例子:網(wǎng)絡(luò)爬蟲

你已經(jīng)看到了怎樣使用 Future 組合子的例子,不過也許意猶未盡。假設(shè)你有一個(gè)簡單的互聯(lián)網(wǎng)模型。該互聯(lián)網(wǎng)中只有 HTML 網(wǎng)頁和圖片,其中頁面可以鏈接到圖像和其他網(wǎng)頁。你可以獲取一個(gè)頁面或圖像,但 API 是異步的。這個(gè)假設(shè)的 API 成這些“可獲取”的數(shù)據(jù)為資源:

import com.twitter.util.{Try,Future,Promise}

// a fetchable thing
trait Resource {
  def imageLinks(): Seq[String]
  def links(): Seq[String]
}

// HTML pages can link to Imgs and to other HTML pages.
class HTMLPage(val i: Seq[String], val l: Seq[String]) extends Resource {
  def imageLinks() = i
  def links = l
}

// IMGs don't actually link to anything else
class Img() extends Resource {
  def imageLinks() = Seq()
  def links() = Seq()
}

// profile.html links to gallery.html and has an image link to portrait.jpg
val profile = new HTMLPage(Seq("portrait.jpg"), Seq("gallery.html"))
val portrait = new Img

// gallery.html links to profile.html and two images
val gallery = new HTMLPage(Seq("kitten.jpg", "puppy.jpg"), Seq("profile.html"))
val kitten = new Img
val puppy = new Img

val internet = Map(
  "profile.html" -> profile,
  "gallery.html" -> gallery,
  "portrait.jpg" -> portrait,
  "kitten.jpg" -> kitten,
  "puppy.jpg" -> puppy
)

// fetch(url) attempts to fetch a resource from our fake internet.
// Its returned Future might contain a Resource or an exception
def fetch(url: String) = { new Promise(Try(internet(url))) }

順序組合

假設(shè)給定一個(gè)頁面 URL,而你希望獲取該頁面的第一個(gè)圖。也許你正在做一個(gè)網(wǎng)站,在上面用戶可以發(fā)布有趣的網(wǎng)頁鏈接。為了幫助其他用戶決定某個(gè)鏈接是否值得追蹤,你打算顯示那個(gè)鏈接中第一張圖像的縮略圖。

即使你不知道組合子,你仍然可以寫一個(gè)縮略圖獲取函數(shù):

def getThumbnail(url: String): Future[Resource]={
  val returnVal = new Promise[Resource]

  fetch(url) onSuccess { page => // callback for successful page fetch
    fetch(page.imageLinks()(0)) onSuccess { p => // callback for successful img fetch
      returnVal.setValue(p)
    } onFailure { exc => // callback for failed img fetch
      returnVal.setException(exc)
    }
  } onFailure { exc => // callback for failed page fetch
    returnVal.setException(exc)
  }
  returnVal
}

這個(gè)版本的函數(shù)能工作。它的大部分內(nèi)容用來解析 Future,然后把他們的內(nèi)容傳給另一個(gè) Future。

我們希望得到一個(gè)頁面,然后從該頁面獲得一個(gè)圖像。如果你想獲得 A,然后再獲得 B 的,這通常意味著順序組合。由于 B 是異步的,所以需要使用 flatMap:

def getThumbnail(url: String): Future[Resource] =
  fetch(url) flatMap { page => fetch(page.imageLinks()(0)) }

通過并發(fā)組合

抓取頁面的第一個(gè)圖片是好的,但也許我們應(yīng)該獲取所有圖片,并讓用戶自己進(jìn)行選擇。我們可以使用for循環(huán)一個(gè)個(gè)地抓取,但這需要很長時(shí)間;所以我們想并行獲取它們。如果你想的事情“并行”發(fā)生,這通常意味著并發(fā)組合。所以我們使用 Future.collect 的提取所有的圖像:

def getThumbnails(url:String): Future[Seq[Resource]] =
  fetch(url) flatMap { page =>
    Future.collect(
      page.imageLinks map { u => fetch(u) }
    )
  }

如果這對(duì)你有意義,那太好了。你可能會(huì)看不懂這行代碼 page.imageLinks map { u => fetch(u) }:它使用 map 和 map 后的函數(shù)返回一個(gè) Future。當(dāng)接下來的事情是返回一個(gè)Future時(shí),我們不是應(yīng)該使用flatMap嗎?但是請(qǐng)注意,在 map 前的不是一個(gè) Future;它是一個(gè)集合。collection map function 返回一個(gè)集合;我們使用 Future.collect 收集 Future 的集合到一個(gè) Future 中。

并發(fā) + 遞歸

除了頁面中的圖片以外,我們可能會(huì)想獲取它鏈接的其他頁面。通過遞歸我們可以構(gòu)建一個(gè)簡單的網(wǎng)絡(luò)爬蟲。

// Return
def crawl(url: String): Future[Seq[Resource]] =
  fetch(url) flatMap { page =>
    Future.collect(
      page.links map { u => crawl(u) }
    ) map { pps => pps.flatten }
}

crawl("profile.html")
   ...hangs REPL, infinite loop...
Ctrl-C
Execution interrupted by signal.

scala>
// She's gone rogue, captain! Have to take her out!
// Calling Thread.stop on runaway Thread[Thread-93,5,main] with offending code:
// scala> crawl("profile.html")

在實(shí)踐中,這個(gè)網(wǎng)絡(luò)爬蟲不是很有用:首先我們沒有告訴它何時(shí)停止爬行;其次即使資源剛剛被獲取過,它仍然會(huì)不厭其煩地重新獲取。

服務(wù)

一個(gè) Finagle 服務(wù)用來處理 RPC,讀取請(qǐng)求并給予回復(fù)的。服務(wù)是針對(duì)請(qǐng)求和回應(yīng)的一個(gè)函數(shù)Req => Future[Rep]

abstract class Service[-Req, +Rep] extends (Req => Future[Rep])

http://wiki.jikexueyuan.com/project/scala/images/finagle_client_server.png" alt="" />

在服務(wù)中,我們要同時(shí)定義客戶端和服務(wù)器。

一個(gè) Finagle 客戶端“引入”一個(gè)網(wǎng)絡(luò)服務(wù)。從概念上講,F(xiàn)inagle 客戶端由兩部分組成

  • 一個(gè)使用服務(wù)的函數(shù):分發(fā)一個(gè) Req 并處理 Future[Rep]
  • 配置怎樣分發(fā)這些請(qǐng)求;例如,作為 HTTP 請(qǐng)求發(fā)送到 api.twitter.com 的 80 端口

同樣,F(xiàn)inagle 服務(wù)端“輸出”網(wǎng)絡(luò)服務(wù)。一個(gè)服務(wù)端由兩個(gè)部分組成:

  • 一個(gè)實(shí)現(xiàn)服務(wù)的函數(shù):傳入一個(gè) Req 并返回一個(gè) Future[Rep]
  • 配置如何“監(jiān)聽”輸入的 Reqs;例如,在 80 端口的 HTTP 請(qǐng)求。

這種設(shè)計(jì)分離了服務(wù)的“業(yè)務(wù)邏輯”和數(shù)據(jù)如何在網(wǎng)絡(luò)中流動(dòng)的配置。

http://wiki.jikexueyuan.com/project/scala/images/finagle_filter.png" alt="" />

我們也談?wù)?Finagle “過濾器”。過濾器在服務(wù)之間,修改流經(jīng)它的數(shù)據(jù)。過濾器可以很好地和服務(wù)組合在一起。例如,如果你有一個(gè)速率限制過濾器和一個(gè) tweet 服務(wù),你可以把它們組合在一起形成有速率限制的 tweet 服務(wù)。

客戶端

一個(gè) Finagle 客戶端“引入”一個(gè)網(wǎng)絡(luò)服務(wù)。它有一些配置來設(shè)定如何在網(wǎng)絡(luò)上發(fā)送數(shù)據(jù)。一個(gè)簡單的 HTTP 客戶端可能看起來像這樣:

import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpRequest, HttpResponse, HttpVersion, HttpMethod}
import com.twitter.finagle.Service
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http

// Don't worry, we discuss this magic "ClientBuilder" later
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
  .codec(Http())
  .hosts("twitter.com:80") // If >1 host, client does simple load-balancing
  .hostConnectionLimit(1)
  .build()

val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")

val f = client(req) // Client, send the request

// Handle the response:
f onSuccess { res =>
  println("got response", res)
} onFailure { exc =>
  println("failed :-(", exc)
}

服務(wù)端

一個(gè)服務(wù)端按服務(wù)進(jìn)行定義,并配置如何“監(jiān)聽”網(wǎng)絡(luò)上的請(qǐng)求。一個(gè)簡單的 HTTP 服務(wù)端可能看起來像這樣:

import com.twitter.finagle.Service
import com.twitter.finagle.http.Http
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion, HttpResponseStatus, HttpRequest, HttpResponse}
import java.net.{SocketAddress, InetSocketAddress}
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.builder.ServerBuilder

// Define our service: OK response for root, 404 for other paths
val rootService = new Service[HttpRequest, HttpResponse] {
  def apply(request: HttpRequest) = {
    val r = request.getUri match {
      case "/" => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
      case _ => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)
    }
    Future.value(r)
  }
}

// Serve our service on a port
val address: SocketAddress = new InetSocketAddress(10000)
val server: Server = ServerBuilder()
  .codec(Http())
  .bindTo(address)
  .name("HttpServer")
  .build(rootService)

這個(gè)name是我們強(qiáng)加的,雖然沒有在例子中使用它,但這個(gè)字段對(duì)分析和調(diào)試是很有用的。

過濾器

過濾器改造服務(wù),它們可以提供通用的服務(wù)功能。例如你有幾個(gè)服務(wù)需要支持速率限制,這時(shí)可以寫一個(gè)限速過濾器并將其應(yīng)用于所有的服務(wù)就解決問題了。過濾器也可以將服務(wù)分解成不同的階段。

一個(gè)簡單的代理可能看起來像這樣:

class MyService(client: Service[..]) extends Service[HttpRequest, HttpResponse]
{
  def apply(request: HttpRequest) = {
    client(rewriteReq(request)) map { res =>
      rewriteRes(res)
    }
  }
}

其中 rewriteReq 和 rewriteRes 可以提供協(xié)議翻譯,例如。

abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
  extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])

通過圖示可以更清晰地看出其類型:

    ((ReqIn, Service[ReqOut, RepIn])
         => Future[RepOut])

          (*   Service   *)
[ReqIn -> (ReqOut -> RepIn) -> RepOut]

下面的例子展示了怎樣通過過濾器來提供服務(wù)超時(shí)機(jī)制。

class TimeoutFilter[Req, Rep](
  timeout: Duration,
  exception: RequestTimeoutException,
  timer: Timer)
  extends Filter[Req, Rep, Req, Rep]
{
  def this(timeout: Duration, timer: Timer) =
    this(timeout, new IndividualRequestTimeoutException(timeout), timer)

  def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
    val res = service(request)

    res.within(timer, timeout) rescue {
      case _: java.util.concurrent.TimeoutException =>
        res.cancel()
        Trace.record(TimeoutFilter.TimeoutAnnotation)
        Future.exception(exception)
    }
  }
}

這個(gè)例子展示了怎樣(通過認(rèn)證服務(wù))提供身份驗(yàn)證來將 Service[AuthHttpReq, HttpRep] 轉(zhuǎn)換為 Service[HttpReq, HttpRep]。

class RequireAuthentication(authService: AuthService)
  extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] {
  def apply(
    req: HttpReq,
    service: Service[AuthHttpReq, HttpRep]
  ) = {
    authService.auth(req) flatMap {
      case AuthResult(AuthResultCode.OK, Some(passport), _) =>
        service(AuthHttpReq(req, passport))
      case ar: AuthResult =>
        Future.exception(
          new RequestUnauthenticated(ar.resultCode))
    }
  }
}

這樣使用過濾器是有好處的。它可以幫助你將“身份驗(yàn)證邏輯”固定在一個(gè)地方。擁有一個(gè)獨(dú)立的類型執(zhí)行請(qǐng)求授權(quán),會(huì)使追查程序安全問題變得更容易。

過濾器可以使用 andThen 組合在一起。傳入一個(gè) Service 參數(shù)給 andThen 將創(chuàng)建一個(gè)(添加了過濾功能)的Service(類型用來做說明)。

val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep]
val serviceRequiringAuth: Service[AuthHttpReq, HttpRep]

val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] =
  authFilter andThen timeoutFilter

val authenticatedTimedOutService: Service[HttpReq, HttpRep] =
  authenticateAndTimedOut andThen serviceRequiringAuth

生成器(Builder)

生成器把所有組件組合在一起。一個(gè) ClientBuilder 對(duì)給定的一組參數(shù)生成一個(gè) Service,而一個(gè) ServerBuilder 獲取一個(gè) Service 的實(shí)例,并調(diào)度傳入請(qǐng)求給它。為了確定 Service 的類型,我們必須提供一個(gè)編解碼器(Codec)。編解碼器提供底層協(xié)議的實(shí)現(xiàn)(如 HTTP,thrift,memcached)。這兩個(gè) Builder 都有很多參數(shù),其中一些是必填的。

下面是一個(gè)調(diào)用 ClientBuilder 的例子(類型用來做說明)

val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
  .codec(Http)
  .hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003")
  .hostConnectionLimit(1)
  .tcpConnectTimeout(1.second)
  .retries(2)
  .reportTo(new OstrichStatsReceiver)
  .build()

這將構(gòu)建一個(gè)客戶端在三個(gè)主機(jī)上進(jìn)行負(fù)載平衡,最多在每臺(tái)主機(jī)建立一個(gè)連接,并在兩次失敗嘗試后放棄。統(tǒng)計(jì)數(shù)據(jù)會(huì)報(bào)給 ostrich 。以下生成器選項(xiàng)是必須的(而且它們也被靜態(tài)強(qiáng)制填寫了):hosts 或 cluster, codec 和 hostConnectionLimit。

同樣的,你也可以使用一個(gè) ServerBuilder 來創(chuàng)建“監(jiān)聽”傳入請(qǐng)求的服務(wù):

val service = new MyService(...) // construct instance of your Finagle service
var filter = new MyFilter(...) // and maybe some filters
var filteredServce = filter andThen service
val  server = ServerBuilder()
  .bindTo(new InetSocketAddress(port))
  .codec(ThriftServerFramedCodec())
  .name("my filtered service")
//  .hostConnectionMaxLifeTime(5.minutes)
//  .readTimeout(2.minutes)
  .build(filteredService)

通過這些參數(shù)會(huì)生成一個(gè)Thrift服務(wù)器監(jiān)聽端口 port,并將請(qǐng)求分發(fā)給 service。如果我們?nèi)サ?hostConnectionMaxLifeTime 的注釋,每個(gè)連接將被允許留存長達(dá) 5 分鐘。如果我們?nèi)サ?readTimeout 的注釋,那么我們就需要在 2 分鐘之內(nèi)發(fā)送請(qǐng)求。ServerBuilder 必選項(xiàng)有:name, bindTo 和 codec。

不要阻塞(除非你用正確的方式)

Finagle 自動(dòng)操縱線程來保證服務(wù)順利運(yùn)行。但是,如果你的服務(wù)阻塞了,它會(huì)阻塞所有 Finagle 線程。

如果你的代碼調(diào)用了一個(gè)阻塞操作(apply 或 get),使用 Future 池來包裝阻塞代碼。阻塞操作將運(yùn)行在自己的線程池中,返回一個(gè) Future 來完成(或失敗)這個(gè)操作,并可以和其它 Future 組合。 如果你的代碼中使用 Future 的順序組合,不用擔(dān)心它會(huì)“阻塞”組合中的 Future。

[1]小心,還有其它“Future”類。不要將 com.twitter.util.Future 和scala.actor.Future 或 java.util.concurrent.Future 混淆起來!

[2] 如果你學(xué)習(xí)類型系統(tǒng)和/或分類理論,你會(huì)高興地發(fā)現(xiàn) flatMap 相當(dāng)于一元綁定。