国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Vert.x Blueprint 系列教程(三) | Micro-Shop 微服務應用實戰

QiShare / 3330人閱讀

摘要:本教程是藍圖系列的第三篇教程,對應的版本為。提供了一個服務發現模塊用于發布和獲取服務記錄。前端此微服務的前端部分,目前已整合至組件中。監視儀表板用于監視微服務系統的狀態以及日志統計數據的查看。而服務則負責發布其它服務如服務或消息源并且部署。

本文章是 Vert.x 藍圖系列 的第三篇教程。全系列:

Vert.x Blueprint 系列教程(一) | 待辦事項服務開發教程

Vert.x Blueprint 系列教程(二) | 開發基于消息的應用 - Vert.x Kue 教程

Vert.x Blueprint 系列教程(三) | Micro-Shop 微服務應用實戰

本系列已發布至Vert.x官網:Vert.x Blueprint Tutorials

前言

歡迎回到Vert.x 藍圖系列!當今,微服務架構 變得越來越流行,開發者們都想嘗試一下微服務應用的開發和架構設計。令人激動的是,Vert.x給我們提供了一系列用于微服務開發的組件,包括 Service Discovery (服務發現)、Circuit Breaker (斷路器) 以及其它的一些組件。有了Vert.x微服務組件的幫助,我們就可以快速利用Vert.x搭建我們的微服務應用。在這篇藍圖教程中,我們一起來探索一個利用Vert.x的各個組件開發的 Micro-Shop 微服務應用~

通過本教程,你將會學習到以下內容:

微服務架構

如何利用Vert.x來開發微服務應用

異步開發模式

響應式、函數式編程

事件溯源 (Event Sourcing)

通過分布式 Event Bus 進行異步RPC調用

各種各樣的服務類型(例如REST、數據源、Event Bus服務等)

如何使用服務發現模塊 (Vert.x Service Discovery)

如何使用斷路器模塊 (Vert.x Circuit Breaker)

如何利用Vert.x實現API Gateway

如何進行權限認證 (OAuth 2 + Keycloak)

如何配置及使用 SockJS - Event Bus Bridge

以及其它的一些東西。。。

本教程是 Vert.x 藍圖系列 的第三篇教程,對應的Vert.x版本為 3.3.2 。本教程中的完整代碼已托管至 GitHub。

踏入微服務之門

哈~你一定對“微服務”這個詞很熟悉——至少聽起來很熟悉~越來越多的開發者開始擁抱微服務架構,那么微服務究竟是什么呢?一句話總結一下:

Microservices are small, autonomous services that work together.

我們來深入一下微服務的各種特性,來看看微服務為何如此出色:

首先,微服務的重要的一點是“微”。每個微服務都是獨立的,每個多帶帶的微服務組件都注重某一特定的邏輯。在微服務架構中,我們將傳統的單體應用拆分成許多互相獨立的組件。每個組件都由其特定的“邏輯邊界”,因此組件不會過于龐大。不過話又說回來了,每個組件應該有多小呢?這個問題可不好回答,它通常取決與我們的業務與負載。正如Sam Newman在其《Building
Microservices》書中所講的那樣:

We seem to have a very good sense of what is too big, and so it could be argued that once a piece of code no longer feels too big, it’s probably small enough.

因此,當我們覺得每個組件不是特別大的時候,組件的大小可能就剛剛好。

在微服務架構中,組件之間可以通過任意協議進行通信,比如 HTTPAMQP

每個組件是獨立的,因此我們可以在不同的組件中使用不同的編程語言,不同的技術 —— 這就是所謂的 polyglot support (不錯,Vert.x也是支持多語言的!)

每個組件都是獨立開發、部署以及發布的,所以這減少了部署及發布的難度。

微服務架構通常與分布式系統形影不離,所以我們還需要考慮分布式系統中的方方面面,包括可用性、彈性以及可擴展性。

微服務架構通常被設計成為 面向失敗的,因為在分布式系統中失敗的場景非常復雜,我們需要有效地處理失敗的手段。

雖然微服務有如此多的優點,但是不要忘了,微服務可不是銀彈,因為它引入了分布式系統中所帶來的各種問題,因此設計架構時我們都要考慮這些情況。

服務發現

在微服務架構中,每個組件都是獨立的,它們都不知道其他組件的位置,但是組件之間又需要通信,因此我們必須知道各個組件的位置。然而,把位置信息寫死在代碼中顯然不好,因此我們需要一種機制可以動態地記錄每個組件的位置 —— 這就是 服務發現。有了服務發現模塊,我們就可以將服務位置發布至服務發現模塊中,其它服務就可以從服務發現模塊中獲取想要調用的服務的位置并進行調用。在調用服務的過程中,我們不需要知道對應服務的位置,所以當服務位置或環境變動時,服務調用可以不受影響,這使得我們的架構更加靈活。

Vert.x提供了一個服務發現模塊用于發布和獲取服務記錄。在Vert.x 服務發現模塊,每個服務都被抽象成一個Record(服務記錄)。服務提供者可以向服務發現模塊中發布服務,此時Record會根據底層ServiceDiscoveryBackend的配置存儲在本地Map、分布式Map或Redis中。服務消費者可以從服務發現模塊中獲取服務記錄,并且通過服務記錄獲取對應的服務實例然后進行服務調用。目前Vert.x原生支持好幾種服務類型,比如 Event Bus 服務(即服務代理)、HTTP 端點消息源 以及 數據源。當然我們也可以實現自己的服務類型,可以參考相關的文檔。在后面我們還會詳細講述如何使用服務發現模塊,這里先簡單做個了解。

異步的、響應式的Vert.x

異步與響應式風格都很適合微服務架構,而Vert.x兼具這兩種風格!異步開發模式相信大家已經了然于胸了,而如果大家讀過前幾篇藍圖教程的話,響應式風格大家一定不會陌生。有了基于Future以及基于RxJava的異步開發模式,我們可以隨心所欲地對異步過程進行組合和變換,這樣代碼可以非常簡潔,非常優美!在本藍圖教程中,我們會見到大量基于Future和RxJava的異步方法。

Mirco Shop 微服務應用

好啦,現在大家應該對微服務架構有了一個大致的了解了,下面我們來講一下本藍圖中的微服務應用。這是一個簡單的 Micro-Shop 微服務應用 (目前只完成了基本功能),人們可以進行網上購物以及交易。。。當前版本的微服務應用包含下列組件:

賬戶服務:提供用戶賬戶的操作服務,使用MySQL作為后端存儲。

商品服務:提供商品的操作服務,使用MySQL作為后端存儲。

庫存服務:提供商品庫存的操作服務,如查詢庫存、增加庫存即減少庫存。使用Redis作為后端存儲。

網店服務:提供網店的操作即管理服務,使用MongoDB作為后端存儲。

購物車服務:提供購物車事件的生成以及購物車操作(添加、刪除商品以及結算)服務。我們通過此服務來講述 事件溯源

訂單服務:訂單服務從Event Bus接收購物車服務發送的訂單請求,接著處理訂單并將訂單發送至下層服務(本例中僅僅簡單地存儲至數據庫中)。

Micro Shop 前端:此微服務的前端部分(SPA),目前已整合至API Gateway組件中。

監視儀表板:用于監視微服務系統的狀態以及日志、統計數據的查看。

