国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

如何以并發方式在同一個流上執行多種操作?--復制流

王晗 / 3493人閱讀

摘要:正常情況下,一個流在執行一次終端操作之后便結束了。本文通過復制流內數據的方式,曲折的實現了同一個流上執行多次操作。只是思路,其性能并不一定高效,尤其是數據都在內存中處理時復制的開銷很大。但如果流涉及大量,也許性能會有提高。

正常情況下,一個流在執行一次終端操作之后便結束了。本文通過復制流內數據的方式,曲折的實現了同一個流上執行多次操作。
Demo只是思路,其性能并不一定高效,尤其是數據都在內存中處理時復制的開銷很大。但如果流涉及大量I/O,也許性能會有提高。

public class StreamForker {
    private final Stream stream;
    private final Map, ?>> forks = new HashMap<>();

    public StreamForker(Stream stream) {
        this.stream = stream;
    }

    public StreamForker fork(Object key, Function, ?> f) {
        forks.put(key, f);
        return this;
    }

    public Results getResults() {
        ForkingStreamConsumer consumer = build();
        try {
            stream.sequential().forEach(consumer);
        } finally {
            consumer.finish();
        }
        return consumer;
    }

    private ForkingStreamConsumer build() {
        List> queues = new ArrayList<>();

        Map> actions = forks.entrySet().stream().reduce(new HashMap>(),
                (map, e) -> {
                    map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                    return map;
                }, (m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                });

        return new ForkingStreamConsumer<>(queues, actions);
    }

    private Future getOperationResult(List> queues, Function, ?> f) {
        BlockingQueue queue = new LinkedBlockingQueue<>();
        queues.add(queue);
        Spliterator spliterator = new BlockingQueueSpliterator<>(queue);
        Stream source = StreamSupport.stream(spliterator, false);
        return CompletableFuture.supplyAsync(() -> f.apply(source));
    }
}

accept方法將原始流中所有的數據添加到各個BlockingQueue內,此處實現了復制

class ForkingStreamConsumer implements Consumer, Results {
    static final Object END_OF_STREAM = new Object();

    private final List> queues;
    private final Map> actions;

    public ForkingStreamConsumer(List> queues, Map> actions) {
        this.queues = queues;
        this.actions = actions;
    }

    @Override
    public void accept(T t) {
        queues.forEach(q -> q.add(t));
    }

    @SuppressWarnings("unchecked")
    void finish() {
        accept((T) END_OF_STREAM);
    }

    @SuppressWarnings("unchecked")
    @Override
    public  R get(Object key) {
        try {
            return ((Future) actions.get(key)).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

此處重寫了tryAdvance接口,只是簡單的從BlockingQueue中取出數據,執行action。業務邏輯中復制流是為了做什么事情,action就是這件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中數據結束的標示

class BlockingQueueSpliterator implements Spliterator {
    private final BlockingQueue q;

    BlockingQueueSpliterator(BlockingQueue q) {
        this.q = q;
    }

    @Override
    public boolean tryAdvance(Consumer action) {
        T t;
        while (true) {
            try {
                t = q.take();
                break;
            } catch (InterruptedException e) {
            }
        }

        if (t != ForkingStreamConsumer.END_OF_STREAM) {
            action.accept(t);
            return true;
        }

        return false;
    }

    @Override
    public Spliterator trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return 0;
    }

    @Override
    public int characteristics() {
        return 0;
    }
}

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77289.html

相關文章

  • 從命令式到響應式(五)

    摘要:輸出流只有在所有的輸入流都完成以后才能完成,任何一條輸入流上的錯誤都將立即推送到輸出流上。如果沒有轉入輸入流,輸出流將會立即發出結束通知。返回值以數組形式獲取到的每一個輸入流的值,或者來自映射函數的值。返回值僅從最新的內部流上取值的流。 接著上一節的操作符繼續,讓我們直奔主題。 組合類操作符 組合類的操作符可以將不同流數據按一定的規則進行合并,從而獲得所需要的完整數據。 combine...

    CoderBear 評論0 收藏0
  • 從命令式到響應式(四)

    摘要:使用的操作符這條從左到右的橫線代表經過操作符轉換后的輸出流。返回值通過判定函數檢測的值組成的流。返回值持續發出輸入流上的值,直到通知流上發出值為止。 上期介紹過了rxjs中的三大件,Observable,subscription,subject,但是在開發過程我們最常接觸到的東西非操作符莫屬。比如上期代碼中曾出現過的from就是一個操作符。rxjs中的操作符大致上可以分為幾類,創建類,...

    jaysun 評論0 收藏0
  • 巧妙復制一個

    摘要:場景實際業務中可能出現重復消費一個可讀流的情況,比如在前置過濾器解析請求體,拿到進行相關權限及身份認證認證通過后框架或者后置過濾器再次解析請求體傳遞給業務上下文。 場景 實際業務中可能出現重復消費一個可讀流的情況,比如在前置過濾器解析請求體,拿到body進行相關權限及身份認證;認證通過后框架或者后置過濾器再次解析請求體傳遞給業務上下文。因此,重復消費同一個流的需求并不奇葩,這類似于js...

    wenzi 評論0 收藏0
  • 探索 RxJS - Core Concept

    摘要:但不同的是,在的遍歷調用過程中,如果一個事件還沒有觸發完畢獲取到返回值,就觸發了下一個事件,則將忽略返回的值。這樣,我們就可以避免異步的返回值因為返回較慢,反而覆蓋了之后異步的返回值。 Steam in ReactiveX showImg(https://segmentfault.com/img/bVFReX?w=100&h=100); ReactiveX,又稱 Reactive Ex...

    Neilyo 評論0 收藏0

發表評論

0條評論

王晗

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<