鍍金池/ 教程/ Scala/ Searchbird
Java 與 Scala
Searchbird
更多的集合
高級類型
集合
基礎(chǔ)
使用 specs 測試
簡單構(gòu)建工具
Scala 并發(fā)編程
Finagle 介紹
類型和多態(tài)基礎(chǔ)
模式匹配與函數(shù)組合
基礎(chǔ)知識(續(xù))

Searchbird

我們要使用 Scala 和先前介紹的 Finagle 框架構(gòu)建一個簡單的分布式搜索引擎。

設(shè)計目標:大圖景

從廣義上講,我們的設(shè)計目標包括 抽象 (abstraction:在不知道其內(nèi)部的所有細節(jié)的前提下,利用該系統(tǒng)功能的能力)、 模塊化 (modularity:把系統(tǒng)分解為小而簡單的片段,從而更容易被理解和/或被更換的能力)和 擴展性 (scalability:用簡單直接的方法給系統(tǒng)擴容的能力)。

我們要描述的系統(tǒng)有三個部分: (1) 客戶端 發(fā)出請求,(2) 服務(wù)端 接收請求并應(yīng)答,和(3) 傳送 機制來這些通信包裝起來。通常情況下,客戶端和服務(wù)器位于不同的機器上,通過網(wǎng)絡(luò)上的一個特定的端口進行通信,但在這個例子中,它們將運行在同一臺機器上(而且仍然使用端口進行通信) 。在我們的例子中,客戶端和服務(wù)器將用 Scala 編寫,傳送協(xié)議將使用 Thrift 處理。本教程的主要目的是展示一個簡單的具有良好可擴展性的服務(wù)器和客戶端。

探索默認的引導(dǎo)程序項目

首先,使用 scala-bootstrapper 創(chuàng)建一個骨架項目( “ Searchbird ” )。這將創(chuàng)建一個簡單的基于 Finagle 和 key-value 內(nèi)存存儲的 Scala 服務(wù)。我們將擴展這個工程以支持搜索值,并進而支持多進程多個內(nèi)存存儲的搜索。

$ mkdir searchbird ; cd searchbird
$ scala-bootstrapper searchbird
writing build.sbt
writing config/development.scala
writing config/production.scala
writing config/staging.scala
writing config/test.scala
writing console
writing Gemfile
writing project/plugins.sbt
writing README.md
writing sbt
writing src/main/scala/com/twitter/searchbird/SearchbirdConsoleClient.scala
writing src/main/scala/com/twitter/searchbird/SearchbirdServiceImpl.scala
writing src/main/scala/com/twitter/searchbird/config/SearchbirdServiceConfig.scala
writing src/main/scala/com/twitter/searchbird/Main.scala
writing src/main/thrift/searchbird.thrift
writing src/scripts/searchbird.sh
writing src/scripts/config.sh
writing src/scripts/devel.sh
writing src/scripts/server.sh
writing src/scripts/service.sh
writing src/test/scala/com/twitter/searchbird/AbstractSpec.scala
writing src/test/scala/com/twitter/searchbird/SearchbirdServiceSpec.scala
writing TUTORIAL.md

首先,來看下 scala-bootstrapper 為我們創(chuàng)建的默認項目。這是一個模板。雖然最終將替換它的大部分內(nèi)容,不過作為支架它還是很方便的。它定義了一個簡單(但完整)的 key-value 存儲,并包含了配置、thrift 接口、統(tǒng)計輸出和日志記錄。

在我們看代碼之前,先運行一個客戶端和服務(wù)器,看看它是如何工作的。這里是我們構(gòu)建的:

http://wiki.jikexueyuan.com/project/scala/images/searchbird.png" alt="" />

這里是我們的服務(wù)輸出的接口。由于 Searchbird 服務(wù)是一個 Thrift 服務(wù)(和我們大部分服務(wù)一樣),因而其外部接口使用 Thrift IDL(“接口描述語言”)定義。

