摘要:也是自帶的一個基于線程池設(shè)計的定時任務類。其每個調(diào)度任務都會分配到線程池中的一個線程執(zhí)行,所以其任務是并發(fā)執(zhí)行的,互不影響。
原創(chuàng)不易,如需轉(zhuǎn)載,請注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責任!!!
一、在JAVA開發(fā)領(lǐng)域,目前可以通過以下幾種方式進行定時任務 1、單機部署模式Timer:jdk中自帶的一個定時調(diào)度類,可以簡單的實現(xiàn)按某一頻度進行任務執(zhí)行。提供的功能比較單一,無法實現(xiàn)復雜的調(diào)度任務。
ScheduledExecutorService:也是jdk自帶的一個基于線程池設(shè)計的定時任務類。其每個調(diào)度任務都會分配到線程池中的一個線程執(zhí)行,所以其任務是并發(fā)執(zhí)行的,互不影響。
Spring Task:Spring提供的一個任務調(diào)度工具,支持注解和配置文件形式,支持Cron表達式,使用簡單但功能強大。
Quartz:一款功能強大的任務調(diào)度器,可以實現(xiàn)較為復雜的調(diào)度功能,如每月一號執(zhí)行、每天凌晨執(zhí)行、每周五執(zhí)行等等,還支持分布式調(diào)度,就是配置稍顯復雜。
2、分布式集群模式(不多介紹,簡單提一下) 問題:I、如何解決定時任務的多次執(zhí)行? II、如何解決任務的單點問題,實現(xiàn)任務的故障轉(zhuǎn)移?問題I的簡單思考:
1、固定執(zhí)行定時任務的機器(可以有效避免多次執(zhí)行的情況 ,缺點就是單點故障問題)。 2、借助Redis的過期機制和分布式鎖。 3、借助mysql的鎖機制等。成熟的解決方案:
1、Quartz:可以去看看這篇文章[Quartz分布式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。 2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)當當開發(fā)的彈性分布式任務調(diào)度系統(tǒng),采用zookeeper實現(xiàn)分布式協(xié)調(diào),實現(xiàn)任務高可用以及分片。 3、xxl-job:(https://github.com/xuxueli/xxl-job)是大眾點評員發(fā)布的分布式任務調(diào)度平臺,是一個輕量級分布式任務調(diào)度框架。 4、saturn:(https://github.com/vipshop/Saturn) 是唯品會提供一個分布式、容錯和高可用的作業(yè)調(diào)度服務框架。二、SpringTask實現(xiàn)定時任務(這里是基于springboot) 1、簡單的定時任務實現(xiàn) 使用方式:
使用@EnableScheduling注解開啟對定時任務的支持。 使用@Scheduled 注解即可,基于corn、fixedRate、fixedDelay等一些定時策略來實現(xiàn)定時任務。使用缺點:
1、多個定時任務使用的是同一個調(diào)度線程,所以任務是阻塞執(zhí)行的,執(zhí)行效率不高。 2、其次如果出現(xiàn)任務阻塞,導致一些場景的定時計算沒有實際意義,比如每天12點的一個計算任務被阻塞到1點去執(zhí)行,會導致結(jié)果并非我們想要的。使用優(yōu)點:
1、配置簡單 2、適用于單個后臺線程執(zhí)行周期任務,并且保證順序一致執(zhí)行的場景源碼分析:
//默認使用的調(diào)度器 if(this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } //可以看到SingleThreadScheduledExecutor指定的核心線程為1,說白了就是單線程執(zhí)行 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } //利用了DelayedWorkQueue延時隊列作為任務的存放隊列,這樣便可以實現(xiàn)任務延遲執(zhí)行或者定時執(zhí)行 public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
2、實現(xiàn)并發(fā)的定時任務 使用方式:
方式一:由1中我們知道之所以定時任務是阻塞執(zhí)行,是配置的線程池決定的,那就好辦了,換一個不就行了!直接上代碼:
@Configuration public class ScheduledConfig implements SchedulingConfigurer { @Autowired private TaskScheduler myThreadPoolTaskScheduler; @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { //簡單粗暴的方式直接指定 //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5)); //也可以自定義的線程池,方便線程的使用與維護,這里不多說了 scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler); } } @Bean(name = "myThreadPoolTaskScheduler") public TaskScheduler getMyThreadPoolTaskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.setThreadNamePrefix("Haina-Scheduled-"); taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //調(diào)度器shutdown被調(diào)用時等待當前被調(diào)度的任務完成 taskScheduler.setWaitForTasksToCompleteOnShutdown(true); //等待時長 taskScheduler.setAwaitTerminationSeconds(60); return taskScheduler; }
方式二:方式一的本質(zhì)改變了任務調(diào)度器默認使用的線程池,接下來這種是不改變調(diào)度器的默認線程池,而是把當前任務交給一個異步線程池去執(zhí)行
首先使用@EnableAsync 啟用異步任務
然后在定時任務的方法加上@Async即可,默認使用的線程池為SimpleAsyncTaskExecutor(該線程池默認來一個任務創(chuàng)建一個線程,就會不斷創(chuàng)建大量線程,極有可能壓爆服務器內(nèi)存。當然它有自己的限流機制,這里就不多說了,有興趣的自己翻翻源碼~)
項目中為了更好的控制線程的使用,我們可以自定義我們自己的線程池,使用方式@Async("myThreadPool")
廢話太多,直接上代碼: @Scheduled(fixedRate = 1000*10,initialDelay = 1000*20) @Async("myThreadPoolTaskExecutor") //@Async public void scheduledTest02(){ System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId()); } //自定義線程池 @Bean(name = "myThreadPoolTaskExecutor") public TaskExecutor getMyThreadPoolTaskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(20); taskExecutor.setMaxPoolSize(200); taskExecutor.setQueueCapacity(25); taskExecutor.setKeepAliveSeconds(200); taskExecutor.setThreadNamePrefix("Haina-ThreadPool-"); // 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為后者 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //調(diào)度器shutdown被調(diào)用時等待當前被調(diào)度的任務完成 taskExecutor.setWaitForTasksToCompleteOnShutdown(true); //等待時長 taskExecutor.setAwaitTerminationSeconds(60); taskExecutor.initialize(); return taskExecutor; }
線程池的使用心得(后續(xù)有專門文章來探討)
java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,對應與spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基礎(chǔ)上增加了新的特性,在spring環(huán)境下更容易使用和控制。
使用自定義的線程池能夠避免一些默認線程池造成的內(nèi)存溢出、阻塞等等問題,更貼合自己的服務特性
使用自定義的線程池便于對項目中線程的管理、維護以及監(jiān)控。
即便在非spring環(huán)境下也不要使用java默認提供的那幾種線程池,坑很多,阿里代碼規(guī)約不說了嗎,得相信大廠!!!
三、動態(tài)定時任務的實現(xiàn) 問題:使用@Scheduled注解來完成設(shè)置定時任務,但是有時候我們往往需要對周期性的時間的設(shè)置會做一些改變,或者要動態(tài)的啟停一個定時任務,那么這個時候使用此注解就不太方便了,原因在于這個注解中配置的cron表達式必須是常量,那么當我們修改定時參數(shù)的時候,就需要停止服務,重新部署。
解決辦法:
方式一:實現(xiàn)SchedulingConfigurer接口,重寫configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不過需要注意的是,此種方式修改了配置值后,需要在下一次調(diào)度結(jié)束后,才會更新調(diào)度器,并不會在修改配置值時實時更新,實時更新需要在修改配置值時額外增加相關(guān)邏輯處理。
@Configuration public class ScheduledConfig implements SchedulingConfigurer { @Autowired private TaskScheduler myThreadPoolTaskScheduler; @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5)); scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler); //可以實現(xiàn)動態(tài)調(diào)整定時任務的執(zhí)行頻率 scheduledTaskRegistrar.addTriggerTask( //1.添加任務內(nèi)容(Runnable) () -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()), //2.設(shè)置執(zhí)行周期(Trigger) triggerContext -> { //2.1 從數(shù)據(jù)庫動態(tài)獲取執(zhí)行周期 String cron = "0/2 * * * * ? "; //2.2 合法性校驗. // if (StringUtils.isEmpty(cron)) { // // Omitted Code .. // } //2.3 返回執(zhí)行周期(Date) return new CronTrigger(cron).nextExecutionTime(triggerContext); } ); } }
方式二:使用threadPoolTaskScheduler類可實現(xiàn)動態(tài)添加刪除功能,當然也可實現(xiàn)執(zhí)行頻率的調(diào)整
首先,我們要認識下這個調(diào)度類,它其實是對java中ScheduledThreadPoolExecutor的一個封裝改進后的產(chǎn)物,主要改進有以下幾點: 1、提供默認配置,因為是ScheduledThreadPoolExecutor,所以只有poolSize這一個默認參數(shù)。 2、支持自定義任務,通過傳入Trigger參數(shù)。 3、對任務出錯處理進行優(yōu)化,如果是重復性的任務,不拋出異常,通過日志記錄下來,不影響下次運行,如果是只執(zhí)行一次的任務,將異常往上拋。 順便說下ThreadPoolTaskExecutor相對于ThreadPoolExecutor的改進點: 1、提供默認配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他沒有默認配置 2、實現(xiàn)AsyncListenableTaskExecutor接口,支持對FutureTask添加success和fail的回調(diào),任務成功或失敗的時候回執(zhí)行對應回調(diào)方法。 3、因為是spring的工具類,所以拋出的RejectedExecutionException也會被轉(zhuǎn)換為spring框架的TaskRejectedException異常(這個無所謂) 4、提供默認ThreadFactory實現(xiàn),直接通過參數(shù)重載配置
扯了這么多,還是直接上代碼:
@Component public class DynamicTimedTask { private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class); //利用創(chuàng)建好的調(diào)度類統(tǒng)一管理 //@Autowired //@Qualifier("myThreadPoolTaskScheduler") //private ThreadPoolTaskScheduler myThreadPoolTaskScheduler; //接受任務的返回結(jié)果 private ScheduledFuture> future; @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; //實例化一個線程池任務調(diào)度類,可以使用自定義的ThreadPoolTaskScheduler @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler(); return new ThreadPoolTaskScheduler(); } /** * 啟動定時任務 * @return
*/ public boolean startCron() { boolean flag = false; //從數(shù)據(jù)庫動態(tài)獲取執(zhí)行周期 String cron = "0/2 * * * * ? "; future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron); if (future!=null){ flag = true; logger.info("定時check訓練模型文件,任務啟動成功!!!"); }else { logger.info("定時check訓練模型文件,任務啟動失敗!!!"); } return flag; } /** * 停止定時任務 * @return */ public boolean stopCron() { boolean flag = false; if (future != null) { boolean cancel = future.cancel(true); if (cancel){ flag = true; logger.info("定時check訓練模型文件,任務停止成功!!!"); }else { logger.info("定時check訓練模型文件,任務停止失敗!!!"); } }else { flag = true; logger.info("定時check訓練模型文件,任務已經(jīng)停止!!!"); } return flag; } class CheckModelFile implements Runnable{ @Override public void run() { //編寫你自己的業(yè)務邏輯 System.out.print("模型文件檢查完畢!!!") } } }四、總結(jié)
到此基于springtask下的定時任務的簡單使用算是差不多了,其中不免有些錯誤的地方,或者理解有偏頗的地方歡迎大家提出來!
基于分布式集群下的定時任務使用,后續(xù)有時間再繼續(xù)!!!
個人博客地址:
csdn:https://blog.csdn.net/tiantuo6513
cnblogs:https://www.cnblogs.com/baixianlong
segmentfault:https://segmentfault.com/u/baixianlong
github:https://github.com/xianlongbai
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77485.html
摘要:作為面試官,我是如何甄別應聘者的包裝程度語言和等其他語言的對比分析和主從復制的原理詳解和持久化的原理是什么面試中經(jīng)常被問到的持久化與恢復實現(xiàn)故障恢復自動化詳解哨兵技術(shù)查漏補缺最易錯過的技術(shù)要點大掃盲意外宕機不難解決,但你真的懂數(shù)據(jù)恢復嗎每秒 作為面試官,我是如何甄別應聘者的包裝程度Go語言和Java、python等其他語言的對比分析 Redis和MySQL Redis:主從復制的原理詳...
摘要:作為面試官,我是如何甄別應聘者的包裝程度語言和等其他語言的對比分析和主從復制的原理詳解和持久化的原理是什么面試中經(jīng)常被問到的持久化與恢復實現(xiàn)故障恢復自動化詳解哨兵技術(shù)查漏補缺最易錯過的技術(shù)要點大掃盲意外宕機不難解決,但你真的懂數(shù)據(jù)恢復嗎每秒 作為面試官,我是如何甄別應聘者的包裝程度Go語言和Java、python等其他語言的對比分析 Redis和MySQL Redis:主從復制的原理詳...
摘要:手動創(chuàng)建執(zhí)行線程存在以上問題,而線程池就是用來解決這些問題的。線程池詳解上面我們已經(jīng)知道了線程池的作用,而對于這樣一個好用,重要的工具,當然已經(jīng)為我們提供了實現(xiàn),這也是本篇文章的重點。,線程池一旦空閑超過時間,線程都將被回收。 showImg(https://segmentfault.com/img/remote/1460000018476903); 本文原創(chuàng)地址,我的博客:https...
閱讀 2327·2021-11-25 09:43
閱讀 3465·2021-10-25 09:48
閱讀 1337·2021-09-13 10:24
閱讀 2746·2019-08-29 15:07
閱讀 1285·2019-08-29 13:14
閱讀 3280·2019-08-29 12:22
閱讀 1363·2019-08-29 11:32
閱讀 3254·2019-08-29 11:23