你可以實(shí)現(xiàn)你自己的Observable操作符,本文展示怎么做。
如果你的操作符是被用于創(chuàng)造一個(gè)Observable,而不是變換或者響應(yīng)一個(gè)Observable,使用 create(?)
方法,不要試圖手動(dòng)實(shí)現(xiàn) Observable
。另外,你可以按照下面的用法說明創(chuàng)建一個(gè)自定義的操作符。
如果你的操作符是用于Observable發(fā)射的單獨(dú)的數(shù)據(jù)項(xiàng),按照下面的說明做:Sequence Operators 。如果你的操作符是用于變換Observable發(fā)射的整個(gè)數(shù)據(jù)序列,按照這個(gè)說明做:Transformational Operators 。
提示: 在一個(gè)類似于Groovy的語言Xtend中,你可以以 extension methods 的方式實(shí)現(xiàn)你自己的操作符 ,不使用本文的方法,它們也可以鏈?zhǔn)秸{(diào)用。詳情參見 RxJava and Xtend
下面的例子向你展示了怎樣使用lift(?)
操作符將你的自定義操作符(在這個(gè)例子中是 myOperator
)與標(biāo)準(zhǔn)的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});
下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與lift()
搭配使用。
將你的自定義操作符定義為實(shí)現(xiàn)了 Operator
接口的一個(gè)公開類, 就像這樣:
public class MyOperator<T> implements Operator<T> {
public MyOperator( /* any necessary params here */ ) {
/* 這里添加必要的初始化代碼 */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* 這里添加你自己的onCompleted行為,或者僅僅傳遞完成通知: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* 這里添加你自己的onError行為, 或者僅僅傳遞錯(cuò)誤通知:*/
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* 這個(gè)例子對(duì)結(jié)果的每一項(xiàng)執(zhí)行排序操作,然后返回這個(gè)結(jié)果 */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
下面的例子向你展示了怎樣使用 compose(?)
操作符將你得自定義操作符(在這個(gè)例子中,是一個(gè)名叫myTransformer
的操作符,它將一個(gè)發(fā)射整數(shù)的Observable轉(zhuǎn)換為發(fā)射字符串的)與標(biāo)準(zhǔn)的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});
下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與compose()
搭配使用。
將你的自定義操作符定義為實(shí)現(xiàn)了 Transformer
接口的一個(gè)公開類,就像這樣:
public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
public MyTransformer( /* any necessary params here */ ) {
/* 這里添加必要的初始化代碼 */
}
@Override
public Observable<String> call(Observable<Integer> source) {
/*
* 這個(gè)簡(jiǎn)單的例子Transformer應(yīng)用一個(gè)map操作,
* 這個(gè)map操作將發(fā)射整數(shù)變換為發(fā)射整數(shù)的字符串表示。
*/
return source.map( new Func1<Integer,String>() {
@Override
public String call(Integer t1) {
return String.valueOf(t1);
}
} );
}
}
Subscriber.isUnsubscribed(?)
狀態(tài),如果沒有訂閱者了,沒必要浪費(fèi)時(shí)間生成數(shù)據(jù)項(xiàng)。onNext(?)
方法任意次,但是這些調(diào)用必須是不重疊的。onCompleted(?)
或 onError(?)
正好一次,但不能都調(diào)用,而且不能在這之后調(diào)用訂閱者的 onNext(?)
方法。serialize(?)
操作符,它會(huì)強(qiáng)制保持正確的行為。first(?)
被定義為 take(1).single(?)ignoreElements(?)
被定義為 filter(alwaysFalse(?))reduce(a)
被定義為 scan(a).last(?)onError()
通知訂閱者。
onError()
毫無意義,那樣或者是無用的,或者只是對(duì)問題的妥協(xié)。你可以使用 Exceptions.throwIfFatal(throwable)
方法過濾掉這些致命的異常,并重新拋出它們,而不是試圖發(fā)射關(guān)于它們的通知。null
可能是Observable發(fā)射的一個(gè)合法數(shù)據(jù)。頻繁發(fā)生錯(cuò)誤的一個(gè)來源是:測(cè)試一些變量并且將持有一個(gè)非 null
值作為是否發(fā)射了數(shù)據(jù)的替代。一個(gè)值為 null
的數(shù)據(jù)仍然是一個(gè)發(fā)射數(shù)據(jù)項(xiàng),它與沒有發(fā)射任何東西是不能等同的。