API Gateway:整個微服務的入口,它負責將收到的請求按照一定的規則分發至對應的組件的REST端點中(相當于反向代理)。它也負責權限認證與管理,負載均衡,心跳檢測以及失敗處理(使用Vert.x Circuit Breaker)。

Micro Shop 微服務架構

我們來看一下Micro Shop微服務應用的架構:

用戶請求首先經過API Gateway,再經其處理并分發至對應的業務端點。

我們再來看一下每個基礎組件內部的結構(基礎組件即圖中最下面的各個業務組件)。

組件結構

每個基礎組件至少有兩個Verticle:服務Verticle以及REST Verticle。REST Vertice提供了服務對應的REST端點,并且也負責將此端點發布至服務發現層。而服務Verticle則負責發布其它服務(如Event Bus服務或消息源)并且部署REST Verticle。

每個基礎組件中都包含對應的服務接口,如商品組件中包含ProductService接口。這些服務接口都是Event Bus 服務,由@ProxyGen注解修飾。上篇藍圖教程中我們講過,Vert.x Service Proxy可以自動為@ProxyGen注解修飾的接口生成服務代理類,因此我們可以很方便地在Event Bus上進行異步RPC調用而不用寫額外的代碼。很酷吧!并且有了服務發現組件以后,我們可以非常方便地將Event Bus服務發布至服務發現層,這樣其它組件可以更方便地調用服務。

組件之間的通信

我們先來看一下我們的微服務應用中用到的服務類型:

HTTP端點 (e.g. REST 端點以及API Gateway) - 此服務的位置用URL描述

Event Bus服務 - 此服務的位置用Event Bus上的一個特定地址描述

事件源 - 事件源服務對應Event Bus上某個地址的事件消費者。此服務的位置用Event Bus上的一個特定地址描述

因此,我們各個組件之間可以通過HTTP以及Event Bus(本質是TCP)進行通信,例如:

API Gateway與其它組件通過HTTP進行通信。

讓我們開始吧!

好啦,現在開始我們的微服務藍圖旅程吧!首先我們從GitHub上clone項目:

git clone https://github.com/sczyh30/vertx-blueprint-microservice.git

在本藍圖教程中,我們使用 Maven 作為構建工具。我們首先來看一下pom.xml配置文件。我們可以看到,我們的藍圖應用由許多模塊構成:


  microservice-blueprint-common
  account-microservice
  product-microservice
  inventory-microservice
  store-microservice
  shopping-cart-microservice
  order-microservice
  api-gateway
  cache-infrastructure
  monitor-dashboard

每個模塊代表一個組件。看著配置文件,似乎有不少組件呢!不要擔心,我們將會一一探究這些組件。下面我們先來看一下所有組件的基礎模塊 - microservice-blueprint-common

微服務基礎模塊

microservice-blueprint-common模塊提供了一些微服務功能相關的輔助類以及輔助Verticle。我們先來看一下兩個base verticles - BaseMicroserviceVerticleRestAPIVerticle

Base Microservice Verticle

BaseMicroserviceVerticle提供了與微服務相關的初始化函數以及各種各樣的輔助函數。其它每一個Verticle都會繼承此Verticle,因此這個基礎Verticle非常重要。

首先我們來看一下其中的成員變量:

protected ServiceDiscovery discovery;
protected CircuitBreaker circuitBreaker;
protected Set registeredRecords = new ConcurrentHashSet<>();

discovery以及circuitBreaker分別代表服務發現實例以及斷路器實例,而registeredRecords代表當前已發布的服務記錄的集合,用于在結束Verticle時注銷服務。

start函數中主要是對服務發現實例和斷路器實例進行初始化,配置文件從config()中獲取。它的實現非常簡單:

@Override
public void start() throws Exception {
  // init service discovery instance
  discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(config()));

  // init circuit breaker instance
  JsonObject cbOptions = config().getJsonObject("circuit-breaker") != null ?
    config().getJsonObject("circuit-breaker") : new JsonObject();
  circuitBreaker = CircuitBreaker.create(cbOptions.getString("name", "circuit-breaker"), vertx,
    new CircuitBreakerOptions()
      .setMaxFailures(cbOptions.getInteger("max-failures", 5))
      .setTimeout(cbOptions.getLong("timeout", 10000L))
      .setFallbackOnFailure(true)
      .setResetTimeout(cbOptions.getLong("reset-timeout", 30000L))
  );
}

下面我們還提供了幾個輔助函數用于發布各種各樣的服務。這些函數都是異步的,并且基于Future:

protected Future publishHttpEndpoint(String name, String host, int port) {
  Record record = HttpEndpoint.createRecord(name, host, port, "/",
    new JsonObject().put("api.name", config().getString("api.name", ""))
  );
  return publish(record);
}

protected Future publishMessageSource(String name, String address) {
  Record record = MessageSource.createRecord(name, address);
  return publish(record);
}

protected Future publishJDBCDataSource(String name, JsonObject location) {
  Record record = JDBCDataSource.createRecord(name, location, new JsonObject());
  return publish(record);
}

protected Future publishEventBusService(String name, String address, Class serviceClass) {
  Record record = EventBusService.createRecord(name, address, serviceClass);
  return publish(record);
}

之前我們提到過,每個服務記錄Record代表一個服務,其中服務類型由記錄中的type字段標識。Vert.x原生支持的各種服務接口中都包含著好幾個createRecord方法因此我們可以利用這些方法來方便地創建服務記錄。通常情況下我們需要給每個服務都指定一個name,這樣之后我們就可以通過名稱來獲取服務了。我們還可以通過setMetadata方法來給服務記錄添加額外的元數據。

你可能注意到在publishHttpEndpoint方法中我們就提供了含有api-name的元數據,之后我們會了解到,API Gateway在進行反向代理時會用到它。

下面我們來看一下發布服務的通用方法 —— publish方法:

private Future publish(Record record) {
  Future future = Future.future();
  // publish the service
  discovery.publish(record, ar -> {
    if (ar.succeeded()) {
      registeredRecords.add(record);
      logger.info("Service <" + ar.result().getName() + "> published");
      future.complete();
    } else {
      future.fail(ar.cause());
    }
  });
  return future;
}

publish方法中,我們調用了服務發現實例discoverypublish方法來將服務發布至服務發現模塊。它同樣也是一個異步方法,當發布成功時,我們將此服務記錄存儲至registeredRecords中,輸出日志然后通知future操作已完成。最后返回對應的future

注意,在Vert.x Service Discovery當前版本(3.3.2)的設計中,服務發布者需要在必要時手動注銷服務,因此當Verticle結束時,我們需要將注冊的服務都注銷掉:

@Override
public void stop(Future future) throws Exception {
  // In current design, the publisher is responsible for removing the service
  List futures = new ArrayList<>();
  for (Record record : registeredRecords) {
    Future unregistrationFuture = Future.future();
    futures.add(unregistrationFuture);
    discovery.unpublish(record.getRegistration(), unregistrationFuture.completer());
  }

  if (futures.isEmpty()) {
    discovery.close();
    future.complete();
  } else {
    CompositeFuture.all(futures)
      .setHandler(ar -> {
        discovery.close();
        if (ar.failed()) {
          future.fail(ar.cause());
        } else {
          future.complete();
        }
      });
  }
}

