摘要:序本文主要來展示一下簡版的線程池的實現。默認提供了幾個工廠方法思路主要用到的是雙端隊列,不過這里我們粗糙的實現的話,也可以不用到。測試實例輸出從數據來看,還是相對均勻的。
序
本文主要來展示一下簡版的work stealing線程池的實現。
ExecutorsExecutors默認提供了幾個工廠方法
/** * Creates a thread pool that maintains enough threads to support * the given parallelism level, and may use multiple queues to * reduce contention. The parallelism level corresponds to the * maximum number of threads actively engaged in, or available to * engage in, task processing. The actual number of threads may * grow and shrink dynamically. A work-stealing pool makes no * guarantees about the order in which submitted tasks are * executed. * * @param parallelism the targeted parallelism level * @return the newly created thread pool * @throws IllegalArgumentException if {@code parallelism <= 0} * @since 1.8 */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } /** * Creates a work-stealing thread pool using all * {@link Runtime#availableProcessors available processors} * as its target parallelism level. * @return the newly created thread pool * @see #newWorkStealingPool(int) * @since 1.8 */ public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }思路
ForkJoinPool主要用到的是雙端隊列,不過這里我們粗糙的實現的話,也可以不用到deque。
public class WorkStealingChannel{ private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class); BlockingDeque [] managedQueues; AtomicLongMap stat = AtomicLongMap.create(); public WorkStealingChannel() { int nCPU = Runtime.getRuntime().availableProcessors(); int queueCount = nCPU / 2 + 1; managedQueues = new LinkedBlockingDeque[queueCount]; for(int i=0;i (); } } public void put(T item) throws InterruptedException { int targetIndex = Math.abs(item.hashCode() % managedQueues.length); BlockingQueue targetQueue = managedQueues[targetIndex]; targetQueue.put(item); } public T take() throws InterruptedException { int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length); int idx = rdnIdx; while (true){ idx = idx % managedQueues.length; T item = null; if(idx == rdnIdx){ item = managedQueues[idx].poll(); }else{ item = managedQueues[idx].pollLast(); } if(item != null){ LOGGER.info("take ele from queue {}",idx); stat.addAndGet(idx,1); return item; } idx++; if(idx == rdnIdx){ break; } } //走完一輪沒有,則隨機取一個等待 LOGGER.info("wait for queue:{}",rdnIdx); stat.addAndGet(rdnIdx,1); return managedQueues[rdnIdx].take(); } public AtomicLongMap getStat() { return stat; } }
這里根據cpu的數量建立了幾個deque,然后每次put的時候,根據hashcode取模放到對應的隊列。然后獲取的時候,先從隨機一個隊列取,沒有的話,再robbin round取其他隊列的,還沒有的話,則阻塞等待指定隊列的元素。
測試實例
public class WorkStealingDemo { static final WorkStealingChannelchannel = new WorkStealingChannel<>(); static volatile boolean running = true; static class Producer extends Thread{ @Override public void run() { while(running){ try { channel.put(UUID.randomUUID().toString()); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer extends Thread{ @Override public void run() { while(running){ try { String value = channel.take(); System.out.println(value); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void stop(){ running = false; System.out.println(channel.getStat()); } public static void main(String[] args) throws InterruptedException { int nCPU = Runtime.getRuntime().availableProcessors(); int consumerCount = nCPU / 2 + 1; for (int i = 0; i < nCPU; i++) { new Producer().start(); } for (int i = 0; i < consumerCount; i++) { new Consumer().start(); } Thread.sleep(30*1000); stop(); } }
輸出
{0=660972, 1=660613, 2=661537, 3=659846, 4=659918}
從數據來看,還是相對均勻的。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70405.html
執行器 在前面的所有示例中,由新的線程(由其Runnable對象定義)和線程本身(由Thread對象定義)完成的任務之間存在緊密的聯系,這適用于小型應用程序,但在大型應用程序中,將線程管理和創建與應用程序的其余部分分開是有意義的,封裝這些函數的對象稱為執行器,以下小節詳細描述了執行器。 執行器接口定義三個執行器對象類型。 線程池是最常見的執行器實現類型。 Fork/Join是一個利用多個處理器的...
摘要:大多數待遇豐厚的開發職位都要求開發者精通多線程技術并且有豐富的程序開發調試優化經驗,所以線程相關的問題在面試中經常會被提到。掌握了這些技巧,你就可以輕松應對多線程和并發面試了。進入等待通行準許時,所提供的對象。 最近看到網上流傳著,各種面試經驗及面試題,往往都是一大堆技術題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關線程的問題。Java語言一個重要的特點就...
摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務隊列,工作線程優先處理自身隊列中的任務或順序,由線程池構造時的參數決定,自身隊列為空時,以的順序隨機竊取其它隊列中的任務。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發于一世流云的專欄:https://segmentfau...
摘要:方法接受對象數組作為參數,目標是對數組進行升序排序。創建一個對象,并調用方法將它提交給線程池。此排序算法不直接返回結果給調用方,因此基于類。 分支/合并框架 說明 重點是那個浮點數數組排序的例子,從主函數展開,根據序號看 1、GitHub代碼歡迎star。你們輕輕的一點,對我鼓勵特大,我有一個習慣,看完別人的文章是會點贊的。 2、個人認為學習語言最好的方式就是模仿、思考別人為什么這么寫...
摘要:并不會為每個任務都創建工作線程,而是根據實際情況構造線程池時的參數確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
閱讀 1587·2021-10-18 13:35
閱讀 2365·2021-10-09 09:44
閱讀 819·2021-10-08 10:05
閱讀 2719·2021-09-26 09:47
閱讀 3571·2021-09-22 15:22
閱讀 435·2019-08-29 12:24
閱讀 2002·2019-08-29 11:06
閱讀 2860·2019-08-26 12:23