摘要:信號會由于發射源的不同發射到多次而和僅會發射其中一個,且只發射一次,標志著流的結束。以此類推直到其中某個流發出結束信號,整個被合并后的流結束,不再發射數據。
前言
forkJoin, zip, combineLatest是rxjs中的合并操作符,用于對多個流進行合并。很多人第一次接觸rxjs時往往分不清它們之間的區別,其實這很正常,因為當你準備用來合并的流是那種只會發射一次數據就關閉的流時(比如http請求),就結果而言這三個操作符沒有任何區別。
const ob1 = Rx.Observable.of(1).delay(1000); const ob2 = Rx.Observable.of(2).delay(2000); const ob3 = Rx.Observable.of(3).delay(3000); Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data)); Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data)); Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data)); // [1, 2, 3] // [1, 2, 3] // [1, 2, 3] // 都是在3秒的時候打印
rxjs中很多操作符功能相近,只有當其操作的流會多次發射數據時才會體現出它們之間的區別,下面我們來詳細解釋forkJoin, zip, 和combineLatest。
一個基本概念首先我們要知道,一個流(或者說Observable序列)的生命周期中,每次發射數據會發出next信號(Notification),結束發射時會發出complete信號,發生錯誤時發出error信號,三個信號分別對應observer的三個方法。next信號會由于發射源的不同發射0到多次;而complete和error僅會發射其中一個,且只發射一次,標志著流的結束。
subscribe接收一個observer對象用來處理上述三種信號,只傳入一個函數會被認為是next方法,因此傳入subscribe的next方法會執行0到N次,N為序列正常發射信號的次數。
用forkJoin合并的流,會在每個被合并的流都發出結束信號時發射一次也是唯一一次數據。假設我們有兩個流:
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:$iimcuyw`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:$ocuyqgk`).take(2); Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data)); // ["ob1:2", "ob2:1"]
ob1會在發射完第三個數據時停止發射,ob2會在發射完第二個數據時停止,而forkJoin合并后的流會等到ob1和ob2都結束時,發射一次數據,也就是觸發一次subscribe里的回調,接收到的數據為ob1和ob2發射的最后一次數據的數組。
zipzip工作原理如下,當每個傳入zip的流都發射完畢第一次數據時,zip將這些數據合并為數組并發射出去;當這些流都發射完第二次數據時,zip再次將它們合并為數組并發射。以此類推直到其中某個流發出結束信號,整個被合并后的流結束,不再發射數據。
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:$4u2cy2w`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:$eqw0qss`).take(2); Rx.Observable.zip(ob1, ob2).subscribe({ next: (data) => console.log(data), complete: () => console.log("complete") }); // ["ob1:0", "ob2:0"] ob1等待ob2發射數據,之后合并 // ["ob1:1", "ob2:1"] 此時ob2結束,整個合并的流也結束 // "complete"
zip和forkJoin的區別在于,forkJoin僅會合并各個子流最后發射的一次數據,觸發一次回調;zip會等待每個子流都發射完一次數據然后合并發射,之后繼續等待,直到其中某個流結束(因為此時不能使合并的數據包含每個子流的數據)。
combineLatestcombineLatest與zip很相似,combineLatest一開始也會等待每個子流都發射完一次數據,但是在合并時,如果子流1在等待其他流發射數據期間又發射了新數據,則使用子流最新發射的數據進行合并,之后每當有某個流發射新數據,不再等待其他流同步發射數據,而是使用其他流之前的最近一次數據進行合并。
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:$muimsgq`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:$e22wcqg`).take(2); Rx.Observable.combineLatest(ob1, ob2).subscribe({ next: (data) => console.log(data), complete: () => console.log("complete") }); // ["ob1:1", "ob2:0"] ob1等待ob2發射,當ob2發射時ob1已經發射了第二次數據,使用ob1的第二次數據 // ["ob1:2", "ob2:0"] ob1繼續發射第三次也是最后一次數據,ob2雖然還未發射,但是可以使用它上一次的數據 // ["ob1:2", "ob2:1"] ob2發射第二次也是最后一次數據,使ob1上一次的數據。 // "complete"
本期內容結束,下一期會繼續帶來rxjs的其他操作符或者概念詳解。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/92383.html
摘要:原文各種各樣的操作符按照不同的目的,分類幾個大類創建,變化,過濾,組合,廣播,錯誤處理,使用工具等等。 原文:http://reactivex.io/rxjs/manu... 各種各樣的操作符按照不同的目的,分類幾個大類:創建,變化,過濾,組合,廣播(multicasting),錯誤處理,使用工具等等。 以下的列表,按照分類羅列了全部的操作符: 創建操作符 Creation Opera...
摘要:有哪些新變化于年月日正式發布,為開發人員帶來了一些令人興奮的增補和改進。不要移除包,直到你將所有的鏈式操作修改為管道操作符。 RxJS 6有哪些新變化? RxJs 6于2018年4月24日正式發布,為開發人員帶來了一些令人興奮的增補和改進。Ben Lesh, rxJS核心開發成員,強調: RxJS 6在擁有更小API的同時,帶來了更整潔的引入方式 提供一個npm包,該package可...
摘要:輸出流只有在所有的輸入流都完成以后才能完成,任何一條輸入流上的錯誤都將立即推送到輸出流上。如果沒有轉入輸入流,輸出流將會立即發出結束通知。返回值以數組形式獲取到的每一個輸入流的值,或者來自映射函數的值。返回值僅從最新的內部流上取值的流。 接著上一節的操作符繼續,讓我們直奔主題。 組合類操作符 組合類的操作符可以將不同流數據按一定的規則進行合并,從而獲得所需要的完整數據。 combine...
摘要:是一種針對異步數據流編程工具,或者叫響應式擴展編程可不管如何解釋其目標就是異步編程,引入為了就是讓異步可控更簡單。最合理的方式在調用它。當組件需要向組件傳遞數據時,我們可以在組件中使用。的作用是使指令或組件能自定義事件。 RxJS是一種針對異步數據流編程工具,或者叫響應式擴展編程;可不管如何解釋RxJS其目標就是異步編程,Angular引入RxJS為了就是讓異步可控、更簡單。 而今就是...
摘要:加上以后的操作符大都是直接將輸入流映射到一個輸出流,并且它們都不關心輸入流上的值。如果輸入流沒發出任何值,只發出完成通知,那么發出一個默認值。與錯誤相關的一些操作符,如已經提到的當輸入流上有錯誤時,可以發出重試,傳入的參數就是重試的次數。 這個系列不知不覺已經寫到10了,單純從使用上來說的話,大部分的知識點也都講過了,本來不打算寫了,剛好今天有同學在群里說希望能總結一下常用的操作符,那...
閱讀 851·2021-11-15 17:58
閱讀 3652·2021-11-12 10:36
閱讀 3788·2021-09-22 16:06
閱讀 965·2021-09-10 10:50
閱讀 1332·2019-08-30 11:19
閱讀 3315·2019-08-29 16:26
閱讀 937·2019-08-29 10:55
閱讀 3347·2019-08-26 13:48