stop方法中,我們遍歷registeredRecords集合并且嘗試注銷每一個服務,并將異步結果future添加至futures列表中。之后我們調用CompositeFuture.all(futures)來依次獲取每個異步結果的狀態。all方法返回一個組合的Future,當列表中的所有Future都成功賦值時方為成功狀態,反之只要有一個異步結果失敗,它就為失敗狀態。因此,我們給它綁定一個Handler,當所有服務都被注銷時,服務發現模塊就可以安全地關閉了,否則結束函數會失敗。

REST API Verticle

RestAPIVerticle抽象類繼承了BaseMicroserviceVerticle抽象類。從名字上就可以看出,它提供了諸多的用于REST API開發的輔助方法。我們在其中封裝了諸如創建服務端、開啟Cookie和Session支持,開啟心跳檢測支持(通過HTTP),各種各樣的路由處理封裝以及用于權限驗證的路由處理器。在之后的章節中我們將會見到這些方法。

好啦,現在我們已經了解了整個藍圖應用中的兩個基礎Verticle,下面是時候探索各個模塊了!在探索邏輯組件之前,我們先來看一下其中最重要的組件之一 —— API Gateway。

API Gateway

我們把API Gateway的內容多帶帶歸為一篇教程,請見:Vert.x 藍圖 - Micro Shop 微服務實戰 (API Gateway篇)。

Event Bus 服務 - 賬戶、網店及商品服務 在Event Bus上進行異步RPC

在之前的 Vert.x Kue 藍圖教程 中我們已經介紹過Vert.x中的異步RPC(也叫服務代理)了,這里我們再來回顧一下,并且說一說如何利用服務發現模塊更方便地進行異步RPC。

傳統的RPC有一個缺點:消費者需要阻塞等待生產者的回應。這是一種阻塞模型,和Vert.x推崇的異步開發模式不相符。并且,傳統的RPC不是真正面向失敗設計的。還好,Vert.x提供了一種高效的、響應式的RPC —— 異步RPC。我們不需要等待生產者的回應,而只需要傳遞一個Handler>參數給異步方法。這樣當收到生產者結果時,對應的Handler就會被調用,非常方便,這與Vert.x的異步開發模式相符。并且,AsyncResult也是面向失敗設計的。

Vert.x Service Proxy(服務代理組件)可以自動處理含有@ProxyGen注解的服務接口,生成相應的服務代理類。生成的服務代理類可以幫我們將數據封裝好后發送至Event Bus、從Event Bus接收數據,以及對數據進行編碼和解碼,因此我們可以省掉不少代碼。我們需要做的就是遵循@ProxyGen注解的一些限定。

比如,這里有一個Event Bus服務接口:

@ProxyGen
public interface MyService {
  @Fluent
  MyService retrieveData(String id, Handler> resultHandler);
}

我們可以通過Vert.x Service Proxy組件生成對應的代理類。然后我們就可以通過ProxyHelper類的registerService方法將此服務注冊至Event Bus上:

MyService myService = MyService.createService(vertx, config);
ProxyHelper.registerService(MyService.class, vertx, myService, SERVICE_ADDRESS);

有了服務發現組件之后,將服務發布至服務發現層就非常容易了。比如在我們的藍圖應用中我們使用封裝好的方法:

publishEventBusService(SERVICE_NAME, SERVICE_ADDRESS, MyService.class)

OK,現在服務已經成功地發布至服務發現模塊。現在我們就可以通過EventBusService接口的getProxy方法來從服務發現層獲取發布的Event Bus服務,并且像調用普通異步方法那樣進行異步RPC:

EventBusService.getProxy(discovery, new JsonObject().put("name", SERVICE_NAME), ar -> {
  if (ar.succeeded()) {
    MyService myService = ar.result();
    myService.retrieveData(...);
  }
});
幾個服務模塊的通用特性

在我們的Micro Shop微服務應用中,賬戶、網店及商品服務有幾個通用的特性及約定。現在我們來解釋一下。

在這三個模塊中,每個模塊都包含:

一個Event Bus服務接口。此服務接口定義了對實體存儲的各種操作

服務接口的實現

REST API Verticle,用于創建服務端并將其發布至服務發現模塊

Main Verticle,用于部署其它的verticles以及將Event Bus服務和消息源發布至服務發現層

其中,用戶賬戶服務以及商品服務都使用 MySQL 作為后端存儲,而網店服務則以 MongoDB 作為后端存儲。這里我們只挑兩個典型的服務介紹如何通過Vert.x操作不同的數據庫:product-microservicestore-microserviceaccount-microservice的實現與product-microservice非常相似,大家可以查閱 GitHub 上的代碼。

基于MySQL的商品服務

商品微服務模塊提供了商品的操作功能,包括添加、查詢(搜索)、刪除與更新商品等。其中最重要的是ProductService服務接口以及其實現了。我們先來看一下此服務接口的定義:

@VertxGen
@ProxyGen
public interface ProductService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "product-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.product";

  /**
   * Initialize the persistence.
   */
  @Fluent
  ProductService initializePersistence(Handler> resultHandler);

  /**
   * Add a product to the persistence.
   */
  @Fluent
  ProductService addProduct(Product product, Handler> resultHandler);

  /**
   * Retrieve the product with certain `productId`.
   */
  @Fluent
  ProductService retrieveProduct(String productId, Handler> resultHandler);

  /**
   * Retrieve the product price with certain `productId`.
   */
  @Fluent
  ProductService retrieveProductPrice(String productId, Handler> resultHandler);

  /**
   * Retrieve all products.
   */
  @Fluent
  ProductService retrieveAllProducts(Handler>> resultHandler);

  /**
   * Retrieve products by page.
   */
  @Fluent
  ProductService retrieveProductsByPage(int page, Handler>> resultHandler);

  /**
   * Delete a product from the persistence
   */
  @Fluent
  ProductService deleteProduct(String productId, Handler> resultHandler);

  /**
   * Delete all products from the persistence
   */
  @Fluent
  ProductService deleteAllProducts(Handler> resultHandler);

}

正如我們之前所提到的那樣,這個服務接口是一個Event Bus服務,所以我們需要給它加上@ProxyGen注解。這些方法都是異步的,因此每個方法都需要接受一個Handler>參數。當異步操作完成時,對應的Handler會被調用。注意到我們還給此接口加了@VertxGen注解。上篇藍圖教程中我們提到過,這是為了開啟多語言支持(polyglot language support)。Vert.x Codegen注解處理器會自動處理含有@VertxGen注解的類,并生成支持的其它語言的代碼,如Ruby、JS等。。。這是非常適合微服務架構的,因為不同的組件可以用不同的語言進行開發!

每個方法的含義都在注釋中給出了,這里就不解釋了。

商品服務接口的實現位于ProductServiceImpl類中。商品信息存儲在MySQL中,因此我們可以通過 Vert.x-JDBC 對數據庫進行操作。我們在 第一篇藍圖教程 中已經詳細講述過Vert.x JDBC的使用細節了,因此這里我們就不過多地討論細節了。這里我們只關注如何減少代碼量。因為通常簡單數據庫操作的過程都是千篇一律的,因此做個封裝是很有必要的。

首先來回顧一下每次數據庫操作的過程:

JDBCClient中獲取數據庫連接SQLConnection,這是一個異步過程

執行SQL語句,綁定回調Handler

最后不要忘記關閉數據庫連接以釋放資源

