摘要:本文章是藍圖系列的第二篇教程。這就是請求回應(yīng)模式。好多屬性我們一個一個地解釋一個序列,作為的地址任務(wù)的編號任務(wù)的類型任務(wù)攜帶的數(shù)據(jù),以類型表示任務(wù)優(yōu)先級,以枚舉類型表示。默認優(yōu)先級為正常任務(wù)的延遲時間,默認是任務(wù)狀態(tài),以枚舉類型表示。
本文章是 Vert.x 藍圖系列 的第二篇教程。全系列:
Vert.x Blueprint 系列教程(一) | 待辦事項服務(wù)開發(fā)教程
Vert.x Blueprint 系列教程(二) | 開發(fā)基于消息的應(yīng)用 - Vert.x Kue 教程
Vert.x Blueprint 系列教程(三) | Micro-Shop 微服務(wù)應(yīng)用實戰(zhàn)
本系列已發(fā)布至Vert.x官網(wǎng):Vert.x Blueprint Tutorials
前言歡迎回到Vert.x 藍圖系列~在本教程中,我們將利用Vert.x開發(fā)一個基于消息的應(yīng)用 - Vert.x Kue,它是一個使用Vert.x開發(fā)的優(yōu)先級工作隊列,數(shù)據(jù)存儲使用的是 Redis。Vert.x Kue是 Automattic/kue 的Vert.x實現(xiàn)版本。我們可以使用Vert.x Kue來處理各種各樣的任務(wù),比如文件轉(zhuǎn)換、訂單處理等等。
通過本教程,你將會學習到以下內(nèi)容:
消息、消息系統(tǒng)以及事件驅(qū)動的運用
Vert.x Event Bus 的幾種事件機制(發(fā)布/訂閱、點對點模式)
設(shè)計 分布式 的Vert.x應(yīng)用
工作隊列的設(shè)計
Vert.x Service Proxy(服務(wù)代理,即異步RPC)的運用
更深層次的Redis運用
本教程是Vert.x 藍圖系列的第二篇教程,對應(yīng)的Vert.x版本為3.3.2。本教程中的完整代碼已托管至GitHub。
Vert.x的消息系統(tǒng)既然我們要用Vert.x開發(fā)一個基于消息的應(yīng)用,那么我們先來瞅一瞅Vert.x的消息系統(tǒng)吧~在Vert.x中,我們可以通過 Event Bus 來發(fā)送和接收各種各樣的消息,這些消息可以來自不同的Vertx實例。怎么樣,很酷吧?我們都將消息發(fā)送至Event Bus上的某個地址上,這個地址可以是任意的字符串。
Event Bus支持三種消息機制:發(fā)布/訂閱(Publish/Subscribe)、點對點(Point to point)以及請求/回應(yīng)(Request-Response)模式。下面我們就來看一看這幾種機制。
發(fā)布/訂閱模式在發(fā)布/訂閱模式中,消息被發(fā)布到Event Bus的某一個地址上,所有訂閱此地址的Handler都會接收到該消息并且調(diào)用相應(yīng)的處理邏輯。我們來看一看示例代碼:
EventBus eventBus = vertx.eventBus(); eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address System.out.println("1: " + r.body()); }); eventBus.consumer("foo.bar.baz", r -> { // subscribe to `foo.bar.baz` address System.out.println("2: " + r.body()); }); eventBus.publish("foo.bar.baz", "+1s"); // 向此地址發(fā)送消息
我們可以通過vertx.eventBus()方法獲取EventBus的引用,然后我們就可以通過consume方法訂閱某個地址的消息并且綁定一個Handler。接著我們通過publish向此地址發(fā)送消息。如果運行上面的例子,我們會得到一下結(jié)果:
2: +1s 1: +1s點對點模式
如果我們把上面的示例中的publish方法替代成send方法,上面的實例就變成點對點模式了。在點對點模式中,消息被發(fā)布到Event Bus的某一個地址上。Vert.x會將此消息傳遞給其中監(jiān)聽此地址的Handler之一。如果有多個Handler綁定到此地址,那么就使用輪詢算法隨機挑一個Handler傳遞消息。比如在此示例中,程序只會打印2: +1s或者1: +1s之中的一個。
請求/回應(yīng)模式當我們綁定的Handler接收到消息的時候,我們可不可以給消息的發(fā)送者回復呢?當然了!當我們通過send方法發(fā)送消息的時候,我們可以同時指定一個回復處理函數(shù)(reply handler)。然后當某個消息的訂閱者接收到消息的時候,它就可以給發(fā)送者回復消息;如果發(fā)送者接收到了回復,發(fā)送者綁定的回復處理函數(shù)就會被調(diào)用。這就是請求/回應(yīng)模式。
好啦,現(xiàn)在我們已經(jīng)粗略了解了Vert.x中的消息系統(tǒng) - Event Bus的基本使用,下面我們就看看Vert.x Kue的基本設(shè)計。有關(guān)更多關(guān)于Event Bus的信息請參考Vert.x Core Manual - Event Bus。
Vert.x Kue 架構(gòu)設(shè)計 Vert.x Kue 組件劃分在我們的項目中,我們將Vert.x Kue劃分為兩個模塊:
kue-core: 核心組件,提供優(yōu)先級隊列的功能
kue-http: Web組件,提供Web UI以及REST API
另外我們還提供一個示例模塊kue-example用于演示以及闡述如何使用Vert.x Kue。
既然我們的項目有兩個模塊,那么你一定會好奇:兩個模塊之間是如何進行通信的?并且如果我們寫自己的Kue應(yīng)用的話,我們該怎樣去調(diào)用Kue Core中的服務(wù)呢?不要著急,謎底將在后邊的章節(jié)中揭曉:-)
Vert.x Kue 核心模塊回顧一下Vert.x Kue的作用 - 優(yōu)先級工作隊列,所以在Vert.x Kue的核心模塊中我們設(shè)計了以下的類:
Job - 任務(wù)(作業(yè))數(shù)據(jù)實體
JobService - 異步服務(wù)接口,提供操作任務(wù)以及獲取數(shù)據(jù)的相關(guān)邏輯
KueWorker - 用于處理任務(wù)的Verticle
Kue - 工作隊列
前邊我們提到過,我們的兩個組件之間需要一種通信機制可以互相通信 - 這里我們使用Vert.x的集群模式,即以clustered的模式來部署Verticle。這樣的環(huán)境下的Event Bus同樣也是集群模式的,因此各個組件可以通過集群模式下的Event Bus進行通信。很不錯吧?在Vert.x的集群模式下,我們需要指定一個集群管理器ClusterManager。這里我們使用默認的HazelcastClusterManager,使用 Hazelcast 作為集群管理。
在Vert.x Kue中,我們將JobService服務(wù)發(fā)布至分布式的Event Bus上,這樣其它的組件就可以通過Event Bus調(diào)用該服務(wù)了。我們設(shè)計了一個KueVerticle用于注冊服務(wù)。Vert.x提供了Vert.x Service Proxy(服務(wù)代理組件),可以很方便地將服務(wù)注冊至Event Bus上,然后在其它地方獲取此服務(wù)的代理并調(diào)用。我們將在下面的章節(jié)中詳細介紹Vert.x Service Proxy。
基于Future的異步模式在我們的Vert.x Kue中,大多數(shù)的異步方法都是基于Future的。如果您看過藍圖系列的第一篇文章的話,您一定不會對這種模式很陌生。在Vert.x 3.3中,我們的Future支持基本的響應(yīng)式的操作,比如map和compose。它們用起來非常方便,因為我們可以將多個Future以響應(yīng)式的方式組合起來而不用擔心陷入回調(diào)地獄中。
Vert.x Kue中的事件正如我們在Vert.x Kue 特性介紹中提到的那樣,Vert.x Kue支持兩種級別的事件:任務(wù)事件(job events) 以及 隊列事件(queue events)。在Vert.x Kue中,我們設(shè)計了三種事件地址:
vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}: 某個特定任務(wù)的任務(wù)事件地址
vertx.kue.handler.workers.{eventType}: (全局)隊列事件地址
vertx.kue.handler.workers.{eventType}.{addressId}: 某個特定任務(wù)的內(nèi)部事件地址
在特性介紹文檔中,我們提到了以下幾種任務(wù)事件:
start 開始處理一個任務(wù) (onStart)
promotion 一個延期的任務(wù)時間已到,提升至工作隊列中 (onPromotion)
progress 任務(wù)的進度變化 (onProgress)
failed_attempt 任務(wù)處理失敗,但是還可以重試 (onFailureAttempt)
failed 任務(wù)處理失敗并且不能重試 (onFailure)
complete 任務(wù)完成 (onComplete)
remove 任務(wù)從后端存儲中移除 (onRemove)
隊列事件也相似,只不過需要加前綴job_。這些事件都會通過send方法發(fā)送至Event Bus上。每一個任務(wù)都有對應(yīng)的任務(wù)事件地址,因此它們能夠正確地接收到對應(yīng)的事件并進行相應(yīng)的處理邏輯。
特別地,我們還有兩個內(nèi)部事件:done和done_fail。done事件對應(yīng)一個任務(wù)在底層的處理已經(jīng)完成,而done_fail事件對應(yīng)一個任務(wù)在底層的處理失敗。這兩個事件使用第三種地址進行傳遞。
任務(wù)狀態(tài)在Vert.x Kue中,任務(wù)共有五種狀態(tài):
INACTIVE: 任務(wù)還未開始處理,在工作隊列中等待處理
ACTIVE: 任務(wù)正在處理中
COMPLETE: 任務(wù)處理完成
FAILED: 任務(wù)處理失敗
DELAYED: 任務(wù)延時處理,正在等待計時器時間到并提升至工作隊列中
我們使用狀態(tài)圖來描述任務(wù)狀態(tài)的變化:
以及任務(wù)狀態(tài)的變化伴隨的事件:
整體設(shè)計為了讓大家對Vert.x Kue的架構(gòu)有大致的了解,我用一幅圖來簡略描述整個Vert.x Kue的設(shè)計:
現(xiàn)在我們對Vert.x Kue的設(shè)計有了大致的了解了,下面我們就來看一看Vert.x Kue的代碼實現(xiàn)了~
項目結(jié)構(gòu)我們來開始探索Vert.x Kue的旅程吧!首先我們先從GitHub上clone源代碼:
git clone https://github.com/sczyh30/vertx-blueprint-job-queue.git
然后你可以把項目作為Gradle項目導入你的IDE中。(如何導入請參考相關(guān)IDE幫助文檔)
正如我們之前所提到的,我們的Vert.x Kue中有兩個功能模塊和一個實例模塊,因此我們需要在Gradle工程文件中定義三個子工程。我們來看一下本項目中的build.gradle文件:
configure(allprojects) { project -> ext { vertxVersion = "3.3.2" } apply plugin: "java" repositories { jcenter() } dependencies { compile("io.vertx:vertx-core:${vertxVersion}") compile("io.vertx:vertx-codegen:${vertxVersion}") compile("io.vertx:vertx-rx-java:${vertxVersion}") compile("io.vertx:vertx-hazelcast:${vertxVersion}") compile("io.vertx:vertx-lang-ruby:${vertxVersion}") testCompile("io.vertx:vertx-unit:${vertxVersion}") testCompile group: "junit", name: "junit", version: "4.12" } sourceSets { main { java { srcDirs += "src/main/generated" } } } compileJava { targetCompatibility = 1.8 sourceCompatibility = 1.8 } } project("kue-core") { dependencies { compile("io.vertx:vertx-redis-client:${vertxVersion}") compile("io.vertx:vertx-service-proxy:${vertxVersion}") } jar { archiveName = "vertx-blueprint-kue-core.jar" from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes "Main-Class": "io.vertx.core.Launcher" attributes "Main-Verticle": "io.vertx.blueprint.kue.queue.KueVerticle" } } task annotationProcessing(type: JavaCompile, group: "build") { // codegen source = sourceSets.main.java classpath = configurations.compile destinationDir = project.file("src/main/generated") options.compilerArgs = [ "-proc:only", "-processor", "io.vertx.codegen.CodeGenProcessor", "-AoutputDirectory=${project.projectDir}/src/main" ] } compileJava { targetCompatibility = 1.8 sourceCompatibility = 1.8 dependsOn annotationProcessing } } project("kue-http") { dependencies { compile(project(":kue-core")) compile("io.vertx:vertx-web:${vertxVersion}") compile("io.vertx:vertx-web-templ-jade:${vertxVersion}") } jar { archiveName = "vertx-blueprint-kue-http.jar" from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes "Main-Class": "io.vertx.core.Launcher" attributes "Main-Verticle": "io.vertx.blueprint.kue.http.KueHttpVerticle" } } } project("kue-example") { dependencies { compile(project(":kue-core")) } jar { archiveName = "vertx-blueprint-kue-example.jar" from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } manifest { attributes "Main-Class": "io.vertx.core.Launcher" attributes "Main-Verticle": "io.vertx.blueprint.kue.example.LearningVertxVerticle" } } } task wrapper(type: Wrapper) { gradleVersion = "2.12" }
(⊙o⊙)…比之前的待辦事項服務(wù)項目中的長不少誒。。。我們來解釋一下:
在configure(allprojects)作用域中,我們配置了一些全局信息(對所有子工程都適用)。
我們定義了三個子工程:kue-core、kue-http以及kue-example。這里我們來解釋一下里面用到的依賴。在kue-core中,vertx-redis-client用于Redis通信,vertx-service-proxy用于Event Bus上的服務(wù)代理。在kue-http中,我們將kue-core子工程作為它的一個依賴。vertx-web和vertx-web-templ-jade用于Kue Web端的開發(fā)。
任務(wù)annotationProcessing用于注解處理(Vert.x Codegen)。我們已經(jīng)在上一篇教程中介紹過了,這里就不展開講了。
我們還需要在 settings.gradle 中配置工程:
rootProject.name = "vertx-blueprint-job-queue" include "kue-core" include "kue-http" include "kue-example"
看完了配置文件以后,我們再來瀏覽一下我們的項目目錄結(jié)構(gòu):
. ├── build.gradle ├── kue-core │?? └── src │?? ├── main │?? │?? ├── java │?? │?? └── resources │?? └── test │?? ├── java │?? └── resources ├── kue-example │?? └── src │?? ├── main │?? │?? ├── java │?? │?? └── resources │?? └── test │?? ├── java │?? └── resources ├── kue-http │?? └── src │?? ├── main │?? │?? ├── java │?? │?? └── resources │?? └── test │?? ├── java │?? └── resources └── settings.gradle
在Gradle中,項目的源碼都位于{projectName}/src/main/java目錄內(nèi)。這篇教程是圍繞Vert.x Kue Core的,所以我們的代碼都在kue-core目錄中。
好啦!現(xiàn)在我們已經(jīng)對Vert.x Kue項目的整體結(jié)構(gòu)有了大致的了解了,下面我們開始源碼探索之旅!
任務(wù)實體 - 不僅僅是一個數(shù)據(jù)對象Vert.x Kue是用來處理任務(wù)的,因此我們先來看一下代表任務(wù)實體的Job類。Job類位于io.vertx.blueprint.kue.queue包下。代碼可能有點長,不要擔心,我們把它分成幾部分,分別來解析。
任務(wù)成員屬性我們先來看一下Job類中的成員屬性:
@DataObject(generateConverter = true) public class Job { // job properties private final String address_id; private long id = -1; private String zid; private String type; private JsonObject data; private Priority priority = Priority.NORMAL; private JobState state = JobState.INACTIVE; private long delay = 0; private int max_attempts = 1; private boolean removeOnComplete = false; private int ttl = 0; private JsonObject backoff; private int attempts = 0; private int progress = 0; private JsonObject result; // job metrics private long created_at; private long promote_at; private long updated_at; private long failed_at; private long started_at; private long duration; // ... }
我去。。。好多屬性!我們一個一個地解釋:
address_id: 一個UUID序列,作為Event Bus的地址
id: 任務(wù)的編號(id)
type: 任務(wù)的類型
data: 任務(wù)攜帶的數(shù)據(jù),以 JsonObject 類型表示
priority: 任務(wù)優(yōu)先級,以 Priority 枚舉類型表示。默認優(yōu)先級為正常(NORMAL)
delay: 任務(wù)的延遲時間,默認是 0
state: 任務(wù)狀態(tài),以 JobState 枚舉類型表示。默認狀態(tài)為等待(INACTIVE)
attempts: 任務(wù)已經(jīng)嘗試執(zhí)行的次數(shù)
max_attempts: 任務(wù)嘗試執(zhí)行次數(shù)的最大閾值
removeOnComplete: 代表任務(wù)完成時是否自動從后臺移除
zid: zset操作對應(yīng)的編號(zid),保持先進先出順序
ttl: TTL(Time to live)
backoff: 任務(wù)重試配置,以 JsonObject 類型表示
progress: 任務(wù)執(zhí)行的進度
result: 任務(wù)執(zhí)行的結(jié)果,以 JsonObject 類型表示
還有這些統(tǒng)計數(shù)據(jù):
created_at: 代表此任務(wù)創(chuàng)建的時間
promote_at: 代表此任務(wù)從延時狀態(tài)被提升至等待狀態(tài)時的時間
updated_at: 代表任務(wù)更新的時間
failed_at: 代表任務(wù)失敗的時間
started_at: 代表任務(wù)開始的時間
duration: 代表處理任務(wù)花費的時間,單位為毫秒(ms)
你可能注意到在 Job 類中還存在著幾個靜態(tài)成員變量:
private static Logger logger = LoggerFactory.getLogger(Job.class); private static Vertx vertx; private static RedisClient client; private static EventBus eventBus; public static void setVertx(Vertx v, RedisClient redisClient) { vertx = v; client = redisClient; eventBus = vertx.eventBus(); }
對于 logger 對象,我想大家應(yīng)該都很熟悉,它代表一個Vert.x Logger實例用于日志記錄。但是你一定想問為什么 Job 類中存在著一個Vertx類型的靜態(tài)成員。Job類不應(yīng)該是一個數(shù)據(jù)對象嗎?當然咯!Job類代表一個數(shù)據(jù)對象,但不僅僅是一個數(shù)據(jù)對象。這里我模仿了一些Automattic/kue的風格,把一些任務(wù)相關(guān)邏輯方法放到了Job類里,它們大多都是基于Future的異步方法,因此可以很方便地去調(diào)用以及進行組合變換。比如:
job.save() .compose(Job::updateNow) .compose(j -> j.log("good!"));
由于我們不能在Job類被JVM加載的時候就獲取Vertx實例,我們必須手動給Job類中的靜態(tài)Vertx成員賦值。這里我們是在Kue類中對其進行賦值的。當我們創(chuàng)建一個工作隊列的時候,Job類中的靜態(tài)成員變量會被初始化。同時為了保證程序的正確性,我們需要一個方法來檢測靜態(tài)成員變量是否初始化。當我們在創(chuàng)建一個任務(wù)的時候,如果靜態(tài)成員此時未被初始化,那么日志會給出警告:
private void _checkStatic() { if (vertx == null) { logger.warn("static Vertx instance in Job class is not initialized!"); } }
我們還注意到 Job 類也是由@DataObject注解修飾的。Vert.x Codegen可以處理含有@DataObject注解的類并生成對應(yīng)的JSON轉(zhuǎn)換器,并且Vert.x Service Proxy也需要數(shù)據(jù)對象。
在Job類中我們有四個構(gòu)造函數(shù)。其中address_id成員必須在一個任務(wù)被創(chuàng)建時就被賦值,默認情況下此地址用一個唯一的UUID字符串表示。每一個構(gòu)造函數(shù)中我們都要調(diào)用_checkStatic函數(shù)來檢測靜態(tài)成員變量是否被初始化。
任務(wù)事件輔助函數(shù)正如我們之前所提到的那樣,我們通過一個特定的地址vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}在分布式的Event Bus上發(fā)送和接收任務(wù)事件(job events)。所以我們提供了兩個用于發(fā)送和接收事件的輔助函數(shù)emit和on(類似于Node.js中的EventEmitter):
@Fluent publicJob on(String event, Handler > handler) { logger.debug("[LOG] On: " + Kue.getCertainJobAddress(event, this)); eventBus.consumer(Kue.getCertainJobAddress(event, this), handler); return this; } @Fluent public Job emit(String event, Object msg) { logger.debug("[LOG] Emit: " + Kue.getCertainJobAddress(event, this)); eventBus.send(Kue.getCertainJobAddress(event, this), msg); return this; }
在后面的代碼中,我們將頻繁使用這兩個輔助函數(shù)。
Redis中的存儲形式在我們探索相關(guān)的邏輯函數(shù)之前,我們先來描述一下Vert.x Kue的數(shù)據(jù)在Redis中是以什么樣的形式存儲的:
所有的key都在vertx_kue命名空間下(以vertx_kue:作為前綴)
vertx:kue:job:{id}: 存儲任務(wù)實體的map
vertx:kue:ids: 計數(shù)器,指示當前最大的任務(wù)ID
vertx:kue:job:types: 存儲所有任務(wù)類型的列表
vertx:kue:{type}:jobs: 指示所有等待狀態(tài)下的某種類型任務(wù)的列表
vertx_kue:jobs: 存儲所有任務(wù)zid的有序集合
vertx_kue:job:{state}: 存儲所有指定狀態(tài)的任務(wù)zid的有序集合
vertx_kue:jobs:{type}:{state}: 存儲所有指定狀態(tài)和類型的任務(wù)zid的有序集合
vertx:kue:job:{id}:log: 存儲指定id的任務(wù)對應(yīng)日志的列表
OK,下面我們就來看看Job類中重要的邏輯函數(shù)。
改變?nèi)蝿?wù)狀態(tài)我們之前提到過,Vert.x Kue中的任務(wù)一共有五種狀態(tài)。所有的任務(wù)相關(guān)的操作都伴隨著任務(wù)狀態(tài)的變換,因此我們先來看一下state方法的實現(xiàn),它用于改變?nèi)蝿?wù)的狀態(tài):
public Futurestate(JobState newState) { Future future = Future.future(); RedisClient client = RedisHelper.client(vertx, new JsonObject()); // use a new client to keep transaction JobState oldState = this.state; client.transaction().multi(r0 -> { // (1) if (r0.succeeded()) { if (oldState != null && !oldState.equals(newState)) { // (2) client.transaction().zrem(RedisHelper.getStateKey(oldState), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + oldState.name()), this.zid, _failure()); } client.transaction().hset(RedisHelper.getKey("job:" + this.id), "state", newState.name(), _failure()) // (3) .zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue(), this.zid, _failure()) .zadd(RedisHelper.getKey("jobs:" + this.type + ":" + newState.name()), this.priority.getValue(), this.zid, _failure()); switch (newState) { // dispatch different state case ACTIVE: // (4) client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.priority.getValue() < 0 ? this.priority.getValue() : -this.priority.getValue(), this.zid, _failure()); break; case DELAYED: // (5) client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.promote_at, this.zid, _failure()); break; case INACTIVE: // (6) client.transaction().lpush(RedisHelper.getKey(this.type + ":jobs"), "1", _failure()); break; default: } this.state = newState; client.transaction().exec(r -> { // (7) if (r.succeeded()) { future.complete(this); } else { future.fail(r.cause()); } }); } else { future.fail(r0.cause()); } }); return future.compose(Job::updateNow); }
首先我們先創(chuàng)建了一個Future對象。然后我們調(diào)用了 client.transaction().multi(handler) 函數(shù)開始一次Redis事務(wù) (1)。在Vert.x 3.3.2中,所有的Redis事務(wù)操作都移至RedisTransaction類中,所以我們需要先調(diào)用client.transaction()方法去獲取一個事務(wù)實例,然后調(diào)用multi代表事務(wù)塊的開始。
在multi函數(shù)傳入的Handler中,我們先判定當前的任務(wù)狀態(tài)。如果當前任務(wù)狀態(tài)不為空并且不等于新的任務(wù)狀態(tài),我們就將Redis中存儲的舊的狀態(tài)信息移除 (2)。為了方便起見,我們提供了一個RedisHelper輔助類,里面提供了一些生成特定地址以及編碼解碼zid的方法:
package io.vertx.blueprint.kue.util; import io.vertx.blueprint.kue.queue.JobState; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.redis.RedisClient; import io.vertx.redis.RedisOptions; public final class RedisHelper { private static final String VERTX_KUE_REDIS_PREFIX = "vertx_kue"; private RedisHelper() { } public static RedisClient client(Vertx vertx, JsonObject config) { return RedisClient.create(vertx, options(config)); } public static RedisOptions options(JsonObject config) { return new RedisOptions() .setHost(config.getString("redis.host", "127.0.0.1")) .setPort(config.getInteger("redis.port", 6379)); } public static String getKey(String key) { return VERTX_KUE_REDIS_PREFIX + ":" + key; } public static String getStateKey(JobState state) { return VERTX_KUE_REDIS_PREFIX + ":jobs:" + state.name(); } public static String createFIFO(long id) { String idLen = "" + ("" + id).length(); int len = 2 - idLen.length(); while (len-- > 0) idLen = "0" + idLen; return idLen + "|" + id; } public static String stripFIFO(String zid) { return zid.substring(zid.indexOf("|") + 1); } public static long numStripFIFO(String zid) { return Long.parseLong(zid.substring(zid.indexOf("|") + 1)); } }
所有的key都必須在vertx_kue命名空間下,因此我們封裝了一個getKey方法。我們還實現(xiàn)了createFIFO和stripFIFO方法用于生成zid以及解碼zid。zid的格式使用了Automattic/Kue中的格式。
回到state方法來。我們使用zrem(String key, String member, Handler
接下來我們使用hset方法來變更新的狀態(tài) (3),然后用zadd方法往vertx_kue:job:{state} 和 vertx_kue:jobs:{type}:{state}兩個有序集合中添加此任務(wù)的zid,同時傳遞一個權(quán)重(score)。這個非常重要,我們就是通過這個實現(xiàn)優(yōu)先級隊列的。我們直接使用priority對應(yīng)的值作為score。這樣,當我們需要從Redis中獲取任務(wù)的時候,我們就可以通過zpop方法獲取優(yōu)先級最高的任務(wù)。我們會在后面詳細講述。
不同的新狀態(tài)需要不同的操作。對于ACTIVE狀態(tài),我們通過zadd命令將zid添加至vertx_kue:jobs:ACTIVE有序集合中并賦予優(yōu)先級權(quán)值 (4)。對于DELAYED狀態(tài),我們通過zadd命令將zid添加至vertx_kue:jobs:DELAYED有序集合中并賦予提升時間(promote_at)權(quán)值 (5)。對于INACTIVE狀態(tài),我們向vertx:kue:{type}:jobs列表中添加一個元素 (6)。這些操作都是在Redis事務(wù)塊內(nèi)完成的。最后我們通過exec方法一并執(zhí)行這些事務(wù)操作 (7)。如果執(zhí)行成功,我們給future賦值(當前任務(wù))。最后我們返回future并且與updateNow方法相組合。
updateNow方法非常簡單,就是把updated_at的值設(shè)為當前時間,然后存到Redis中:
Future保存任務(wù)updateNow() { this.updated_at = System.currentTimeMillis(); return this.set("updated_at", String.valueOf(updated_at)); }
這里我們來看一下整個Job類中最重要的方法之一 - save方法,它的作用是保存任務(wù)至Redis中。
public Futuresave() { // check Objects.requireNonNull(this.type, "Job type cannot be null"); // (1) if (this.id > 0) return update(); // (2) Future future = Future.future(); // 生成id client.incr(RedisHelper.getKey("ids"), res -> { // (3) if (res.succeeded()) { this.id = res.result(); this.zid = RedisHelper.createFIFO(id); // (4) String key = RedisHelper.getKey("job:" + this.id); if (this.delay > 0) { this.state = JobState.DELAYED; } client.sadd(RedisHelper.getKey("job:types"), this.type, _failure()); // (5) this.created_at = System.currentTimeMillis(); this.promote_at = this.created_at + this.delay; // 保存任務(wù) client.hmset(key, this.toJson(), _completer(future, this)); // (6) } else { future.fail(res.cause()); } }); return future.compose(Job::update); // (7) }
首先,任務(wù)類型不能為空所以我們要檢查type是否為空 (1)。接著,如果當前任務(wù)的id大于0,則代表此任務(wù)已經(jīng)存儲過(因為id是存儲時分配),此時只需執(zhí)行更新操作(update)即可 (2)。然后我們創(chuàng)建一個Future對象,然后使用incr方法從vertx_kue:ids字段獲取一個新的id (3)。同時我們使用RedisHelper.createFIFO(id)方法來生成新的zid (4)。接著我們來判斷任務(wù)延時是否大于0,若大于0則將當前任務(wù)狀態(tài)設(shè)置為DELAYED。然后我們通過sadd方法將當前任務(wù)類型添加至vertx:kue:job:types列表中 (5) 并且保存任務(wù)創(chuàng)建時間(created_at)以及任務(wù)提升時間(promote_at)。經(jīng)過這一系列的操作后,所有的屬性都已準備好,所以我們可以利用hmset方法將此任務(wù)實體存儲至vertx:kue:job:{id}哈希表中 (6)。如果存儲操作成功,那么將當前任務(wù)實體賦給future,否則記錄錯誤。最后我們返回此future并且將其與update方法進行組合。
update方法進行一些更新操作,它的邏輯比較簡單:
Futureupdate() { Future future = Future.future(); this.updated_at = System.currentTimeMillis(); client.transaction().multi(_failure()) .hset(RedisHelper.getKey("job:" + this.id), "updated_at", String.valueOf(this.updated_at), _failure()) .zadd(RedisHelper.getKey("jobs"), this.priority.getValue(), this.zid, _failure()) .exec(_completer(future, this)); return future.compose(r -> this.state(this.state)); }
可以看到update方法只做了三件微小的工作:存儲任務(wù)更新時間、存儲zid以及更改當前任務(wù)狀態(tài)(組合state方法)。
最后總結(jié)一下將一個任務(wù)存儲到Redis中經(jīng)過的步驟:save -> update -> state :-)
移除任務(wù)移除任務(wù)非常簡單,借助zrem和del方法即可。我們來看一下其實現(xiàn):
public Futureremove() { Future future = Future.future(); client.transaction().multi(_failure()) .zrem(RedisHelper.getKey("jobs:" + this.stateName()), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs:" + this.type + ":" + this.stateName()), this.zid, _failure()) .zrem(RedisHelper.getKey("jobs"), this.zid, _failure()) .del(RedisHelper.getKey("job:" + this.id + ":log"), _failure()) .del(RedisHelper.getKey("job:" + this.id), _failure()) .exec(r -> { if (r.succeeded()) { this.emit("remove", new JsonObject().put("id", this.id)); future.complete(); } else { future.fail(r.cause()); } }); return future; }
注意到成功移除任務(wù)時,我們會向Event Bus上的特定地址發(fā)送remove任務(wù)事件。此事件包含著被移除任務(wù)的id。
監(jiān)聽任務(wù)事件我們可以通過幾種 onXXX 方法來監(jiān)聽任務(wù)事件:
@Fluent public Job onComplete(HandlercompleteHandler) { this.on("complete", message -> { completeHandler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onFailure(Handler failureHandler) { this.on("failed", message -> { failureHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onFailureAttempt(Handler failureHandler) { this.on("failed_attempt", message -> { failureHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onPromotion(Handler handler) { this.on("promotion", message -> { handler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onStart(Handler handler) { this.on("start", message -> { handler.handle(new Job((JsonObject) message.body())); }); return this; } @Fluent public Job onRemove(Handler removeHandler) { this.on("start", message -> { removeHandler.handle((JsonObject) message.body()); }); return this; } @Fluent public Job onProgress(Handler progressHandler) { this.on("progress", message -> { progressHandler.handle((Integer) message.body()); }); return this; }
注意到不同的事件,對應(yīng)接收的數(shù)據(jù)類型也有差異。我們來說明一下:
onComplete、onPromotion 以及 onStart: 發(fā)送的數(shù)據(jù)是對應(yīng)的Job對象
onFailure and onFailureAttempt: 發(fā)送的數(shù)據(jù)是JsonObject類型的,其格式類似于:
{ "job": {}, "extra": { "message": "some_error" } }
onProgress: 發(fā)送的數(shù)據(jù)是當前任務(wù)進度
onRemove: 發(fā)送的數(shù)據(jù)是JsonObject類型的,其中id代表被移除任務(wù)的編號
更新任務(wù)進度我們可以通過progress方法來更新任務(wù)進度。看一下其實現(xiàn):
public Futureprogress(int complete, int total) { int n = Math.min(100, complete * 100 / total); // (1) this.emit("progress", n); // (2) return this.setProgress(n) // (3) .set("progress", String.valueOf(n)) .compose(Job::updateNow); }
progress方法接受兩個參數(shù):第一個是當前完成的進度值,第二個是完成狀態(tài)需要的進度值。我們首先計算出當前的進度 (1),然后向特定地址發(fā)送progress事件 (2)。最后我們將進度存儲至Redis中并更新時間,返回Future (3)。
任務(wù)失敗以及重試機制當一個任務(wù)處理失敗時,如果它有剩余的重試次數(shù),Vert.x Kue會自動調(diào)用failAttempt方法進行重試。我們來看一下failAttempt方法的實現(xiàn):
FuturefailedAttempt(Throwable err) { return this.error(err) .compose(Job::failed) .compose(Job::attemptInternal); }
(⊙o⊙)非常簡短吧~實際上,failAttempt方法是三個異步方法的組合:error、failed以及attemptInternal。當一個任務(wù)需要進行重試的時候,我們首先向Event Bus發(fā)布 error 隊列事件并且在Redis中記錄日志,然后將當前的任務(wù)狀態(tài)置為FAILED,最后重新處理此任務(wù)。
我們先來看一下error方法:
public Futureerror(Throwable ex) { return this.emitError(ex) .set("error", ex.getMessage()) .compose(j -> j.log("error | " + ex.getMessage())); }
它的邏輯很簡單:首先我們向Event Bus發(fā)布 錯誤 事件,然后記錄錯誤日志即可。這里我們封裝了一個發(fā)布錯誤的函數(shù)emitError:
@Fluent public Job emitError(Throwable ex) { JsonObject errorMessage = new JsonObject().put("id", this.id) .put("message", ex.getMessage()); eventBus.publish(Kue.workerAddress("error"), errorMessage); eventBus.send(Kue.getCertainJobAddress("error", this), errorMessage); return this; }
其中發(fā)送的錯誤信息格式類似于下面的樣子:
{ "id": 2052, "message": "some error" }
接下來我們再來看一下failed方法的實現(xiàn):
public Futurefailed() { this.failed_at = System.currentTimeMillis(); return this.updateNow() .compose(j -> j.set("failed_at", String.valueOf(j.failed_at))) .compose(j -> j.state(JobState.FAILED)); }
非常簡單,首先我們更新任務(wù)的更新時間和失敗時間,然后通過state方法將當前任務(wù)狀態(tài)置為FAILED即可。
任務(wù)重試的核心邏輯在attemptInternal方法中:
private FutureattemptInternal() { int remaining = this.max_attempts - this.attempts; // (1) if (remaining > 0) { // 還有重試次數(shù) return this.attemptAdd() // (2) .compose(Job::reattempt) // (3) .setHandler(r -> { if (r.failed()) { this.emitError(r.cause()); // (4) } }); } else if (remaining == 0) { // (5) return Future.failedFuture("No more attempts"); } else { // (6) return Future.failedFuture(new IllegalStateException("Attempts Exceeded")); } }
在我們的Job數(shù)據(jù)對象中,我們存儲了最大重試次數(shù)max_attempts以及已經(jīng)重試的次數(shù)attempts,所以我們首先根據(jù)這兩個數(shù)據(jù)計算剩余的重試次數(shù)remaining (1)。如果還有剩余次數(shù)的話,我們就先調(diào)用attemptAdd方法增加一次已重試次數(shù)并 (2),然后我們調(diào)用reattempt方法執(zhí)行真正的任務(wù)重試邏輯 (3)。最后返回這兩個異步方法組合的Future。如果其中一個過程出現(xiàn)錯誤,我們就發(fā)布error事件 (4)。如果沒有剩余次數(shù)了或者超出剩余次數(shù)了,我們直接返回錯誤。
在我們解析reattempt方法之前,我們先來回顧一下Vert.x Kue中的任務(wù)失敗恢復機制。Vert.x Kue支持延時重試機制(retry backoff),并且支持不同的策略(如 fixed 以及 exponential)。之前我們提到Job類中有一個backoff成員變量,它用于配置延時重試的策略。它的格式類似于這樣:
{ "type": "fixed", "delay": 5000 }
延時重試機制的實現(xiàn)在getBackoffImpl方法中,它返回一個Function
private FunctiongetBackoffImpl() { String type = this.backoff.getString("type", "fixed"); // (1) long _delay = this.backoff.getLong("delay", this.delay); // (2) switch (type) { case "exponential": // (3) return attempts -> Math.round(_delay * 0.5 * (Math.pow(2, attempts) - 1)); case "fixed": default: // (4) return attempts -> _delay; } }
首先我們從backoff配置中獲取延遲重試策略。目前Vert.x Kue支持兩種策略:fixed 和 exponential。前者采用固定延遲時間,而后者采用指數(shù)增長型延遲時間。默認情況下Vert.x Kue會采用fixed策略 (1)。接下來我們從backoff配置中獲取延遲時間,如果配置中沒有指定,那么就使用任務(wù)對象中的延遲時間delay (2)。接下來就是根據(jù)具體的策略進行計算了。對于指數(shù)型延遲,我們計算[delay * 0.5 * 2^attempts]作為延遲時間 (3);對于固定型延遲策略,我們直接使用獲取到的延遲時間 (4)。
好啦,現(xiàn)在回到“真正的重試”方法 —— reattempt方法來:
private Futurereattempt() { if (this.backoff != null) { long delay = this.getBackoffImpl().apply(attempts); // (1) return this.setDelay(delay) .setPromote_at(System.currentTimeMillis() + delay) .update() // (2) .compose(Job::delayed); // (3) } else { return this.inactive(); // (4) } }
首先我們先檢查backoff配置是否存在,若存在則計算出對應(yīng)的延時時間 (1) 并且設(shè)定delay和promote_at屬性的值然后保存至Redis中 (2)。接著我們通過delayed方法將任務(wù)的狀態(tài)設(shè)為延時(DELAYED) (3)。如果延時重試配置不存在,我們就通過inactive方法直接將此任務(wù)置入工作隊列中 (4)。
這就是整個任務(wù)重試功能的實現(xiàn),也不是很復雜蛤?觀察上面的代碼,我們可以發(fā)現(xiàn)Future組合無處不在。這種響應(yīng)式的組合非常方便。想一想如果我們用回調(diào)的異步方式來寫代碼的話,我們很容易陷入回調(diào)地獄中(⊙o⊙)。。。幾個回調(diào)嵌套起來總顯得不是那么優(yōu)美和簡潔,而用響應(yīng)式的、可組合的Future就可以有效地避免這個問題。
不錯!到現(xiàn)在為止我們已經(jīng)探索完Job類的源碼了~下面我們來看一下JobService類。
Event Bus 服務(wù) - JobService在本章節(jié)中我們來探索一下JobService接口及其實現(xiàn) —— 它包含著各種普通的操作和統(tǒng)計Job的邏輯。
異步RPC我們的JobService是一個通用邏輯接口,因此我們希望應(yīng)用中的每一個組件都能訪問此服務(wù),即進行RPC。在Vert.x中,我們可以將服務(wù)注冊至Event Bus上,然后其它組件就可以通過Event Bus來遠程調(diào)用注冊的服務(wù)了。
傳統(tǒng)的RPC有一個缺點:消費者需要阻塞等待生產(chǎn)者的回應(yīng)。你可能想說:這是一種阻塞模型,和Vert.x推崇的異步開發(fā)模式不相符。沒錯!而且,傳統(tǒng)的RPC不是真正面向失敗設(shè)計的。
還好,Vert.x提供了一種高效的、響應(yīng)式的RPC —— 異步RPC。我們不需要等待生產(chǎn)者的回應(yīng),而只需要傳遞一個Handler
所以講到這里,你可能想問:到底怎么在Event Bus上注冊服務(wù)呢?我們是不是需要寫一大堆的邏輯去包裝和發(fā)送信息,然后在另一端解碼信息并進行調(diào)用呢?不,這太麻煩了!有了Vert.x 服務(wù)代理,我們不需要這么做!Vert.x提供了一個組件 Vert.x Service Proxy 來自動生成服務(wù)代理。有了它的幫助,我們就只需要按照規(guī)范設(shè)計我們的異步服務(wù)接口,然后用@ProxyGen注解修飾即可。
異步服務(wù)接口@ProxyGen注解的限制
@ProxyGen注解的使用有諸多限制。比如,所有的異步方法都必須是基于回調(diào)的,也就是說每個方法都要接受一個Handler> 類型的參數(shù)。并且,類型R也是有限制的 —— 只允許基本類型以及數(shù)據(jù)對象類型。詳情請參考官方文檔。
我們來看一下JobService的源碼:
@ProxyGen @VertxGen public interface JobService { static JobService create(Vertx vertx, JsonObject config) { return new JobServiceImpl(vertx, config); } static JobService createProxy(Vertx vertx, String address) { return ProxyHelper.createProxy(JobService.class, vertx, address); } /** * 獲取任務(wù),按照優(yōu)先級順序 * * @param id job id * @param handler async result handler */ @Fluent JobService getJob(long id, Handler> handler); /** * 刪除任務(wù) * * @param id job id * @param handler async result handler */ @Fluent JobService removeJob(long id, Handler > handler); /** * 判斷任務(wù)是否存在 * * @param id job id * @param handler async result handler */ @Fluent JobService existsJob(long id, Handler > handler); /** * 獲取任務(wù)日志 * * @param id job id * @param handler async result handler */ @Fluent JobService getJobLog(long id, Handler > handler); /** * 獲取某一范圍內(nèi)某個指定狀態(tài)下的任務(wù)列表 * * @param state expected job state * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRangeByState(String state, long from, long to, String order, Handler >> handler); /** * 獲取某一范圍內(nèi)某個指定狀態(tài)和類型下的任務(wù)列表 * * @param type expected job type * @param state expected job state * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRangeByType(String type, String state, long from, long to, String order, Handler >> handler); /** * 獲取某一范圍內(nèi)的任務(wù)列表(按照順序或倒序) * * @param from from * @param to to * @param order range order * @param handler async result handler */ @Fluent JobService jobRange(long from, long to, String order, Handler >> handler); // 統(tǒng)計函數(shù) /** * 獲取指定狀態(tài)和類型下的任務(wù)的數(shù)量 * * @param type job type * @param state job state * @param handler async result handler */ @Fluent JobService cardByType(String type, JobState state, Handler > handler); /** * 獲取某個狀態(tài)下的任務(wù)的數(shù)量 * * @param state job state * @param handler async result handler */ @Fluent JobService card(JobState state, Handler > handler); /** * 獲取COMPLETE狀態(tài)任務(wù)的數(shù)量 * * @param type job type; if null, then return global metrics * @param handler async result handler */ @Fluent JobService completeCount(String type, Handler > handler); /** * 獲取FAILED狀態(tài)任務(wù)的數(shù)量 * * @param type job type; if null, then return global metrics */ @Fluent JobService failedCount(String type, Handler > handler); /** * 獲取INACTIVE狀態(tài)任務(wù)的數(shù)量 * * @param type job type; if null, then return global metrics */ @Fluent JobService inactiveCount(String type, Handler > handler); /** * 獲取ACTIVE狀態(tài)任務(wù)的數(shù)量 * * @param type job type; if null, then return global metrics */ @Fluent JobService activeCount(String type, Handler > handler); /** * 獲取DELAYED狀態(tài)任務(wù)的數(shù)量 * * @param type job type; if null, then return global metrics */ @Fluent JobService delayedCount(String type, Handler > handler); /** * 獲取當前存在的所有任務(wù)類型 * * @param handler async result handler */ @Fluent JobService getAllTypes(Handler >> handler); /** * 獲取指定狀態(tài)下的所有任務(wù)的ID * * @param state job state * @param handler async result handler */ @Fluent JobService getIdsByState(JobState state, Handler >> handler); /** * 工作隊列運行時間(ms) * * @param handler async result handler */ @Fluent JobService getWorkTime(Handler > handler); }
可以看到我們還為JobService接口添加了@VertxGen注解,Vert.x Codegen可以處理此注解生成多種語言版本的服務(wù)。
在JobService接口中我們還定義了兩個靜態(tài)方法:create用于創(chuàng)建一個任務(wù)服務(wù)實例,createProxy用于創(chuàng)建一個服務(wù)代理。
JobService接口中包含一些任務(wù)操作和統(tǒng)計的相關(guān)邏輯,每個方法的功能都已經(jīng)在注釋中闡述了,因此我們就直接來看它的實現(xiàn)吧~
任務(wù)服務(wù)的實現(xiàn)JobService接口的實現(xiàn)位于JobServiceImpl類中,代碼非常長,因此這里就不貼代碼了。。。大家可以對照GitHub中的代碼讀下面的內(nèi)容。
getJob: 獲取任務(wù)的方法非常簡單。直接利用hgetall命令從Redis中取出對應(yīng)的任務(wù)即可。
removeJob: 我們可以將此方法看作是getJob和Job#remove兩個方法的組合。
existsJob: 使用exists命令判斷對應(yīng)id的任務(wù)是否存在。
getJobLog: 使用lrange命令從vertx_kue:job:{id}:log列表中取出日志。
rangeGeneral: 使用zrange命令獲取一定范圍內(nèi)的任務(wù),這是一個通用方法。
zrange 操作
zrange 返回某一有序集合中某個特定范圍內(nèi)的元素。詳情請見ZRANGE - Redis。
以下三個方法復用了rangeGeneral方法:
jobRangeByState: 指定狀態(tài),對應(yīng)的key為vertx_kue:jobs:{state}。
jobRangeByType: 指定狀態(tài)和類型,對應(yīng)的key為vertx_kue:jobs:{type}:{state}。
jobRange: 對應(yīng)的key為vertx_kue:jobs。
這兩個通用方法用于任務(wù)數(shù)量的統(tǒng)計:
cardByType: 利用zcard命令獲取某一指定狀態(tài)和類型下任務(wù)的數(shù)量。
card: 利用zcard命令獲取某一指定狀態(tài)下任務(wù)的數(shù)量。
下面五個輔助統(tǒng)計方法復用了上面兩個通用方法:
completeCount
failedCount
delayedCount
inactiveCount
activeCount
接著看:
getAllTypes: 利用smembers命令獲取vertx_kue:job:types集合中存儲的所有的任務(wù)類型。
getIdsByState: 使用zrange獲取某一指定狀態(tài)下所有任務(wù)的ID。
getWorkTime: 使用get命令從vertx_kue:stats:work-time中獲取Vert.x Kue的工作時間。
注冊任務(wù)服務(wù)既然完成了JobService的實現(xiàn),接下來我們來看一下如何利用Service Proxy將服務(wù)注冊至Event Bus上。這里我們還需要一個KueVerticle來創(chuàng)建要注冊的服務(wù)實例,并且將其注冊至Event Bus上。
打開io.vertx.blueprint.kue.queue.KueVerticle類的源碼:
package io.vertx.blueprint.kue.queue; import io.vertx.blueprint.kue.service.JobService; import io.vertx.blueprint.kue.util.RedisHelper; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.redis.RedisClient; import io.vertx.serviceproxy.ProxyHelper; public class KueVerticle extends AbstractVerticle { private static Logger logger = LoggerFactory.getLogger(Job.class); public static final String EB_JOB_SERVICE_ADDRESS = "vertx.kue.service.job.internal"; // (1) private JsonObject config; private JobService jobService; @Override public void start(Futurefuture) throws Exception { this.config = config(); this.jobService = JobService.create(vertx, config); // (2) // create redis client RedisClient redisClient = RedisHelper.client(vertx, config); redisClient.ping(pr -> { // (3) test connection if (pr.succeeded()) { logger.info("Kue Verticle is running..."); // (4) register job service ProxyHelper.registerService(JobService.class, vertx, jobService, EB_JOB_SERVICE_ADDRESS); future.complete(); } else { logger.error("oops!", pr.cause()); future.fail(pr.cause()); } }); } }
首先我們需要定義一個地址用于服務(wù)注冊 (1)。在start方法中,我們創(chuàng)建了一個任務(wù)服務(wù)實例 (2),然后通過ping命令測試Redis連接 (3)。如果連接正常,那么我們就可以通過ProxyHelper類中的registerService輔助方法來將服務(wù)實例注冊至Event Bus上 (4)。
這樣,一旦我們在集群模式下部署KueVerticle,服務(wù)就會被發(fā)布至Event Bus上,然后我們就可以在其他組件中去遠程調(diào)用此服務(wù)了。很奇妙吧!
Kue - 工作隊列Kue類代表著工作隊列。我們來看一下Kue類的實現(xiàn)。首先先看一下其構(gòu)造函數(shù):
public Kue(Vertx vertx, JsonObject config) { this.vertx = vertx; this.config = config; this.jobService = JobService.createProxy(vertx, EB_JOB_SERVICE_ADDRESS); this.client = RedisHelper.client(vertx, config); Job.setVertx(vertx, RedisHelper.client(vertx, config)); // init static vertx instance inner job }
這里我們需要注意兩點:第一點,我們通過createProxy方法來創(chuàng)建一個JobService的服務(wù)代理;第二點,之前提到過,我們需要在這里初始化Job類中的靜態(tài)成員變量。
基于Future的封裝我們的JobService是基于回調(diào)的,這是服務(wù)代理組件所要求的。為了讓Vert.x Kue更加響應(yīng)式,使用起來更加方便,我們在Kue類中以基于Future的異步模式封裝了JobService中的所有異步方法。這很簡單,比如這個方法:
@Fluent JobService getJob(long id, Handler> handler);
可以這么封裝:
public Future> getJob(long id) { Future > future = Future.future(); jobService.getJob(id, r -> { if (r.succeeded()) { future.complete(Optional.ofNullable(r.result())); } else { future.fail(r.cause()); } }); return future; }
其實就是加一層Future。其它的封裝過程也類似所以我們就不細說了。
process和processBlocking方法process和processBlocking方法用于處理任務(wù):
public Kue process(String type, int n, Handlerhandler) { if (n <= 0) { throw new IllegalStateException("The process times must be positive"); } while (n-- > 0) { processInternal(type, handler, false); }f setupTimers(); return this; } public Kue process(String type, Handler handler) { processInternal(type, handler, false); setupTimers(); return this; } public Kue processBlocking(String type, int n, Handler handler) { if (n <= 0) { throw new IllegalStateException("The process times must be positive"); } while (n-- > 0) { processInternal(type, handler, true); } setupTimers(); return this; }
兩個process方法都類似 —— 它們都是使用Event Loop線程處理任務(wù)的,其中第一個方法還可以指定同時處理任務(wù)數(shù)量的閾值。我們來回顧一下使用Event Loop線程的注意事項 —— 我們不能阻塞Event Loop線程。因此如果我們需要在處理任務(wù)時做一些耗時的操作,我們可以使用processBlocking方法。這幾個方法的代碼看起來都差不多,那么區(qū)別在哪呢?之前我們提到過,我們設(shè)計了一種Verticle - KueWorker,用于處理任務(wù)。因此對于process方法來說,KueWorker就是一種普通的Verticle;而對于processBlocking方法來說,KueWorker是一種Worker Verticle。這兩種Verticle有什么不同呢?區(qū)別在于,Worker Verticle會使用Worker線程,因此即使我們執(zhí)行一些耗時的操作,Event Loop線程也不會被阻塞。
創(chuàng)建及部署KueWorker的邏輯在processInternal方法中,這三個方法都使用了processInternal方法:
private void processInternal(String type, Handlerhandler, boolean isWorker) { KueWorker worker = new KueWorker(type, handler, this); // (1) vertx.deployVerticle(worker, new DeploymentOptions().setWorker(isWorker), r0 -> { // (2) if (r0.succeeded()) { this.on("job_complete", msg -> { long dur = new Job(((JsonObject) msg.body()).getJsonObject("job")).getDuration(); client.incrby(RedisHelper.getKey("stats:work-time"), dur, r1 -> { // (3) if (r1.failed()) r1.cause().printStackTrace(); }); }); } }); }
首先我們創(chuàng)建一個KueWorker實例 (1)。我們將在稍后詳細介紹KueWorker的實現(xiàn)。然后我們根據(jù)提供的配置來部署此KueWorker (2)。processInternal方法的第三個參數(shù)代表此KueWorker是否為worker verticle。如果部署成功,我們就監(jiān)聽complete事件。每當接收到complete事件的時候,我們獲取收到的信息(處理任務(wù)消耗的時間),然后用incrby增加對應(yīng)的工作時間 (3)。
再回到前面三個處理方法中。除了部署KueWorker以外,我們還調(diào)用了setupTimers方法,用于設(shè)定定時器以監(jiān)測延時任務(wù)以及監(jiān)測活動任務(wù)TTL。
監(jiān)測延時任務(wù)Vert.x Kue支持延時任務(wù),因此我們需要在任務(wù)延時時間到達時將任務(wù)“提升”至工作隊列中等待處理。這個工作是在checkJobPromotion方法中實現(xiàn)的:
private void checkJobPromotion() { int timeout = config.getInteger("job.promotion.interval", 1000); // (1) int limit = config.getInteger("job.promotion.limit", 1000); // (2) vertx.setPeriodic(timeout, l -> { // (3) client.zrangebyscore(RedisHelper.getKey("jobs:DELAYED"), String.valueOf(0), String.valueOf(System.currentTimeMillis()), new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)), r -> { // (4) if (r.succeeded()) { r.result().forEach(r1 -> { long id = Long.parseLong(RedisHelper.stripFIFO((String) r1)); this.getJob(id).compose(jr -> jr.get().inactive()) // (5) .setHandler(jr -> { if (jr.succeeded()) { jr.result().emit("promotion", jr.result().getId()); // (6) } else { jr.cause().printStackTrace(); } }); }); } else { r.cause().printStackTrace(); } }); }); }
首先我們從配置中獲取監(jiān)測延時任務(wù)的間隔(job.promotion.interval,默認1000ms)以及提升數(shù)量閾值(job.promotion.limit,默認1000)。然后我們使用vertx.setPeriodic方法設(shè)一個周期性的定時器 (3),每隔一段時間就從Redis中獲取需要被提升的任務(wù) (4)。這里我們通過zrangebyscore獲取每個需要被提升任務(wù)的id。我們來看一下zrangebyscore方法的定義:
RedisClient zrangebyscore(String key, String min, String max, RangeLimitOptions options, Handler> handler);
key: 某個有序集合的key,即vertx_kue:jobs:DELAYED
min and max: 最小值以及最大值(按照某種模式)。這里min是0,而max是當前時間戳
我們來回顧一下Job類中的state方法。當我們要把任務(wù)狀態(tài)設(shè)為DELAYED的時候,我們將score設(shè)為promote_at時間:
case DELAYED: client.transaction().zadd(RedisHelper.getKey("jobs:" + newState.name()), this.promote_at, this.zid, _failure());
因此我們將max設(shè)為當前時間(System.currentTimeMillis()),只要當前時間超過需要提升的時間,這就說明此任務(wù)可以被提升了。
options: range和limit配置。這里我們需要指定LIMIT值所以我們用new RangeLimitOptions(new JsonObject().put("offset", 0).put("count", limit)創(chuàng)建了一個配置
zrangebyscore的結(jié)果是一個JsonArray,里面包含著所有等待提升任務(wù)的zid。獲得結(jié)果后我們就將每個zid轉(zhuǎn)換為id,然后分別獲取對應(yīng)的任務(wù)實體,最后對每個任務(wù)調(diào)用inactive方法來將任務(wù)狀態(tài)設(shè)為INACTIVE (5)。如果任務(wù)成功提升至工作隊列,我們就發(fā)送promotion事件 (6)。
CallbackKue - 提供多語言支持我們知道,Vert.x支持多種語言(如JS,Ruby),因此如果能讓我們的Vert.x Kue支持多種語言那當然是極好的!這沒有問題~Vert.x Codegen可以處理含@VertxGen注解的異步接口,生成多語言版本。@VertxGen注解同樣限制異步方法 —— 需要基于回調(diào),因此我們設(shè)計了一個CallbackKue接口用于提供多語言支持。CallbackKue的設(shè)計非常簡單,其實現(xiàn)復用了Kue和jobService的代碼。大家可以直接看源碼,一目了然,這里就不細說了。
注意要生成多語言版本的代碼,需要添加相應(yīng)的依賴。比如要生成Ruby版本的代碼就要向build.gradle中添加compile("io.vertx:vertx-lang-ruby:${vertxVersion}")。
KueWorker - 任務(wù)在此處理好啦,我們已經(jīng)對Vert.x Kue Core的幾個核心部分有了大致的了解了,現(xiàn)在是時候探索一下任務(wù)處理的本源 - KueWorker了~
每一個worker都對應(yīng)一個特定的任務(wù)類型,并且綁定著特定的處理函數(shù)(Handler),所以我們需要在創(chuàng)建的時候指定它們。
prepareAndStart方法在KueWorker中,我們使用prepareAndStart方法來準備要處理的任務(wù)并且開始處理任務(wù)的過程:
private void prepareAndStart() { this.getJobFromBackend().setHandler(jr -> { // (1) if (jr.succeeded()) { if (jr.result().isPresent()) { this.job = jr.result().get(); // (2) process(); // (3) } else { this.emitJobEvent("error", null, new JsonObject().put("message", "job_not_exist")); throw new IllegalStateException("job not exist"); } } else { this.emitJobEvent("error", null, new JsonObject().put("message", jr.cause().getMessage())); jr.cause().printStackTrace(); } }); }
代碼比較直觀。首先我們通過getJobFromBackend方法從Redis中按照優(yōu)先級順序獲取任務(wù) (1)。如果成功獲取任務(wù),我們就把獲取到的任務(wù)保存起來 (2) 然后通過process方法處理任務(wù) (3)。如果中間出現(xiàn)錯誤,我們需要發(fā)送error錯誤事件,其中攜帶錯誤信息。
使用zpop按照優(yōu)先級順序獲取任務(wù)我們來
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/64982.html
摘要:上部分藍圖教程中我們一起探索了如何用開發(fā)一個基于消息的應(yīng)用。對部分來說,如果看過我們之前的藍圖待辦事項服務(wù)開發(fā)教程的話,你應(yīng)該對這一部分非常熟悉了,因此這里我們就不詳細解釋了。有關(guān)使用實現(xiàn)的教程可參考藍圖待辦事項服務(wù)開發(fā)教程。 上部分藍圖教程中我們一起探索了如何用Vert.x開發(fā)一個基于消息的應(yīng)用。在這部分教程中,我們將粗略地探索一下kue-http模塊的實現(xiàn)。 Vert.x Kue ...
摘要:本教程是藍圖系列的第三篇教程,對應(yīng)的版本為。提供了一個服務(wù)發(fā)現(xiàn)模塊用于發(fā)布和獲取服務(wù)記錄。前端此微服務(wù)的前端部分,目前已整合至組件中。監(jiān)視儀表板用于監(jiān)視微服務(wù)系統(tǒng)的狀態(tài)以及日志統(tǒng)計數(shù)據(jù)的查看。而服務(wù)則負責發(fā)布其它服務(wù)如服務(wù)或消息源并且部署。 本文章是 Vert.x 藍圖系列 的第三篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項服務(wù)開發(fā)教程 Vert....
摘要:本文章是藍圖系列的第一篇教程。是事件驅(qū)動的,同時也是非阻塞的。是一組負責分發(fā)和處理事件的線程。注意,我們絕對不能去阻塞線程,否則事件的處理過程會被阻塞,我們的應(yīng)用就失去了響應(yīng)能力。每個負責處理請求并且寫入回應(yīng)結(jié)果。 本文章是 Vert.x 藍圖系列 的第一篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項服務(wù)開發(fā)教程 Vert.x Blueprint 系...
摘要:而不是開始,將服務(wù)使用多線程的請求重量級的容器。是啟動多個輕便單線程的服務(wù)器和流量路由到他們。亮點應(yīng)用程序是事件驅(qū)動,異步和單線程的。通過使用事件總線傳遞消息通信。為了建立一個消息系統(tǒng),則需要獲得該事件總線。 摘要 如果你對Node.js感興趣,Vert.x可能是你的下一個大事件:一個建立在JVM上一個類似的架構(gòu)企業(yè)制度。 這一部分介紹Vert.x是通過兩個動手的例子(基于Vert.x...
摘要:二來,給大家新開坑的項目一個參考。因此,本系列以主要以官方文檔為基礎(chǔ),將盡可能多的特性融入本項目,并標注官網(wǎng)原文出處,有興趣的小伙伴可點擊深入了解。可以通過一些特殊協(xié)議例如將消息作為統(tǒng)一消息服務(wù)導出。下載完成后自行修改和。 開坑前言 我給這個專欄的名氣取名叫做小項目,聽名字就知道,這個專題最終的目的是帶領(lǐng)大家完成一個項目。為什么要開這么大一個坑呢,一來,雖然網(wǎng)上講IT知識點的書籍鋪天蓋...
閱讀 5050·2021-07-25 21:37
閱讀 692·2019-08-30 15:53
閱讀 3359·2019-08-29 18:47
閱讀 694·2019-08-29 15:39
閱讀 2139·2019-08-29 13:12
閱讀 1806·2019-08-29 12:43
閱讀 2997·2019-08-26 11:52
閱讀 1896·2019-08-26 10:15