摘要:在上文的代碼中,多線程累加的程序之所以會比單線程還慢得多就是因為在類型的靜態(tài)變量上有兩個線程同時調(diào)用方法進行累加,這就會導(dǎo)致在這個靜態(tài)變量上存在很嚴重的沖突。使用進行任務(wù)中引入了一個新的多線程任務(wù)執(zhí)行框架,被稱為。
雖然對于一個計算機程序來說最重要的是正確性,如果一個程序沒辦法產(chǎn)出正確的結(jié)果,那么這個程序的價值就大打折扣了。但程序性能也是很重要的一個方面,如果程序運行得太慢,那也會影響到程序的適用范圍和硬件配置的成本。
在之前的文章《4.多線程中那些看不到的陷阱》中,我們了解了線程間的同步機制,這主要是為了保證程序在多線程環(huán)境下的正確性。在這篇文章中我們將會深入探究多線程程序的性能瓶頸和多種不同的優(yōu)化方式,那么我們首先就從對程序性能的測量與分析開始吧。
分析多線程程序的性能我們先來看一個使用AtomicLong進行多線程計數(shù)的程序,下面的程序中會啟動兩個線程,每個線程會對靜態(tài)變量count進行一億次(10的8次方)的累加操作,這段代碼在開始和結(jié)束的時候都獲取了當前時間,然后通過這兩個時間值計算程序的運行耗時。
public class AtomicIntegerTest { private static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); Runnable task = new Runnable() { public void run() { for (int i = 0; i < 1e8; ++i) { count.incrementAndGet(); } } }; Thread t1 = new Thread(task); Thread t2 = new Thread(task); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("count = " + count); long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
在我的電腦上運行這段程序最后輸出的結(jié)果是2.44s,看起來有點長了,那么我們再看一下如果直接在單個線程中對一個整型變量累加兩億次會是什么結(jié)果。下面是一個在單個線程中累加兩億次的程序代碼:
public class SingleThreadTest { public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); long count = 0; for (int i = 0; i < 2e8; ++i) { count += 1; } System.out.println("count = " + count); long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
這段代碼運行耗時只有0.33s,我們通過兩個線程進行累加的代碼竟然比單線程的代碼還要慢得多,這不是就和我們使用多線程加快程序運行的初衷相違背了嗎?
多線程程序的線程間同步是影響多線程程序性能的關(guān)鍵所在,一方面程序中必須串行化的部分會使系統(tǒng)整體的耗時顯著增加,另一方面同步行為本身的開銷也比較大,特別是在發(fā)生沖突的情況下。在上文的代碼中,多線程累加的程序之所以會比單線程還慢得多就是因為在AtomicLong類型的靜態(tài)變量count上有兩個線程同時調(diào)用incrementAndGet方法進行累加,這就會導(dǎo)致在這個靜態(tài)變量上存在很嚴重的沖突。
當一個線程成功修改了變量count的值后,另外一個正在修改的線程就會修改失敗并且會再次重試累加操作。并且因為AtomicLong類型的對象中是用一個volatile變量來保存實際的整型值的,而我們在之前的文章《多線程中那些看不到的陷阱》中可以了解到,對volatile變量的修改操作一定要把修改后的數(shù)據(jù)從高速緩存寫回內(nèi)存當中,這也是用AtomicLong進行累加的耗時比單線程累加版本還要多這么多的主要原因。
那么我們有沒有更好的方法可以解決這個問題呢?
使用任務(wù)拆分進行優(yōu)化在上面的例子中,我們需要的只是最終累加的結(jié)果,所以為了減小線程間同步的開銷,我們可以將累加任務(wù)拆分到不同的線程中執(zhí)行,到最后再把每個線程的結(jié)果加在一起就可以得到最終的結(jié)果了。在下面的代碼中我們就使用了這種方法,t1在count1上累加一億次,t2在count2上累加一億次,最后把count1和count2相加得到最終的結(jié)果,我們來一起運行一下,看看效果如何。
public class TwoThreadTest { private static long count1 = 0; private static long count2 = 0; public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); Thread t1 = new Thread(new Runnable() { public void run() { for (int i = 0; i < 1e8; ++i) { count1 += 1; } } }); Thread t2 = new Thread(new Runnable() { public void run() { for (int i = 0; i < 1e8; ++i) { count2 += 1; } } }); t1.start(); t2.start(); t1.join(); t2.join(); long count = count1 + count2; System.out.println("count = " + count); long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
這段程序在我的電腦上的耗時是0.20s,比之前單線程的0.33s有了不小的提高,更是遙遙領(lǐng)先原先用兩個線程累加AtomicLong類型變量版本的2.44s。這說明我們之前的分析是正確的,CAS重試和volatile寫回內(nèi)存兩個操作所引起的開銷是AtomicLong版本程序性能低下的罪魁禍首。
但是這個版本的結(jié)構(gòu)還是略顯原始了,在應(yīng)付累加這種簡單的需求時可能還比較容易,但是一旦面臨復(fù)雜的并發(fā)任務(wù),那可能就要寫很多復(fù)雜的代碼,并且很容易出現(xiàn)錯誤了。例如我們?nèi)绻氚讶蝿?wù)拆分到10個線程中運行,那么我們就要首先把兩億次的累加任務(wù)拆分為10份,然后還要創(chuàng)建一個包含10個Thread對象的數(shù)組讓他們分別的對不同范圍進行累加,最后還要通過join方法等待這10個線程都執(zhí)行完成,這個任務(wù)聽起來就不太容易。沒關(guān)系,下面我們將會介紹一種目前比較常用的任務(wù)拆分與運行框架來解決這個問題,通過這個框架我們可以很容易地寫出易于編寫和擴展的任務(wù)拆分式程序。
使用ForkJoinPool進行任務(wù)JDK 1.7中引入了一個新的多線程任務(wù)執(zhí)行框架,被稱為ForkJoinPool。ForkJoinPool是一個Java類,它實現(xiàn)了代表線程池功能的ExecutorService接口,所以它在使用方法上和常用的線程池類ThreadPoolExecutor相似,但在本節(jié)中我們并不需要了解線程池的詳細用法,不過感興趣的讀者可以參考這篇文章從0到1玩轉(zhuǎn)線程池來了解一下。
線程池就是一個線程的集合,其中的線程會一直等待執(zhí)行任務(wù),所以我們可以把任務(wù)以任務(wù)對象的形式提交到線程池,然后線程池就會利用其中的線程來執(zhí)行任務(wù)。在ForkJoinPool的使用中,線程池指的就是ForkJoinPool類型的對象,而任務(wù)對象指的就是繼承自ForkJoinTask的類的對象。在下面的示例代碼中,我們使用了自定義的RecursiveTask的子類來作為任務(wù)類,RecursiveTask類就繼承自ForkJoinTask類。
Recursive的意思是遞歸,也就是說我們在這個任務(wù)類的執(zhí)行過程中可能會創(chuàng)建新的任務(wù)類對象來代表當前任務(wù)的子任務(wù),然后通過結(jié)合多個子任務(wù)的結(jié)果來返回當前任務(wù)的結(jié)果。比如一開始的任務(wù)是累加兩億次,但那么我們就可以把它分為兩個分別累加一億次的子任務(wù)的結(jié)果之和,同樣的道理,累加一億次的子任務(wù)也可以再被分為兩個累加五千萬次的子子任務(wù)。這樣的拆分會一直持續(xù)到我們認為任務(wù)規(guī)模已經(jīng)足夠小的時候,這時子任務(wù)的結(jié)果就會被計算,然后再返回給上層任務(wù)進行處理之后就得到上層任務(wù)的結(jié)果了。
如果前面這一段文字描述看不明白也沒關(guān)系,我們在代碼中找一找答案:
public class ForkJoinTest { private static class AccumulateTask extends RecursiveTask{ private long start; private long end; private long threshold; /** * 任務(wù)的構(gòu)造函數(shù) * * @param start 任務(wù)處理范圍的起始點(包含) * @param end 任務(wù)處理范圍的結(jié)束點(不包含) * @param threshold 任務(wù)拆分的閾值 */ public AccumulateTask(long start, long end, long threshold) { this.start = start; this.end = end; this.threshold = threshold; } @Override protected Long compute() { long left = start; long right = end; // 終止條件:如果當前處理的范圍小于等于閾值(threshold), // 那么就直接通過循環(huán)執(zhí)行累加操作 if (right - left <= (int) threshold) { long result = 0; for (long i = left; i < right; ++i) { result += 1; } return result; } // 獲取當前處理范圍的中心點 long mid = (start + end) / 2; // 拆分出兩個子任務(wù),一個從start到mid,一個從mid到end ForkJoinTask leftTask = new AccumulateTask(start, mid, threshold); ForkJoinTask rightTask = new AccumulateTask(mid, end, threshold); // 通過當前線程池運行兩個子任務(wù) leftTask.fork(); rightTask.fork(); try { // 獲取兩個子任務(wù)的結(jié)果并返回 return leftTask.get() + rightTask.get(); } catch (Exception e) { return 0L; } } } public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); // 創(chuàng)建總?cè)蝿?wù),范圍是從1到兩億(包含),閾值為10的7次方,所以最終至少會有10個任務(wù)進行for循環(huán)的累加 AccumulateTask forkJoinTask = new AccumulateTask(1, (int) 2e8+1, (long) 1e7); // 使用一個新創(chuàng)建的ForkJoinPool任務(wù)池運行ForkJoin任務(wù) new ForkJoinPool().submit(forkJoinTask); // 打印任務(wù)結(jié)果 System.out.println("count = " + forkJoinTask.get()); // 計算程序耗時并打印 long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
在上面的代碼中,我們會不斷創(chuàng)建在指定范圍內(nèi)累加的子任務(wù),直到任務(wù)范圍小于閾值threshold(在代碼中是10的7次方)時才不再拆分子任務(wù),而是通過循環(huán)來得到累加結(jié)果。之后子任務(wù)的返回結(jié)果在上層任務(wù)中相加并作為上層任務(wù)的結(jié)果返回。到最后我們就可以得到累加兩億次的結(jié)果了。
在這個程序中,最重要的是對任務(wù)對象的三個操作:
創(chuàng)建任務(wù)對象,代碼中使用的是new AccumulateTask(start, mid, threshold)和new AccumulateTask(mid, end, threshold),這兩段代碼會創(chuàng)建除了范圍不同其他邏輯與父任務(wù)完全一致的子任務(wù),每個子任務(wù)會負責(zé)執(zhí)行父任務(wù)范圍中的一半;
執(zhí)行子任務(wù),通過調(diào)用任務(wù)對象的fork()方法可以讓子任務(wù)被提交到當前ForkJoinPool中執(zhí)行;
等待子任務(wù)返回結(jié)果,通過調(diào)用子任務(wù)任務(wù)對象的get()方法,父任務(wù)將會等待子任務(wù)執(zhí)行完成并返回結(jié)果,然后將兩個子任務(wù)的結(jié)果相加得到父任務(wù)的執(zhí)行結(jié)果。
為什么同樣都是線程池,但是ThreadPoolExecutor類就很難實現(xiàn)這樣的執(zhí)行方式呢?細心的讀者可能已經(jīng)發(fā)現(xiàn)了,我們在一個任務(wù)中會拆分出兩個子任務(wù),并且要等待這兩個子任務(wù)都執(zhí)行完成才能返回父任務(wù)的結(jié)果。如果是在ThreadPoolExecutor中,在等待子任務(wù)運行完成得到結(jié)果時,父任務(wù)會一直阻塞并且占用一個線程,這樣的話如果父任務(wù)太多就會導(dǎo)致子任務(wù)沒有線程可供使用了,這個運行流程就沒辦法繼續(xù)執(zhí)行下去了。而ForkJoinPool這個特殊的線程池就解決了這個問題,父任務(wù)在等待子任務(wù)執(zhí)行時可以讓出線程給其他任務(wù),這樣就不會導(dǎo)致線程都被阻塞狀態(tài)的父任務(wù)所阻塞了。
這種將任務(wù)拆分為互不依賴的子任務(wù),然后分別在不同的線程上執(zhí)行,最后再將結(jié)果進行逐步合并的方法就被稱為Map-Reduce。這種方法在離線大數(shù)據(jù)技術(shù)中被廣泛應(yīng)用,甚至可以說大數(shù)據(jù)相關(guān)技術(shù)就是在Map-Reduce思想基礎(chǔ)上發(fā)展起來的也不為過。
線程內(nèi)變量通過上面的幾個例子,我們可以看到,對于多線程程序來說,共享數(shù)據(jù)就是最大的問題。共享數(shù)據(jù)不但可能引起數(shù)據(jù)競爭問題,導(dǎo)致程序出現(xiàn)問題;而且隨著引入的線程同步操作又會拉低程序的性能,甚至可能使多線程程序的執(zhí)行時間比單線程程序還長得多。在上面的例子中,我們通過使用ForkJoinPool拆分并執(zhí)行了一個累加任務(wù),各個子任務(wù)之間基本完全獨立,做到了最大程度的并行化。但是在一些情況下,我們可能沒辦法做到如此理想的方案,在一些情況下還是會留有一定的線程同步操作和對應(yīng)的代碼臨界區(qū)。
那么在這些情況下我們?nèi)绾翁幚砟茏尦绦虻男阅鼙M可能高呢?
假設(shè)我們現(xiàn)在要統(tǒng)計一個方法的調(diào)用次數(shù),如果可能有多個線程同時調(diào)用該方法,那么就需要對多個線程的調(diào)用同時計數(shù)。這種情況下我們可以考慮在每個線程里各自保留一個整型變量用于保存每個線程內(nèi)的調(diào)用次數(shù),然后在獲取總數(shù)時只需要把每個線程中的數(shù)量加在一起就能計算出來了。而在累加計數(shù)時我們只需要修改當前線程對應(yīng)的變量就可以了,自然就沒有了數(shù)據(jù)競爭問題。java.util.concurrent.atomic包中的累加器LongAdder類采用的就是這樣的思路,這種思路也有自己專門的專業(yè)術(shù)語,被稱為“線程封閉”,線程封閉指的就是這種通過線程內(nèi)變量來避免線程間共享數(shù)據(jù)的優(yōu)化方式。
Java中也有專門的ThreadLocal類可以處理線程內(nèi)變量,只是因為性能和線程銷毀時的數(shù)據(jù)保存之類的原因一般不會用于多線程累加這樣的數(shù)據(jù)聚合場景,但是在保存和獲取數(shù)據(jù)方面非常的便利,有興趣的讀者可以了解一下。
ConcurrentHashMapjava.util.concurrent包為我們提供了鎖、原子類、線程池、ForkJoinPool等一大批并發(fā)編程工具。最后,我們來了解一下java.util.concurrent包中為我們提供的一種線程安全數(shù)據(jù)結(jié)構(gòu)。
在Java中,我們常用的Map類是HashMap,但是這個類并不是線程安全的,如果我們在多個線程中同時對HashMap對象進行讀寫,那么就有可能引發(fā)一些程序問題。還有一個從JDK 1.0起就已經(jīng)存在的Hashtable類可以保證線程安全,但是我們打開這個類的源代碼可以看到,這個類中的大部分方法上都加上了synchronized標記,其中包括了最常用的get和put方法,這意味著Hashtable類的對象同一時間幾乎只能被一個線程所使用,這樣的效率相對是比較低的。
但是其實熟悉HashMap結(jié)構(gòu)的朋友可能會知道,HashMap內(nèi)部的結(jié)構(gòu)是分為很多的桶的,每個鍵值對都會根據(jù)key值的hashCode值被放到不同的桶中。其實在做修改操作時我們只需要對對應(yīng)的一個桶加鎖就可以了,而在執(zhí)行讀操作時,在大多數(shù)情況下是不用加鎖的。JDK 1.5中引入的ConcurrentHashMap基本能達到這兩點。
在這個類中,我們通過兩種方式來優(yōu)化了并發(fā)的性能:
通過限制鎖保護的代碼范圍來減少了鎖沖突發(fā)生的可能性,而且也減少了需要的鎖數(shù)量,減少了同步產(chǎn)生的開銷;
另一方面因為讀取的時候并不需要加鎖,而只有寫操作才需要加鎖,在一些讀操作較多但是寫操作較少的情況下,我們就能大大降低讀操作的成本,從而提高了程序的性能。
而且ConcurrentHashMap中的大多數(shù)方法不僅優(yōu)化了同步機制的效率,而且提供了很多原子性的類似于CAS的操作方法,下面是ConcurrentHashMap類常用的操作:
V putIfAbsent(K key, V value),原子性操作,如果map中不包含key,則執(zhí)行map.put(key, value)并將put方法的返回值返回,否則直接返回map.get(key)的值,即當前值;
boolean remove(Object key, Object value),原子性操作,如果滿足條件 (map中包含key對應(yīng)的鍵值對 && value參數(shù)等于鍵值對中當前的value值) 則移除key對應(yīng)的鍵值對并返回true,否則返回false;
boolean replace(K key, V oldValue, V newValue),原子性操作,當滿足 (map中包含key對應(yīng)的鍵值對 && oldValue參數(shù)等于鍵值對中當前的value值)時,將key對應(yīng)的值改為newValue并返回true,否則返回false。
總結(jié)我們在這篇文章中從對一個使用AtomicLong進行多線程累加的程序的性能測試開始,通過Map-Reduce思想大大優(yōu)化了這個程序的性能,在此過程中還涉及到了ForkJoinPool類的使用。之后我們通過線程內(nèi)變量正式提出了“線程封閉”的概念,如果我們能做到線程封閉,那么因為減少了線程間同步的開銷,所以線程的性能一定會有很大的提高。最后我們介紹了java.util.concurrent包中為我們提供的并發(fā)安全數(shù)據(jù)類ConcurrentHashMap。相信通過這篇文章大家能夠了解到多種多線程性能優(yōu)化方法,但最重要的還是要找出多線程程序性能的瓶頸所在,這樣才能在實際的實踐場景中根據(jù)不同的情況因時制宜,使用合適的方法解決不同的瓶頸問題。本文中的觀點是多線程程序的瓶頸主要在于共享數(shù)據(jù)引起的數(shù)據(jù)競爭問題,如果能夠讓不同的線程間不存在或者盡可能少地存在共享數(shù)據(jù)與臨界區(qū)代碼,就能夠?qū)Χ嗑€程程序的性能起到正面的影響。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77883.html
摘要:在這個范圍廣大的并發(fā)技術(shù)領(lǐng)域當中多線程編程可以說是基礎(chǔ)和核心,大多數(shù)抽象并發(fā)問題的構(gòu)思與解決都是基于多線程模型來進行的。一般來說,多線程程序會面臨三類問題正確性問題效率問題死鎖問題。 多線程編程或者說范圍更大的并發(fā)編程是一種非常復(fù)雜且容易出錯的編程方式,但是我們?yōu)槭裁催€要冒著風(fēng)險艱辛地學(xué)習(xí)各種多線程編程技術(shù)、解決各種并發(fā)問題呢? 因為并發(fā)是整個分布式集群的基礎(chǔ),通過分布式集群不僅可以大...
摘要:限制同時運行線程數(shù)使用類就行,在內(nèi)部管理著一個計數(shù)器。當計數(shù)器到時,再調(diào)用就會阻塞,直到其他線程來調(diào)用,這樣就限制了同時運行線程的數(shù)量。 事前最好了解一下什么是進程,什么是線程,什么是GIL,本文不再贅述,直接介紹模塊的使用: 推薦1,推薦2,推薦3,更多自尋 普通的python爬蟲是單進程單線程的,這樣在遇到大量重復(fù)的操作時就只能逐個進行,我們就很難過了。舉個栗子:你有1000個...
摘要:在回調(diào)隊列中,函數(shù)等待調(diào)用棧為空,因為每個語句都執(zhí)行一次。最后一個運行,并且從調(diào)用棧中彈出。它將回調(diào)以先進先出順序移動到調(diào)用棧并執(zhí)行。 翻譯:瘋狂的技術(shù)宅原文: https://medium.freecodecamp.o... 本文首發(fā)微信公眾號:前端先鋒歡迎關(guān)注,每天都給你推送新鮮的前端技術(shù)文章 Node.js 是一個 JavaScript 運行時環(huán)境。聽起來還不錯,不過這究竟...
閱讀 696·2021-11-22 09:34
閱讀 3830·2021-09-22 15:42
閱讀 1342·2021-09-03 10:28
閱讀 1079·2021-08-26 14:13
閱讀 1911·2019-08-29 15:41
閱讀 1436·2019-08-29 14:12
閱讀 3374·2019-08-26 18:36
閱讀 3316·2019-08-26 13:47