對于正常的CRUD操作來說,它們的實現都很相似,因此我們封裝了一個JdbcRepositoryWrapper類來實現這些通用邏輯。它位于io.vertx.blueprint.microservice.common.service包中:

我們提供了以下的封裝方法:

executeNoResult: 執行含參數的SQL語句 (通過updateWithParams方法)。執行結果是不需要的,因此只需要接受一個 Handler> 類型的參數。此方法通常用于insert之類的操作。

retrieveOne: 執行含參數的SQL語句,用于獲取某一特定實體(通過 queryWithParams方法)。此方法是基于Future的,它返回一個Future>類型的異步結果。如果結果集為空,那么返回一個空的Optional monad。如果結果集不為空,則返回第一個結果并用Optional進行包裝。

retrieveMany: 獲取多個實體,返回Future>作為異步結果。

retrieveByPage: 與retrieveMany 方法相似,但包含分頁邏輯。

retrieveAll: similar to retrieveMany method but does not require query parameters as it simply executes statement such as SELECT * FROM xx_table.

removeOne and removeAll: remove entity from the database.

當然這與Spring JPA相比的不足之處在于SQL語句得自己寫,自己封裝也不是很方便。。。考慮到Vert.x JDBC底層也只是使用了Worker線程池包裝了原生的JDBC(不是真正的異步),我們也可以結合Spring Data的相關組件來簡化開發。另外,Vert.x JDBC使用C3P0作為默認的數據庫連接池,C3P0的性能我想大家應該都懂。。。因此換成性能更優的HikariCP是很有必要的。

回到JdbcRepositoryWrapper中來。這層封裝可以大大地減少代碼量。比如,我們的ProductServiceImpl實現類就可以繼承JdbcRepositoryWrapper類,然后利用這些封裝好的方法。看個例子 —— retrieveProduct方法的實現:

@Override
public ProductService retrieveProduct(String productId, Handler> resultHandler) {
  this.retrieveOne(productId, FETCH_STATEMENT)
    .map(option -> option.map(Product::new).orElse(null))
    .setHandler(resultHandler);
  return this;
}

我們唯一需要做的只是將結果變換成需要的類型。是不是很方便呢?

當然這不是唯一方法。在下面的章節中,我們將會講到一種更Reactive,更Functional的方法 —— 利用Rx版本的Vert.x JDBC。另外,用vertx-sync也是一種不錯的選擇(類似于async/await)。

好啦!看完服務實現,下面輪到REST API了。我們來看看RestProductAPIVerticle的實現:

public class RestProductAPIVerticle extends RestAPIVerticle {

  public static final String SERVICE_NAME = "product-rest-api";

  private static final String API_ADD = "/add";
  private static final String API_RETRIEVE = "/:productId";
  private static final String API_RETRIEVE_BY_PAGE = "/products";
  private static final String API_RETRIEVE_PRICE = "/:productId/price";
  private static final String API_RETRIEVE_ALL = "/products";
  private static final String API_DELETE = "/:productId";
  private static final String API_DELETE_ALL = "/all";

  private final ProductService service;

  public RestProductAPIVerticle(ProductService service) {
    this.service = service;
  }

  @Override
  public void start(Future future) throws Exception {
    super.start();
    final Router router = Router.router(vertx);
    // body handler
    router.route().handler(BodyHandler.create());
    // API route handler
    router.post(API_ADD).handler(this::apiAdd);
    router.get(API_RETRIEVE).handler(this::apiRetrieve);
    router.get(API_RETRIEVE_BY_PAGE).handler(this::apiRetrieveByPage);
    router.get(API_RETRIEVE_PRICE).handler(this::apiRetrievePrice);
    router.get(API_RETRIEVE_ALL).handler(this::apiRetrieveAll);
    router.patch(API_UPDATE).handler(this::apiUpdate);
    router.delete(API_DELETE).handler(this::apiDelete);
    router.delete(API_DELETE_ALL).handler(context -> requireLogin(context, this::apiDeleteAll));

    enableHeartbeatCheck(router, config());

    // get HTTP host and port from configuration, or use default value
    String host = config().getString("product.http.address", "0.0.0.0");
    int port = config().getInteger("product.http.port", 8082);

    // create HTTP server and publish REST service
    createHttpServer(router, host, port)
      .compose(serverCreated -> publishHttpEndpoint(SERVICE_NAME, host, port))
      .setHandler(future.completer());
  }

  private void apiAdd(RoutingContext context) {
    try {
      Product product = new Product(new JsonObject(context.getBodyAsString()));
      service.addProduct(product, resultHandler(context, r -> {
        String result = new JsonObject().put("message", "product_added")
          .put("productId", product.getProductId())
          .encodePrettily();
        context.response().setStatusCode(201)
          .putHeader("content-type", "application/json")
          .end(result);
      }));
    } catch (DecodeException e) {
      badRequest(context, e);
    }
  }

  private void apiRetrieve(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.retrieveProduct(productId, resultHandlerNonEmpty(context));
  }

  private void apiRetrievePrice(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.retrieveProductPrice(productId, resultHandlerNonEmpty(context));
  }

  private void apiRetrieveByPage(RoutingContext context) {
    try {
      String p = context.request().getParam("p");
      int page = p == null ? 1 : Integer.parseInt(p);
      service.retrieveProductsByPage(page, resultHandler(context, Json::encodePrettily));
    } catch (Exception ex) {
      badRequest(context, ex);
    }
  }

  private void apiRetrieveAll(RoutingContext context) {
    service.retrieveAllProducts(resultHandler(context, Json::encodePrettily));
  }

  private void apiDelete(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.deleteProduct(productId, deleteResultHandler(context));
  }

  private void apiDeleteAll(RoutingContext context, JsonObject principle) {
    service.deleteAllProducts(deleteResultHandler(context));
  }

}

此Verticle繼承了RestAPIVerticle,因此我們可以利用其中諸多的輔助方法。首先來看一下啟動過程,即start方法。首先我們先調用super.start()來初始化服務發現組件,然后創建Router,綁定BodyHandler以便操作請求正文,然后創建各個API路由并綁定相應的處理函數。接著我們調用enableHeartbeatCheck方法開啟簡單的心跳檢測支持。最后我們通過封裝好的createHttpServer創建HTTP服務端,并通過publishHttpEndpoint方法將HTTP端點發布至服務發現模塊。

其中createHttpServer方法非常簡單,我們只是把vertx.createHttpServer方法變成了基于Future的:

protected Future createHttpServer(Router router, String host, int port) {
  Future httpServerFuture = Future.future();
  vertx.createHttpServer()
    .requestHandler(router::accept)
    .listen(port, host, httpServerFuture.completer());
  return httpServerFuture.map(r -> null);
}

至于各個路由處理邏輯如何實現,可以參考 Vert.x Blueprint - Todo Backend Tutorial 獲取相信信息。

最后我們打開此微服務模塊中的Main Verticle - ProductVerticle類。正如我們之前所提到的,它負責發布服務以及部署REST Verticle。我們來看一下其start方法:

