鍍金池/ 教程/ Java/ google Guava 包的 ListenableFuture 解析
不可變集合
排序: Guava 強(qiáng)大的”流暢風(fēng)格比較器”
強(qiáng)大的集合工具類:java.util.Collections 中未包含的集合工具
新集合類型
常見 Object 方法
I/O
前置條件
字符串處理:分割,連接,填充
散列
原生類型
數(shù)學(xué)運(yùn)算
使用和避免 null
Throwables:簡(jiǎn)化異常和錯(cuò)誤的傳播與檢查
google Guava 包的 ListenableFuture 解析
事件總線
緩存
函數(shù)式編程
區(qū)間
集合擴(kuò)展工具類
Google-Guava Concurrent 包里的 Service 框架淺析
google Guava 包的 reflection 解析

google Guava 包的 ListenableFuture 解析

并發(fā)編程是一個(gè)難題,但是一個(gè)強(qiáng)大而簡(jiǎn)單的抽象可以顯著的簡(jiǎn)化并發(fā)的編寫。出于這樣的考慮,Guava 定義了 ListenableFuture 接口并繼承了 JDK concurrent 包下的 Future 接口。

我們強(qiáng)烈地建議你在代碼中多使用 ListenableFuture 來(lái)代替 JDK 的 Future, 因?yàn)椋?/strong>

  • 大多數(shù) Futures 方法中需要它。
  • 轉(zhuǎn)到 ListenableFuture 編程比較容易。
  • Guava 提供的通用公共類封裝了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的擴(kuò)展方法。

接口

傳統(tǒng) JDK 中的 Future 通過(guò)異步的方式計(jì)算返回結(jié)果:在多線程運(yùn)算中可能或者可能在沒有結(jié)束返回結(jié)果,F(xiàn)uture 是運(yùn)行中的多線程的一個(gè)引用句柄,確保在服務(wù)執(zhí)行返回一個(gè) Result。

ListenableFuture 可以允許你注冊(cè)回調(diào)方法(callbacks),在運(yùn)算(多線程執(zhí)行)完成的時(shí)候進(jìn)行調(diào)用, 或者在運(yùn)算(多線程執(zhí)行)完成后立即執(zhí)行。這樣簡(jiǎn)單的改進(jìn),使得可以明顯的支持更多的操作,這樣的功能在 JDK concurrent 中的 Future 是不支持的。

ListenableFuture 中的基礎(chǔ)方法是 addListener(Runnable, Executor), 該方法會(huì)在多線程運(yùn)算完的時(shí)候,指定的 Runnable 參數(shù)傳入的對(duì)象會(huì)被指定的 Executor 執(zhí)行。

添加回調(diào)(Callbacks)

多數(shù)用戶喜歡使用 Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)的方式, 或者 另外一個(gè)版本version(譯者注:addCallback(ListenableFuture<V> future,FutureCallback<? super V> callback)),默認(rèn)是采用 MoreExecutors.sameThreadExecutor()線程池, 為了簡(jiǎn)化使用,Callback 采用輕量級(jí)的設(shè)計(jì). FutureCallback<V> 中實(shí)現(xiàn)了兩個(gè)方法:

  • onSuccess(V),在 Future 成功的時(shí)候執(zhí)行,根據(jù) Future 結(jié)果來(lái)判斷。
  • onFailure(Throwable), 在 Future 失敗的時(shí)候執(zhí)行,根據(jù) Future 結(jié)果來(lái)判斷。

ListenableFuture 的創(chuàng)建

對(duì)應(yīng) JDK 中的 ExecutorService.submit(Callable) 提交多線程異步運(yùn)算的方式,Guava 提供了 ListeningExecutorService 接口, 該接口返回 ListenableFuture 而相應(yīng)的 ExecutorService 返回普通的 Future。將 ExecutorService 轉(zhuǎn)為 ListeningExecutorService,可以使用 MoreExecutors.listeningDecorator(ExecutorService)進(jìn)行裝飾。


    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture explosion = service.submit(new Callable() {
      public Explosion call() {
    return pushBigRedButton();
      }
    });
    Futures.addCallback(explosion, new FutureCallback() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
      }
    });

另外, 假如你是從 FutureTask 轉(zhuǎn)換而來(lái)的, Guava 提供 ListenableFutureTask.create(Callable)ListenableFutureTask.create(Runnable, V). 和 JDK 不同的是, ListenableFutureTask 不能隨意被繼承(譯者注:ListenableFutureTask 中的 done 方法實(shí)現(xiàn)了調(diào)用 listener 的操作)。

