摘要:只要線程池未關(guān)閉該策略直接在調(diào)用者線程中運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣做不會(huì)真的丟棄任務(wù)但是任務(wù)提交線程的性能極有可能會(huì)急劇下降。任務(wù)并嘗試再次提交當(dāng)前任務(wù)。
1. 同步控制
synchronized的擴(kuò)展:重入鎖
同步控制不僅有synchronized配合object.wait()以及object.notify(),也有增強(qiáng)版的reentrantLock(重入鎖)
public class ReenterLock implements Runnable{ public static ReentrantLock lock=new ReentrantLock(); public static int i=0; @Override public void run() { for(int j=0;j<10000000;j++){ lock.lock(); lock.lock(); //此處演示重入性 try{ i++; }finally{ lock.unlock(); //退出臨界區(qū)必須解鎖 lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenterLock tl=new ReenterLock(); Thread t1=new Thread(tl); Thread t2=new Thread(tl); t1.start();t2.start(); t1.join();t2.join(); System.out.println(i); //計(jì)算結(jié)果為 20000000 } }
我們來看下reentrantlock相比synchronized鎖有何優(yōu)點(diǎn):
中斷響應(yīng)
面對(duì)死鎖,似乎synchronized沒有任何主動(dòng)解決策略,而reentrantlock則可以輕松解決
public class IntLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; /** * 控制加鎖順序,方便構(gòu)造死鎖 * @param lock */ public IntLock(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); //可中斷的加鎖 try{ Thread.sleep(500); }catch(InterruptedException e){} lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); try{ Thread.sleep(500); }catch(InterruptedException e){} lock1.lockInterruptibly(); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println(Thread.currentThread().getName()+":線程被中斷"); } finally { if (lock1.isHeldByCurrentThread()) lock1.unlock(); if (lock2.isHeldByCurrentThread()) lock2.unlock(); System.out.println(Thread.currentThread().getName()+":線程退出"); } } public static void main(String[] args) throws InterruptedException { IntLock r1 = new IntLock(1); IntLock r2 = new IntLock(2); Thread t1 = new Thread(r1,"線程1"); Thread t2 = new Thread(r2,"線程2"); t1.start();t2.start(); Thread.sleep(1000); //中斷其中一個(gè)線程 t2.interrupt(); } } // 輸出結(jié)果: // java.lang.InterruptedException // at //java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898) // at //java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) // at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) // at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31) // at java.lang.Thread.run(Thread.java:745) // 線程2:線程被中斷 // 線程2:線程退出 // 線程1:線程退出
由上可知,當(dāng)t1,t2形成死鎖時(shí),可以主動(dòng)利用中斷來解開,但完成任務(wù)的只有t1,t2被中斷. 而如果換成synchronized則將無法進(jìn)行中斷
1鎖申請等待時(shí)限
lock1.tryLock(); //嘗試獲取鎖,獲得立即返回true,未獲得立即返回false lock1.tryLock(5, TimeUnit.SECONDS); //嘗試獲取鎖,5秒內(nèi)未獲得則返回false,獲得返回true
public class TryLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public TryLock(int lock) { this.lock = lock; } @Override public void run() { if (lock == 1) { while (true) { if (lock1.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if (lock2.tryLock()) { try { System.out.println(Thread.currentThread() .getId() + ":My Job done"); return; } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } } } else { while (true) { if (lock2.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { } if (lock1.tryLock()) { try { System.out.println(Thread.currentThread() .getId() + ":My Job done"); return; } finally { lock1.unlock(); } } } finally { lock2.unlock(); } } } } } public static void main(String[] args) throws InterruptedException { TryLock r1 = new TryLock(1); TryLock r2 = new TryLock(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } } // 15:My Job done // 14:My Job done
使用trylock可以有效地避免產(chǎn)生死鎖
公平鎖
synchronized鎖為非公平鎖,而reentrantLock既可以是公平鎖也可以是非公平鎖
非公平鎖容易產(chǎn)生饑餓,公平鎖先進(jìn)先出,但效率不敵非公平鎖
public ReentrantLock(boolean fair)
ffffd
重入鎖的搭檔Condition
Condition和object.wait(),object.notify()方法類似
condition的基本方法如下:
void await() throws InterruptedException; //使當(dāng)前線程等待,釋放鎖,能響應(yīng)signal和signalAll方法,響應(yīng)中斷 void awaitUninterruptibly(); //類似 await,但不響應(yīng)中斷 long awaitNanos(long nanosTimeout)throws InterruptedException; //等待一段時(shí)間 boolean await (long time,TimeUnit unit)throws InterruptedException; boolean awaitUntil(Date deadline)throws InterruptedException; void signal(); //喚醒一個(gè)等待中的線程 void signalAll(); //喚醒所有等待中的線程
JDK內(nèi)部就有很多對(duì)于ReentrantLock的使用,如ArrayBlockingQueue
//在 ArrayBlockingQueue中的一些定義 boolean fair = true; private final ReentrantLock lock = new ReentrantLock(fair); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); //put(方法的實(shí)現(xiàn) public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //put方法做同步 try { try { while (count == items.length) //隊(duì)列已滿 notFull.await(); //等待隊(duì)列有足夠的空間 } catch (InterruptedException ie) { notFull.signal(); throw ie; } insert(e); //notFull被通知時(shí),說明有足夠的空間 } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notFull.signal(); //通知take方法的線程,隊(duì)列已有數(shù)據(jù) } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //對(duì)take()方法做同步 try { try { while (count == 0) //如果隊(duì)列為空 notEmpty.await(); //則消費(fèi)者隊(duì)列要等待一個(gè)非空的信號(hào) } catch (InterruptedException ie) { notEmpty.signal(); throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); //通知put線程隊(duì)列已有空閑空間 return x; }
多線程同時(shí)訪問:信號(hào)量(semaphore)
同步鎖只能允許一個(gè)線程進(jìn)行訪問,信號(hào)量可以指定多個(gè)線程同時(shí)訪問同一個(gè)資源.
//構(gòu)造方法 public Semaphore(int permits) //傳入int表示能同時(shí)訪問的線程數(shù) public Semaphore(int permits, boolean fair) //線程數(shù),是否公平鎖 //實(shí)例方法 public void acquire() throws InterruptedException //獲取一個(gè)訪問權(quán)限,會(huì)阻塞線程,會(huì)被打斷 public void acquireUninterruptibly() //獲取一個(gè)訪問權(quán)限,會(huì)阻塞線程,不會(huì)被打斷 public boolean tryAcquire() //獲取一個(gè)訪問權(quán)限,立即返回 public boolean tryAcquire(long timeout, TimeUnit unit) //獲取一個(gè)訪問權(quán)限,嘗試一段時(shí)間 public void release() //釋放一個(gè)訪問權(quán)限
public class SemapDemo implements Runnable { final Semaphore semp = new Semaphore(5); @Override public void run() { try { semp.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + ":done!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semp.release(); //使用完后要釋放,否則會(huì)引起信號(hào)量泄漏 } } public static void main(String[] args) { ExecutorService exec = Executors.newFixedThreadPool(20); final SemapDemo demo = new SemapDemo(); for (int i = 0; i < 20; i++) { exec.submit(demo); } } } //輸出結(jié)果 //每次輸出5個(gè)結(jié)果,對(duì)應(yīng)信號(hào)量的5個(gè)許可
讀寫鎖ReadWriteLock
讀寫鎖適用于讀多寫少的場景,讀讀之間為并行,讀寫之間為串行,寫寫之間也為串行
public class ReadWriteLockDemo { private static Lock lock=new ReentrantLock(); private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); //獲取讀寫鎖 private static Lock readLock = readWriteLock.readLock(); //讀鎖 private static Lock writeLock = readWriteLock.writeLock(); //寫鎖 private int value; public Object handleRead(Lock lock) throws InterruptedException{ try{ lock.lock(); //模擬讀操作 Thread.sleep(1000); //讀操作的耗時(shí)越多,讀寫鎖的優(yōu)勢就越明顯 return value; }finally{ lock.unlock(); } } public void handleWrite(Lock lock,int index) throws InterruptedException{ try{ lock.lock(); //模擬寫操作 Thread.sleep(1000); value=index; }finally{ lock.unlock(); } } public static void main(String[] args) { final ReadWriteLockDemo demo=new ReadWriteLockDemo(); Runnable readRunnale=new Runnable() { @Override public void run() { try { demo.handleRead(readLock); // demo.handleRead(lock); } catch (InterruptedException e) { e.printStackTrace(); } } }; Runnable writeRunnale=new Runnable() { @Override public void run() { try { demo.handleWrite(writeLock,new Random().nextInt()); // demo.handleWrite(lock,new Random().nextInt()); } catch (InterruptedException e) { e.printStackTrace(); } } }; for(int i=0;i<18;i++){ new Thread(readRunnale).start(); } for(int i=18;i<20;i++){ new Thread(writeRunnale).start(); } } } //結(jié)果: //讀寫鎖明顯要比單純的鎖要更快結(jié)束,說明讀寫鎖確實(shí)提升不少效率
倒計(jì)數(shù)器CountDownLatch
讓一個(gè)線程等待,知道倒計(jì)時(shí)結(jié)束
public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); //構(gòu)造倒計(jì)時(shí)器,倒計(jì)數(shù)為10 static final CountDownLatchDemo demo=new CountDownLatchDemo(); @Override public void run() { try { //模擬檢查任務(wù) Thread.sleep(new Random().nextInt(10)*1000); System.out.println("check complete"); end.countDown(); //倒計(jì)時(shí)器減1 } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for(int i=0;i<10;i++){ exec.submit(demo); } //等待檢查 end.await(); //主線程阻塞,待其他線程全部完成后再喚醒主線程 //發(fā)射火箭 System.out.println("Fire!"); exec.shutdown(); } }
循環(huán)柵欄CyclicBarrier
循環(huán)柵欄類似于倒計(jì)時(shí)器,但是計(jì)數(shù)器可以反復(fù)使用,cyclicBarrier比CountDownLatch稍微強(qiáng)大些,可以傳入一個(gè)barrierAction,barrierAction指每次完成計(jì)數(shù)便出發(fā)一次
public CyclicBarrier(int parties,Runnable barrierAction) //構(gòu)造方法
public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic, String soldierName) { this.cyclic = cyclic; this.soldier = soldierName; } public void run() { try { //等待所有士兵到齊 cyclic.await(); //觸發(fā)一次循環(huán)柵欄,達(dá)到計(jì)數(shù)器后才會(huì)進(jìn)行下一步工作 doWork(); //等待所有士兵完成工作 cyclic.await(); //再次觸發(fā)循環(huán)柵欄,達(dá)到計(jì)數(shù)器后才會(huì)進(jìn)行下一步工作 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork() { try { Thread.sleep(Math.abs(new Random().nextInt()%10000)); //模擬工作 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + ":任務(wù)完成"); } } public static class BarrierRun implements Runnable { //用于傳入CyclicBarrier的構(gòu)造方法,作為達(dá)到計(jì)數(shù)器數(shù)值后的觸發(fā)任務(wù), 可以被多次調(diào)用 boolean flag; int N; public BarrierRun(boolean flag, int N) { this.flag = flag; this.N = N; } public void run() { if (flag) { System.out.println("司令:[士兵" + N + "個(gè),任務(wù)完成!]"); } else { System.out.println("司令:[士兵" + N + "個(gè),集合完畢!]"); flag = true; } } } public static void main(String args[]) throws InterruptedException { final int N = 10; Thread[] allSoldier=new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); //設(shè)置屏障點(diǎn),主要是為了執(zhí)行這個(gè)方法 System.out.println("集合隊(duì)伍!"); for (int i = 0; i < N; ++i) { System.out.println("士兵 "+i+" 報(bào)道!"); allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i)); allSoldier[i].start(); } } }
注意: 一旦其中一個(gè)被interrupt后,很可能會(huì)拋出一個(gè)interruptExpection和9個(gè)BrokenBarrierException,表示該循環(huán)柵欄已破損,防止其他線程進(jìn)行無所謂的長久等待
線程阻塞工具LockSupport
LockSupport是一個(gè)非常實(shí)用的線程阻塞工具,不需要獲取某個(gè)對(duì)象的鎖(如wait),也不會(huì)拋出interruptedException異常
public static void park() //掛起當(dāng)前線程, public static void park(Object blocker) //掛起當(dāng)前線程,顯示阻塞對(duì)象,parking to wait for <地址值>
public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name){ super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in "+getName()); LockSupport.park(this); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); //即使unpark發(fā)生在park前,也可以使程序正常結(jié)束 t1.join(); t2.join(); } }
LockSupport使用了類似信號(hào)量的機(jī)制,它為每個(gè)線程準(zhǔn)備一個(gè)許可,如果許可可用,park立即返回,并且消費(fèi)這個(gè)許可(轉(zhuǎn)為不可用),如果許可不可用,就會(huì)阻塞,而unpark方法就是使一個(gè)許可變?yōu)榭捎?locksupport.park()可以相應(yīng)中斷,但是不會(huì)拋出interruptedException,我們可以用Thread.interrupted等方法中獲取中斷標(biāo)記.
public class LockSupportIntDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name){ super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in "+getName()); LockSupport.park(); if(Thread.interrupted()){ //檢測到中斷位,并清除中斷狀態(tài) System.out.println(getName()+" 被中斷了"); } if (Thread.currentThread().isInterrupted()){ //中斷狀態(tài)已被清除,無法檢測到 System.out.println(1); } } System.out.println(getName()+"執(zhí)行結(jié)束"); } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); t1.interrupt(); LockSupport.unpark(t2); } } //輸出: //in t1 //t1 被中斷了 //t1執(zhí)行結(jié)束 //in t2 //t2執(zhí)行結(jié)束
Guava和Limiter限流
限流算法一般有兩種:漏桶算法和令牌桶算法
漏桶算法: 利用緩存區(qū),所有請求進(jìn)入系統(tǒng),都在緩存區(qū)中保存,然后以固定的流速流出緩存區(qū)進(jìn)行處理.
令牌桶算法: 桶中存放令牌,每個(gè)請求拿到令牌后才能進(jìn)行處理,如果沒有令牌,請求要么等待,要么丟棄.RateLimiter就是采用這種算法
public class RateLimiterDemo { static RateLimiter limiter = RateLimiter.create(2); //每秒處理2個(gè)請求 public static class Task implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis()); } } public static void main(String args[]) throws InterruptedException { for (int i = 0; i < 50; i++) { limiter.acquire(); //過剩流量會(huì)等待 new Thread(new Task()).start(); } } } // 某些場景傾向于丟棄過剩流量,tryAcquire則是立即返回,不會(huì)阻塞 // for (int i = 0; i < 50; i++) { // if(!limiter.tryAcquire()) { // continue; // } // new Thread(new Task()).start(); // }2. 線程池
Executors框架
Executor框架提供了各種類型的線程池,主要有以下工廠方法:
//固定線程數(shù)量,當(dāng)有新任務(wù)提交時(shí),若池中有空閑線程則立即執(zhí)行,若沒有空閑線程,任務(wù)會(huì)被暫存在一個(gè)任務(wù)隊(duì)列中,直到有空閑線程 public static ExecutorService newFixedThreadPool(int nThreads) //返回只有一個(gè)線程的線程池,多余任務(wù)被保存到一個(gè)任務(wù)隊(duì)列中,線程空閑時(shí),按先入先出的順序執(zhí)行隊(duì)列中的任務(wù) public static ExecutorService newSingleThreadPoolExecutor() //線程數(shù)量不固定,優(yōu)先使用空閑線程,多余任務(wù)會(huì)創(chuàng)建新線程 public static ExecutorService newCachedThreadPool() //線程數(shù)量為1,給定時(shí)間執(zhí)行某任務(wù),或周期性執(zhí)行任務(wù) public static ScheduledExecutorService newSingleThreadScheduledExecutor() //線程數(shù)量可以指定,定時(shí)或周期性執(zhí)行任務(wù) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
計(jì)劃任務(wù):newScheduledThreadPool主要方法
//給定時(shí)間,對(duì)任務(wù)進(jìn)行一次調(diào)度 public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); //周期調(diào)度,以任務(wù)完成后間隔固定時(shí)間調(diào)度下一個(gè)任務(wù),(兩者相加) public ScheduledFuture> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit); //周期調(diào)度,兩個(gè)任務(wù)開始的時(shí)間差為固定間隔,如果任務(wù)時(shí)間大于間隔時(shí)間則以任務(wù)時(shí)間為準(zhǔn)(兩者取其大者) public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit);
注意: 任務(wù)異常時(shí)后續(xù)所有任務(wù)都將停止調(diào)度,因此必須保證所有任務(wù)異常均被正常處理.
核心線程池 ThreadPoolExecutor
ThreadPoolExecutor構(gòu)造函數(shù):
public ThreadPoolExecutor(int corePoolSize, //核心線程池大小 int maximumPoolSize, //最大線程池大小 long keepAliveTime, //線程池中超過corePoolSize數(shù)目的空閑線程最大存活時(shí)間;可以allowCoreThreadTimeOut(true)使得核心線程有效時(shí)間 TimeUnit unit, //keepAliveTime時(shí)間單位 BlockingQueueworkQueue, //阻塞任務(wù)隊(duì)列 ThreadFactory threadFactory, //新建線程工廠 RejectedExecutionHandler handler ) //當(dāng)提交任務(wù)數(shù)超過maxmumPoolSize+workQueue之和時(shí),任務(wù)會(huì)交給RejectedExecutionHandler來處理
workQueue指被提交但是未執(zhí)行的任務(wù)隊(duì)列,是BlockingQueue接口的對(duì)象
1.直接提交隊(duì)列:SynchronousQueue,該隊(duì)列沒有容量,每個(gè)插入操作對(duì)應(yīng)一個(gè)刪除操作,即提交的任務(wù)總是會(huì)交給線程執(zhí)行,如果沒有空閑進(jìn)程,則創(chuàng)建新線程,數(shù)量達(dá)最大則執(zhí)行拒絕策略,一般需要設(shè)置很大的maximumPoolSize
2.有界任務(wù)隊(duì)列:ArrayBlockingQueue,有新任務(wù)時(shí),若線程池的實(shí)際線程數(shù)小于corePoolSize,優(yōu)先創(chuàng)建新線程,若大于corePoolSize,加入到等待隊(duì)列,若隊(duì)列已滿,不大于maximumPoolSize前提下,創(chuàng)建新線程執(zhí)行;當(dāng)且僅當(dāng)?shù)却?duì)列滿時(shí)才會(huì)創(chuàng)建新線程,否則數(shù)量一直維持在corePoolSize
3.無界任務(wù)隊(duì)列:LinkedBlockingQueue,小于corePoolSize時(shí)創(chuàng)建線程,達(dá)到corePoolSize則加入隊(duì)列直到資源消耗殆盡
4.優(yōu)先任務(wù)隊(duì)列:PriorityBlockingQueue,特殊無界隊(duì)列,總是保證高優(yōu)先級(jí)的任務(wù)先執(zhí)行.
Executors分析
newFixedThreadPool: corePoolSize=maximumPoolSize,線程不會(huì)超過corePoolSize,使用LinkedBlockingQueue
newSingleThreadPoolExecutor: newFixedThreadPool的弱化版,corePoolSize只有1
newCachedThreadPool: corePoolSize=0,maximumPoolSize為無窮大,空閑線程60秒回收,使用SynchronousQueue隊(duì)列
ThreadPoolExecutor的execute()方法執(zhí)行邏輯
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //檢查是否小于corePoolSize if (addWorker(command, true)) //添加線程,執(zhí)行任務(wù) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //添加進(jìn)隊(duì)列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //雙重校驗(yàn) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //提交線程池失敗 reject(command); //拒絕執(zhí)行 }
拒絕策略
AbortPolicy:該策略會(huì)直接拋出異常,阻止系統(tǒng)正常工作。
CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣做不會(huì)真的丟棄任務(wù),但是,任務(wù)提交線程的性能極有可 能會(huì)急劇下降。 任務(wù),并嘗試再次提交當(dāng)前任務(wù)。
DiscardOldestPolicy:該策略將丟棄最老的一個(gè)請求,也就是即將被執(zhí)行的一個(gè)
DiscardPolicy:該策略默默地丟棄無法處理的任務(wù),不予任何處理。如果允許任務(wù)丟失,我覺得這可能是最好的一種方案了吧!
自定義ThreadFactory
public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { //自定義創(chuàng)建線程的方法 Thread t= new Thread(r); t.setDaemon(true); System.out.println("create "+t); return t; } } ); for (int i = 0; i < 5; i++) { es.submit(task); } Thread.sleep(2000); }
擴(kuò)展線程池
public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在執(zhí)行" + ":Thread ID:" + Thread.currentThread().getId() + ",Task Name=" + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準(zhǔn)備執(zhí)行:" + ((MyTask) r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執(zhí)行完成:" + ((MyTask) r).name); } @Override protected void terminated() { System.out.println("線程池退出"); } }; for (int i = 0; i < 5; i++) { MyTask task = new MyTask("TASK-GEYM-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); //等待所有任務(wù)執(zhí)行完畢后再關(guān)閉 } }
異常堆棧消息
線程池中的異常堆棧可能不會(huì)拋出,需要我們自己去包裝
public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable task) { super.execute(wrap(task, clientTrace(), Thread.currentThread() .getName())); } @Override public Future> submit(Runnable task) { return super.submit(wrap(task, clientTrace(), Thread.currentThread() .getName())); } private Exception clientTrace() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) { return new Runnable() { @Override public void run() { try { task.run(); //外層包裹trycatch,即可打印出異常 } catch (Exception e) { clientStack.printStackTrace(); throw e; } } }; } }
Fork/Join框架
類似于mapreduce,用于大數(shù)據(jù)量,fork()創(chuàng)造子線程,join表示等待,
public class CountTask extends RecursiveTask{ private static final int THRESHOLD = 10000; //任務(wù)分解規(guī)模 private long start; private long end; public CountTask(long start,long end){ this.start=start; this.end=end; } @Override public Long compute(){ long sum=0; boolean canCompute = (end-start) subTasks=new ArrayList (); long pos=start; for(int i=0;i<100;i++){ long lastOne=pos+step; if(lastOne>end)lastOne=end; //最后一個(gè)任務(wù)可能小于step,故需要此步 CountTask subTask=new CountTask(pos,lastOne); //子任務(wù) pos+=step+1; //調(diào)整下一個(gè)任務(wù) subTasks.add(subTask); subTask.fork(); //fork子任務(wù) } for(CountTask t:subTasks){ sum+=t.join(); //聚合任務(wù) } } return sum; } public static void main(String[]args){ ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0,200000000000L); ForkJoinTask result = forkJoinPool.submit(task); try{ long res = result.get(); System.out.println("sum="+res); }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); } } }
注意: 如果任務(wù)的劃分層次很多,一直得不到返回,可能有兩種原因: 1.系統(tǒng)內(nèi)線程數(shù)量越積越多,導(dǎo)致性能嚴(yán)重下降 2.函數(shù)調(diào)用層次變多,導(dǎo)致棧溢出
Guava對(duì)線程池的拓展
1.特殊的DirectExecutor線程池
Executor executor=MoreExecutors.directExecutor(); // 僅在當(dāng)前線程運(yùn)行,用于抽象
2.Daemon線程池
提供將普通線程轉(zhuǎn)換為Daemon線程.很多情況下,我們不希望后臺(tái)線程池阻止程序的退出
public class MoreExecutorsDemo2 { public static void main(String[] args) { ThreadPoolExecutor exceutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2); MoreExecutors.getExitingExecutorService(exceutor); exceutor.execute(() -> System.out.println("I am running in " + Thread.currentThread().getName())); } }
3.future模式擴(kuò)展
待續(xù)....
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/75977.html
摘要:的并發(fā)容器并發(fā)集合這是一個(gè)高效的并發(fā)你可以把它理解為一個(gè)線程安全的。可以看作一個(gè)線程安全的這是一個(gè)接口,內(nèi)部通過鏈表數(shù)組等方式實(shí)現(xiàn)了這個(gè)接口。 3. JDK的并發(fā)容器 并發(fā)集合 ConcurrentHashMap:這是一個(gè)高效的并發(fā)HashMap.你可以把它理解為一個(gè)線程安全的HashMap。 CopyOnWriteArrayList:這是一個(gè)List,從名字看就知道它和Ar...
摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報(bào)率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級(jí)技術(shù)專家我看過哪些技術(shù)類書籍。 大家好,我是...
摘要:有時(shí)候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會(huì)有線程安全的需求。它可以讓你在不改動(dòng)或者極少改動(dòng)原有代碼的基礎(chǔ)上,讓普通的變量也享受操作帶來的線程安全性,這樣你可以修改極少的代碼,來獲得線程安全的保證。 有時(shí)候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會(huì)有線程安全的需求。如果改動(dòng)不大,我們可以簡單地修改程序中每一個(gè)使用或者讀取這個(gè)變量的地方。但顯然,這...
摘要:實(shí)戰(zhàn)高并發(fā)程序設(shè)計(jì)推薦豆瓣評(píng)分書的質(zhì)量沒的說,推薦大家好好看一下。推薦,豆瓣評(píng)分,人評(píng)價(jià)本書介紹了在編程中條極具實(shí)用價(jià)值的經(jīng)驗(yàn)規(guī)則,這些經(jīng)驗(yàn)規(guī)則涵蓋了大多數(shù)開發(fā)人員每天所面臨的問題的解決方案。 很早就想把JavaGuide的書單更新一下了,昨晚加今天早上花了幾個(gè)時(shí)間對(duì)之前的書單進(jìn)行了分類和補(bǔ)充完善。雖是終極版,但一定還有很多不錯(cuò)的 Java 書籍我沒有添加進(jìn)去,會(huì)繼續(xù)完善下去。希望這篇...
閱讀 3123·2023-04-25 15:02
閱讀 2827·2021-11-23 09:51
閱讀 2039·2021-09-27 13:47
閱讀 1994·2021-09-13 10:33
閱讀 982·2019-08-30 15:54
閱讀 2648·2019-08-30 15:53
閱讀 2864·2019-08-29 13:58
閱讀 898·2019-08-29 13:54