@Override
public void start(Future future) throws Exception {
  super.start();

  // create the service instance
  ProductService productService = new ProductServiceImpl(vertx, config()); // (1)
  // register the service proxy on event bus
  ProxyHelper.registerService(ProductService.class, vertx, productService, SERVICE_ADDRESS); // (2)
  // publish the service in the discovery infrastructure
  initProductDatabase(productService) // (3)
    .compose(databaseOkay -> publishEventBusService(ProductService.SERVICE_NAME, SERVICE_ADDRESS, ProductService.class)) // (4)
    .compose(servicePublished -> deployRestService(productService)) // (5)
    .setHandler(future.completer()); // (6)
}

首先我們創建一個ProductService服務實例(1),然后通過registerService方法將服務注冊至Event Bus(2)。接著我們初始化數據庫表(3),將商品服務發布至服務發現層(4)然后部署REST Verticle(5)。這是一系列的異步方法的組合操作,很溜吧!最后我們將future.completer()綁定至組合后的Future上,這樣當所有異步操作都OK的時候,Future會自動完成。

當然,不要忘記在配置里指定api.name。之前我們在 API Gateway章節 提到過,API Gateway的反向代理部分就是通過對應REST服務的 api.name 來進行請求分發的。默認情況下api.nameproduct:

{
  "api.name": "product"
}
基于MongoDB的網店服務

網店服務用于網店的操作,如開店、關閉、更新數據。正常情況下,開店都需要人工申請,不過在本藍圖教程中,我們把這一步簡化掉了。網店服務模塊的結構和商品服務模塊非常相似,所以我們就不細說了。我們這里僅僅瞅一瞅如何使用Vert.x Mongo Client。

使用Vert.x Mongo Client非常簡單,首先我們需要創建一個MongoClient實例,過程類似于JDBCClient

private final MongoClient client;

public StoreCRUDServiceImpl(Vertx vertx, JsonObject config) {
  this.client = MongoClient.createNonShared(vertx, config);
}

然后我們就可以通過它來操作Mongo了。比如我們想執行存儲(save)操作,我們可以這樣寫:

@Override
public void saveStore(Store store, Handler> resultHandler) {
  client.save(COLLECTION, new JsonObject().put("_id", store.getSellerId())
      .put("name", store.getName())
      .put("description", store.getDescription())
      .put("openTime", store.getOpenTime()),
    ar -> {
      if (ar.succeeded()) {
        resultHandler.handle(Future.succeededFuture());
      } else {
        resultHandler.handle(Future.failedFuture(ar.cause()));
      }
    }
  );
}

這些操作都是異步的,因此你一定非常熟悉這種模式!當然如果不喜歡基于回調的異步模式的話,你也可以選擇Rx版本的API~

更多關于Vert.x Mongo Client的使用細節,請參考官方文檔。

基于Redis的商品庫存服務

商品庫存服務負責操作商品的庫存數量,比如添加庫存、減少庫存以及獲取當前庫存數量。庫存使用Redis來存儲。

與之前的Event Bus服務不同,我們這里的商品庫存服務是基于Future的,而不是基于回調的。由于服務代理模塊不支持處理基于Future的服務接口,因此這里我們就不用異步RPC了,只發布一個REST API端點,所有的調用都通過REST進行。

首先來看一下InventoryService服務接口:

public interface InventoryService {

  /**
   * Create a new inventory service instance.
   *
   * @param vertx  Vertx instance
   * @param config configuration object
   * @return a new inventory service instance
   */
  static InventoryService createService(Vertx vertx, JsonObject config) {
    return new InventoryServiceImpl(vertx, config);
  }

  /**
   * Increase the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @param increase  increase amount
   * @return the asynchronous result of current amount
   */
  Future increase(String productId, int increase);

  /**
   * Decrease the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @param decrease  decrease amount
   * @return the asynchronous result of current amount
   */
  Future decrease(String productId, int decrease);

  /**
   * Retrieve the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @return the asynchronous result of current amount
   */
  Future retrieveInventoryForProduct(String productId);

}

接口定義非常簡單,含義都在注釋中給出了。接著我們再看一下服務的實現類InventoryServiceImpl類。在Redis中,所有的庫存數量都被存儲在inventory:v1命名空間中,并以商品號productId作為標識。比如商品A123456會被存儲至inventory:v1:A123456鍵值對中。

Vert.x Redis提供了incrbydecrby命令,可以很方便地實現庫存增加和減少功能,代碼類似。這里我們只看庫存增加功能:

@Override
public Future increase(String productId, int increase) {
  Future future = Future.future();
  client.incrby(PREFIX + productId, increase, future.completer());
  return future.map(Long::intValue);
}

由于庫存數量不會非常大,Integer就足夠了,因此我們需要通過Long::intValue方法引用來將Long結果變換成Integer類型的。

retrieveInventoryForProduct方法的實現也非常短小精悍:

@Override
public Future retrieveInventoryForProduct(String productId) {
  Future future = Future.future();
  client.get(PREFIX + productId, future.completer());
  return future.map(r -> r == null ? "0" : r)
    .map(Integer::valueOf);
}

我們通過get命令來獲取值。由于結果是String類型的,因此我們需要自行將其轉換為Integer類型。如果結果為空,我們就認為商品沒有庫存,返回0

至于REST Verticle(在此模塊中也為Main Verticle),其實現模式與前面的大同小異,這里就不展開說了。不要忘記在config.json中指定api.name:

{
  "api.name": "inventory",
  "redis.host": "redis",
  "inventory.http.address": "inventory-microservice",
  "inventory.http.port": 8086
}
事件溯源 - 購物車服務

好了,現在我們與基礎服務模塊告一段落了。下面我們來到了另一個重要的服務模塊 —— 購物車微服務。此模塊負責購物車的獲取、購物車事件的添加以及結算功能。與傳統的實現不同,這里我們要介紹一種不同的開發模式 —— 事件溯源(Event Sourcing)。

解道Event Sourcing

在傳統的數據存儲模式中,我們通常直接將數據本身的狀態存儲至數據庫中。這在一般場景中是沒有問題的,但有些時候,我們不僅想獲取到數據,還想獲取數據操作的過程(即此數據是經過怎樣的操作生成的),這時候我們就可以利用事件溯源(Event Sourcing)來解決這個問題。

事件溯源保證了數據狀態的變換都以一系列的事件的形式存儲在數據庫中。所以,我們不僅可以獲取每個變換的事件,而且可以通過過去的事件來組合出過去任意時刻的數據狀態!這真是極好的~注意,有一點很重要,我們不能更改已經保存的事件以及它們的序列 —— 也就是說,事件存儲是只能添加而不能刪除的,并且需要不可變。是不是感覺和數據庫事務日志的原理差不多呢?

在微服務架構中,事件溯源模式可以帶來以下的好處:

我們可以從過去的事件序列中組建出任意時刻的數據狀態

每個過去的事件都得以保存,因此這使得補償事務成為可能

我們可以從事件存儲中獲取事件流,并且以異步、響應式風格對其進行變換和處理

事件存儲同樣可以當作為數據日志

事件存儲的選擇也需要好好考慮。Apache Kafka非常適合這種場景,在此版本的Micro Shop微服務中,為了簡化其實現,我們簡單地使用了MySQL作為事件存儲。下個版本我們將把Kafka整合進來。

注:在實際生產環境中,購物車通常被存儲于Session或緩存內。本章節僅為介紹事件溯源而使用事件存儲模式。

購物車事件

我們來看一下代表購物車事件的CartEvent數據對象:

@DataObject(generateConverter = true)
public class CartEvent {