src/main/thrift/searchbird.thrift

service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)

  void put(1: string key, 2: string value)
}

這是非常直觀的:我們的服務(wù) SearchbirdService 輸出兩個 RPC 方法 get 和 put 。他們組成了一個到 key-value 存儲的簡單接口。

現(xiàn)在,讓我們運行默認的服務(wù),啟動客戶端連接到這個服務(wù),并通過這個接口來探索他們。打開兩個窗口,一個用于服務(wù)器,一個用于客戶端。

在第一個窗口中,用交互模式啟動 SBT(在命令行中運行 ./sbt [1]),然后構(gòu)建和運行項目內(nèi) SBT。這會運行 Main.scala 定義的 主 進程。

$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala

配置文件 (development.scala) 實例化一個新的服務(wù),并監(jiān)聽 9999 端口??蛻舳丝梢赃B接到 9999 端口使用此服務(wù)。

現(xiàn)在,我們將使用 控制臺 shell腳本初始化和運行一個客戶端實例,即 SearchbirdConsoleClient 實例 (SearchbirdConsoleClient.scala) 。在另一個窗口中運行此腳本:

$ ./console 127.0.0.1 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.

finagle-client> 

客戶端對象 client 現(xiàn)在連接到本地計算機上的 9999 端口,并可以跟服務(wù)交互了。接下來我們發(fā)送一些請求:

scala> client.put("marius", "Marius Eriksen")
res0: ...

scala> client.put("stevej", "Steve Jenson")
res1: ...

scala> client.get("marius")
res2: com.twitter.util.Future[String] = ...

scala> client.get("marius").get()
res3: String = Marius Eriksen

(第二個 get() 調(diào)用解析 client.get() 返回的 Future 類型值,阻塞直到該值準備好。)

該服務(wù)器還輸出運行統(tǒng)計(配置文件中指定這些信息在 9900 端口)。這不僅方便對各個服務(wù)器進行檢查,也利于聚集全局的服務(wù)統(tǒng)計(以機器可讀的 JSON 接口)。打開第三個窗口來查看這些統(tǒng)計:

$ curl localhost:9900/stats.txt
counters:
  Searchbird/connects: 1
  Searchbird/received_bytes: 264
  Searchbird/requests: 3
  Searchbird/sent_bytes: 128
  Searchbird/success: 3
  jvm_gc_ConcurrentMarkSweep_cycles: 1
  jvm_gc_ConcurrentMarkSweep_msec: 15
  jvm_gc_ParNew_cycles: 24
  jvm_gc_ParNew_msec: 191
  jvm_gc_cycles: 25
  jvm_gc_msec: 206
gauges:
  Searchbird/connections: 1
  Searchbird/pending: 0
  jvm_fd_count: 135
  jvm_fd_limit: 10240
  jvm_heap_committed: 85000192
  jvm_heap_max: 530186240
  jvm_heap_used: 54778640
  jvm_nonheap_committed: 89657344
  jvm_nonheap_max: 136314880
  jvm_nonheap_used: 66238144
  jvm_num_cpus: 4
  jvm_post_gc_CMS_Old_Gen_used: 36490088
  jvm_post_gc_CMS_Perm_Gen_used: 54718880
  jvm_post_gc_Par_Eden_Space_used: 0
  jvm_post_gc_Par_Survivor_Space_used: 1315280
  jvm_post_gc_used: 92524248
  jvm_start_time: 1345072684280
  jvm_thread_count: 16
  jvm_thread_daemon_count: 7
  jvm_thread_peak_count: 16
  jvm_uptime: 1671792
labels:
metrics:
  Searchbird/handletime_us: (average=9598, count=4, maximum=19138, minimum=637, p25=637, p50=4265, p75=14175, p90=19138, p95=19138, p99=19138, p999=19138, p9999=19138, sum=38393)
  Searchbird/request_latency_ms: (average=4, count=3, maximum=9, minimum=0, p25=0, p50=5, p75=9, p90=9, p95=9, p99=9, p999=9, p9999=9, sum=14)

