鍍金池/ 教程/ Android/ 第一個(gè)例子
調(diào)度器 Scheduler
Empty/Never/Throw
Replay
這個(gè)頁面展示了創(chuàng)建Observable的各種方法。
ObserveOn
ReactiveX
TimeInterval
Window
本頁展示的操作符用于對整個(gè)序列執(zhí)行算法操作或其它操作,由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù)),它們對于非常長
IgnoreElements
Distinct
Last
Start
And/Then/When
Switch
創(chuàng)建操作
Materialize/Dematerialize
CombineLatest
Catch
實(shí)現(xiàn)自己的操作符
StringObservable
Map
ConnectableObservable
Using
Take
BlockingObservable
TakeLast
Defer
RxJavaSchedulersHook
First
FlatMap
這個(gè)頁面的操作符可用于根據(jù)條件發(fā)射或變換Observables,或者對它們做布爾運(yùn)算:
Do
Repeat
Serialize
這個(gè)頁面展示的操作符可用于過濾和選擇Observable發(fā)射的數(shù)據(jù)序列。
這個(gè)頁面列出了很多用于Observable的輔助操作符
Single
Retry
從錯(cuò)誤中恢復(fù)的技術(shù)
Sample
Merge
算術(shù)和聚合操作
Range
Timestamp
RxJava Issues
From
Subscribe
Subject
Delay
Skip
SubscribeOn
Filter
按字母順序排列的全部操作符列表
Timeout
Scan
onError
Zip
RxJava文檔和教程
Publish
ElementAt
第一個(gè)例子
SkipLast
Just
Timer
Debounce
GroupBy
條件和布爾操作
這個(gè)頁面展示了可用于對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作的各種操作符。
Introduction
rxjava-async
介紹響應(yīng)式編程
這個(gè)頁面展示的操作符可用于組合多個(gè)Observables。
ReactiveX
Connect
操作符分類
StartWith
Interval
Join
To
Buffer
RefCount
介紹
Observable

第一個(gè)例子

你可以在這里找到JVM平臺(tái)幾種語言的例子 language adaptor:

下面的示例從一個(gè)字符串列表創(chuàng)建一個(gè)Observable,然后使用一個(gè)方法訂閱這個(gè)Observable。

Java

public static void hello(String... names) {
    Observable.from(names).subscribe(new Action1<String>() {

        @Override
        public void call(String s) {
            System.out.println("Hello " + s + "!");
        }

    });
}
hello("Ben", "George");
Hello Ben!
Hello George!

Groovy

def hello(String[] names) {
    Observable.from(names).subscribe { println "Hello ${it}!" }
}
hello("Ben", "George")
Hello Ben!
Hello George!

Clojure

(defn hello
  [&rest]
  (-> (Observable/from &rest)
    (.subscribe #(println (str "Hello " % "!")))))
(hello ["Ben" "George"])
Hello Ben!
Hello George!

Scala

import rx.lang.scala.Observable

def hello(names: String*) {
  Observable.from(names) subscribe { n =>
    println(s"Hello $n!")
  }
}
hello("Ben", "George")
Hello Ben!
Hello George!

如何使用RxJava

要使用RxJava,首先你需要?jiǎng)?chuàng)建Observable(它們發(fā)射數(shù)據(jù)序列),使用Observable操作符變換那些Observables,獲取嚴(yán)格符合你要求的數(shù)據(jù),然后觀察并處理對這些數(shù)據(jù)序列(通過實(shí)現(xiàn)觀察者或訂閱者,然后訂閱變換后的Observable)。

創(chuàng)建Observables

要?jiǎng)?chuàng)建Observable,你可以手動(dòng)實(shí)現(xiàn)Observable的行為,也可以傳遞一個(gè)函數(shù)給create(?),還可以使用這些 創(chuàng)建操作符 將一個(gè)已有的數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable。

已有的數(shù)據(jù)結(jié)構(gòu)創(chuàng)建Observable

你可以使用just(?)from(?) 方法將對象,列表,對象屬性轉(zhuǎn)換為發(fā)射那些對象的Observable:

Observable<String> o = Observable.from("a", "b", "c");

def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.from(list);

Observable<String> o = Observable.just("one object");

轉(zhuǎn)換后的Observable每發(fā)射一項(xiàng)數(shù)據(jù),會(huì)同步地調(diào)用任何訂閱者的onNext()方法,最后會(huì)調(diào)用訂閱者的onCompleted()方法。

使用create(?)創(chuàng)建一個(gè)Observable