  private Long id;
  private CartEventType cartEventType;
  private String userId;
  private String productId;
  private Integer amount;

  private long createdAt;

  public CartEvent() {
    this.createdAt = System.currentTimeMillis();
  }

  public CartEvent(JsonObject json) {
    CartEventConverter.fromJson(json, this);
  }

  public CartEvent(CartEventType cartEventType, String userId, String productId, Integer amount) {
    this.cartEventType = cartEventType;
    this.userId = userId;
    this.productId = productId;
    this.amount = amount;
    this.createdAt = System.currentTimeMillis();
  }

  public static CartEvent createCheckoutEvent(String userId) {
    return new CartEvent(CartEventType.CHECKOUT, userId, "all", 0);
  }

  public static CartEvent createClearEvent(String userId) {
    return new CartEvent(CartEventType.CLEAR_CART, userId, "all", 0);
  }

  public JsonObject toJson() {
    JsonObject json = new JsonObject();
    CartEventConverter.toJson(this, json);
    return json;
  }

  public static boolean isTerminal(CartEventType eventType) {
    return eventType == CartEventType.CLEAR_CART || eventType == CartEventType.CHECKOUT;
  }
}

一個購物車事件存儲著事件的類型、發生的時間、操作用戶、對應的商品ID以及商品數量變動。在我們的藍圖應用中,購物車事件一共有四種,它們用CartEventType枚舉類表示:

public enum CartEventType {
  ADD_ITEM, // 添加商品至購物車
  REMOVE_ITEM, // 從購物車中刪除商品
  CHECKOUT, // 結算并清空
  CLEAR_CART // 清空
}

其中CHECKOUTCLEAR_CART事件是對整個購物車實體進行操作,對應的購物車事件參數類似,因此我們寫了兩個靜態方法來創建這兩種事件。

另外我們還注意到一個靜態方法isTerminal,它用于檢測當前購物車事件是否為一個“終結”事件。所謂的“終結”,指的是到此就對整個購物車進行操作(結算或清空)。在從購物車事件流構建出對應的購物車狀態的時候,此方法非常有用。

購物車實體

看完了購物車事件,我們再來看一下購物車。購物車實體用ShoppingCart數據對象表示,它包含著一個商品列表表示當前購物車中的商品即數量:

private List productItems = new ArrayList<>();

其中ProductTuple數據對象包含著商品號、商品賣家ID、單價以及當前購物車中次商品的數目amount

為了方便,我們還在ShoppingCart類中放了一個amountMap用于暫時存儲商品數量:

private Map amountMap = new HashMap<>();

由于它只是暫時存儲,我們不希望在對應的JSON數據中看到它,所以把它的getter和setter方法都注解上@GenIgnore

在事件溯源模式中,我們要從一系列的購物車事件構建對應的購物車狀態,因此我們需要一個incorporate方法將每個購物車事件“合并”至購物車內以變更對應的商品數目:

public ShoppingCart incorporate(CartEvent cartEvent) {
  // 此事件必須為添加或刪除事件
  boolean ifValid = Stream.of(CartEventType.ADD_ITEM, CartEventType.REMOVE_ITEM)
    .anyMatch(cartEventType ->
      cartEvent.getCartEventType().equals(cartEventType));

  if (ifValid) {
    amountMap.put(cartEvent.getProductId(),
      amountMap.getOrDefault(cartEvent.getProductId(), 0) +
        (cartEvent.getAmount() * (cartEvent.getCartEventType()
          .equals(CartEventType.ADD_ITEM) ? 1 : -1)));
  }

  return this;
}

實現倒是比較簡單,我們首先來檢查要合并的事件是不是添加商品或移除商品事件,如果是的話,我們就根據事件類型以及對應的數量變更來改變當前購物車中該商品的數量(amountMap)。

使用Rx版本的Vert.x JDBC

我們現在已經了解購物車微服務中的實體類了,下面該看看購物車事件存儲服務了。

之前用callback-based API寫Vert.x JDBC操作總感覺心累,還好Vert.x支持與RxJava進行整合,并且幾乎每個Vert.x組件都有對應的Rx版本!是不是瞬間感覺整個人都變得Reactive了呢~(⊙o⊙) 這里我們就來使用Rx版本的Vert.x JDBC來寫我們的購物車事件存儲服務。也就是說,里面所有的異步方法都將是基于Observable的,很有FRP風格!

我們首先定義了一個簡單的CRUD接口SimpleCrudDataSource

public interface SimpleCrudDataSource {

  Observable save(T entity);

  Observable retrieveOne(ID id);

  Observable delete(ID id);

}

接著定義了一個CartEventDataSource接口,定義了購物車事件獲取的相關方法:

public interface CartEventDataSource extends SimpleCrudDataSource {

  Observable streamByUser(String userId);

}

可以看到這個接口只有一個方法 —— streamByUser方法會返回某一用戶對應的購物車事件流,這樣后面我們就可以對其進行流式變換操作了!

下面我們來看一下服務的實現類CartEventDataSourceImpl。首先是save方法,它將一個事件存儲至事件數據庫中:

@Override
public Observable save(CartEvent cartEvent) {
  JsonArray params = new JsonArray().add(cartEvent.getCartEventType().name())
    .add(cartEvent.getUserId())
    .add(cartEvent.getProductId())
    .add(cartEvent.getAmount())
    .add(cartEvent.getCreatedAt() > 0 ? cartEvent.getCreatedAt() : System.currentTimeMillis());
  return client.getConnectionObservable()
    .flatMap(conn -> conn.updateWithParamsObservable(SAVE_STATEMENT, params))
    .map(r -> null);
}

看看我們的代碼,在對比對比普通的callback-based的Vert.x JDBC,是不是更加簡潔,更加Reactive呢?我們可以非常簡單地通過getConnectionObservable方法獲取數據庫連接,然后組合updateWithParamsObservable方法執行對應的含參SQL語句。只需要兩行有木有!而如果用callback-based的風格的話,你只能這么寫:

client.getConnection(ar -> {
  if (ar.succeeded) {
    SQLConnection connection = ar.result();
    connection.updateWithParams(SAVE_STATEMENT, params, ar2 -> {
      // ...
    })
  } else {
    resultHandler.handle(Future.failedFuture(ar.cause()));
  }
})

因此,使用RxJava是非常愉快的一件事!當然vertx-sync也是一個不錯的選擇。

當然,不要忘記返回的Observablecold 的,因此只有在它被subscribe的時候,數據才會被發射。

不過話說回來了,Vert.x JDBC底層本質還是阻塞型的調用,要實現真正的異步數據庫操作,我們可以利用 Vert.x MySQL / PostgreSQL Client 這個組件,底層使用Scala寫的異步數據庫操作庫,不過目前還不是很穩定,大家可以自己嘗嘗鮮。

下面我們再來看一下retrieveOne方法,它從數據存儲中獲取特定ID的事件:

@Override
public Observable retrieveOne(Long id) {
  return client.getConnectionObservable()
    .flatMap(conn -> conn.queryWithParamsObservable(RETRIEVE_STATEMENT, new JsonArray().add(id)))
    .map(ResultSet::getRows)
    .filter(list -> !list.isEmpty())
    .map(res -> res.get(0))
    .map(this::wrapCartEvent);
}

非常簡潔明了,就像之前我們的基于Future的范式相似,因此這里就不再詳細解釋了~