除了我們自己的服務(wù)統(tǒng)計信息以外,還有一些通用的 JVM 統(tǒng)計。

現(xiàn)在,讓我們來看看配置、服務(wù)器和客戶端的實現(xiàn)代碼。

…/config/SearchbirdServiceConfig.scala

配置是一個 Scala 的特質(zhì),有一個方法 apply: RuntimeEnvironment => T 來創(chuàng)建一些 T 。在這個意義上,配置是“工廠” 。在運行時,配置文件(通過使用Scala編譯器庫)被取值為一個腳本,并產(chǎn)生一個配置對象。 RuntimeEnvironment 是一個提供各種運行參數(shù)(命令行標志, JVM 版本,編譯時間戳等)查詢的一個對象。

SearchbirdServiceConfig 類就是這樣一個配置類。它使用其默認值一起指定配置參數(shù)。 (Finagle 支持一個通用的跟蹤系統(tǒng),我們在本教程將不會介紹: Zipkin 一個集合/聚合軌跡的 分布式跟蹤系統(tǒng)。)

class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var tracerFactory: Tracer.Factory = NullTracer.factory

  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this)
}

在我們的例子中,我們要創(chuàng)建一個 SearchbirdService.ThriftServer。這是由 thrift 代碼生成器生成的服務(wù)器類型[2]。

…/Main.scala

在 SBT 控制臺中鍵入“run”調(diào)用 main ,這將配置和初始化服務(wù)器。它讀取配置(在 development.scala 中指定,并會作為參數(shù)傳給“run”),創(chuàng)建 SearchbirdService.ThriftServer ,并啟動它。 RuntimeEnvironment.loadRuntimeConfig 執(zhí)行配置賦值,并把自身作為一個參數(shù)來調(diào)用 apply [3]

object Main {
  private val log = Logger.get(getClass)

  def main(args: Array[String]) {
    val runtime = RuntimeEnvironment(this, args)
    val server = runtime.loadRuntimeConfig[SearchbirdService.ThriftServer]
    try {
      log.info("Starting SearchbirdService")
      server.start()
    } catch {
      case e: Exception =>
        log.error(e, "Failed starting SearchbirdService, exiting")
        ServiceTracker.shutdown()
        System.exit(1)
    }
  }
}

…/SearchbirdServiceImpl.scala

這是實質(zhì)的服務(wù):我們用自己的實現(xiàn)擴展 SearchbirdService.ThriftServer ?;貞浺幌?thrift 為我們生成的 SearchbirdService.ThriftServer 。它為每一個 thrift 方法生成一個 Scala 方法。到目前為止,在我們的例子中生成的接口是:

trait SearchbirdService {
  def put(key: String, value: String): Future[Void]
  def get(key: String): Future[String]
}

返回值是 Future[Value] 而不是直接返回值,可以推遲它們的計算(finagle 的文檔有 Future 更多的細節(jié))。對本教程的目的來說,你唯一需要知道的有關(guān) Future 的知識點是,可以通過 get() 獲取其值。

scala-bootstrapper 默認實現(xiàn)的 key-value 存儲很簡單:它提供了一個通過 get 和 put 訪問的 數(shù)據(jù)庫 數(shù)據(jù)結(jié)構(gòu)。

class SearchbirdServiceImpl(config: SearchbirdServiceConfig) extends SearchbirdService.ThriftServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort
  override val tracerFactory = config.tracerFactory

  val database = new mutable.HashMap[String, String]()

  def get(key: String) = {
    database.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)
    database(key) = value
    Future.Unit
  }

  def shutdown() = {
    super.shutdown(0.seconds)
  }
}

其結(jié)果是構(gòu)建在 Scala HashMap 上的一個簡單 thrift 接口。

一個簡單的搜索引擎

