摘要:某些編程語言被設(shè)計為可以將并發(fā)任務(wù)彼此隔離,這些語言通常被稱為函數(shù)性語言。通過使用多線程機(jī)制,這些獨立任務(wù)也被稱為子任務(wù)中的每一個都將由執(zhí)行線程來驅(qū)動。
并發(fā)
之前學(xué)的都是順序編程的知識,學(xué)習(xí)并發(fā)編程就好像進(jìn)入了一個全新的領(lǐng)域,有點類似于學(xué)習(xí)了一門新的編程語言,或者至少是學(xué)習(xí)了一整套新的語言概念。要理解并發(fā)編程,其難度與理解面向?qū)ο缶幊滩畈欢?。如果花點兒功夫,就能明白其基本機(jī)制,但想要抓住其本質(zhì),就需要深入的學(xué)習(xí)和理解。所以看完《Java編程思想》或許會變得過分自信,但寫復(fù)雜的多線程時,應(yīng)該多看其他多線程的書籍,關(guān)鍵還是多動手。
“并發(fā)是一種具有可論證的確定性,但實際上具有不可確定性?!?
使用并發(fā)時,你的自食其力,并且只有變得多疑而自信,才能用Java編寫出可靠的多線程代碼。
用并發(fā)解決的問題大體可以分為“速度”和“設(shè)計可管理性”兩種。 速度并發(fā)解決“速度”問題不僅僅是利用多個CPU去解決分片的問題,也就是說并發(fā)不僅僅是多個CPU的事情,也是單個CPU的事情。如果提高程序在單個CPU的性能,就得考慮具體情況,正常情況單個CPU運行多任務(wù)(task)是有上下文切換的性能損耗。但在阻塞(Blocking)的情況下就不同了。
我們先看看阻塞的定義:如果程序中的某個任務(wù)因為該程序控制范圍之外的某些條件(通常是I/O),那我們就說這個任務(wù)或線程阻塞了。
如果使用并發(fā)來寫這個阻塞程序,在一個任務(wù)阻塞時,程序中的其他任務(wù)還可以繼續(xù)執(zhí)行。這樣性能會有很大的提升。所以如果沒有阻塞的情況,在單CPU使用并發(fā),就沒必要了。
在單個CPU的系統(tǒng)中性能提高的常見示例:事件驅(qū)動編程(event-driven programing)。
實現(xiàn)并發(fā)最直接的方式是在操作系統(tǒng)級別使用進(jìn)程(process)。多任務(wù)操作系統(tǒng)可以通過周期性地將CPU從一個進(jìn)程切換到另一個進(jìn)程,來實現(xiàn)同時運行多個進(jìn)程(程序)。
某些編程語言被設(shè)計為可以將并發(fā)任務(wù)彼此隔離,這些語言通常被稱為函數(shù)性語言。Erlang就是這樣的語言,它包含針對任務(wù)之間彼此通信的安全機(jī)制。如果你發(fā)現(xiàn)程序中某個部分必須大量使用并發(fā),并且你在試圖構(gòu)建這個部分時遇到過多的問題。那么你可以考慮使用像Erlang這類專門的并發(fā)語言來創(chuàng)建這個部分。
Java語言采用更加傳統(tǒng)的方式,在順序語言的基礎(chǔ)上提供對線程的支持。 Java的目的是“編寫一次,到處運行”,所以在OSX之前的Macintosh操作系統(tǒng)版本是不支持多任務(wù),因此Java支持多線程機(jī)制,讓并發(fā)Java程序能夠移植到Macintosh和類似的平臺上。
設(shè)計可管理性設(shè)計可管理性,我更愿意說是一個解決問題的方法模型(程序設(shè)計)。線程使你能夠創(chuàng)建更加松散耦合的設(shè)計。
在單CPU上使用多任務(wù)的程序(代碼)在任意時刻仍然只能執(zhí)行一項任務(wù),因此理論上講,肯定可以不用任何任務(wù)就可以編寫相同的程序。但是,這樣寫來的代碼可能會很混亂,不方便維護(hù)。因此并發(fā)提供一種重要的組織結(jié)構(gòu)上的好處:你的程序設(shè)計可以極大地簡化。某些類似的問題,例如仿真,沒有并發(fā)的支持是很難解決的。
搶占式調(diào)度指的是每條線程執(zhí)行的時間、線程的切換都是由系統(tǒng)控制,每條線程可能都分同樣的的執(zhí)行時間片(CPU切片),也可能是在某些線程執(zhí)行的時間片較長,甚至某些線程得不到執(zhí)行時間片。這種機(jī)制下,優(yōu)點是一個線程阻塞不會導(dǎo)致整個進(jìn)程堵塞,缺點就是上下文切換開銷大。
協(xié)同式調(diào)度指的是某一條線程執(zhí)行完后主動通知系統(tǒng)切到另一條線程上執(zhí)行。線程的執(zhí)行時間由線程本身控制,線程切換可以預(yù)知。優(yōu)點是不存在多線程同步問題,上下文切換開銷小,缺點是如果一個線程阻塞了,那么可能造成整個系統(tǒng)崩潰。
Java線程機(jī)制是搶占式.
線程讓出cpu的情況:
1.當(dāng)前運行線程主動放棄CPU,JVM暫時放棄CPU操作(基于時間片輪轉(zhuǎn)調(diào)度的JVM操作系統(tǒng)不會讓線程永久放棄CPU,或者說放棄本次時間片的執(zhí)行權(quán)),例如調(diào)用yield()方法。
2.當(dāng)前運行線程因為某些原因進(jìn)入阻塞狀態(tài),例如阻塞在I/O上。
3.當(dāng)前運行線程結(jié)束,即運行完run()方法里面的任務(wù)
并發(fā)需要付出代價,包含復(fù)雜性代價。但這些代價與優(yōu)化程序設(shè)計、資源負(fù)載均衡以及用戶體驗上的改進(jìn)相比,這些代價就顯得微不足道。
線程帶來設(shè)計上的演變為了獲取線程的結(jié)果,于是產(chǎn)生輪詢,然后再后來為了解決輪詢,引進(jìn)了靜態(tài)方法的回調(diào),再后來帶來實例方法的回調(diào),最后引出設(shè)計模式:策略模式 和Java5引進(jìn)多線程編程的新方法,通過隱藏細(xì)節(jié)可以更容易地處理回調(diào)——ExecutorService和Futrue
輪詢例子:
package com.jc.thread; import com.jc.thinkinjava.io.util.Directory; import javax.xml.bind.DatatypeConverter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; /** * 回調(diào)例子-前序 * * 計算文件的256位的SHA-2消息摘要 * 由于瓶頸在IO上,所以采用多線程 * * 嘗試去獲取線程返回的值,但發(fā)現(xiàn)需要另外個線程不停的輪詢,這是很耗cpu資源 */ @SuppressWarnings("Duplicates") public class ReturnDigest extends Thread { private String fileName; private byte[] digest; public ReturnDigest(String fileName) { this.fileName = fileName; } @Override public void run() { try { // System.out.println(fileName); FileInputStream in = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream digestInputStream = new DigestInputStream(in, sha); while (digestInputStream.read() != -1) ; digestInputStream.close(); digest = sha.digest(); //注意,不是DigestInputStream的方法哦 StringBuilder sb = new StringBuilder(fileName); sb.append(":").append(DatatypeConverter.printHexBinary(digest)); System.out.println(sb.toString()); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public byte[] getDigest() { return this.digest; } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); ListfileList = new ArrayList (); for (int i = 0; i < files.length; i++) { File file = files[i]; if (!file.isDirectory()) { fileList.add(file); } } ReturnDigest[] digests = new ReturnDigest[fileList.size()]; for (int i = 0; i < fileList.size(); i++) { File file = fileList.get(0); digests[i] = new ReturnDigest(file.getAbsolutePath()); digests[i].start(); } for(int i=0;i 然后為了解決輪詢,產(chǎn)生了靜態(tài)方法的回調(diào):
package com.jc.thread.callback; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; /** * 回調(diào)例子 * 靜態(tài)方法的回調(diào) */ @SuppressWarnings("Duplicates") public class CallbackDigest implements Runnable{ private String fileName; public CallbackDigest(String fileName) { this.fileName = fileName; } @Override public void run() { try { // System.out.println(fileName); FileInputStream in = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream digestInputStream = new DigestInputStream(in, sha); while (digestInputStream.read() != -1) ; digestInputStream.close(); byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦 CallbackDigestUserInterface.receiveDigest(digest,fileName); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }package com.jc.thread.callback; import com.jc.thinkinjava.io.util.Directory; import com.jc.thread.DigestRunnable; import javax.xml.bind.DatatypeConverter; import java.io.File; /** * 回調(diào)例子 * 靜態(tài)方法的回調(diào) */ public class CallbackDigestUserInterface { public static void receiveDigest(byte[] digest,String fileName){ StringBuilder sb = new StringBuilder(fileName); sb.append(":").append(DatatypeConverter.printHexBinary(digest)); System.out.println(sb.toString()); } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); for (File file : files) { if (!file.isDirectory()) new Thread(new DigestRunnable(file.getAbsolutePath())).start(); } } }實例方法的回調(diào):
package com.jc.thread.callback; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class InstanceCallbackDigest implements Runnable{ private String fileName; private InstanceCallbackDigestUserInterface callback; public InstanceCallbackDigest(String fileName, InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface) { this.fileName = fileName; this.callback = instanceCallbackDigestUserInterface; } @Override public void run() { try { // System.out.println(fileName); FileInputStream in = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream digestInputStream = new DigestInputStream(in, sha); while (digestInputStream.read() != -1) ; digestInputStream.close(); byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦 callback.receiveDigest(digest); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }package com.jc.thread.callback; import com.jc.thinkinjava.io.util.Directory; import com.jc.thread.ReturnDigest; import javax.xml.bind.DatatypeConverter; import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * 回調(diào)例子 ** 使用實例方法代替靜態(tài)方法進(jìn)行回調(diào) *
* 雖然復(fù)雜點,但優(yōu)點很多。如: * 1. 主類(InstanceCallbackDigestUserInterface)的各個實例映射為一個文件,可以很自然地記錄跟蹤這個文件的信息,而不需要額外的數(shù)據(jù)結(jié)構(gòu) * 2. 這個實例在有必要時可以容易地重新計算某個特定文件的摘要 *
* 實際上,經(jīng)證明,這種機(jī)制有更大的靈活性。 *
* 這種機(jī)制,也稱為:觀察者模式,如Swing、AWT */ public class InstanceCallbackDigestUserInterface { private String fileName; private byte[] digest; public InstanceCallbackDigestUserInterface(String fileName) { this.fileName = fileName; } public void calculateDigest() { InstanceCallbackDigest instanceCallbackDigest = new InstanceCallbackDigest(fileName, this); new Thread(instanceCallbackDigest).start(); } public void receiveDigest(byte[] digest) { this.digest = digest; System.out.println(this); } @Override public String toString() { String result = fileName + ": "; if (digest != null) { result += DatatypeConverter.printHexBinary(digest); } else { result += "digest not available"; } return result; } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); List
fileList = new ArrayList (); for (int i = 0; i < files.length; i++) { File file = files[i]; if (!file.isDirectory()) { fileList.add(file); } } for (int i = 0; i < fileList.size(); i++) { File file = fileList.get(0); InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface = new InstanceCallbackDigestUserInterface(file.getAbsolutePath()); instanceCallbackDigestUserInterface.calculateDigest(); } } } Java5引進(jìn)的新方法,ExecutorService和Future:
package com.jc.thread.callback; import java.util.concurrent.Callable; public class FindMaxTask implements Callable{ private int[] data; private int start; private int end; public FindMaxTask(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } @Override public Integer call() throws Exception { int max = Integer.MAX_VALUE; for (int i = start; i < end; i++) { if (data[i] > max) max = data[i]; } return max; } } package com.jc.thread.callback; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * * Java5引入了多線程編程的一個新方法,通過隱藏細(xì)節(jié)可以更容易地處理回調(diào) * 使用回調(diào)實現(xiàn)的Futrue */ public class MultithreadedMaxFinder { public static int max(int[] data) throws ExecutionException, InterruptedException { if (data.length == 1) { return data[0]; } else if (data.length == 0) { throw new IllegalArgumentException(); } FindMaxTask task1 = new FindMaxTask(data,0,data.length/2); FindMaxTask task2 = new FindMaxTask(data,data.length/2,data.length); ExecutorService executorService = Executors.newFixedThreadPool(2); Future基本線程機(jī)制future1 = executorService.submit(task1); Future future2 = executorService.submit(task2); //調(diào)用future1.get()時,這個方法會進(jìn)行阻塞,等待第一個FindMaxTask完成。只有當(dāng)?shù)谝粋€FindMaxTask完成,才會調(diào)用future2.get() return Math.max(future1.get(),future2.get()); } } 并發(fā)編程使我們可以將程序劃分為多個分離的、獨立運行的任務(wù)。通過使用多線程機(jī)制,這些獨立任務(wù)(也被稱為子任務(wù))中的每一個都將由執(zhí)行線程來驅(qū)動。
線程模型:一個線程就是進(jìn)程中的一個單一順序控制流,因此單個進(jìn)程可以擁有多個并發(fā)執(zhí)行的任務(wù),感覺每個任務(wù)都好像有其CPU一樣,其底層機(jī)制是切分CPU時間,但通常不用考慮CPU的切片。
線程模型為編程帶來便利,它簡化了在單一程序中同時交織在一起的多個操作的處理。在使用線程時,CPU將輪流給每個任務(wù)分配其占用時間。線程的一大好處是可以使你從這一個層次抽身出來,即代碼不必知道它是運行在具有一個還是多個CPU的機(jī)子上。
所以,使用線程機(jī)制是一種建立透明的、可擴(kuò)展的程序的方法,如果程序運行得太慢,為機(jī)器增添一個CPU就能容易地加快程序的運行速度。多任務(wù)和多線程往往是使用多處理器系統(tǒng)的最合理方式。//此方法調(diào)用是對 線程調(diào)度器 的一種建議:我已經(jīng)執(zhí)行完生命周期中最重要的部分了,此刻正是切換給其他任務(wù)執(zhí)行一段時間的大好時機(jī)。 Thread.yield();Thread.yield();這個方法叫“讓步”,不過沒有任何機(jī)制保證它將會被采納。
術(shù)語在Java中學(xué)習(xí)并發(fā)編程,總是會讓人困惑。讓人困惑是那些概念,特別是涉及到線程。
要執(zhí)行的任務(wù)和驅(qū)動它的線程,這里的任務(wù)和線程是不同的,在Java中會更明細(xì),因為你對Thread類實際沒有任何控制權(quán)(特別是使用Executor時候)。通過某種方式,將任務(wù)附著到線程,以使這個線程可以驅(qū)動任務(wù)。
在Java中,Thread類自身不執(zhí)行任何操作,它只是驅(qū)動賦予它的任務(wù),但是線程的一些研究中,總是使用這樣的話語“線程執(zhí)行這項或那項動作”,仿佛“線程就是任務(wù)”。這一點是讓新人是十分困惑的。因為會讓人覺得任務(wù)和線程是一種“是一個”的關(guān)系。覺得應(yīng)該從Thread繼承出一個任務(wù)。但實際不是,所以用Task名字會更好。
那為什么Java設(shè)計者不用Task而用Thread或Runnable呢? 之所以有上述的困惑(概念混淆),那是因為,雖然從概念上講,我們應(yīng)該只關(guān)注任務(wù),而不需要關(guān)注線程的細(xì)節(jié),我們只需要定義任務(wù),然后說“開始”就好。但實際情況是,在物理上,創(chuàng)建線程可能會代價很高,因此需要人工去保存和管理它們。而且Java的線程機(jī)制是基于C的低級的P線程(pthread)方式。所以才導(dǎo)致任務(wù)和線程這兩個概念總是混在一起。站在實現(xiàn)和更抽象的角度,這兩者應(yīng)該分開,所以編寫代碼時,你必須遵守規(guī)則。為了描述更清楚,因為定義為要執(zhí)行的工作則為“任務(wù)”,引用到驅(qū)動任務(wù)的具體機(jī)制時,用“線程”。 如果只是概念級別上討論系統(tǒng),則只用“任務(wù)”就行。
加入一個線程一個線程可以調(diào)用其他線程的join()方法,其效果是等待一段時間直到第二個線程結(jié)束才繼續(xù)執(zhí)行。
package com.jc.concurrency; /** * 一個線程可以等待一個線程完成,那就是用join * @author * */ class Sleeper extends Thread { private int duration; public Sleeper(String name, int sleepTime) { super(name); duration = sleepTime; start(); } public void run() { try { sleep(duration); } catch (InterruptedException e) { //異常捕獲時會將Interrupted這個標(biāo)志位重置為false,所以在這里輸出false System.out.println(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted()); return; } System.out.println(getName() + " has awakened"); } } class Joiner extends Thread { private Sleeper sleeper; public Joiner(String name, Sleeper sleeper) { super(name); this.sleeper = sleeper; start(); } public void run() { try { sleeper.join(); } catch (InterruptedException e) { System.out.println("Interrupted"); } System.out.println(getName() + " join completed"); } } public class Joining { public static void main(String[] args) { Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500); Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc", grumpy); grumpy.interrupt(); } }捕獲異常在main方法是無法捕獲到線程里的異常。為解決這個問題,我們修改Executor產(chǎn)生線程的方式。Java SE5中的新接口:Thread.UncaughtExceptionHandler
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** * 使用Thread.UncaughtExceptionHandler處理線程拋出的異常 * * MyUncaughtExceptionHandler會新建線程去處理其他線程跑出來的異常 * * @author * */ class ExceptionThread2 implements Runnable { public void run() { Thread t = Thread.currentThread(); System.out.println("run() by " + t); System.out.println("eh = " + t.getUncaughtExceptionHandler()); throw new RuntimeException(); } } class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { System.out.println("caught " + t + ""s " + e); } } class HandlerThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { System.out.println(this + " creating new Thread"); Thread t = new Thread(r); System.out.println("created " + t); t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); System.out.println("eh = " + t.getUncaughtExceptionHandler()); return t; } } public class CaptureUncaughtException { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory()); exec.execute(new ExceptionThread2()); } }/* * output: * * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread * created Thread[Thread-0,5,main] eh = * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e run() by * Thread[Thread-0,5,main] eh = * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread * created Thread[Thread-1,5,main] eh = * com.jc.concurrency.MyUncaughtExceptionHandler@5490c2f5 caught * Thread[Thread-0,5,main]"s java.lang.RuntimeException * * * */還可以設(shè)置默認(rèn)異常處理器:
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 設(shè)置默認(rèn)的線程異常處理類 * @author * */ public class SettingDefaultHandler { public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } }線程狀態(tài)(Thread state)1.新建(new):一個線程可以處于四種狀態(tài)之一:新建(new),就緒(Runnable),阻塞(Blocked),死亡(Dead)。
進(jìn)入阻塞狀態(tài)
1.新建(new):這是個短暫狀態(tài),當(dāng)線程被創(chuàng)建時,它只會短暫地處于這種狀態(tài)。此時它已經(jīng)分配了必須的系統(tǒng)資源,并執(zhí)行了初始化。此刻線程已經(jīng)有資格獲取CPU時間了,之后調(diào)度器將把這個線程轉(zhuǎn)變?yōu)榭蛇\行狀態(tài)或阻塞狀態(tài)。
2.就緒(Runnable):在這種狀態(tài)下,只要調(diào)度器把時間片分配給線程,線程就可以運行。也就是說,在任意時刻,此狀態(tài)的線程可以運行也可以不運行。不同于死亡和阻塞狀態(tài)。
3.阻塞(Blocked):線程能夠運行,但有某個條件阻止它的運行。當(dāng)線程處于阻塞狀態(tài)時,調(diào)度器將忽略線程,不會分配給線程任何CPU時間。直到線程重新進(jìn)入了就緒狀態(tài),它才有可能執(zhí)行操作。
4.死亡(Dead):處于死亡或終止?fàn)顟B(tài)的線程將不再是可調(diào)度的,并且再也不會得到CPU時間,它的任務(wù)已結(jié)束,或不再是可運行的。任務(wù)死亡的通常方式是從run()方法返回,但是任務(wù)的線程還可以被中斷,中斷也是屬于死亡。一個任務(wù)進(jìn)入阻塞狀態(tài),可能要有如下原因:
通過調(diào)用sleep(milliseconds)使任務(wù)進(jìn)入休眠狀態(tài),在這種情況下,任務(wù)在指定的時間內(nèi)不會運行。
通過調(diào)用wait()使線程掛起。直到線程得到了notify()或notifyAll()消息(或者在Java SE5的java.util.concurrent類庫中等價的signal()或signalAll()消息),線程才會進(jìn)入就緒狀態(tài)。
任務(wù)在等待某個輸入/輸出完成。
任務(wù)試圖在某個對象上調(diào)用其同步控制方法,但是對象鎖不可用,因為另一個任務(wù)已經(jīng)獲取了這個鎖。
在較早的代碼中,會有suspend()和resume()來阻塞和喚醒線程,因為容易導(dǎo)致死鎖,所以被廢止了。
中斷在阻塞狀態(tài)的線程,可以通過中斷來終止該阻塞的任務(wù)。Thread類包含interrupt()方法來中斷。如果使用Executor,則使用Future的cancel()來中斷任務(wù)。其實Executor的shutdownNow()方法,就是將發(fā)送一個interrupt()調(diào)用給它所啟動的所有線程。
package com.jc.concurrency; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 中斷處于阻塞狀態(tài)的線程例子 * 發(fā)現(xiàn)只有sleep()操作的才能中斷,其余的io和同步都不能被中斷 * @author * */ class SleepBlocked implements Runnable { public void run() { try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { System.out.println("InterruptedException"); } System.out.println("Exiting SleepBlocked.run()"); } } class IOBlocked implements Runnable { private InputStream in; public IOBlocked(InputStream is) { in = is; } public void run() { try { System.out.println("Waiting for read():"); in.read(); } catch (IOException e) { if (Thread.currentThread().isInterrupted()) { System.out.println("Interrupted from blocked I/O"); } else { throw new RuntimeException(e); } } System.out.println("Exiting IOBlocked.run()"); } } class SynchronizedBlocked implements Runnable { public synchronized void f() { while (true) // Never releases lock Thread.yield(); } public SynchronizedBlocked() { new Thread() { public void run() { f(); // Lock acquired by this thread } }.start(); } public void run() { System.out.println("Trying to call f()"); f(); System.out.println("Exiting SynchronizedBlocked.run()"); } } public class Interrupting { private static ExecutorService exec = Executors.newCachedThreadPool(); static void test(Runnable r) throws InterruptedException { Future> f = exec.submit(r); TimeUnit.MILLISECONDS.sleep(100); System.out.println("Interrupting " + r.getClass().getName()); f.cancel(true); // Interrupts if running System.out.println("Interrupt sent to " + r.getClass().getName()); } public static void main(String[] args) throws Exception { test(new SleepBlocked()); test(new IOBlocked(System.in)); test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3); System.out.println("Aborting with System.exit(0)"); System.exit(0); // ... since last 2 interrupts failed } }發(fā)現(xiàn)只有sleep()操作的才能中斷,其余的io和同步都不能被中斷。所以有個比較不優(yōu)雅,但有效的關(guān)閉方式:
package com.jc.concurrency; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 中斷IO阻塞的線程的方式:關(guān)閉資源 * @author * */ public class CloseResource { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InputStream socketInput = new Socket("localhost", 8080).getInputStream(); exec.execute(new IOBlocked(socketInput)); exec.execute(new IOBlocked(System.in)); TimeUnit.MILLISECONDS.sleep(100); System.out.println("Shutting down all threads"); exec.shutdownNow(); TimeUnit.SECONDS.sleep(1); System.out.println("Closing " + socketInput.getClass().getName()); socketInput.close(); // Releases blocked thread TimeUnit.SECONDS.sleep(1); System.out.println("Closing " + System.in.getClass().getName()); System.in.close(); // Releases blocked thread } }之所以要sleep,是想要interrupt都傳到各個線程里面。以達(dá)到中斷的效果。
NIO提供了優(yōu)雅的I/O中斷。
/** * NIO提供了優(yōu)雅的I/O中斷 * @author * */ class NIOBlocked implements Runnable { private final SocketChannel sc; public NIOBlocked(SocketChannel sc) { this.sc = sc; } public void run() { try { System.out.println("Waiting for read() in " + this); sc.read(ByteBuffer.allocate(1)); } catch (ClosedByInterruptException e) { System.out.println("ClosedByInterruptException" + this); } catch (AsynchronousCloseException e) { System.out.println("AsynchronousCloseException" + this); } catch (IOException e) { throw new RuntimeException(e); } System.out.println("Exiting NIOBlocked.run() " + this); } } public class NIOInterruption { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InetSocketAddress isa = new InetSocketAddress("localhost", 8080); SocketChannel sc1 = SocketChannel.open(isa); SocketChannel sc2 = SocketChannel.open(isa); System.out.println(sc1); System.out.println(sc2); Future> f = exec.submit(new NIOBlocked(sc1)); exec.execute(new NIOBlocked(sc2)); exec.shutdown(); TimeUnit.SECONDS.sleep(1); // Produce an interrupt via cancel: f.cancel(true); TimeUnit.SECONDS.sleep(1); // Release the block by closing the channel: sc2.close(); } }SleepBlocked例子展示了synchronized的鎖是不可以中斷,這是很危險的。所以ReentrantLock提供了可中斷的能力
package com.jc.concurrency; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * SleepBlocked例子展示了synchronized的鎖是不可以中斷,這是很危險的。 * 所以ReentrantLock提供了可中斷的能力 * @author * */ class BlockedMutex { private Lock lock = new ReentrantLock(); public BlockedMutex() { // Acquire it right away, to demonstrate interruption // of a task blocked on a ReentrantLock: lock.lock(); } public void f() { try { // This will never be available to a second task lock.lockInterruptibly(); // Special call System.out.println("lock acquired in f()"); } catch (InterruptedException e) { System.out.println("Interrupted from lock acquisition in f()"); } } } class Blocked2 implements Runnable { BlockedMutex blocked = new BlockedMutex(); public void run() { System.out.println("Waiting for f() in BlockedMutex"); blocked.f(); System.out.println("Broken out of blocked call"); } } public class Interrupting2 { public static void main(String[] args) throws Exception { Thread t = new Thread(new Blocked2()); t.start(); TimeUnit.SECONDS.sleep(1); System.out.println("Issuing t.interrupt()"); t.interrupt(); } }/**output: Waiting for f() in BlockedMutex Issuing t.interrupt() Interrupted from lock acquisition in f() Broken out of blocked call **/在沒有阻塞的語句時,通過Thread.interrupted()判斷線程被中斷:
package com.jc.concurrency; import java.util.concurrent.TimeUnit; /** * 在沒有阻塞的語句時,通過Thread.interrupted()判斷線程被中斷 * @author * */ class NeedsCleanup { private final int id; public NeedsCleanup(int ident) { id = ident; System.out.println("NeedsCleanup " + id); } public void cleanup() { System.out.println("Cleaning up " + id); } } class Blocked3 implements Runnable { private volatile double d = 0.0; public void run() { // try { while (!Thread.interrupted()) { // point1 NeedsCleanup n1 = new NeedsCleanup(1); // Start try-finally immediately after definition // of n1, to guarantee proper cleanup of n1: try { System.out.println("Sleeping"); // TimeUnit.SECONDS.sleep(1); // point2 NeedsCleanup n2 = new NeedsCleanup(2); // Guarantee proper cleanup of n2: try { System.out.println("Calculating"); // A time-consuming, non-blocking operation: for (int i = 1; i < 2500000; i++) d = d + (Math.PI + Math.E) / d; System.out.println("Finished time-consuming operation"); } finally { n2.cleanup(); } } finally { n1.cleanup(); } } System.out.println("Exiting via while() test"); // } catch (InterruptedException e) { // System.out.println("Exiting via InterruptedException"); // } } } public class InterruptingIdiom { public static void main(String[] args) throws Exception { if (args.length != 1) { System.out.println("usage: java InterruptingIdiom delay-in-mS"); System.exit(1); } Thread t = new Thread(new Blocked3()); t.start(); TimeUnit.MILLISECONDS.sleep(new Integer(args[0])); t.interrupt(); } }線程協(xié)作wait()和notify()wait()、notify()以及nofityAll()有一個比較特殊的方面,那就是這些方法都是基類Object的方法,而不是Thread的一部分。一開始或許有這種困惑,覺得很奇怪。明明是線程的功能,為啥要放在Object里。那時因為這些方法需要操作鎖,當(dāng)一個任務(wù)在方法里遇到wait()的調(diào)用時,線程的執(zhí)行被掛起(阻塞狀態(tài)),對象上的鎖會被是否。因此wait()方法需放在同步控制塊里(與之相對比是sleep()因為不用操作鎖,所以可以放在非同步控制塊里,而且還是Thread的方法)。如果在非同步控制調(diào)用這些方法,程序能通過編譯,但運行時會拋IllegalMonitorStateException差異。例子:
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * wait()和notifyAll()例子,notifyAll會將該對象的wait()方法所阻塞的線程 * @author * */ class Car { private boolean waxOn = false; public synchronized void waxed() { waxOn = true; // Ready to buff notifyAll(); } public synchronized void buffed() { waxOn = false; // Ready for another coat of wax notifyAll(); } public synchronized void waitForWaxing() throws InterruptedException { while (waxOn == false) wait(); } public synchronized void waitForBuffing() throws InterruptedException { while (waxOn == true) wait(); } } class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { System.out.print("Wax On! "); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { car.waitForWaxing(); System.out.print("Wax Off! "); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax Off task"); } } public class WaxOMatic { public static void main(String[] args) throws Exception { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); // Run for a while... exec.shutdownNow(); // Interrupt all tasks } }notify()和nofityAll()因為可能有多個任務(wù)在單個Car對象上處于wait()狀態(tài),因此調(diào)用nofityAll()比只調(diào)用notify()要更安全。所以上面那個程序,只有一個任務(wù),因此可以使用notify()來代替notifyAll()。
使用 notify()而不是notifyAll()是一種優(yōu)化。除非知道notify()會喚醒具體哪個任務(wù),不如還是notifyAll()保守點
在有關(guān)Java的線程機(jī)制的討論中,有一個令人困惑的描述:notifyAll()將喚醒“所有正在等待的任務(wù)”。其實更準(zhǔn)確是:當(dāng)notifyAll()因某個特定鎖而被調(diào)用時,只有等待這個鎖的任務(wù)才會被喚醒:package com.jc.concurrency; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 當(dāng)notifyAll()因某個特定鎖而被調(diào)用時,只有等待這個鎖的任務(wù)才會被喚醒 * @author * */ class Blocker { synchronized void waitingCall() { try { while (!Thread.interrupted()) { wait(); System.out.print(Thread.currentThread() + " "); } } catch (InterruptedException e) { // OK to exit this way } } synchronized void prod() { notify(); } synchronized void prodAll() { notifyAll(); } } class Task implements Runnable { static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); } } class Task2 implements Runnable { // A separate Blocker object: static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); } } public class NotifyVsNotifyAll { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) exec.execute(new Task()); exec.execute(new Task2()); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { boolean prod = true; public void run() { if (prod) { System.out.print(" notify() "); Task.blocker.prod(); prod = false; } else { System.out.print(" notifyAll() "); Task.blocker.prodAll(); prod = true; } } }, 400, 400); // Run every .4 second TimeUnit.SECONDS.sleep(5); // Run for a while... timer.cancel(); System.out.println(" Timer canceled"); TimeUnit.MILLISECONDS.sleep(500); System.out.print("Task2.blocker.prodAll() "); Task2.blocker.prodAll(); TimeUnit.MILLISECONDS.sleep(500); System.out.println(" Shutting down"); exec.shutdownNow(); // Interrupt all tasks } }使用wait()和notifyAll()實現(xiàn)生產(chǎn)者和消費者:一個飯店,有一個廚師和一個服務(wù)員,這個服務(wù)員必須等待廚師準(zhǔn)備好食物,當(dāng)廚師準(zhǔn)備好后就會通知服務(wù)員,之后服務(wù)員上菜,然后服務(wù)員繼續(xù)等待。
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 簡單的生產(chǎn)者消費者例子 * 此例子有點局限因為不能有多線程的生產(chǎn)者、多線程的消費者。 * 這例子僅僅展示如果使用wait()和notify()保證有序 * @author * */ class Meal { private final int orderNum; public Meal(int orderNum) { this.orderNum = orderNum; } public String toString() { return "Meal " + orderNum; } } class WaitPerson implements Runnable { private Restaurant restaurant; public WaitPerson(Restaurant r) { restaurant = r; } public void run() { try { while (!Thread.interrupted()) { synchronized (this) { while (restaurant.meal == null) wait(); // ... for the chef to produce a meal } System.out.println("Waitperson got " + restaurant.meal); synchronized (restaurant.chef) { restaurant.meal = null; restaurant.chef.notifyAll(); // Ready for another } } } catch (InterruptedException e) { System.out.println("WaitPerson interrupted"); } } } class Chef implements Runnable { private Restaurant restaurant; private int count = 0; public Chef(Restaurant r) { restaurant = r; } public void run() { try { while (!Thread.interrupted()) { synchronized (this) { while (restaurant.meal != null) wait(); // ... for the meal to be taken } if (++count == 10) { System.out.println("Out of food, closing"); restaurant.exec.shutdownNow(); } System.out.println("Order up! "); synchronized (restaurant.waitPerson) { restaurant.meal = new Meal(count); restaurant.waitPerson.notifyAll(); } TimeUnit.MILLISECONDS.sleep(100); } } catch (InterruptedException e) { System.out.println("Chef interrupted"); } } } public class Restaurant { Meal meal; ExecutorService exec = Executors.newCachedThreadPool(); WaitPerson waitPerson = new WaitPerson(this); Chef chef = new Chef(this); public Restaurant() { exec.execute(chef); exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); } }使用顯式鎖Lock和Condition對象:
package com.jc.concurrency.waxomatic2; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 使用顯式的Lock和Condition對象來修改WaxOMatic例子 * @author * */ class Car { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean waxOn = false; public void waxed() { lock.lock(); try { waxOn = true; // Ready to buff condition.signalAll(); } finally { lock.unlock(); } } public void buffed() { lock.lock(); try { waxOn = false; // Ready for another coat of wax condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing() throws InterruptedException { lock.lock(); try { while (waxOn == false) condition.await(); } finally { lock.unlock(); } } public void waitForBuffing() throws InterruptedException { lock.lock(); try { while (waxOn == true) condition.await(); } finally { lock.unlock(); } } } class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { System.out.print("Wax On! "); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { car.waitForWaxing(); System.out.print("Wax Off! "); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax Off task"); } } public class WaxOMatic2 { public static void main(String[] args) throws Exception { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }基于Lock和鏈表存儲結(jié)構(gòu)寫的一個消息隊列:
package com.jc.framework.queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class JcBlockingQueueExecutorService的shutdown{ private JcQueueData head; private JcQueueData tail; private int size = 0; private int maxSize = Integer.MAX_VALUE; private final Lock lock; private final Condition full; private final Condition empty; public JcBlockingQueue() { lock = new ReentrantLock(); full = lock.newCondition(); //角度是生產(chǎn)者 empty = lock.newCondition(); //角度是消費者 } public void enQueue(T t) throws InterruptedException { lock.lock(); if (size == maxSize) { full.await(); } if (head == null) { head = new JcQueueData<>(t, null); tail = head; size++; empty.signalAll(); lock.unlock(); return; } JcQueueData jcQueueData = new JcQueueData<>(t, null); tail.setNext(jcQueueData); tail = jcQueueData; size++; if (size == 1) empty.signalAll(); lock.unlock(); } public T deQueue() throws InterruptedException { lock.lock(); while (head == null) { empty.await(); } T t = head.getData(); if (head.next != null) { JcQueueData next = head.next; head.next = null; head = next; } else { head = null; tail = null; } size--; if(size==maxSize-1) full.signalAll(); lock.unlock(); return t; } public int size() { return size; } private class JcQueueData { private T data; private JcQueueData next; public JcQueueData(T data, JcQueueData next) { this.data = data; this.next = next; } public T getData() { return data; } public void setData(T data) { this.data = data; } public JcQueueData getNext() { return next; } public void setNext(JcQueueData next) { this.next = next; } } } ExecutorService的shutdown方法,這有可能還有工作正在執(zhí)行或準(zhǔn)備執(zhí)行,這情況下,它只是通知線程池再沒有更多任務(wù)需要增加到它的內(nèi)部隊列,而且一旦完成所有等待的工作,就應(yīng)當(dāng)關(guān)閉。
對應(yīng)的還有shutdownNow(),此方法中止當(dāng)前處理中的任務(wù),并忽略所有等待的任務(wù)。
參考:《Java編程思想》
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/69285.html
摘要:流程源處理源代碼,例如過濾任何值。工藝類從編譯后處理生成的文件,例如對類進(jìn)行字節(jié)碼增強(qiáng)。整合后的測試執(zhí)行集成測試后執(zhí)行所需的操作。校驗運行任何檢查以驗證包裝是否有效并符合質(zhì)量標(biāo)準(zhǔn)。 nodejs和es6 nodejs的語法和es6不一樣,如模塊系統(tǒng),一個是CommonJS的require、一個是es6的import,寫模塊也不一樣。 nodejs的npm 我來理解,nodejs類似與j...
閱讀 1387·2021-10-08 10:04
閱讀 2710·2021-09-22 15:23
閱讀 2733·2021-09-04 16:40
閱讀 1185·2019-08-29 17:29
閱讀 1504·2019-08-29 17:28
閱讀 3001·2019-08-29 14:02
閱讀 2230·2019-08-29 13:18
閱讀 856·2019-08-23 18:35