下面我們來看一下里面最重要的方法 —— streamByUser方法:

@Override
public Observable streamByUser(String userId) {
  JsonArray params = new JsonArray().add(userId).add(userId);
  return client.getConnectionObservable()
    .flatMap(conn -> conn.queryWithParamsObservable(STREAM_STATEMENT, params))
    .map(ResultSet::getRows)
    .flatMapIterable(item -> item) // list merge into observable
    .map(this::wrapCartEvent);
}

其核心在于它的SQL語句STREAM_STATEMENT

SELECT * FROM cart_event c
WHERE c.user_id = ? AND c.created_at > coalesce(
    (SELECT created_at FROM cart_event
       WHERE user_id = ? AND (`type` = "CHECKOUT" OR `type` = "CLEAR_CART")
     ORDER BY cart_event.created_at DESC
     LIMIT 1),
    0)
ORDER BY c.created_at ASC;

此SQL語句執行時會獲取與當前購物車相關的所有購物車事件。注意到我們有許多用戶,每個用戶可能會有許多購物車事件,它們屬于不同時間的購物車,那么如何來獲取相關的事件呢?方法是 —— 首先我們獲取最近一次“終結”事件發生對應的時間,那么當前購物車相關的購物車事件就是在此終結事件發生后所有的購物車事件。

明白了這一點,我們再回到streamByUser方法中來。既然此方法是從數據庫中獲取一個事件列表,那么為什么此方法返回Observable而不是Observable>呢?我們來看看其中的奧秘 —— flatMapIterable算子,它將一個序列變換為一串數據流。所以,這里的Observable與Vert.x中的Future以及Java 8中的CompletableFuture就有些不同了。CompletableFuture更像是RxJava中的Single,它僅僅發送一個值或一個錯誤信息,而Observable本身則就像是一個數據流,數據源源不斷地從發布者流向訂閱者。之前retrieveOnesave方法中返回的Observable的使用更像是一個Single,但是在streamByUser方法中,Observable是真真正正的事件數據流。我們將會在購物車服務ShoppingCartService中處理事件流。

哇!現在你一定又被Rx這種函數響應式風格所吸引了~在下面的部分中,我們將探索購物車服務及其實現,基于Future,同樣非常Reactive!

根據購物車事件序列構建對應的購物車狀態

我們首先來看一下ShoppingCartService —— 購物車服務接口,它也是一個Event Bus服務:

@VertxGen
@ProxyGen
public interface ShoppingCartService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "shopping-cart-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.shopping.cart";

  @Fluent
  ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler);

  @Fluent
  ShoppingCartService getShoppingCart(String userId, Handler> resultHandler);

}

這里我們定義了兩個方法:addCartEvent用于將購物車事件存儲至事件存儲中;getShoppingCart方法用于獲取某個用戶當前購物車的狀態。

下面我們來看一下其實現類 —— ShoppingCartServiceImpl。首先是addCartEvent方法,它非常簡單:

@Override
public ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler) {
  Future future = Future.future();
  repository.save(event).toSingle().subscribe(future::complete, future::fail);
  future.setHandler(resultHand
  return this;
}

正如之前我們所提到的,這里save方法返回的Observable其實更像個Single,因此我們將其通過toSingle方法變換為Single,然后通過subscribe(future::complete, future::fail)將其轉化為Future以便于給其綁定一個Handler>類型的處理函數。

getShoppingCart方法的邏輯位于aggregateCartEvents方法中,此方法非常重要,并且是基于Future的。我們先來看一下代碼:

private Future aggregateCartEvents(String userId) {
  Future future = Future.future();
  // aggregate cart events into raw shopping cart
  repository.streamByUser(userId) // (1)
    .takeWhile(cartEvent -> !CartEvent.isTerminal(cartEvent.getCartEventType())) // (2)
    .reduce(new ShoppingCart(), ShoppingCart::incorporate) // (3)
    .toSingle()
    .subscribe(future::complete, future::fail); // (4)

  return future.compose(cart ->
    getProductService() // (5)
      .compose(service -> prepareProduct(service, cart)) // (6) prepare product data
      .compose(productList -> generateCurrentCartFromStream(cart, productList)) // (7) prepare product items
  );
}

我們來詳細地解釋一下。首先我們先創建個Future,然后先通過repository.streamByUser(userId)方法獲取事件流(1),然后我們使用takeWhile算子來獲取所有的ADD_ITEMREMOVE_ITEM類型的事件(2)。takeWhile算子在判定條件變為假時停止發射新的數據,因此當事件流遇到一個終結事件時,新的事件就不再往外發送了,之前的事件將會繼續被傳遞。

下面就是產生購物車狀態的過程了!我們通過reduce算子將事件流來“聚合”成購物車實體(3)。這個過程可以總結為以下幾步:首先我們先創建一個空的購物車,然后依次將各個購物車事件“合并”至購物車實體中。最后聚合而成的購物車實體應該包含一個完整的amountMap

現在此Observable已經包含了我們想要的初始狀態的購物車了。我們將其轉化為Single然后通過subscribe(future::complete, future::fail)轉化為Future(4)。

現在我們需要更多的信息以組件一個完整的購物車,所以我們首先組合getProductService異步方法來從服務發現層獲取商品服務(5),然后通過prepareProduct方法來獲取需要的商品數據(6),最后通過generateCurrentCartFromStream異步方法組合出完整的購物車實體(7)。這里面包含了好幾個組合過程,我們來一一解釋。

首先來看getProductService異步方法。它用于從服務發現層獲取商品服務,然后返回其異步結果:

private Future getProductService() {
  Future future = Future.future();
  EventBusService.getProxy(discovery,
    new JsonObject().put("name", ProductService.SERVICE_NAME),
    future.completer());
  return future;
}

現在我們獲取到商品服務了,那么下一步自然是獲取需要的商品數據了。這個過程通過prepareProduct異步方法實現:

private Future> prepareProduct(ProductService service, ShoppingCart cart) {
  List> futures = cart.getAmountMap().keySet() // (1)
    .stream()
    .map(productId -> {
      Future future = Future.future();
      service.retrieveProduct(productId, future.completer());
      return future; // (2)
    })
    .collect(Collectors.toList()); // (3)
  return Functional.sequenceFuture(futures); // (4)
}

在此實現中,首先我們從amountMap中獲取購物車中所有商品的ID(1),然后我們根據每個ID異步調用商品服務的retrieveProduct方法并且以Future包裝(2),然后將此流轉化為List>類型的列表(3)。我們這里想獲得的是所有商品的異步結果,即Future>,那么如何轉換呢?這里我寫了一個輔助函數sequenceFuture來實現這樣的變換,它位于io.vertx.blueprint.microservice.common.functional包下的Functional類中:

public static  Future> sequenceFuture(List> futures) {
  return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()])) // (1)
    .map(v -> futures.stream()
        .map(Future::result) // (2)
        .collect(Collectors.toList()) // (3)
    );
}

此方法對于想將一個Future序列變換成單個Future的情況非常有用。這里我們首先調用CompositeFutureImpl類的all方法(1),它返回一個組合的Future,當且僅當序列中所有的Future都成功完成時,它為成功狀態,否則為失敗狀態。下面我們就對此組合Future做變換:獲取每個Future對應的結果(因為all方法已經強制獲取所有結果),然后歸結成列表(3)。