使用 create(?) 方法,你可以創(chuàng)建你自己的Observable,可以實(shí)現(xiàn)異步I/O,計(jì)算操作,甚至是無限的數(shù)據(jù)流。

同步的Observable示例

/**
 * 這個(gè)例子展示了一個(gè)自定義的Observable,當(dāng)有訂閱時(shí)他會(huì)阻塞當(dāng)前線程。
 */
def customObservableBlocking() {
    return Observable.create { aSubscriber ->
        50.times { i ->
            if (!aSubscriber.unsubscribed) {
                aSubscriber.onNext("value_${i}")
            }
        }
        // after sending all values we complete the sequence
        if (!aSubscriber.unsubscribed) {
            aSubscriber.onCompleted()
        }
    }
}

// To see output:
customObservableBlocking().subscribe { println(it) }

異步的Observable示例

The following example uses Groovy to create an Observable that emits 75 strings. 下面的例子使用Groovy創(chuàng)建了一個(gè)發(fā)射75個(gè)字符串的Observable。

為了讓它更清楚,例子很詳細(xì),使用靜態(tài)類型和匿名內(nèi)部類Func1

/**
 * This example shows a custom Observable that does not block
 * when subscribed to as it spawns a separate thread.
 */
def customObservableNonBlocking() {
    return Observable.create({ subscriber ->
        Thread.start {
            for (i in 0..<75) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext("value_${i}")
            }
            // after sending all values we complete the sequence
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
    } as Observable.OnSubscribe)
}

// To see output:
customObservableNonBlocking().subscribe { println(it) }

這是一個(gè)用Clojure寫的例子,使用Future(而不是直接用線程),實(shí)現(xiàn)很簡潔:

