摘要:我們就可以將這些請求合并,達到一定數量我們統一提交??偨Y一個比較生動的例子給大家講解了一些多線程的具體運用。學習多線程應該多思考多動手,才會有比較好的效果。地址徒手擼框架系列文章地址徒手擼框架實現徒手擼框架實現
原文地址:https://www.xilidou.com/2018/01/22/merge-request/
在高并發系統中,我們經常遇到這樣的需求:系統產生大量的請求,但是這些請求實時性要求不高。我們就可以將這些請求合并,達到一定數量我們統一提交。最大化的利用系統性IO,提升系統的吞吐性能。
所以請求合并框架需要考慮以下兩個需求:
當請求收集到一定數量時提交數據
一段時間后如果請求沒有達到指定的數量也進行提交
我們就聊聊一如何實現這樣一個需求。
閱讀這篇文章你將會了解到:
ScheduledThreadPoolExecutor
阻塞隊列
線程安全的參數
LockSuppor的使用
設計思路和實現我們就聊一聊實現這個東西的具體思路是什么。希望大家能夠學習到分析問題,設計模塊的一些套路。
底層使用什么數據結構來持有需要合并的請求?
既然我們的系統是在高并發的環境下使用,那我們肯定不能使用,普通的ArrayList來持有。我們可以使用阻塞隊列來持有需要合并的請求。
我們的數據結構需要提供一個 add() 的方法給外部,用于提交數據。當外部add數據以后,需要檢查隊列里面的數據的個數是否達到我們限額?達到數量提交數據,不達到繼續等待。
數據結構還需要提供一個timeOut()的方法,外部有一個計時器定時調用這個timeOut方法,如果方法被調用,則直接向遠程提交數據。
條件滿足的時候線程執行提交動作,條件不滿足的時候線程應當暫停,等待隊列達到提交數據的條件。所以我們可以考慮使用 LockSuppor.park()和LockSuppor.unpark 來暫停和激活操作線程。
經過上面的分析,我們就有了這樣一個數據結構:
private static class FlushThread- implements Runnable{ private final String name; //隊列大小 private final int bufferSize; //操作間隔 private int flushInterval; //上一次提交的時間。 private volatile long lastFlushTime; private volatile Thread writer; //持有數據的阻塞隊列 private final BlockingQueue
- queue; //達成條件后具體執行的方法 private final Processor
- processor; //構造函數 public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor
- processor) { this.name = name; this.bufferSize = bufferSize; this.flushInterval = flushInterval; this.lastFlushTime = System.currentTimeMillis(); this.processor = processor; this.queue = new ArrayBlockingQueue<>(queueSize); } //外部提交數據的方法 public boolean add(Item item){ boolean result = queue.offer(item); flushOnDemand(); return result; } //提供給外部的超時方法 public void timeOut(){ //超過兩次提交超過時間間隔 if(System.currentTimeMillis() - lastFlushTime >= flushInterval){ start(); } } //解除線程的阻塞 private void start(){ LockSupport.unpark(writer); } //當前的數據是否大于提交的條件 private void flushOnDemand(){ if(queue.size() >= bufferSize){ start(); } } //執行提交數據的方法 public void flush(){ lastFlushTime = System.currentTimeMillis(); List
- temp = new ArrayList<>(bufferSize); int size = queue.drainTo(temp,bufferSize); if(size > 0){ try { processor.process(temp); }catch (Throwable e){ log.error("process error",e); } } } //根據數據的尺寸和時間間隔判斷是否提交 private boolean canFlush(){ return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval; } @Override public void run() { writer = Thread.currentThread(); writer.setName(name); while (!writer.isInterrupted()){ while (!canFlush()){ //如果線程沒有被打斷,且不達到執行的條件,則阻塞線程 LockSupport.park(this); } flush(); } } }
如何實現定時提交呢?
通常我們遇到定時相關的需求,首先想到的應該是使用 ScheduledThreadPoolExecutor定時來調用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()...那需要再努力學習,多看源碼了。
怎樣進一步的提升系統的吞吐量?
我們使用的FlushThread 實現了 Runnable 所以我們可以考慮使用線程池來持有多個FlushThread。
所以我們就有這樣的代碼:
public class Flusher- { private final FlushThread
- [] flushThreads; private AtomicInteger index; //防止多個線程同時執行。增加一個隨機數間隔 private static final Random r = new Random(); private static final int delta = 50; private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1); private static ExecutorService POOL = Executors.newCachedThreadPool(); public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor
- processor) { this.flushThreads = new FlushThread[threads]; if(threads > 1){ index = new AtomicInteger(); } for (int i = 0; i < threads; i++) { final FlushThread
- flushThread = new FlushThread
- (name+ "-" + i,bufferSiz,flushInterval,queueSize,processor); flushThreads[i] = flushThread; POOL.submit(flushThread); //定時調用 timeOut()方法。 TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS); } } // 對 index 取模,保證多線程都能被add public boolean add(Item item){ int len = flushThreads.length; if(len == 1){ return flushThreads[0].add(item); } int mod = index.incrementAndGet() % len; return flushThreads[mod].add(item); } //上文已經描述 private static class FlushThread
- implements Runnable{ ...省略 } }
面向接口編程,提升系統擴展性:
public interface Processor使用{ void process(List list); }
我們寫個測試方法測試一下:
//實現 Processor 將 String 全部輸出 public class PrintOutProcessor implements Processor{ @Override public void process(List list) { System.out.println("start flush"); list.forEach(System.out::println); System.out.println("end flush"); } }
public class Test { public static void main(String[] args) throws InterruptedException { FlusherstringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor()); int index = 1; while (true){ stringFlusher.add(String.valueOf(index++)); Thread.sleep(1000); } } }
執行的結果:
start flush 1 2 3 end flush start flush 4 5 6 7 end flush
我們發現并沒有達到10個數字就觸發了flush。因為出發了超時提交,雖然還沒有達到規定的5
個數據,但還是執行了 flush。
如果我們去除 Thread.sleep(1000); 再看看結果:
start flush 1 2 3 4 5 end flush start flush 6 7 8 9 10 end flush
每5個數一次提交。完美。。。。
總結一個比較生動的例子給大家講解了一些多線程的具體運用。學習多線程應該多思考多動手,才會有比較好的效果。希望這篇文章大家讀完以后有所收獲,歡迎交流。
github地址:https://github.com/diaozxin007/framework
徒手擼框架系列文章地址:
徒手擼框架--實現IoC
徒手擼框架--實現Aop
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/68354.html
摘要:我們繼續看代碼的意思是這個是一段內嵌匯編代碼。也就是在語言中使用匯編代碼。就是匯編版的比較并交換。就是保證在多線程情況下,不阻塞線程的填充和消費。微觀上看匯編的是實現操作系統級別的原子操作的基石。 原文地址:https://www.xilidou.com/2018/02/01/java-cas/ CAS 是現代操作系統,解決并發問題的一個重要手段,最近在看 eureka 的源碼的時候。...
摘要:徒手擼一個簡單的框架之前在牛逼哄哄的框架,底層到底什么原理得知了遠程過程調用簡單來說就是調用遠程的服務就像調用本地方法一樣,其中用到的知識有序列化和反序列化動態代理網絡傳輸動態加載反射這些知識點。 徒手擼一個簡單的RPC框架 之前在牛逼哄哄的 RPC 框架,底層到底什么原理得知了RPC(遠程過程調用)簡單來說就是調用遠程的服務就像調用本地方法一樣,其中用到的知識有序列化和反序列化、動態...
摘要:先來看代碼吧,一會松哥再慢慢解釋關于這一段自動配置,解釋如下首先注解表明這是一個配置類。本文的案例,松哥已經上傳到上了,地址。我們使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中。Starter 為我們帶來了眾多的自動化配置,有了這些自動化配置,我們可以不費吹灰之力就能搭建一個生產級開發環境,有的小伙伴會覺得這個 Starter 好神奇呀!其實 Starter 也都...
摘要:是一個組件庫目前擁有的組件語法編寫,無依賴原生模塊化,以上支持,請開啟靜態服務器預覽效果,靜態服務器傳送門采用變量配置樣式辛苦造輪子,歡迎來倉庫四月份找工作,求內推,坐標深圳寫在前面去年年底項目中嘗試著寫過一個分頁的組件,然后就有了寫的想法 QingUI是一個UI組件庫目前擁有的組件:DatePicker, TimePicker, Paginator, Tree, Cascader, ...
摘要:從而能夠進一步深入了解框架。至此我們框架開發完成。雖然說閱讀源碼是了解框架的最終手段。但是框架作為一個生產框架,為了保證通用和穩定,源碼必定是高度抽象,且處理大量細節。下一篇文章應該會是徒手擼框架實現。 原文地址:https://www.xilidou.com/2018/... Spring 作為 J2ee 開發事實上的標準,是每個Java開發人員都需要了解的框架。但是Spring 的...
閱讀 901·2021-09-22 15:17
閱讀 1924·2021-09-22 15:06
閱讀 2222·2021-09-08 09:35
閱讀 5109·2021-09-01 11:43
閱讀 3483·2019-08-30 15:55
閱讀 2156·2019-08-30 12:48
閱讀 3157·2019-08-30 12:45
閱讀 1787·2019-08-29 17:31