回到之前的組合中來!現在我們得到了我們需要的商品信息列表List,接下來就根據這些信息來構建完整的購物車實體了!我們來看一下generateCurrentCartFromStream方法的實現:

private Future generateCurrentCartFromStream(ShoppingCart rawCart, List productList) {
  Future future = Future.future();
  // check if any of the product is invalid
  if (productList.stream().anyMatch(e -> e == null)) { // (1)
    future.fail("Error when retrieve products: empty");
    return future;
  }
  // construct the product items
  List currentItems = rawCart.getAmountMap().entrySet() // (2)
    .stream()
    .map(item -> new ProductTuple(getProductFromStream(productList, item.getKey()), // (3)
      item.getValue())) // (4) amount value
    .filter(item -> item.getAmount() > 0) // (5) amount must be greater than zero
    .collect(Collectors.toList());

  ShoppingCart cart = rawCart.setProductItems(currentItems); // (6)
  return Future.succeededFuture(cart); // (7)
}

看起來非常混亂的樣子。。。不要擔心,我們慢慢來~注意這個方法本身不是異步的,但我們需要表示此方法成功或失敗兩種狀態(即AsyncResult),所以此方法仍然返回Future。首先我們創建一個Future,然后通過anyMatch方法檢查商品列表是否合法(1)。若不合法,返回一個失敗的Future;若合法,我們對每個商品依次構建出對應的ProductTuple。在(3)中,我們通過這個構造函數來構建ProductTuple:

public ProductTuple(Product product, Integer amount) {
  this.productId = product.getProductId();
  this.sellerId = product.getSellerId();
  this.price = product.getPrice();
  this.amount = amount;
}

其中第一個參數是對應的商品實體。為了從列表中獲取對應的商品實體,我們寫了一個getProductFromStream方法:

private Product getProductFromStream(List productList, String productId) {
  return productList.stream()
    .filter(product -> product.getProductId().equals(productId))
    .findFirst()
    .get();
}

當每個商品的ProductTuple都構建完畢的時候,我們就可以將列表賦值給對應的購物車實體了(6),并且返回購物車實體結果(7)。現在我們終于整合出一個完整的購物車了!

結算 - 根據購物車產生訂單

現在我們已經選好了自己喜愛的商品,把購物車填的慢慢當當了,下面是時候進行結算了!我們這里同樣定義了一個結算服務接口CheckoutService,它只包含一個特定的方法:checkout

@VertxGen
@ProxyGen
public interface CheckoutService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "shopping-checkout-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.shopping.cart.checkout";

  /**
   * Order event source address.
   */
  String ORDER_EVENT_ADDRESS = "events.service.shopping.to.order";

  /**
   * Create a shopping checkout service instance
   */
  static CheckoutService createService(Vertx vertx, ServiceDiscovery discovery) {
    return new CheckoutServiceImpl(vertx, discovery);
  }

  void checkout(String userId, Handler> handler);

}

接口非常簡單,下面我們來看其實現 —— CheckoutServiceImpl類。盡管接口只包含一個checkout方法,但我們都知道結算過程可不簡單。。。它包含庫存檢測、付款(這里暫時省掉了)以及生成訂單的邏輯。我們先來看看checkout方法的源碼:

@Override
public void checkout(String userId, Handler> resultHandler) {
  if (userId == null) { // (1)
    resultHandler.handle(Future.failedFuture(new IllegalStateException("Invalid user")));
    return;
  }
  Future cartFuture = getCurrentCart(userId); // (2)
  Future orderFuture = cartFuture.compose(cart ->
    checkAvailableInventory(cart).compose(checkResult -> { // (3)
      if (checkResult.getBoolean("res")) { // (3)
        double totalPrice = calculateTotalPrice(cart); // (4)
        // 創建訂單實體
        Order order = new Order().setBuyerId(userId) // (5)
          .setPayId("TEST")
          .setProducts(cart.getProductItems())
          .setTotalPrice(totalPrice);
        // 設置訂單流水號,然后向訂單組件發送訂單并等待回應
        return retrieveCounter("order") // (6)
          .compose(id -> sendOrderAwaitResult(order.setOrderId(id))) // (7)
          .compose(result -           
               
                                           
                       
                 

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/65105.html

相關文章

  • Vert.x Blueprint 系列教程(二) | 開發基于消息的應用 - Vert.x Kue

    摘要:本文章是藍圖系列的第二篇教程。這就是請求回應模式。好多屬性我們一個一個地解釋一個序列,作為的地址任務的編號任務的類型任務攜帶的數據,以類型表示任務優先級,以枚舉類型表示。默認優先級為正常任務的延遲時間,默認是任務狀態,以枚舉類型表示。 本文章是 Vert.x 藍圖系列 的第二篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項服務開發教程 Vert.x B...

    elina 評論0 收藏0
  • Vert.x Blueprint 系列教程(一) | 待辦事項服務開發教程

    摘要:本文章是藍圖系列的第一篇教程。是事件驅動的,同時也是非阻塞的。是一組負責分發和處理事件的線程。注意,我們絕對不能去阻塞線程,否則事件的處理過程會被阻塞,我們的應用就失去了響應能力。每個負責處理請求并且寫入回應結果。 本文章是 Vert.x 藍圖系列 的第一篇教程。全系列: Vert.x Blueprint 系列教程(一) | 待辦事項服務開發教程 Vert.x Blueprint 系...

    frank_fun 評論0 收藏0
  • Vert.x Blueprint 系列教程(二) | Vert.x Kue 教程(Web部分)

    摘要:上部分藍圖教程中我們一起探索了如何用開發一個基于消息的應用。對部分來說,如果看過我們之前的藍圖待辦事項服務開發教程的話,你應該對這一部分非常熟悉了,因此這里我們就不詳細解釋了。有關使用實現的教程可參考藍圖待辦事項服務開發教程。 上部分藍圖教程中我們一起探索了如何用Vert.x開發一個基于消息的應用。在這部分教程中,我們將粗略地探索一下kue-http模塊的實現。 Vert.x Kue ...

    Kerr1Gan 評論0 收藏0
  • Vert.x入坑須知(4)

    摘要:主要是避免引入太多的復雜性,并且出于靈活部署的需要。以應用為例,由于實際上是在上執行,若它被阻塞,即導致后續請求全部無法得到處理。因此,最合適的做法就是對于簡單業務,采用異步庫。本系列其他文章入坑須知入坑須知入坑須知 最開始覺得這個系列也就最多3篇了不起了(因為事不過三嘛),沒曾想居然迎來了第四篇! Kotlin 由于最近決定投身到區塊鏈的學習當中的緣故,出于更好的理解它的基本概念,自...

    summerpxy 評論0 收藏0
  • 【小項目】全棧開發培訓手冊 | 后端(1) vert.x框架理解

    摘要:二來,給大家新開坑的項目一個參考。因此,本系列以主要以官方文檔為基礎,將盡可能多的特性融入本項目,并標注官網原文出處,有興趣的小伙伴可點擊深入了解。可以通過一些特殊協議例如將消息作為統一消息服務導出。下載完成后自行修改和。 開坑前言 我給這個專欄的名氣取名叫做小項目,聽名字就知道,這個專題最終的目的是帶領大家完成一個項目。為什么要開這么大一個坑呢,一來,雖然網上講IT知識點的書籍鋪天蓋...

    hightopo 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<