(defn customObservableNonBlocking []
  "This example shows a custom Observable that does not block 
   when subscribed to as it spawns a separate thread.

  returns Observable<String>"
  (Observable/create 
    (fn [subscriber]
      (let [f (future 
                (doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
                ; after sending all values we complete the sequence
                (-> subscriber .onCompleted))
        ))
      ))
; To see output
(.subscribe (customObservableNonBlocking) #(println %))

這個(gè)例子從維基百科網(wǎng)站抓取文章,每抓取一篇會(huì)調(diào)用一次onNext

(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
  "Fetch a list of Wikipedia articles asynchronously.

   return Observable<String> of HTML"
  (Observable/create 
    (fn [subscriber]
      (let [f (future
                (doseq [articleName wikipediaArticleNames]
                  (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
                ; after sending response to onnext we complete the sequence
                (-> subscriber .onCompleted))
        ))))
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) 
  (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))

回到Groovy,同樣是從維基百科抓取文章,這兒使用閉包代替匿名內(nèi)部類:

/*
 * Fetch a list of Wikipedia articles asynchronously.
 */
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
    return Observable.create { subscriber ->
        Thread.start {
            for (articleName in wikipediaArticleNames) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
            }
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
        return subscriber
    }
}

fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
    .subscribe { println "--- Article ---\n${it.substring(0, 125)}" }

結(jié)果:

--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...

Note that all of the above examples ignore error handling, for brevity. See below for examples that include error handling.

More information can be found on the [[Observable]] and [[Creating Observables|Creating-Observables]] pages.

注意:為了簡潔,上面的所有例子都忽略了錯(cuò)誤處理,查看下面包含錯(cuò)誤處理的例子。

更多的信息可以在這里找到:ObservableCreating Observables。

使用變換操作

RxJava讓你可以鏈?zhǔn)绞褂?code>操作符用來轉(zhuǎn)換和組合多個(gè)Observables。

The following example, in Groovy, uses a previously defined, asynchronous Observable that emits 75 items, skips over the first 10 of these (skip(10)), then takes the next 5 (take(5)), and transforms them (map(...)) before subscribing and printing the items:

下面是一個(gè)Groovy的例子,使用之前的定義,它會(huì)異步發(fā)射75個(gè)字符串,跳過最開始的10個(gè)((skip(10)),然后獲取接下來的5個(gè)(take(5)),在訂閱之前使用map()轉(zhuǎn)換它們,然后打印結(jié)果字符串。

/**
 * Asynchronously calls 'customObservableNonBlocking' and defines
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
    customObservableNonBlocking().skip(10).take(5)
        .map({ stringValue -> return stringValue + "_xform"})
        .subscribe({ println "onNext => " + it})
}

輸出結(jié)果

onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform

這里有一個(gè)圖例解釋了轉(zhuǎn)換過程:

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/Composition.1.png" width="640" height="536" />

這一個(gè)例子使用Clojure,使用了三個(gè)異步的Observable,其中一個(gè)依賴另一個(gè),使用zip組合這三個(gè)發(fā)射的數(shù)據(jù)項(xiàng)為一個(gè)單個(gè)數(shù)據(jù)項(xiàng),最后使用map()轉(zhuǎn)換這個(gè)結(jié)果:

(defn getVideoForUser [userId videoId]
  "Get video metadata for a given userId
   - video metadata
   - video bookmark position
   - user data
  return Observable<Map>"
    (let [user-observable (-> (getUser userId)
              (.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
          bookmark-observable (-> (getVideoBookmark userId videoId)
              (.map (fn [bookmark] {:viewed-position (:position bookmark)})))
          ; getVideoMetadata requires :language from user-observable so nest inside map function
          video-metadata-observable (-> user-observable 
              (.mapMany
                ; fetch metadata after a response from user-observable is received
                (fn [user-map] 
                  (getVideoMetadata videoId (:language user-map)))))]
          ; now combine 3 observables using zip
          (-> (Observable/zip bookmark-observable video-metadata-observable user-observable 
                (fn [bookmark-map metadata-map user-map]
                  {:bookmark-map bookmark-map 
                  :metadata-map metadata-map
                  :user-map user-map}))
            ; and transform into a single response object
            (.map (fn [data]
                  {:video-id videoId
                   :video-metadata (:metadata-map data)
                   :user-id userId
                   :language (:language (:user-map data))
                   :bookmark (:viewed-position (:bookmark-map data))
                  })))))

輸出是這樣的:

{:video-id 78965, 
 :video-metadata {:video-id 78965, :title House of Cards: Episode 1, 
                  :director David Fincher, :duration 3365}, 
 :user-id 12345, :language es-us, :bookmark 0}

這里有一個(gè)圖例解釋了這個(gè)過程:

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/Composition.2.png" width="640" height="742" />

The following example, in Groovy, comes from Ben Christensen’s QCon presentation on the evolution of the Netflix API. It combines two Observables with the merge operator, then uses the reduce operator to construct a single item out of the resulting sequence, then transforms that item with map before emitting it:

下面的例子使用Groovy,來自這里 Ben Christensen’s QCon presentation on the evolution of the Netflix API,它使用merge操作結(jié)合兩個(gè)Observables,使用reduce操作符從結(jié)果序列構(gòu)建一個(gè)單獨(dú)的結(jié)果數(shù)據(jù)項(xiàng),然后在發(fā)射之前,使用map()變換那個(gè)結(jié)果。

public Observable getVideoSummary(APIVideo video) {
   def seed = [id:video.id, title:video.getTitle()];
   def bookmarkObservable = getBookmark(video);
   def artworkObservable = getArtworkImageUrl(video);
   return( Observable.merge(bookmarkObservable, artworkObservable)
      .reduce(seed, { aggregate, current -> aggregate << current })
      .map({ [(video.id.toString() : it] }))
}

這里也有一個(gè)圖例解釋reduce從多個(gè)Observable的結(jié)果構(gòu)建一個(gè)單一結(jié)構(gòu)的過程:

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/Composition.3.png" width="640" height="640" />

錯(cuò)誤處理

這里是另一個(gè)版本的維基百科的例子,包含錯(cuò)誤處理代碼:

/*
 * Fetch a list of Wikipedia articles asynchronously, with error handling.
 */
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
    return Observable.create({ subscriber ->
        Thread.start {
            try {
                for (articleName in wikipediaArticleNames) {
                    if (true == subscriber.isUnsubscribed()) {
                        return;
                    }
                    subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
                }
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } catch(Throwable t) {
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }
            return (subscriber);
        }
    });
}

下面的例子使用Groovy,注意錯(cuò)誤發(fā)生時(shí)現(xiàn)在是如何調(diào)用onError(Throwable t)的,下面的代碼傳遞給subscribe()第二個(gè)方法用戶處理onError通知:

fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
    .subscribe(
        { println "--- Article ---\n" + it.substring(0, 125) }, 
        { println "--- Error ---\n" + it.getMessage() })

查看 錯(cuò)誤處理操作符 這一夜了解更多RxJava中的錯(cuò)誤處理技術(shù),包括使用 onErrorResumeNext()onErrorReturn()等方法,它們讓你可以從錯(cuò)誤中恢復(fù)。

這里是一個(gè)Groovy的例子:

myModifiedObservable = myObservable.onErrorResumeNext({ t ->
   Throwable myThrowable = myCustomizedThrowableCreator(t);
   return (Observable.error(myThrowable));
});
上一篇:SubscribeOn下一篇:ObserveOn