現(xiàn)在,我們將擴展現(xiàn)有的例子,來創(chuàng)建一個簡單的搜索引擎。然后,我們將進一步擴展它成為由多個分片組成的 分布式 搜索引擎,使我們能夠適應(yīng)比單臺機器內(nèi)存更大的語料庫。

為了簡單起見,我們將最小化擴展目前的 thrift 服務(wù),以支持搜索操作。使用模型是用 put 把文件加入搜索引擎,其中每個文件包含了一系列的記號(詞),那么我們就可以輸入一串記號,然后搜索會返回包含這個串中所有記號的所有文件。該體系結(jié)構(gòu)是與前面的例子相同,但增加了一個新的 @search@ 調(diào)用。

http://wiki.jikexueyuan.com/project/scala/images/search.png" alt="" />

要實現(xiàn)這樣一個搜索引擎需要修改以下兩個文件:

src/main/thrift/searchbird.thrift

service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)

  void put(1: string key, 2: string value)

  list<string> search(1: string query)
}

我們增加了一個 search 方法來搜索當前哈希表,返回其值與查詢匹配的鍵列表。實現(xiàn)也很簡單直觀:

…/SearchbirdServiceImpl.scala

大部分修改都在這個文件中。

現(xiàn)在的 數(shù)據(jù)庫 HashMap 保存一個正向索引來持有到文檔的鍵映射。我們重命名它為 forward 并增加一個 倒排(reverse) 索引(映射記號到所有包含該記號的文件)。所以在 SearchbirdServiceImpl.scala 中,更換 database 定義:

val forward = new mutable.HashMap[String, String]
  with mutable.SynchronizedMap[String, String]
val reverse = new mutable.HashMap[String, Set[String]]
  with mutable.SynchronizedMap[String, Set[String]]

在 get 調(diào)用中,使用 forward 替換 數(shù)據(jù)庫 即可,在其他方面 get 保持不變(僅執(zhí)行正向查找)。不過 put 還需要改變:我們還需要為文件中的每個令牌填充反向索引,把文件的鍵附加到令牌關(guān)聯(lián)的列表中。用下面的代碼替換 put 調(diào)用。給定一個特定的搜索令牌,我們現(xiàn)在可以使用反向映射來查找文件。

def put(key: String, value: String) = {
  log.debug("put %s", key)

  forward(key) = value

  // serialize updaters
  synchronized {
    value.split(" ").toSet foreach { token =>
      val current = reverse.getOrElse(token, Set())
      reverse(token) = current + key
    }
  }

  Future.Unit
}

需要注意的是(即使 HashMap 是線程安全的)同時只能有一個線程可以更新倒排索引,以確保對映射條目的 讀-修改-寫 是一個原子操作。 (這段代碼過于保守;在進行 檢索-修改-寫 操作時,它鎖定了整個映射,而不是鎖定單個條目。)。另外還要注意使用 Set 作為數(shù)據(jù)結(jié)構(gòu);這可以確保即使一個文件中兩次出現(xiàn)同樣的符號,它也只會被 foreach 循環(huán)處理一次。

這個實現(xiàn)仍然有一個問題,作為留給讀者的一個練習(xí):當我們用一個新文檔覆蓋的一個鍵的時候,我們誒有刪除任何倒排索引中引用的舊文件。

現(xiàn)在進入搜索引擎的核心:新的 search 方法。他應(yīng)該解析查詢,尋找匹配的文檔,然后對這些列表做相交操作。這將產(chǎn)生包含所有查詢中的標記的文件列表。在 Scala 中可以很直接地表達;添加這段代碼到 SearchbirdServiceImpl 類中:

def search(query: String) = Future.value {
  val tokens = query.split(" ")
  val hits = tokens map { token => reverse.getOrElse(token, Set()) }
  val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
  intersected.toList
}