假如你喜歡抽象的方式來(lái)設(shè)置 future 的值,而不是想實(shí)現(xiàn)接口中的方法,可以考慮繼承抽象類 AbstractFuture 或者直接使用 SettableFuture 。

假如你必須將其他 API 提供的 Future 轉(zhuǎn)換成 ListenableFuture,你沒有別的方法只能采用硬編碼的方式 JdkFutureAdapters.listenInPoolThread(Future) 來(lái)將 Future 轉(zhuǎn)換成 ListenableFuture。盡可能地采用修改原生的代碼返回 ListenableFuture 會(huì)更好一些。

Application

使用 ListenableFuture 最重要的理由是它可以進(jìn)行一系列的復(fù)雜鏈?zhǔn)降漠惒讲僮鳌?/p>


    ListenableFuture rowKeyFuture = indexService.lookUp(query);
    AsyncFunction<RowKey, QueryResult> queryFunction =
    new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture apply(RowKey rowKey) {
    return dataService.read(rowKey);
    }
    };
    ListenableFuture queryFuture = Futures.transform(rowKeyFuture, queryFunction, queryExecutor);

其他更多的操作可以更加有效的支持而 JDK 中的 Future 是沒法支持的.

不同的操作可以在不同的 Executors 中執(zhí)行,單獨(dú)的 ListenableFuture 可以有多個(gè)操作等待。

當(dāng)一個(gè)操作開始的時(shí)候其他的一些操作也會(huì)盡快開始執(zhí)行–“fan-out”–ListenableFuture 能夠滿足這樣的場(chǎng)景:促發(fā)所有的回調(diào)(callbacks)。反之更簡(jiǎn)單的工作是,同樣可以滿足“fan-in”場(chǎng)景,促發(fā) ListenableFuture 獲?。╣et)計(jì)算結(jié)果,同時(shí)其它的 Futures 也會(huì)盡快執(zhí)行:可以參考 the implementation of Futures.allAsList 。(譯者注:fan-in 和 fan-out 是軟件設(shè)計(jì)的一個(gè)術(shù)語(yǔ),可以參考這里:http://baike.baidu.com/view/388892.htm#1 或者看這里的解析 Design Principles: Fan-In vs Fan-Out,這里 fan-out 的實(shí)現(xiàn)就是封裝的 ListenableFuture 通過(guò)回調(diào),調(diào)用其它代碼片段。fan-in 的意義是可以調(diào)用其它 Future)

方法 描述 參考
transform(ListenableFuture<A>, AsyncFunction<A, B>, Executor)* 返回一個(gè)新的 ListenableFuture ,該 ListenableFuture 返回的 result 是由傳入的 AsyncFunction 參數(shù)指派到傳入的 ListenableFuture 中. transform(ListenableFuture<A>, AsyncFunction<A, B>)
transform(ListenableFuture<A>, Function<A, B>, Executor) 返回一個(gè)新的 ListenableFuture ,該 ListenableFuture 返回的 result 是由傳入的Function 參數(shù)指派到傳入的 ListenableFuture 中. transform(ListenableFuture<A>, Function<A, B>)
allAsList(Iterable<ListenableFuture<V>>) 返回一個(gè) ListenableFuture ,該ListenableFuture 返回的 result 是一個(gè) List,List 中的值是每個(gè) ListenableFuture 的返回值,假如傳入的其中之一 fails 或者 cancel,這 個(gè)Future fails 或者 canceled allAsList(ListenableFuture<V>...)
successfulAsList(Iterable<ListenableFuture<V>>) 返回一個(gè) ListenableFuture ,該 Future 的結(jié)果包含所有成功的 Future,按照原來(lái)的順序,當(dāng)其中之一 Failed 或者 cancel,則用 null 替代 successfulAsList(ListenableFuture<V>...)

AsyncFunction<A, B> 中提供一個(gè)方法 ListenableFuture<B> apply(A input),它可以被用于異步變換值。


    List<ListenableFuture> queries;
    // The queries go to all different data centers, but we want to wait until they're all done or failed.

    ListenableFuture<List> successfulQueries = Futures.successfulAsList(queries);

    Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);

CheckedFuture

Guava 也提供了 CheckedFuture<V, X extends Exception> 接口。CheckedFuture 是一個(gè) ListenableFuture ,其中包含了多個(gè)版本的 get 方法,方法聲明拋出檢查異常.這樣使得創(chuàng)建一個(gè)在執(zhí)行邏輯中可以拋出異常的 Future 更加容易 。將 ListenableFuture 轉(zhuǎn)換成 CheckedFuture,可以使用 Futures.makeChecked(ListenableFuture<V>, Function<Exception, X>)