摘要:正確使用并行流錯用并行流而產生錯誤的首要原因,就是使用的算法改變了某些共享狀態。高效使用并行流留意裝箱有些操作本身在并行流上的性能就比順序流差還要考慮流的操作流水線的總計算成本。
一、并行流 1.將順序流轉換為并行流
對順序流調用parallel方法:
public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }
它在內部實際上就是設了一個boolean標志,表示你想讓調用parallel之后進行的所有操作都并行執行。類似地,你只需要對并行流調用sequential方法就可以把它變成順序流。但最后一次parallel或sequential調用會影響整個流水線。
2.測量流性能iterate生成的是裝箱的對象,必須拆箱成數字才能求和;
我們很難把iterate分成多個獨立塊來并行執行。
iterate很難分割成能夠獨立執行的小塊,因為每次應用這個函數都要依賴前一次應用的結果,整張數字列表在歸納過程開始時沒有準備好,因而無法有效地把流劃分為小塊來并行處理。把流標記成并行,你其實是給順序處理增加了開銷,它還要把每次求和操作分到一個不同的線程上。
3.正確使用并行流錯用并行流而產生錯誤的首要原因,就是使用的算法改變了某些共享狀態。
public class Accumulator { public long total = 0; public void add(long value) { total += value; } } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; }
上面的示例在本質上就是順序的,每次訪問total都會出現數據競爭.由于多個線程在同時訪問累加器,執行total += value,而這一句雖然看似簡單,卻不是一個原子操作。所得的結果也是不可控的(錯誤的)。4.高效使用并行流
留意裝箱
有些操作本身在并行流上的性能就比順序流差
還要考慮流的操作流水線的總計算成本。設N是要處理的元素的總數,Q是一個元素通過流水線的大致處理成本,則N*Q就是這個對成本的一個粗略的定性估計。Q值較高就意味著使用并行流時性能好的可能性比較大
對于較小的數據量,選擇并行流幾乎從來都不是一個好的決定
要考慮流背后的數據結構是否易于分解
流自身的特點,以及流水線中的中間操作修改流的方式,都可能會改變分解過程的性能。
還要考慮終端操作中合并步驟的代價是大是小
二、分支/合并框架(Fork/Join)詳見第六章相關內容
注意:不應該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直接調用compute或fork方法,只有順序代碼才應該用invoke來啟動并行計算。
Spliterator是Java 8中加入的另一個新接口;這個名字代表“可分迭代器”(splitable iterator)。和Iterator一樣,Spliterator也用于遍歷數據源中的元素,但它是為了并行執行而設計的。
Spliterator接口
public interface Spliterator{ boolean tryAdvance(Consumer super T> action); Spliterator trySplit(); long estimateSize(); int characteristics(); }
與往常一樣,T是Spliterator遍歷的元素的類型。tryAdvance方法的行為類似于普通的Iterator,因為它會按順序一個一個使用Spliterator中的元素,并且如果還有其他元素要遍歷就返回true。但trySplit是專為Spliterator接口設計的,因為它可以把一些元素劃出去分給第二個Spliterator(由該方法返回),讓它們兩個并行處理。Spliterator還可通過estimateSize方法估計還剩下多少元素要遍歷,因為即使不那么確切,能快速算出來是一個值也有助于讓拆分均勻一點.
1.拆分過程將Stream拆分成多個部分的算法是一個遞歸過程,如圖所示。第一步是對第一個Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用trysplit,這樣總共就有了四個Spliterator。這個框架不斷對Spliterator調用trySplit直到它返回null,表明它處理的數據結構不能再分割,如第三步所示。最后,這個遞歸拆分過程到第四步就終止了,這時所有的Spliterator在調用trySplit時都返回了null。
文中提到了reduce的三參數重載方法
U reduce(U identity,BiFunction accumulator,BinaryOperator combiner)它的三個參數:
identity: 一個初始化的值;這個初始化的值其類型是泛型U,與Reduce方法返回的類型一致;注意此時Stream中元素的類型是T,與U可以不一樣也可以一樣,這樣的話操作空間就大了;不管Stream中存儲的元素是什么類型,U都可以是任何類型,如U可以是一些基本數據類型的包裝類型Integer、Long等;或者是String,又或者是一些集合類型ArrayList等;后面會說到這些用法。
accumulator: 其類型是BiFunction,輸入是U與T兩個類型的數據,而返回的是U類型;也就是說返回的類型與輸入的第一個參數類型是一樣的,而輸入的第二個參數類型與Stream中元素類型是一樣的。
combiner: 其類型是BinaryOperator,支持的是對U類型的對象進行操作;
第三個參數combiner主要是使用在并行計算的場景下;如果Stream是非并行時,第三個參數實際上是不生效的。
代碼實現:
class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
class WordCounterSpliterator implements Spliterator{ private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = (currentSize / 2) + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator spliterator = new WordCounterSpliterator(string.substring( currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; private int countWords(Streamstream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); } Spliterator spliterator = new WordCounterSpliterator(SENTENCE); Stream stream = StreamSupport.stream(spliterator, true); System.out.println("Found " + countWords(stream) + " words");
最后打印顯示
Found 19 words
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74284.html
摘要:實戰讀書筆記第一章從方法傳遞到接著上次的,繼續來了解一下,如果繼續簡化代碼。去掉并且生成的數字是萬,所消耗的時間循序流并行流至于為什么有時候并行流效率比循序流還低,這個以后的文章會解釋。 《Java8實戰》-讀書筆記第一章(02) 從方法傳遞到Lambda 接著上次的Predicate,繼續來了解一下,如果繼續簡化代碼。 把方法作為值來傳遞雖然很有用,但是要是有很多類似與isHeavy...
摘要:限制編寫并行流,存在一些與非并行流不一樣的約定。底層框架并行流在底層沿用的框架,遞歸式的分解問題,然后每段并行執行,最終由合并結果,返回最后的值。 本書第六章的讀書筆記,也是我這個系列的最后一篇讀書筆記。后面7、8、9章分別講的測試、調試與重構、設計和架構的原則以及使用Lambda表達式編寫并發程序,因為筆記不好整理,就不寫了,感興趣的同學自己買書來看吧。 并行化流操作 關于并行與并發...
摘要:第四章引入流一什么是流流是的新成員,它允許你以聲明性方式處理數據集合通過查詢語句來表達,而不是臨時編寫一個實現。 第四章 引入流 一、什么是流 流是Java API的新成員,它允許你以聲明性方式處理數據集合(通過查詢語句來表達,而不是臨時編寫一個實現)。就現在來說,你可以把它們看成遍歷數據集的高級迭代器。此外,流還可以透明地并行處理,你無需寫任何多線程代碼。 下面兩段代碼都是用來返回低...
摘要:分區函數返回一個布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當遍歷到流中第個元素時,這個函數執行時會有兩個參數保存歸約結果的累加器已收集了流中的前個項目,還有第個元素本身。 一、收集器簡介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:內部迭代與使用迭代器顯式迭代的集合不同,流的迭代操作是在背后進行的。流只能遍歷一次請注意,和迭代器類似,流只能遍歷一次。 流(Stream) 流是什么 流是Java API的新成員,它允許你以聲明性方式處理數據集合(通過查詢語句來表達,而不是臨時編寫一個實現)。就現在來說,你可以把它們看成遍歷數據集的高級迭代器。此外,流還可以透明地并行處理,你無需寫任何多線程代碼了!我會在后面的筆記中...
摘要:方法接受一個生產者作為參數,返回一個對象,該對象完成異步執行后會讀取調用生產者方法的返回值。該方法接收一個對象構成的數組,返回由第一個執行完畢的對象的返回值構成的。 一、Future 接口 在Future中觸發那些潛在耗時的操作把調用線程解放出來,讓它能繼續執行其他有價值的工作,不再需要呆呆等待耗時的操作完成。打個比方,你可以把它想象成這樣的場景:你拿了一袋子衣服到你中意的干洗店去洗。...
閱讀 2880·2021-11-11 10:58
閱讀 1936·2021-10-11 10:59
閱讀 3501·2019-08-29 16:23
閱讀 2349·2019-08-29 11:11
閱讀 2797·2019-08-28 17:59
閱讀 3848·2019-08-27 10:56
閱讀 2094·2019-08-23 18:37
閱讀 3123·2019-08-23 16:53