在這段短短的代碼中有幾件事情是值得關(guān)注的。在構(gòu)建命中列表時,如果鍵( token )沒有被發(fā)現(xiàn), getOrElse 會返回其第二個參數(shù)(在這種情況下,一個空 Set )。我們使用 left-reduce 執(zhí)行實際的相交操作。特別是當 reduceLeftOption 發(fā)現(xiàn) hits 為空時將不會繼續(xù)嘗試執(zhí)行 reduce 操作。這使我們能夠提供一個默認值,而不是拋出一個異常。其實這相當于:

def search(query: String) = Future.value {
  val tokens = query.split(" ")
  val hits = tokens map { token => reverse.getOrElse(token, Set()) }
  if (hits.isEmpty)
    Nil
  else
    hits reduceLeft { _ & _ } toList
}

使用哪種方式大多是個人喜好的問題,雖然函數(shù)式風(fēng)格往往會避開帶有合理默認值的條件語句。

現(xiàn)在,我們可以嘗試在控制臺中實驗我們新的實現(xiàn)。重啟服務(wù)器:

$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala

然后再從 searchbird 目錄,啟動客戶端:

$ ./console 127.0.0.1 9999
...
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.

finagle-client> 

粘貼以下說明到控制臺:

client.put("basics", " values functions classes methods inheritance try catch finally expression oriented")
client.put("basics", " case classes objects packages apply update functions are objects (uniform access principle) pattern")
client.put("collections", " lists maps functional combinators (map foreach filter zip")
client.put("pattern", " more functions! partialfunctions more pattern")
client.put("type", " basic types and type polymorphism type inference variance bounds")
client.put("advanced", " advanced types view bounds higher kinded types recursive types structural")
client.put("simple", " all about sbt the standard scala build")
client.put("more", " tour of the scala collections")
client.put("testing", " write tests with specs a bdd testing framework for")
client.put("concurrency", " runnable callable threads futures twitter")
client.put("java", " java interop using scala from")
client.put("searchbird", " building a distributed search engine using")

現(xiàn)在,我們可以執(zhí)行一些搜索,返回包含搜索詞的文件的鍵。

> client.search("functions").get()
res12: Seq[String] = ArrayBuffer(basics)

> client.search("java").get()
res13: Seq[String] = ArrayBuffer(java)

> client.search("java scala").get()
res14: Seq[String] = ArrayBuffer(java)

> client.search("functional").get()
res15: Seq[String] = ArrayBuffer(collections)

> client.search("sbt").get()
res16: Seq[String] = ArrayBuffer(simple)

> client.search("types").get()
res17: Seq[String] = ArrayBuffer(type, advanced)

回想一下,如果調(diào)用返回一個 Future ,我們必須使用一個阻塞的 get() 來獲取其中包含的值。我們可以使用 Future.collect 命令來創(chuàng)建多個并發(fā)請求,并等待所有請求成功返回:

> import com.twitter.util.Future
...
> Future.collect(Seq(
    client.search("types"),
    client.search("sbt"),
    client.search("functional")
  )).get()
res18: Seq[Seq[String]] = ArrayBuffer(ArrayBuffer(type, advanced), ArrayBuffer(simple), ArrayBuffer(collections))

分發(fā)我們的服務(wù)

單臺機器上一個簡單的內(nèi)存搜索引擎將無法搜索超過內(nèi)存大小的語料庫?,F(xiàn)在,我們要大膽改進,用一個簡單的分片計劃來構(gòu)建分布式節(jié)點。下面是框圖:

http://wiki.jikexueyuan.com/project/scala/images/s1.png" alt="" />

抽象

為了幫助我們的工作,我們會先介紹另一個抽象索引來解耦 SearchbirdService 對索引實現(xiàn)的依賴。這是一個直觀的重構(gòu)。我們首先添加一個索引文件到構(gòu)建 (創(chuàng)建文件 searchbird/src/main/scala/com/twitter/searchbird/Index.scala ):

…/Index.scala

package com.twitter.searchbird

