鍍金池/ 教程/ Android/ 實(shí)現(xiàn)自己的操作符
調(diào)度器 Scheduler
Empty/Never/Throw
Replay
這個(gè)頁面展示了創(chuàng)建Observable的各種方法。
ObserveOn
ReactiveX
TimeInterval
Window
本頁展示的操作符用于對(duì)整個(gè)序列執(zhí)行算法操作或其它操作,由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù)),它們對(duì)于非常長(zhǎng)
IgnoreElements
Distinct
Last
Start
And/Then/When
Switch
創(chuàng)建操作
Materialize/Dematerialize
CombineLatest
Catch
實(shí)現(xiàn)自己的操作符
StringObservable
Map
ConnectableObservable
Using
Take
BlockingObservable
TakeLast
Defer
RxJavaSchedulersHook
First
FlatMap
這個(gè)頁面的操作符可用于根據(jù)條件發(fā)射或變換Observables,或者對(duì)它們做布爾運(yùn)算:
Do
Repeat
Serialize
這個(gè)頁面展示的操作符可用于過濾和選擇Observable發(fā)射的數(shù)據(jù)序列。
這個(gè)頁面列出了很多用于Observable的輔助操作符
Single
Retry
從錯(cuò)誤中恢復(fù)的技術(shù)
Sample
Merge
算術(shù)和聚合操作
Range
Timestamp
RxJava Issues
From
Subscribe
Subject
Delay
Skip
SubscribeOn
Filter
按字母順序排列的全部操作符列表
Timeout
Scan
onError
Zip
RxJava文檔和教程
Publish
ElementAt
第一個(gè)例子
SkipLast
Just
Timer
Debounce
GroupBy
條件和布爾操作
這個(gè)頁面展示了可用于對(duì)Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作的各種操作符。
Introduction
rxjava-async
介紹響應(yīng)式編程
這個(gè)頁面展示的操作符可用于組合多個(gè)Observables。
ReactiveX
Connect
操作符分類
StartWith
Interval
Join
To
Buffer
RefCount
介紹
Observable

實(shí)現(xiàn)自己的操作符

你可以實(shí)現(xiàn)你自己的Observable操作符,本文展示怎么做。

如果你的操作符是被用于創(chuàng)造一個(gè)Observable,而不是變換或者響應(yīng)一個(gè)Observable,使用 create(?) 方法,不要試圖手動(dòng)實(shí)現(xiàn) Observable。另外,你可以按照下面的用法說明創(chuàng)建一個(gè)自定義的操作符。

如果你的操作符是用于Observable發(fā)射的單獨(dú)的數(shù)據(jù)項(xiàng),按照下面的說明做:Sequence Operators 。如果你的操作符是用于變換Observable發(fā)射的整個(gè)數(shù)據(jù)序列,按照這個(gè)說明做:Transformational Operators 。

提示: 在一個(gè)類似于Groovy的語言Xtend中,你可以以 extension methods 的方式實(shí)現(xiàn)你自己的操作符 ,不使用本文的方法,它們也可以鏈?zhǔn)秸{(diào)用。詳情參見 RxJava and Xtend

序列操作符

下面的例子向你展示了怎樣使用lift(?)操作符將你的自定義操作符(在這個(gè)例子中是 myOperator)與標(biāo)準(zhǔn)的RxJava操作符(如ofTypemap)一起使用:

fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});

下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與lift()搭配使用。

實(shí)現(xiàn)你的操作符

將你的自定義操作符定義為實(shí)現(xiàn)了 Operator 接口的一個(gè)公開類, 就像這樣:

public class MyOperator<T> implements Operator<T> {
  public MyOperator( /* any necessary params here */ ) {
    /* 這里添加必要的初始化代碼 */
  }

  @Override
  public Subscriber<? super T> call(final Subscriber<? super T> s) {
    return new Subscriber<t>(s) {
      @Override
      public void onCompleted() {
        /* 這里添加你自己的onCompleted行為,或者僅僅傳遞完成通知: */
        if(!s.isUnsubscribed()) {
          s.onCompleted();
        }
      }

      @Override
      public void onError(Throwable t) {
        /* 這里添加你自己的onError行為, 或者僅僅傳遞錯(cuò)誤通知:*/
        if(!s.isUnsubscribed()) {
          s.onError(t);
        }
      }

      @Override
      public void onNext(T item) {
        /* 這個(gè)例子對(duì)結(jié)果的每一項(xiàng)執(zhí)行排序操作,然后返回這個(gè)結(jié)果 */
        if(!s.isUnsubscribed()) {
          transformedItem = myOperatorTransformOperation(item);
          s.onNext(transformedItem);
        }
      }
    };
  }
}

