摘要:并行流與目前,我們對(duì)集合進(jìn)行計(jì)算有兩種方式并行流而更加的靈活,我們可以配置線(xiàn)程池的大小確保整體的計(jì)算不會(huì)因?yàn)榈却l(fā)生阻塞。
【回顧Future接口
Future接口時(shí)java5引入的,設(shè)計(jì)初衷是對(duì)將來(lái)某個(gè)時(shí)刻會(huì)發(fā)生的結(jié)果建模。它建模了一種異步計(jì)算,返回了一個(gè)執(zhí)行預(yù)算結(jié)果的引用。比如,你去干洗店洗衣服,店員會(huì)告訴你什么時(shí)候可以來(lái)取衣服,而不是讓你一直在干洗店等待。要使用Future只需要將耗時(shí)操作封裝在一個(gè)Callable對(duì)象中,再將其提交給ExecutorService就可以了。
ExecutorService executor = Executors.newFixedThreadPool(10); Futurefuture = executor.submit(new Callable () { @Override public Double call() throws Exception { return doSomeLongComputation(); } }); doSomethingElse(); try { //最多等待1秒 Double result = future.get(1,TimeUnit.SECONDS); } catch (InterruptedException e) { //當(dāng)前線(xiàn)程等待過(guò)程中被打斷 e.printStackTrace(); } catch (ExecutionException e) { //計(jì)算時(shí)出現(xiàn)異常 e.printStackTrace(); } catch (TimeoutException e) { //完成計(jì)算前就超時(shí) e.printStackTrace(); }
但是Future依然有一些局限性:
無(wú)法將兩個(gè)異步計(jì)算的結(jié)果合并為一個(gè)。
等待Future集合中所有任務(wù)完成。
等待Future集合中最快任務(wù)完成(選擇最優(yōu)的執(zhí)行方案)。
通過(guò)編程的方式完成一個(gè)Future任務(wù)的執(zhí)行(手工設(shè)定異步結(jié)果處理)。
應(yīng)對(duì)Future的完成事件,當(dāng)Future的完成事件發(fā)生時(shí)會(huì)收到通知,并可以使用Future的結(jié)果進(jìn)行下一步操作,不只是簡(jiǎn)單的阻塞等待。
而CompletableFuture類(lèi)實(shí)現(xiàn)了Future接口,可以將上述的問(wèn)題全部解決。CompletableFuture與Stream的設(shè)計(jì)都遵循了類(lèi)似的設(shè)計(jì)模式:使用Lambda表達(dá)式以及流水線(xiàn)的思想,從這個(gè)角度可以說(shuō)CompletableFuture與Future的關(guān)系類(lèi)似于Stream與Collection的關(guān)系。
【構(gòu)建一個(gè)異步應(yīng)用最佳價(jià)格查詢(xún)器:查詢(xún)多個(gè)線(xiàn)上商店對(duì)同一商品的價(jià)格。
首先構(gòu)建商店對(duì)象:
package BestPriceFinder; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private String name; public Shop(String name){ this.name = name; } public String getName(){ return name; } /** * 異步api:使用創(chuàng)建CompletableFuture類(lèi)提供的工廠(chǎng)方法與getPriceAsync()效果完全一致 * 可以更輕易的完成這個(gè)流程,并且不用擔(dān)心實(shí)現(xiàn)細(xì)節(jié) * @param product * @return */ public FuturegetPriceAsyncByFactory(String product){ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } /** * 異步api: * @param product * @return */ public Future getPriceAsync(String product){ //創(chuàng)建CompletableFuture對(duì)象,它將包含計(jì)算結(jié)果 CompletableFuture futurePrice = new CompletableFuture<>(); //在新線(xiàn)程中異步計(jì)算結(jié)果 new Thread(() -> { try { double price = calculatePrice(product); //需要長(zhǎng)時(shí)間計(jì)算的任務(wù)結(jié)束時(shí),設(shè)置future的返回值 futurePrice.complete(price); }catch (Exception e){ //如這里沒(méi)有使用completeExceptionally,線(xiàn)程不會(huì)結(jié)束,調(diào)用方會(huì)永遠(yuǎn)的執(zhí)行下去 futurePrice.completeExceptionally(e); } }).start(); //無(wú)需等待計(jì)算結(jié)果,直接返回future對(duì)象 return futurePrice; } /** * 同步api: * 每個(gè)商店都需要提供的查詢(xún)api:根據(jù)名稱(chēng)返回價(jià)格; * 模擬查詢(xún)數(shù)據(jù)庫(kù)等一些耗時(shí)操作:使用delay()模擬這些耗時(shí)操作。 * @param product * @return */ public double getPrice(String product){ return calculatePrice(product); } private double calculatePrice(String product){ delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } private Random random = new Random(); /** * 模擬耗時(shí)操作:延遲一秒 */ private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
下面我們針對(duì)Shop.java提供的同步方法與異步方法來(lái)進(jìn)行測(cè)試:
package BestPriceFinder; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; /** * 最佳價(jià)格查詢(xún)器 */ public class BestFinder { Listshops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); /** * 順序查詢(xún) */ public List findPrices(String product){ return shops.stream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 并行流查詢(xún) */ public List findPricesParallel(String product){ return shops.parallelStream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 異步查詢(xún) * 相比并行流的話(huà)CompletableFuture更有優(yōu)勢(shì):可以對(duì)執(zhí)行器配置,設(shè)置線(xiàn)程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守護(hù)線(xiàn)程保證不會(huì)阻止程序的關(guān)停 t.setDaemon(true); return t; } }); @SuppressWarnings("all") public List findPricesAsync(String product){ List > priceFuctures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor)) .collect(Collectors.toList()); /** 這里需要使用新的stream來(lái)等待所有的子線(xiàn)程執(zhí)行完, * 因?yàn)椋喝绻谝粋€(gè)stream中使用兩個(gè)map: * List > priceFuctures = shops.parallelStream() * .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))) * .map(c -> c.join()).collect(Collectors.toList()) * .collect(Collectors.toList()); * 考慮到流操作之間的延遲特性。如果你在單一的流水線(xiàn)中處理流,發(fā)向不同商家的請(qǐng)求只能以同步順序的方式執(zhí)行才會(huì)成功。因此每個(gè)創(chuàng)建CompletableFuture * 對(duì)象只能在前一個(gè)操作結(jié)束之后執(zhí)行查詢(xún)商家動(dòng)作。 */ return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList()); } }
@Test public void findPrices(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesParallel(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesAsync(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPricesAsync("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); }
同步api測(cè)試結(jié)果:毫無(wú)疑問(wèn)是10秒之上
并行流獲取同步api測(cè)試結(jié)果:也是10秒之上,但是并行流不是很高效嗎?怎么會(huì)如此凄慘呢?因?yàn)檫@與并行流可以調(diào)用的系統(tǒng)核數(shù)相關(guān),我的計(jì)算機(jī)是8核,最多8個(gè)線(xiàn)程同時(shí)運(yùn)行。而商店有10個(gè),也就是說(shuō),我們的兩個(gè)線(xiàn)程會(huì)一直等待前面的某一個(gè)線(xiàn)程釋放出空閑才能繼續(xù)運(yùn)行。
異步獲取api測(cè)試結(jié)果:一秒左右
為何差距如此大呢?
明智的選擇是創(chuàng)建了一個(gè)配有線(xiàn)程池的執(zhí)行器,線(xiàn)程池中線(xiàn)程的數(shù)目取決于你的應(yīng)用需要處理的負(fù)擔(dān),但是你該如何選擇合適的線(xiàn)程數(shù)目呢?
《Java并發(fā)編程實(shí)戰(zhàn)》中給出如下公式:
Number = NCpu * Ucpu * ( 1 + W/C) Number : 線(xiàn)程數(shù)量 NCpu : 處理器核數(shù) UCpu : 期望cpu利用率 W/C : 等待時(shí)間與計(jì)算時(shí)間比
我們這里:99%d的時(shí)間是等待商店響應(yīng) W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推斷出 number = 800。但是為了避免過(guò)多的線(xiàn)程搞死計(jì)算機(jī),我們選擇商店數(shù)與計(jì)算值中較小的一個(gè)。
【并行流與CompletableFuture目前,我們對(duì)集合進(jìn)行計(jì)算有兩種方式:1.并行流 2.CompletableFuture;而CompletableFuture更加的靈活,我們可以配置線(xiàn)程池的大小確保整體的計(jì)算不會(huì)因?yàn)榈却齀O而發(fā)生阻塞。
書(shū)上給出的建議如下:
如果是計(jì)算密集型的操作并且沒(méi)有IO推薦stream接口,因?yàn)閷?shí)現(xiàn)簡(jiǎn)單效率也高,如果所有的線(xiàn)程都是計(jì)算密集型的也就沒(méi)有必要?jiǎng)?chuàng)建比核數(shù)更多的線(xiàn)程。
反之,如果任務(wù)涉及到IO,網(wǎng)絡(luò)等操作:CompletableFuture靈活性更好,因?yàn)榇蟛糠志€(xiàn)程處于等待狀態(tài),需要讓他們更加忙碌,并且再邏輯中加入異常處理可以更有效的監(jiān)控是什么原因觸發(fā)了等待。
現(xiàn)在我們知道了如何用CompletableFuture提供異步的api,后面的文章會(huì)學(xué)習(xí)如何利用CompletableFuture高效的操作同步api。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/68219.html
摘要:相比與其他操作系統(tǒng)包括其他類(lèi)系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項(xiàng)就是,其上下文切換和模式切換的時(shí)間消耗非常少。因?yàn)槎嗑€(xiàn)程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線(xiàn)程的使用。很多編程語(yǔ)言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時(shí)至關(guān)重要。 系列文章傳送門(mén): Java多線(xiàn)程學(xué)習(xí)(一)Java多線(xiàn)程入門(mén) Java多線(xiàn)程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線(xiàn)程學(xué)習(xí)(二)syn...
摘要:因?yàn)槎嗑€(xiàn)程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線(xiàn)程的使用。舉個(gè)例子如果說(shuō)服務(wù)器的帶寬只有,某個(gè)資源的下載速度是,系統(tǒng)啟動(dòng)個(gè)線(xiàn)程下載該資源并不會(huì)導(dǎo)致下載速度編程,所以在并發(fā)編程時(shí),需要考慮這些資源的限制。 最近私下做一項(xiàng)目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項(xiàng)目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門(mén): Java多線(xiàn)程學(xué)習(xí)(一)Java多線(xiàn)程入門(mén) Jav...
摘要:學(xué)習(xí)編程的本最佳書(shū)籍這些書(shū)涵蓋了各個(gè)領(lǐng)域,包括核心基礎(chǔ)知識(shí),集合框架,多線(xiàn)程和并發(fā),內(nèi)部和性能調(diào)優(yōu),設(shè)計(jì)模式等。擅長(zhǎng)解釋錯(cuò)誤及錯(cuò)誤的原因以及如何解決簡(jiǎn)而言之,這是學(xué)習(xí)中并發(fā)和多線(xiàn)程的最佳書(shū)籍之一。 showImg(https://segmentfault.com/img/remote/1460000018913016); 來(lái)源 | 愿碼(ChainDesk.CN)內(nèi)容編輯 愿碼Slo...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)常可見(jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線(xiàn)程安全并且高效的,在并發(fā)編程中經(jīng)常可見(jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話(huà),看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...
閱讀 2812·2019-08-30 15:55
閱讀 2858·2019-08-30 15:53
閱讀 2296·2019-08-26 13:47
閱讀 2558·2019-08-26 13:43
閱讀 3157·2019-08-26 13:33
閱讀 2805·2019-08-26 11:53
閱讀 1798·2019-08-23 18:35
閱讀 801·2019-08-23 17:16