import scala.collection.mutable
import com.twitter.util._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.thrift.ThriftClientFramedCodec

trait Index {
  def get(key: String): Future[String]
  def put(key: String, value: String): Future[Unit]
  def search(key: String): Future[List[String]]
}

class ResidentIndex extends Index {
  val log = Logger.get(getClass)

  val forward = new mutable.HashMap[String, String]
    with mutable.SynchronizedMap[String, String]
  val reverse = new mutable.HashMap[String, Set[String]]
    with mutable.SynchronizedMap[String, Set[String]]

  def get(key: String) = {
    forward.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)

    forward(key) = value

    // admit only one updater.
    synchronized {
      (Set() ++ value.split(" ")) foreach { token =>
        val current = reverse.get(token) getOrElse Set()
        reverse(token) = current + key
      }
    }

    Future.Unit
  }

  def search(query: String) = Future.value {
    val tokens = query.split(" ")
    val hits = tokens map { token => reverse.getOrElse(token, Set()) }
    val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
    intersected.toList
  }
}

現(xiàn)在,我們把 thrift 服務(wù)轉(zhuǎn)換成一個簡單的調(diào)度機制:為每一個索引實例提供一個 thrift 接口。這是一個強大的抽象,因為它分離了索引實現(xiàn)和服務(wù)實現(xiàn)。服務(wù)不再知道索引的任何細節(jié);索引可以是本地的或遠程的,甚至可能是許多索引的組合,但服務(wù)并不關(guān)心,索引實現(xiàn)可能會更改但是不用修改服務(wù)。

將 SearchbirdServiceImpl 類定義更換為以下(簡單得多)的代碼(其中不再包含索引實現(xiàn)細節(jié))。注意初始化服務(wù)器現(xiàn)在需要第二個參數(shù) Index 。

…/SearchbirdServiceImpl.scala

class SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdService.ThriftServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort

  def get(key: String) = index.get(key)
  def put(key: String, value: String) =
    index.put(key, value) flatMap { _ => Future.Unit }
  def search(query: String) = index.search(query)

  def shutdown() = {
    super.shutdown(0.seconds)
  }
}

…/config/SearchbirdServiceConfig.scala

相應(yīng)地更新 SearchbirdServiceConfig 的 apply 調(diào)用:

class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var tracerFactory: Tracer.Factory = NullTracer.factory

  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex)
}

我們將建立一個簡單的分布式系統(tǒng),一個主節(jié)點組織查詢其子節(jié)點。為了實現(xiàn)這一目標,我們將需要兩個新的 Index 類型。一個代表遠程索引,另一種是其他多個 Index 實例的組合索引。這樣我們的服務(wù)就可以實例化多個遠程索引的復(fù)合索引來構(gòu)建分布式索引。請注意這兩個 Index 類型具有相同的接口,所以服務(wù)器不需要知道它們所連接的索引是遠程的還是復(fù)合的。

…/Index.scala

在 Index.scala 中定義了 CompositeIndex :

class CompositeIndex(indices: Seq[Index]) extends Index {
  require(!indices.isEmpty)

  def get(key: String) = {
    val queries = indices.map { idx =>
      idx.get(key) map { r => Some(r) } handle { case e => None }
    }

    Future.collect(queries) flatMap { results =>
      results.find { _.isDefined } map { _.get } match {
        case Some(v) => Future.value(v)
        case None => Future.exception(SearchbirdException("No such key"))
      }
    }
  }

  def put(key: String, value: String) =
    Future.exception(SearchbirdException("put() not supported by CompositeIndex"))

  def search(query: String) = {
    val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } }
    Future.collect(queries) map { results => (Set() ++ results.flatten) toList }
  }
}

