定期將來自原始Observable的數(shù)據(jù)分解為一個Observable窗口,發(fā)射這些窗口,而不是每次發(fā)射一項數(shù)據(jù)
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window.C.png" alt="window" />
Window
和Buffer
類似,但不是發(fā)射來自原始Observable的數(shù)據(jù)包,它發(fā)射的是Observables,這些Observables中的每一個都發(fā)射原始Observable數(shù)據(jù)的一個子集,最后發(fā)射一個onCompleted
通知。
和Buffer
一樣,Window
有很多變體,每一種都以自己的方式將原始Observable分解為多個作為結(jié)果的Observable,每一個都包含一個映射原始數(shù)據(jù)的window
。用Window
操作符的術(shù)語描述就是,當(dāng)一個窗口打開(when a window "opens")意味著一個新的Observable已經(jīng)發(fā)射(產(chǎn)生)了,而且這個Observable開始發(fā)射來自原始Observable的數(shù)據(jù);當(dāng)一個窗口關(guān)閉(when a window "closes")意味著發(fā)射(產(chǎn)生)的Observable停止發(fā)射原始Observable的數(shù)據(jù),并且發(fā)射終止通知onCompleted
給它的觀察者們。
在RxJava中有許多種Window
操作符的變體。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window1.png" alt="window1" />
window
的這個變體會立即打開它的第一個窗口。每當(dāng)它觀察到closingSelector
返回的Observable發(fā)射了一個對象時,它就關(guān)閉當(dāng)前打開的窗口并立即打開一個新窗口。用這個方法,這種window
變體發(fā)射一系列不重疊的窗口,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)是一一對應(yīng)的。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window2.png" alt="window2" />
無論何時,只要window
觀察到windowOpenings
這個Observable發(fā)射了一個Opening
對象,它就打開一個窗口,并且同時調(diào)用closingSelector
生成一個與那個窗口關(guān)聯(lián)的關(guān)閉(closing)Observable。當(dāng)這個關(guān)閉(closing)Observable發(fā)射了一個對象時,window
操作符就會關(guān)閉那個窗口。對這個變體來說,由于當(dāng)前窗口的關(guān)閉和新窗口的打開是由單獨的Observable管理的,它創(chuàng)建的窗口可能會存在重疊(重復(fù)某些來自原始Observable的數(shù)據(jù))或間隙(丟棄某些來自原始Observable的數(shù)據(jù))。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window3.png" alt="window3" />
這個window
的變體立即打開它的第一個窗口。每當(dāng)當(dāng)前窗口發(fā)射了count
項數(shù)據(jù),它就關(guān)閉當(dāng)前窗口并打開一個新窗口。如果從原始Observable收到了onError
或onCompleted
通知它也會關(guān)閉當(dāng)前窗口。這種window
變體發(fā)射一系列不重疊的窗口,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)是一一對應(yīng)的。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window4.png" alt="window4" />
這個window
的變體立即打開它的第一個窗口。原始Observable每發(fā)射skip
項數(shù)據(jù)它就打開一個新窗口(例如,如果skip
等于3,每到第三項數(shù)據(jù),它會打開一耳光新窗口)。每當(dāng)當(dāng)前窗口發(fā)射了count
項數(shù)據(jù),它就關(guān)閉當(dāng)前窗口并打開一個新窗口。如果從原始Observable收到了onError
或onCompleted
通知它也會關(guān)閉當(dāng)前窗口。如果skip=count
,它的行為與window(source, count)
相同;如果skip < count
,窗口可會有count - skip
個重疊的數(shù)據(jù);如果skip > count
,在兩個窗口之間會有skip - count
項數(shù)據(jù)被丟棄。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window5.png" alt="window5" />
這個window
的變體立即打開它的第一個窗口。每當(dāng)過了timespan
這么長的時間它就關(guān)閉當(dāng)前窗口并打開一個新窗口(時間單位是unit
,可選在調(diào)度器scheduler
上執(zhí)行)。如果從原始Observable收到了onError
或onCompleted
通知它也會關(guān)閉當(dāng)前窗口。這種window
變體發(fā)射一系列不重疊的窗口,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)也是一一對應(yīng)的。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window6.png" alt="window6" />
這個window
的變體立即打開它的第一個窗口。這個變體是window(count)
和window(timespan, unit[, scheduler])
的結(jié)合,每當(dāng)過了timespan
的時長或者當(dāng)前窗口收到了count
項數(shù)據(jù),它就關(guān)閉當(dāng)前窗口并打開另一個。如果從原始Observable收到了onError
或onCompleted
通知它也會關(guān)閉當(dāng)前窗口。這種window
變體發(fā)射一系列不重疊的窗口,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)也是一一對應(yīng)的。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/window7.png" alt="window7" />
buffer(timespan,?timeshift,?unit)
在每一個timeshift
時期內(nèi)都創(chuàng)建一個新的List
,然后用原始Observable發(fā)射的每一項數(shù)據(jù)填充這個列表(在把這個List
當(dāng)做自己的數(shù)據(jù)發(fā)射前,從創(chuàng)建時開始,直到過了timespan
這么長的時間)。如果timespan
長于timeshift
,它發(fā)射的數(shù)據(jù)包將會重疊,因此可能包含重復(fù)的數(shù)據(jù)項。
這個window
的變體立即打開它的第一個窗口。隨后每當(dāng)過了timeshift
的時長就打開一個新窗口(時間單位是unit
,可選在調(diào)度器scheduler
上執(zhí)行),當(dāng)窗口打開的時長達到timespan
,它就關(guān)閉當(dāng)前打開的窗口。如果從原始Observable收到了onError
或onCompleted
通知它也會關(guān)閉當(dāng)前窗口。窗口的數(shù)據(jù)可能重疊也可能有間隙,取決于你設(shè)置的timeshift
和timespan
的值。
這個變體的window
默認(rèn)在computation
調(diào)度器上執(zhí)行它的定時器。
你可以使用Window
操作符實現(xiàn)反壓backpressure
(意思是,處理這樣一個Observable:它產(chǎn)生數(shù)據(jù)的數(shù)據(jù)可能比它的觀察者消費數(shù)據(jù)的數(shù)據(jù)快)。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.window1.png" alt="bp.window1" />
Window操作符可以將大量的數(shù)據(jù)序列縮減為較少的數(shù)據(jù)窗口序列,讓它們更容易處理。例如,你可以按固定的時間間隔,定期關(guān)閉和發(fā)射來自一個爆發(fā)性O(shè)bservable的數(shù)據(jù)窗口。
示例代碼
Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.window2.png" alt="bp.window2" />
你還可以選擇每當(dāng)收到爆發(fā)性O(shè)bservable的N項數(shù)據(jù)時發(fā)射一個新的數(shù)據(jù)窗口。
示例代碼
Observable<Observable<Integer>> burstyWindowed = bursty.window(5);