摘要:前言當(dāng)遇到大量數(shù)據(jù)導(dǎo)入時(shí),為了提高處理的速度,可以選擇使用多線程來批量處理這些處理。設(shè)計(jì)思路由于場景的特點(diǎn)是讀取快,寫入慢,如果是使用多線程處理,建議是數(shù)據(jù)寫入部分改造為多線程。
前言
當(dāng)遇到大量數(shù)據(jù)導(dǎo)入時(shí),為了提高處理的速度,可以選擇使用多線程來批量處理這些處理。常見的場景有:
大文件導(dǎo)入數(shù)據(jù)庫(這個(gè)文件不一定是標(biāo)準(zhǔn)的CSV可導(dǎo)入文件或者需要在內(nèi)存中經(jīng)過一定的處理)
數(shù)據(jù)同步(從第三方接口拉取數(shù)據(jù)處理后寫入自己的數(shù)據(jù)庫)
以上的場景有一個(gè)共性,這類數(shù)據(jù)導(dǎo)入的場景簡單來說就是將數(shù)據(jù)從一個(gè)數(shù)據(jù)源移動(dòng)到另外一個(gè)數(shù)據(jù)源,而其中必定可以分為兩步
數(shù)據(jù)讀取:從數(shù)據(jù)源讀取數(shù)據(jù)到內(nèi)存
數(shù)據(jù)寫入:將內(nèi)存中的數(shù)據(jù)寫入到另外一個(gè)數(shù)據(jù)源,可能存在數(shù)據(jù)處理
而且數(shù)據(jù)讀取的速度一般會(huì)比數(shù)據(jù)寫入的速度快很多,即讀取快,寫入慢。
設(shè)計(jì)思路由于場景的特點(diǎn)是讀取快,寫入慢,如果是使用多線程處理,建議是數(shù)據(jù)寫入部分改造為多線程。而數(shù)據(jù)讀取可以改造成批量讀取數(shù)據(jù)。簡單來說就是兩個(gè)要點(diǎn):
批量讀取數(shù)據(jù)
多線程寫入數(shù)據(jù)
示例多線程批量處理最簡單的方案是使用線程池來進(jìn)行處理,下面會(huì)通過一個(gè)模擬批量讀取和寫入的服務(wù),以及對這個(gè)服務(wù)的多線程寫入調(diào)用作為示例,展示如何多線程批量數(shù)據(jù)導(dǎo)入。
模擬服務(wù)import java.util.concurrent.atomic.AtomicLong; /** * 數(shù)據(jù)批量寫入用的模擬服務(wù) * * @author RJH * create at 2019-04-01 */ public class MockService { /** * 可讀取總數(shù) */ private long canReadTotal; /** * 寫入總數(shù) */ private AtomicLong writeTotal=new AtomicLong(0); /** * 寫入休眠時(shí)間(單位:毫秒) */ private final long sleepTime; /** * 構(gòu)造方法 * * @param canReadTotal * @param sleepTime */ public MockService(long canReadTotal, long sleepTime) { this.canReadTotal = canReadTotal; this.sleepTime = sleepTime; } /** * 批量讀取數(shù)據(jù)接口 * * @param num * @return */ public synchronized long readData(int num) { long readNum; if (canReadTotal >= num) { canReadTotal -= num; readNum = num; } else { readNum = canReadTotal; canReadTotal = 0; } //System.out.println("read data size:" + readNum); return readNum; } /** * 寫入數(shù)據(jù)接口 */ public void writeData() { try { // 休眠一定時(shí)間模擬寫入速度慢 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 寫入總數(shù)自增 System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet()); } /** * 獲取寫入的總數(shù) * * @return */ public long getWriteTotal() { return writeTotal.get(); } }批量數(shù)據(jù)處理器
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 基于線程池的多線程批量寫入處理器 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandler { private ExecutorService executorService; private MockService service; /** * 每次批量讀取的數(shù)據(jù)量 */ private int batch; /** * 線程個(gè)數(shù) */ private int threadNum; public SimpleBatchHandler(MockService service, int batch,int threadNum) { this.service = service; this.batch = batch; //使用固定數(shù)目的線程池 this.executorService = Executors.newFixedThreadPool(threadNum); } /** * 開始處理 */ public void startHandle() { // 開始處理的時(shí)間 long startTime = System.currentTimeMillis(); System.out.println("start handle time:" + startTime); long readData; while ((readData = service.readData(batch)) != 0) {// 批量讀取數(shù)據(jù),知道讀取不到數(shù)據(jù)才停止 for (long i = 0; i < readData; i++) { executorService.execute(() -> service.writeData()); } } // 關(guān)閉線程池 executorService.shutdown(); while (!executorService.isTerminated()) {//等待線程池中的線程執(zhí)行完 } // 結(jié)束時(shí)間 long endTime = System.currentTimeMillis(); System.out.println("end handle time:" + endTime); // 總耗時(shí) System.out.println("total handle time:" + (endTime - startTime) + "ms"); // 寫入總數(shù) System.out.println("total write num:" + service.getWriteTotal()); } }測試類
/** * SimpleBatchHandler的測試類 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandlerTest { public static void main(String[] args) { // 總數(shù) long total=100000; // 休眠時(shí)間 long sleepTime=100; // 每次拉取的數(shù)量 int batch=100; // 線程個(gè)數(shù) int threadNum=16; MockService mockService=new MockService(total,sleepTime); SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); } }運(yùn)行結(jié)果
start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分輸出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000分析
在單線程情況下的執(zhí)行時(shí)間應(yīng)該為total*sleepTime,即10000000ms,而改造為多線程后執(zhí)行時(shí)間為648447ms。
示例問題本示例存在一些問題,會(huì)在后續(xù)的博客中對本示例進(jìn)行優(yōu)化,同時(shí)分享給大家如何解決這些問題。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74062.html
摘要:哪吒社區(qū)技能樹打卡打卡貼函數(shù)式接口簡介領(lǐng)域優(yōu)質(zhì)創(chuàng)作者哪吒公眾號作者架構(gòu)師奮斗者掃描主頁左側(cè)二維碼,加入群聊,一起學(xué)習(xí)一起進(jìn)步歡迎點(diǎn)贊收藏留言前情提要無意間聽到領(lǐng)導(dǎo)們的談話,現(xiàn)在公司的現(xiàn)狀是碼農(nóng)太多,但能獨(dú)立帶隊(duì)的人太少,簡而言之,不缺干 ? 哪吒社區(qū)Java技能樹打卡?【打卡貼 day2...
摘要:陳建平說訓(xùn)練是十分重要的,尤其是對關(guān)注算法本身的研究者。代碼生成其實(shí)在中也十分簡單,陳建平不僅利用車道線識別模型向我們演示了如何使用生成高效的代碼,同時(shí)還展示了在脫離環(huán)境下運(yùn)行代碼進(jìn)行推斷的效果。 近日,Mathworks 推出了包含 MATLAB 和 Simulink 產(chǎn)品系列的 Release 2017b(R2017b),該版本大大加強(qiáng)了 MATLAB 對深度學(xué)習(xí)的支持,并簡化了工程師、...
閱讀 3366·2021-10-13 09:40
閱讀 2602·2021-10-08 10:17
閱讀 4007·2021-09-28 09:45
閱讀 939·2021-09-28 09:35
閱讀 1820·2019-08-30 10:51
閱讀 2912·2019-08-26 12:11
閱讀 1658·2019-08-26 10:41
閱讀 3104·2019-08-23 17:10