組合索引構(gòu)建在一組相關(guān) Index 實例的基礎(chǔ)上。注意它并不關(guān)心這些實例實際上是如何實現(xiàn)的。這種組合類型在構(gòu)建不同查詢機制的時候具有極大的靈活性。我們沒有定義拆分機制,所以復(fù)合索引不支持 put 操作。這些請求被直接交由子節(jié)點處理。 get 的實現(xiàn)是查詢所有子節(jié)點,并提取第一個成功的結(jié)果。如果沒有成功結(jié)果的話,則拋出一個異常。注意因為沒有結(jié)果是通過拋出一個異常表示的,所以我們 處理Future ,是將任何異常轉(zhuǎn)換成 None 。在實際系統(tǒng)中,我們很可能會為遺漏值填入適當?shù)腻e誤碼,而不是使用異常。異常在構(gòu)建原型時是方便和適宜的,但不能很好地組合。為了把真正的例外和遺漏值區(qū)分開,必須要檢查異常本身。相反,把這種區(qū)別直接嵌入在返回值的類型中是更好的風(fēng)格。

search 像以前一樣工作。和提取第一個結(jié)果不同,我們把它們組合起來,通過使用 Set 確保其唯一性。

RemoteIndex 提供了到遠程服務(wù)器的一個 Index 接口。

class RemoteIndex(hosts: String) extends Index {
  val transport = ClientBuilder()
    .name("remoteIndex")
    .hosts(hosts)
    .codec(ThriftClientFramedCodec())
    .hostConnectionLimit(1)
    .timeout(500.milliseconds)
    .build()
  val client = new SearchbirdService.FinagledClient(transport)

  def get(key: String) = client.get(key)
  def put(key: String, value: String) = client.put(key, value) map { _ => () }
  def search(query: String) = client.search(query) map { _.toList }
}

這樣就使用一些合理的默認值,調(diào)用代理,稍微調(diào)整類型,就構(gòu)造出一個 finagle thrift 客戶端。

全部放在一起

現(xiàn)在我們擁有了需要的所有功能。我們需要調(diào)整配置,以便能夠調(diào)用一個給定的節(jié)點,不管是主節(jié)點亦或是數(shù)據(jù)分片節(jié)點。為了做到這一點,我們將通過創(chuàng)建一個新的配置項來在系統(tǒng)中枚舉分片。我們還需要添加 Index 參數(shù)到我們的 SearchbirdServiceImpl 實例。然后,我們將使用命令行參數(shù)(還記得 Config 是如何做到的嗎)在這兩種模式中啟動服務(wù)器。

…/config/SearchbirdServiceConfig.scala

class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var shards: Seq[String] = Seq()

  def apply(runtime: RuntimeEnvironment) = {
    val index = runtime.arguments.get("shard") match {
      case Some(arg) =>
        val which = arg.toInt
        if (which >= shards.size || which < 0)
          throw new Exception("invalid shard number %d".format(which))

        // override with the shard port
        val Array(_, port) = shards(which).split(":")
        thriftPort = port.toInt

        new ResidentIndex

      case None =>
        require(!shards.isEmpty)
        val remotes = shards map { new RemoteIndex(_) }
        new CompositeIndex(remotes)
    }

    new SearchbirdServiceImpl(this, index)
  }
}

現(xiàn)在,我們將調(diào)整配置:添加“分片”初始化到 SearchbirdServiceConfig 的初始化中(我們可以通過端口 9000 訪問分片 0,9001 訪問分片 1,依次類推)。

config/development.scala

