摘要:正常情況下,一個流在執行一次終端操作之后便結束了。本文通過復制流內數據的方式,曲折的實現了同一個流上執行多次操作。只是思路,其性能并不一定高效,尤其是數據都在內存中處理時復制的開銷很大。但如果流涉及大量,也許性能會有提高。
正常情況下,一個流在執行一次終端操作之后便結束了。本文通過復制流內數據的方式,曲折的實現了同一個流上執行多次操作。
Demo只是思路,其性能并不一定高效,尤其是數據都在內存中處理時復制的開銷很大。但如果流涉及大量I/O,也許性能會有提高。
public class StreamForker{ private final Stream stream; private final Map
accept方法將原始流中所有的數據添加到各個BlockingQueue內,此處實現了復制
class ForkingStreamConsumerimplements Consumer , Results { static final Object END_OF_STREAM = new Object(); private final List > queues; private final Map > actions; public ForkingStreamConsumer(List > queues, Map > actions) { this.queues = queues; this.actions = actions; } @Override public void accept(T t) { queues.forEach(q -> q.add(t)); } @SuppressWarnings("unchecked") void finish() { accept((T) END_OF_STREAM); } @SuppressWarnings("unchecked") @Override public R get(Object key) { try { return ((Future ) actions.get(key)).get(); } catch (Exception e) { throw new RuntimeException(e); } } }
此處重寫了tryAdvance接口,只是簡單的從BlockingQueue中取出數據,執行action。業務邏輯中復制流是為了做什么事情,action就是這件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中數據結束的標示
class BlockingQueueSpliteratorimplements Spliterator { private final BlockingQueue q; BlockingQueueSpliterator(BlockingQueue q) { this.q = q; } @Override public boolean tryAdvance(Consumer super T> action) { T t; while (true) { try { t = q.take(); break; } catch (InterruptedException e) { } } if (t != ForkingStreamConsumer.END_OF_STREAM) { action.accept(t); return true; } return false; } @Override public Spliterator trySplit() { return null; } @Override public long estimateSize() { return 0; } @Override public int characteristics() { return 0; } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77289.html
摘要:輸出流只有在所有的輸入流都完成以后才能完成,任何一條輸入流上的錯誤都將立即推送到輸出流上。如果沒有轉入輸入流,輸出流將會立即發出結束通知。返回值以數組形式獲取到的每一個輸入流的值,或者來自映射函數的值。返回值僅從最新的內部流上取值的流。 接著上一節的操作符繼續,讓我們直奔主題。 組合類操作符 組合類的操作符可以將不同流數據按一定的規則進行合并,從而獲得所需要的完整數據。 combine...
摘要:使用的操作符這條從左到右的橫線代表經過操作符轉換后的輸出流。返回值通過判定函數檢測的值組成的流。返回值持續發出輸入流上的值,直到通知流上發出值為止。 上期介紹過了rxjs中的三大件,Observable,subscription,subject,但是在開發過程我們最常接觸到的東西非操作符莫屬。比如上期代碼中曾出現過的from就是一個操作符。rxjs中的操作符大致上可以分為幾類,創建類,...
摘要:但不同的是,在的遍歷調用過程中,如果一個事件還沒有觸發完畢獲取到返回值,就觸發了下一個事件,則將忽略返回的值。這樣,我們就可以避免異步的返回值因為返回較慢,反而覆蓋了之后異步的返回值。 Steam in ReactiveX showImg(https://segmentfault.com/img/bVFReX?w=100&h=100); ReactiveX,又稱 Reactive Ex...
閱讀 2022·2021-09-29 09:35
閱讀 1955·2019-08-30 14:15
閱讀 2980·2019-08-30 10:56
閱讀 964·2019-08-29 16:59
閱讀 577·2019-08-29 14:04
閱讀 1310·2019-08-29 12:30
閱讀 1031·2019-08-28 18:19
閱讀 515·2019-08-26 11:51