摘要:什么是作為的子項目,是一款基于的企業批處理框架。首先,運行的基本單位是一個,一個就做一件批處理的事情。總結為我們提供了非常實用的功能,對批處理場景進行了完善的抽象,它不僅能實現小數據的遷移,也能應對大企業的大數據實踐應用。
前言
本文將從0到1講解一個Spring Batch是如何搭建并運行起來的。
本教程將講解從一個文本文件讀取數據,然后寫入MySQL。
Spring Batch 作為 Spring 的子項目,是一款基于 Spring 的企業批處理框架。通過它可以構建出健壯的企業批處理應用。Spring Batch 不僅提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及并發處理,同時還支持日志、監控、任務重啟與跳過等特性,大大簡化了批處理應用開發,將開發人員從復雜的任務配置管理過程中解放出來,使他們可以更多地去關注核心的業務處理過程。
更多的介紹可以參考官網:https://spring.io/projects/sp...
環境搭建我是用的Intellij Idea,用gradle構建。
可以使用Spring Initializr 來創建Spring boot應用。地址:https://start.spring.io/
首先選擇Gradle Project,然后選擇Java。填上你的Group和Artifact名字。
最后再搜索你需要用的包,比如Batch是一定要的。另外,由于我寫的Batch項目是使用JPA向MySQL插入數據,所以也添加了JPA和MySQL。其他可以根據自己需要添加。
點擊Generate Project,一個項目就創建好了。
Build.gralde文件大概就長這個樣子:
buildscript { ext { springBootVersion = "2.0.4.RELEASE" } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: "java" apply plugin: "idea" apply plugin: "org.springframework.boot" apply plugin: "io.spring.dependency-management" group = "com.demo" version = "0.0.1-SNAPSHOT" sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile("org.springframework.boot:spring-boot-starter-batch") compile("org.springframework.boot:spring-boot-starter-jdbc") compile("org.springframework.boot:spring-boot-starter-data-jpa") compile group: "com.fasterxml.jackson.datatype", name: "jackson-datatype-joda", version: "2.9.4" compile group: "org.jadira.usertype", name: "usertype.core", version: "6.0.1.GA" compile group: "mysql", name: "mysql-connector-java", version: "6.0.6", testCompile("org.springframework.boot:spring-boot-starter-test") testCompile("org.springframework.batch:spring-batch-test") }Spring Batch 結構
網上有很多Spring Batch結構和原理的講解,我就不詳細闡述了,我這里只講一下Spring Batch的一個基本層級結構。
首先,Spring Batch運行的基本單位是一個Job,一個Job就做一件批處理的事情。
一個Job包含很多Step,step就是每個job要執行的單個步驟。
如下圖所示,Step里面,會有Tasklet,Tasklet是一個任務單元,它是屬于可以重復利用的東西。
然后是Chunk,chunk就是數據塊,你需要定義多大的數據量是一個chunk。
Chunk里面就是不斷循環的一個流程,讀數據,處理數據,然后寫數據。Spring Batch會不斷的循環這個流程,直到批處理數據完成。
構建Spring Batch首先,我們需要一個全局的Configuration來配置所有的Job和一些全局配置。
代碼如下:
@Configuration @EnableAutoConfiguration @EnableBatchProcessing(modular = true) public class SpringBatchConfiguration { @Bean public ApplicationContextFactory firstJobContext() { return new GenericApplicationContextFactory(FirstJobConfiguration.class); } @Bean public ApplicationContextFactory secondJobContext() { return new GenericApplicationContextFactory(SecondJobConfiguration.class); } }
@EnableBatchProcessing是打開Batch。如果要實現多Job的情況,需要把EnableBatchProcessing注解的modular設置為true,讓每個Job使用自己的ApplicationConext。
比如上面代碼的就創建了兩個Job。
例子背景本博客的例子是遷移數據,數據源是一個文本文件,數據量是上百萬條,一行就是一條數據。然后我們通過Spring Batch幫我們把文本文件的數據全部遷移到MySQL數據庫對應的表里面。
假設我們遷移的數據是Message,那么我們就需要提前創建一個叫Message的和數據庫映射的數據類。
@Entity @Table(name = "message") public class Message { @Id @Column(name = "object_id", nullable = false) private String objectId; @Column(name = "content") private String content; @Column(name = "last_modified_time") private LocalDateTime lastModifiedTime; @Column(name = "created_time") private LocalDateTime createdTime; }構建Job
首先我們需要一個關于這個Job的Configuration,它將在SpringBatchConfigration里面被加載。
@Configuration @EnableAutoConfiguration @EnableBatchProcessing(modular = true) public class SpringBatchConfiguration { @Bean public ApplicationContextFactory messageMigrationJobContext() { return new GenericApplicationContextFactory(MessageMigrationJobConfiguration.class); } }
下面的關于構建Job的代碼都將寫在這個MessageMigrationJobConfiguration里面。
public class MessageMigrationJobConfiguration { }
我們先定義一個Job的Bean。
@Autowired private JobBuilderFactory jobBuilderFactory; @Bean public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) { return jobBuilderFactory.get("messageMigrationJob") .start(messageMigrationStep) .build(); }
jobBuilderFactory是注入進來的,get里面的就是job的名字。
這個job只有一個step。
接下來就是創建Step。
@Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReaderjsonMessageReader, @Qualifier("messageItemWriter") JpaItemWriter messageItemWriter, @Qualifier("errorWriter") Writer errorWriter) { return stepBuilderFactory.get("messageMigrationStep") . chunk(CHUNK_SIZE) .reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT) .listener(new MessageItemReadListener(errorWriter)) .writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT) .listener(new MessageWriteListener()) .build(); }
stepBuilderFactory是注入進來的,然后get里面是Step的名字。
我們的Step中可以構建很多東西,比如reader,processer,writer,listener等等。
下面我們就逐個來看看step里面的這些東西是如何使用的。
ChunkSpring batch在配置Step時采用的是基于Chunk的機制,即每次讀取一條數據,再處理一條數據,累積到一定數量后再一次性交給writer進行寫入操作。這樣可以最大化的優化寫入效率,整個事務也是基于Chunk來進行。
比如我們定義chunk size是50,那就意味著,spring batch處理了50條數據后,再統一向數據庫寫入。
這里有個很重要的點,chunk前面需要定義數據輸入類型和輸出類型,由于我們輸入是Message,輸出也是Message,所以兩個都直接寫Message了。
如果不定義這個類型,會報錯。
.Readerchunk(CHUNK_SIZE)
Reader顧名思義就是從數據源讀取數據。
Spring Batch給我們提供了很多好用實用的reader,基本能滿足我們所有需求。比如FlatFileItemReader,JdbcCursorItemReader,JpaPagingItemReader等。也可以自己實現Reader。
本例子里面,數據源是文本文件,所以我們就使用FlatFileItemReader。FlatFileItemReader是從文件里面一行一行的讀取數據。
首先需要設置文件路徑,也就是設置resource。
因為我們需要把一行文本映射為Message類,所以我們需要自己設置并實現LineMapper。
@Bean public FlatFileItemReaderLine MapperjsonMessageReader() { FlatFileItemReader reader = new FlatFileItemReader<>(); reader.setResource(new FileSystemResource(new File(MESSAGE_FILE))); reader.setLineMapper(new MessageLineMapper()); return reader; }
LineMapper的輸入就是獲取一行文本,和行號,然后轉換成Message。
在本例子里面,一行文本就是一個json對象,所以我們使用JsonParser來轉換成Message。
public class MessageLineMapper implements LineMapperProcessor{ private MappingJsonFactory factory = new MappingJsonFactory(); @Override public Message mapLine(String line, int lineNumber) throws Exception { JsonParser parser = factory.createParser(line); Map map = (Map) parser.readValueAs(Map.class); Message message = new Message(); ... // 轉換邏輯 return message; } }
由于本例子里面,數據是一行文本,通過reader變成Message的類,然后writer直接把Message寫入MySQL。所以我們的例子里面就不需要Processor,關于如何寫Processor其實和reader/writer是一樣的道理。
從它的接口可以看出,需要定義輸入和輸出的類型,把輸入I通過某些邏輯處理之后,返回輸出O。
public interface ItemProcessor { O process(I item) throws Exception; }Writer
Writer顧名思義就是把數據寫入到目標數據源里面。
Spring Batch同樣給我們提供很多好用實用的writer。比如JpaItemWriter,FlatFileItemWriter,HibernateItemWriter,JdbcBatchItemWriter等。同樣也可以自定義。
本例子里面,使用的是JpaItemWriter,可以直接把Message對象寫到數據庫里面。但是需要設置一個EntityManagerFactory,可以注入進來。
@Autowired private EntityManagerFactory entityManager; @Bean public JpaItemWritermessageItemWriter() { JpaItemWriter writer = new JpaItemWriter<>(); writer.setEntityManagerFactory(entityManager); return writer; }
另外,你需要配置數據庫的連接等東西。由于我使用的spring,所以直接在Application.properties里面配置如下:
spring.datasource.url=jdbc:mysql://database spring.datasource.username=username spring.datasource.password=password spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect spring.jpa.show-sql=true spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true spring.jackson.serialization.write-dates-as-timestamps=false spring.batch.initialize-schema=ALWAYS spring.jpa.hibernate.ddl-auto=update
spring.datasource相關的設置都是在配置數據庫的連接。
spring.batch.initialize-schema=always表示讓spring batch在數據庫里面創建默認的數據表。
spring.jpa.show-sql=true表示在控制臺輸出hibernate讀寫數據庫時候的SQL。
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect是在指定MySQL的方言。
Spring Batch同樣實現了非常完善全面的listener,listener很好理解,就是用來監聽每個步驟的結果。比如可以有監聽step的,有監聽job的,有監聽reader的,有監聽writer的。沒有你找不到的listener,只有你想不到的listener。
在本例子里面,我只關心,read的時候有沒有出錯,和write的時候有沒有出錯,所以,我只實現了ReadListener和WriteListener。
在read出錯的時候,把錯誤結果寫入一個多帶帶的error列表文件中。
public class MessageItemReadListener implements ItemReadListener{ private Writer errorWriter; public MessageItemReadListener(Writer errorWriter) { this.errorWriter = errorWriter; } @Override public void beforeRead() { } @Override public void afterRead(Message item) { } @Override public void onReadError(Exception ex) { errorWriter.write(format("%s%n", ex.getMessage())); } }
在write出錯的時候,也做同樣的事情,把出錯的原因寫入多帶帶的日志中。
public class MessageWriteListener implements ItemWriteListener{ @Autowired private Writer errorWriter; @Override public void beforeWrite(List extends Message> items) { } @Override public void afterWrite(List extends Message> items) { } @Override public void onWriteError(Exception exception, List extends Message> items) { errorWriter.write(format("%s%n", exception.getMessage())); for (Message message : items) { errorWriter.write(format("Failed writing message id: %s", message.getObjectId())); } } }
前面有說chuck機制,所以write的listener傳入參數是一個List,因為它是累積到一定的數量才一起寫入。
SkipSpring Batch提供了skip的機制,也就是說,如果出錯了,可以跳過。如果你不設置skip,那么一條數據出錯了,整個job都會掛掉。
設置skip的時候一定要設置什么Exception才需要跳過,并且跳過多少條數據。如果失敗的數據超過你設置的skip limit,那么job就會失敗。
你可以分別給reader和writer等設置skip機制。
writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)Retry
這個和Skip是一樣的原理,就是失敗之后可以重試,你同樣需要設置重試的次數。
同樣可以分別給reader,writer等設置retry機制。
如果同時設置了retry和skip,會先重試所有次數,然后再開始skip。比如retry是10次,skip是20,會先重試10次之后,再開始算第一次skip。
運行Job所有東西都準備好以后,就是如何運行了。
運行就是在main方法里面用JobLauncher去運行你制定的job。
下面是我寫的main方法,main方法的第一個參數是job的名字,這樣我們就可以通過不同的job名字跑不同的job了。
首先我們通過運行起來的Spring application得到jobRegistry,然后通過job的名字找到對應的job。
接著,我們就可以用jobLauncher去運行這個job了,運行的時候會傳一些參數,比如你job里面需要的文件路徑或者文件日期等,就可以通過這個jobParameters傳進去。如果沒有參數,可以默認傳當前時間進去。
public static void main(String[] args) { String jobName = args[0]; try { ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args); JobRegistry jobRegistry = context.getBean(JobRegistry.class); Job job = jobRegistry.getJob(jobName); JobLauncher jobLauncher = context.getBean(JobLauncher.class); JobExecution jobExecution = jobLauncher.run(job, createJobParams()); if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) { throw new RuntimeException(format("%s Job execution failed.", jobName)); } } catch (Exception e) { throw new RuntimeException(format("%s Job execution failed.", jobName)); } } private static JobParameters createJobParams() { return new JobParametersBuilder().addDate("date", new Date()).toJobParameters(); }
最后,把jar包編譯出來,在命令行執行下面的命令,就可以運行你的Spring Batch了。
java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME調試
調試主要依靠控制臺輸出的log,可以在application.properties里面設置log輸出的級別,比如你希望輸出INFO信息還是DEBUG信息。
基本上,通過查看log都能定位到問題。
logging.path=build/logs logging.file=${logging.path}/batch.log logging.level.com.easystudio=INFO logging.level.root=INFO log4j.logger.org.springframework.jdbc=INFO log4j.logger.org.springframework.batch=INFO logging.level.org.hibernate.SQL=INFOSpring Batch數據表
如果你的batch最終會寫入數據庫,那么Spring Batch會默認在你的數據庫里面創建一些batch相關的表,來記錄所有job/step運行的狀態和結果。
大部分表你都不需要關心,你只需要關心幾張表。
batch_job_instance:這張表能看到每次運行的job名字。
batch_job_execution:這張表能看到每次運行job的開始時間,結束時間,狀態,以及失敗后的錯誤消息是什么。
batch_step_execution:這張表你能看到更多關于step的詳細信息。比如step的開始時間,結束時間,提交次數,讀寫次數,狀態,以及失敗后的錯誤信息等。
Spring Batch為我們提供了非常實用的功能,對批處理場景進行了完善的抽象,它不僅能實現小數據的遷移,也能應對大企業的大數據實踐應用。它讓我們開發批處理應用可以事半功倍。
最后一個tips,搭建Spring Batch的過程中,會遇到各種各樣的問題。只要善用Google,都能找到答案。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76991.html
摘要:當我們的需求出現變動時,工廠模式會需要進行相應的變化。總結來說,要想成功進行一次阿里巴巴的面試,你需要了解甚至掌握以下內容語言,尤其是線程原理數據庫事務,加鎖,重點分布式設計模式可以說是涉及范圍非常廣了。 showImg(https://segmentfault.com/img/bV8cSY?w=576&h=432); 前言 今天本是一個陽光明媚,鳥語花香的日子。于是我決定在逛街中感受...
摘要:本文是作者自己對中線程的狀態線程間協作相關使用的理解與總結,不對之處,望指出,共勉。當中的的數目而不是已占用的位置數大于集合番一文通版集合番一文通版垃圾回收機制講得很透徹,深入淺出。 一小時搞明白自定義注解 Annotation(注解)就是 Java 提供了一種元程序中的元素關聯任何信息和著任何元數據(metadata)的途徑和方法。Annotion(注解) 是一個接口,程序可以通過...
摘要:結合我自己的經驗,我整理了一份全棧工程師進階路線圖,給大家參考。乾坤大挪移第一層第一層心法,主要都是基本語法,程序設計入門,悟性高者十天半月可成,差一點的到個月也說不準。 技術更新日新月異,對于初入職場的同學來說,經常會困惑該往那個方向發展,這一點松哥是深有體會的。 我剛開始學習 Java 那會,最大的問題就是不知道該學什么,以及學習的順序,我相信這也是很多初學者經常面臨的問題。?我...
摘要:概述庫用的是是我們非常常用的組件。有一個特征是選中之后無法取消。現實中取消的需求是常見且可以理解的。所以看到這個需求之后第一嘗試在組件之上搞一搞,這一搞就入坑了,現在就來理一理我的入坑之路吧。 概述 ui庫用的是iview . radio、radioGroup是我們非常常用的組件。radio有一個特征是選中之后無法取消。現實中取消radio的需求是常見且可以理解的。所以看到這個需求之后...
摘要:為了一探究竟,于是開啟了這次應用性能調優之旅。使用即時編譯器和都能輕輕松松的讓你的應用程序在不用做任何修改的情況下,直接提高或者更高的性能。 這是一份事后的總結。在經歷了調優過程踩的很多坑之后,我們最終完善并實施了初步的性能測試方案,通過真實的測試數據歸納出了 Laravel 開發過程中的一些實踐技巧。 0x00 源起 最近有同事反饋 Laravel 寫的應用程序響應有點慢、20幾個并...
閱讀 649·2021-11-25 09:43
閱讀 1668·2021-11-18 10:02
閱讀 1041·2021-10-15 09:39
閱讀 1890·2021-10-12 10:18
閱讀 2122·2021-09-22 15:43
閱讀 773·2021-09-22 15:10
閱讀 2088·2019-08-30 15:53
閱讀 988·2019-08-30 13:00