new SearchbirdServiceConfig {
  // Add your own config here
  shards = Seq(
    "localhost:9000",
    "localhost:9001",
    "localhost:9002"
  )
  ...

注釋掉 admin.httpPort 的設(shè)置(我們不希望在同一臺機器上運行多個服務(wù),而不注釋的話這些服務(wù)都會試圖打開相同的端口):

  // admin.httpPort = 9900

現(xiàn)在,如果我們不帶任何參數(shù)調(diào)用我們的服務(wù)器程序,它會啟動一個主節(jié)點來和所有分片通信。如果我們指定一個分片參數(shù),它會在指定端口啟動一個分片服務(wù)器。

讓我們試試吧!我們將啟動 3 個服務(wù):2 個分片和 1 個主節(jié)點。首先編譯改動:

$ ./sbt
> compile
...
> exit

然后啟動三個服務(wù):

$ ./sbt 'run -f config/development.scala -D shard=0'
$ ./sbt 'run -f config/development.scala -D shard=1'
$ ./sbt 'run -f config/development.scala'

您可以在 3 個不同的窗口中分別運行,或在同一窗口開始依次逐個運行,等待其啟動后,只用 ctrl-z 懸掛這個命令,并使用 bg 將它放在后臺執(zhí)行。

然后,我們將通過控制臺與它們進行互動。首先,讓我們填充一些數(shù)據(jù)在兩個分片節(jié)點。從 searchbird 目錄運行:

$ ./console localhost 9000
...
> client.put("fromShardA", "a value from SHARD_A")
> client.put("hello", "world")
$ ./console localhost 9001
...
> client.put("fromShardB", "a value from SHARD_B")
> client.put("hello", "world again")

一旦完成就可以退出這些控制臺會話。現(xiàn)在通過主節(jié)點查詢我們的數(shù)據(jù)庫(9999 端口):

$ ./console localhost 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient localhost 9999
'client' is bound to your thrift client.

finagle-client> client.get("hello").get()
res0: String = world

finagle-client> client.get("fromShardC").get()
SearchbirdException(No such key)
...

finagle-client> client.get("fromShardA").get()
res2: String = a value from SHARD_A

finagle-client> client.search("hello").get()
res3: Seq[String] = ArrayBuffer()

finagle-client> client.search("world").get()
res4: Seq[String] = ArrayBuffer(hello)

finagle-client> client.search("value").get()
res5: Seq[String] = ArrayBuffer(fromShardA, fromShardB)

這個設(shè)計有多個數(shù)據(jù)抽象,允許更加模塊化和可擴展的實現(xiàn):

  • ResidentIndex 數(shù)據(jù)結(jié)構(gòu)對網(wǎng)絡(luò)、服務(wù)器或客戶端一無所知。
  • CompositeIndex 對其索引構(gòu)成的底層數(shù)據(jù)結(jié)構(gòu)和組合方式一無所知;它只是簡單地把請求分配給他們。
  • 服務(wù)器相同的 search 接口(特質(zhì))允許服務(wù)器查詢其本地數(shù)據(jù)結(jié)構(gòu)(ResidentIndex) ,或分發(fā)到其他服務(wù)器(CompositeIndex) 查詢,而不需要知道這個區(qū)別,這是從調(diào)用隱藏的。
  • SearchbirdServiceImpl 和 Index 現(xiàn)在是相互獨立的模塊,這使服務(wù)實現(xiàn)變得簡單,同時實現(xiàn)了服務(wù)和其數(shù)據(jù)結(jié)構(gòu)之間的分離。
  • 這個設(shè)計靈活到允許一個或多個遠程索引運行在本地機器或遠程機器上。

這個實現(xiàn)的可能改進將包括:

  • 當前的實現(xiàn)將 put() 調(diào)用發(fā)送到所有節(jié)點。取而代之,我們可以使用一個哈希表,將 put()調(diào)用只發(fā)送到一個節(jié)點,而在所有節(jié)點之間分配存儲。
    • 但是值得注意的是,在這個策略下我們失去了冗余。我們怎樣在不需要完全復(fù)制的前提下保持一定的冗余度呢?
  • 當系統(tǒng)出錯時我們沒有做任何有趣的處理(例如我們沒有處理任何異常)。

[1]本地 ./sbt 腳本只是保證該 SBT 版本和我們知道的所有庫是一致的。

[2]target/gen-scala/com/twitter/searchbird/SearchbirdService.scala 。

[3] 更多信息見 Ostrich’s README 。

上一篇:Scala 并發(fā)編程下一篇:基礎(chǔ)