變換操作符

下面的例子向你展示了怎樣使用 compose(?) 操作符將你得自定義操作符(在這個(gè)例子中,是一個(gè)名叫myTransformer的操作符,它將一個(gè)發(fā)射整數(shù)的Observable轉(zhuǎn)換為發(fā)射字符串的)與標(biāo)準(zhǔn)的RxJava操作符(如ofTypemap)一起使用:

fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});

下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與compose()搭配使用。

實(shí)現(xiàn)你的變換器

將你的自定義操作符定義為實(shí)現(xiàn)了 Transformer 接口的一個(gè)公開類,就像這樣:

public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
  public MyTransformer( /* any necessary params here */ ) {
    /* 這里添加必要的初始化代碼 */
  }

  @Override
  public Observable<String> call(Observable<Integer> source) {
    /* 
     * 這個(gè)簡(jiǎn)單的例子Transformer應(yīng)用一個(gè)map操作,
     * 這個(gè)map操作將發(fā)射整數(shù)變換為發(fā)射整數(shù)的字符串表示。
     */
    return source.map( new Func1<Integer,String>() {
      @Override
      public String call(Integer t1) {
        return String.valueOf(t1);
      }
    } );
  }
}

參見

其它需要考慮的

  • 在發(fā)射任何數(shù)據(jù)(或者通知)給訂閱者之前,你的序列操作符可能需要檢查它的 Subscriber.isUnsubscribed(?) 狀態(tài),如果沒有訂閱者了,沒必要浪費(fèi)時(shí)間生成數(shù)據(jù)項(xiàng)。
  • 請(qǐng)注意:你的序列操作符必須復(fù)合Observable協(xié)議的核心原則:
    • 它可能調(diào)用訂閱者的 onNext(?) 方法任意次,但是這些調(diào)用必須是不重疊的。
    • 它只能調(diào)用訂閱者的 onCompleted(?)onError(?) 正好一次,但不能都調(diào)用,而且不能在這之后調(diào)用訂閱者的 onNext(?) 方法。
    • 如果你不能保證你得操作符遵從這兩個(gè)原則,你可以給它添加 serialize(?) 操作符,它會(huì)強(qiáng)制保持正確的行為。
  • 請(qǐng)關(guān)注這里 Issue #1962 &mdash;需要有一個(gè)計(jì)劃創(chuàng)建一個(gè)測(cè)試腳手架,你可以用它來寫測(cè)試驗(yàn)證你的新操作符遵從了Observable協(xié)議。
  • 不要讓你的操作符阻塞別的操作。
  • When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:
  • 如果可能,你應(yīng)該組合現(xiàn)有的操作符創(chuàng)建你的新操作符,而不是從零開始實(shí)現(xiàn)它。RxJava自身的標(biāo)準(zhǔn)操作符也是這樣做的,例如:
  • 如果你的操作符使用了函數(shù)或者lambda表達(dá)式作為參數(shù),請(qǐng)注意它們可能是異常的來源,而且要準(zhǔn)備好捕獲這些異常,并且使用 onError() 通知訂閱者。
    • 某些異常被認(rèn)為是致命的,對(duì)它們來說,調(diào)用 onError()毫無意義,那樣或者是無用的,或者只是對(duì)問題的妥協(xié)。你可以使用 Exceptions.throwIfFatal(throwable) 方法過濾掉這些致命的異常,并重新拋出它們,而不是試圖發(fā)射關(guān)于它們的通知。
  • 一般說來,一旦發(fā)生錯(cuò)誤應(yīng)立即通知訂閱者,而不是首先嘗試發(fā)射更多的數(shù)據(jù)。
  • 請(qǐng)注意 null 可能是Observable發(fā)射的一個(gè)合法數(shù)據(jù)。頻繁發(fā)生錯(cuò)誤的一個(gè)來源是:測(cè)試一些變量并且將持有一個(gè)非 null 值作為是否發(fā)射了數(shù)據(jù)的替代。一個(gè)值為 null 的數(shù)據(jù)仍然是一個(gè)發(fā)射數(shù)據(jù)項(xiàng),它與沒有發(fā)射任何東西是不能等同的。
  • 想讓你的操作符在反壓(backpressure)場(chǎng)景中變得得好可能會(huì)非常棘手??梢詤⒖糄ávid Karnok的博客 Advanced RxJava,這里有一個(gè)涉及到的各種因素和怎樣處理它們的很值得看的討論。
上一篇:Start下一篇:TakeLast