一、背景簡介

分布式系統中存在很多拆分的服務,在不斷迭代升級的過程中,會出現如下常見的棘手情況:

某個技術組件版本升級,依賴包升級導致部分語法或者API過期,或者組件修復緊急的問題,從而會導致分布式系統下各個服務被動的升級迭代,很容易引發意外的問題;不同的服務中對組件的依賴和版本各不相同,從而導致不兼容問題的出現,很難對版本做統一的管理和維護,一旦出現問題很容易手忙腳亂,引發蝴蝶效應;

所以在復雜的系統中,對于依賴的框架和組件進行統一管理和二次淺封裝,可以較大程度降低上述問題的處理成本與風險,同時可以更好的管理和控制技術棧。

二、框架淺封裝

1、淺封裝作用

為什么淺封裝,核心目的在于統一管理和協調組件的依賴與升級,并對常用方法做一層包裝,實際上很多組件使用到的功能點并不多,只是在業務中的使用點很多,這樣給組件本身的迭代升級帶來了一定的難度:

例如某個組件常用的API中存在巨大風險,或者替換掉過期的用法,需要對整個系統中涉及的地方做升級,這種操作的成本是非常高的;

如果是對這種常用的組件方法進行二次包裝,作為處理業務的工具方法,那么解決上面的問題就相對輕松許多,只要對封裝的工具方法升級,服務的依賴升級即可,降低時間成本和風險。

通過淺封裝的手段,可以實現兩個方面的解耦:

業務與技術

技術棧中常用的方法進行二次淺封裝,這樣可以較大程度的降低業務與技術的耦合,如此可以獨立的升級技術棧,擴展功能而不影響業務服務的迭代。

框架與組件

不同的框架與組件都需要一定程度的自定義配置,同時分模塊管理,在不同的服務中引入特定的依賴,也可以在基礎包中做統一依賴,以此實現技術棧的快速組合搭配。

這里說的淺封裝,是指包裝常規常用的語法,組件本身就是技術層面的深度封裝,所以也不可能完全隔開技術棧原生用法。

2、統一版本控制

例如微服務架構下,不同的研發組負責不同的業務模塊,然而受到開發人員的經驗和能力影響,很容易出現不同的服務組件選型不一致,或者相同的組件依賴版本不同,這樣很難對系統架構做標準的統一管理。

對于二次封裝的方式,可以嚴格的控制技術棧的迭代擴展,以及版本沖突的問題,通過對二次封裝層的統一升級,可以快速實現業務服務的升級,解決不同服務的依賴差異問題。

三、實踐案例

1、案例簡介

Java分布式系統中,微服務基礎組件(Nacos、Feign、Gateway、Seata)等,系統中間件(Quartz、Redis、Kafka、ElasticSearch,Logstash)等,對常用功能、配置、API等,進行二次淺封裝并統一集成管理,以滿足日常開發中基礎環境搭建與臨時工具的快速實現。

  • butte-flyer 組件封裝的應用案例;
  • butte-frame 常用技術組件二次封裝;

2、分層架構

整體劃分五層:網關層、應用層、業務層、中間件層、基礎層,組合成一套分布式系統。

服務總覽

服務名 分層 端口 緩存庫 數據庫 描述
flyer-gateway 網關層 8010 db1 nacos 路由控制
flyer-facade 應用層 8082 db2 facade 門面服務
flyer-admin 應用層 8083 db3 admin 后端管理
flyer-account 業務層 8084 db4 account 賬戶管理
flyer-quartz 業務層 8085 db5 quartz 定時任務
kafka 中間件 9092 --- ------ 消息隊列
elasticsearch 中間件 9200 --- ------ 搜索引擎
redis 中間件 6379 --- ------ 緩存中心
logstash 中間件 5044 --- es6.8.6 日志采集
nacos 基礎層 8848 --- nacos 注冊配置
seata 基礎層 8091 --- seata 分布事務
mysql 基礎層 3306 --- ------ 數據存儲

3、目錄結構

butte-frame中對各個技術棧進行二次封裝管理,在butte-flyer中進行依賴引用。

butte-frame├── frame-base          基礎代碼塊├── frame-jdbc          數據庫組件├── frame-core          服務基礎依賴├── frame-gateway       路由網關├── frame-nacos         注冊與配置中心├── frame-seata         分布式事務├── frame-feign         服務間調用├── frame-security      安全管理├── frame-search        搜索引擎├── frame-redis         緩存管理├── frame-kafka         消息中間件├── frame-quartz        定時任務├── frame-swagger       接口文檔└── frame-sleuth        鏈路日志butte-flyer├── flyer-gateway       網關服務:路由控制├── flyer-facade        門面服務:功能協作接口├── flyer-account       賬戶服務:用戶賬戶├── flyer-quartz        任務服務:定時任務└── flyer-admin         管理服務:后端管理

4、技術棧組件

系統常用的技術棧:基礎框架、微服務組件、緩存、安全管理、數據庫、定時任務、工具依賴等。

名稱 版本 說明
spring-cloud 2.2.5.RELEASE 微服務框架基礎
spring-boot 2.2.5.RELEASE 服務基礎依賴
gateway 2.2.5.RELEASE 路由網關
nacos 2.2.5.RELEASE 注冊中心與配置管理
seata 2.2.5.RELEASE 分布式事務管理
feign 2.2.5.RELEASE 微服務間請求調用
security 2.2.5.RELEASE 安全管理
sleuth 2.2.5.RELEASE 請求軌跡鏈路
security-jwt 1.0.10.RELEASE JWT加密組件
hikari 3.4.2 數據庫連接池,默認
mybatis-plus 3.4.2 ORM持久層框架
kafka 2.0.1 MQ消息隊列
elasticsearch 6.8.6 搜索引擎
logstash 5.2 日志采集
redis 2.2.5.RELEASE 緩存管理與加鎖控制
quartz 2.3.2 定時任務管理
swagger 2.6.1 接口文檔
apache-common 2.7.0 基礎依賴包
hutool 5.3.1 基礎工具包

四、微服務組件

1、Nacos

Nacos在整個組件體系中,提供兩個核心能力,注冊發現:適配微服務注冊與發現標準,快速實現動態服務注冊發現、元數據管理等,提供微服務組件中最基礎的能力;配置中心:統一管理各個服務配置,集中在Nacos中存儲管理,隔離多環境的不同配置,并且可以規避線上配置放開的風險;

連接管理

spring:  cloud:    nacos:      # 配置讀取      config:        prefix: application        server-addr: 127.0.0.1:8848        file-extension: yml        refresh-enabled: true      # 注冊中心      discovery:        server-addr: 127.0.0.1:8848

配置管理

  • bootstrap.yml :服務中文件,連接和讀取Nacos中配置信息;
  • application.yml :公共基礎配置,這里配置mybatis組件;
  • application-dev.yml :中間件連接配置,用作環境標識隔離;
  • application-def.yml :各個服務的自定義配置,參數加載;

2、Gateway

Gateway網關核心能力,提供統一的API路由管理,作為微服務架構體系下請求唯一入口,還可以在網關層處理所有的非業務功能,例如:安全控制,流量監控限流,等等。

路由控制:各個服務的發現和路由;

@Componentpublic class RouteFactory implements RouteDefinitionRepository {    @Resource    private RouteService routeService ;    /**     * 加載全部路由     * @since 2021-11-14 18:08     */    @Override    public Flux getRouteDefinitions() {        return Flux.fromIterable(routeService.getRouteDefinitions());    }    /**     * 添加路由     * @since 2021-11-14 18:08     */    @Override    public Mono save(Mono routeMono) {        return routeMono.flatMap(routeDefinition -> {            routeService.saveRouter(routeDefinition);            return Mono.empty();        });    }}

全局過濾:作為網關的基礎能力;

@Componentpublic class GatewayFilter implements GlobalFilter {    private static final Logger logger = LoggerFactory.getLogger(GatewayFilter.class);    @Override    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {        ServerHttpRequest request = exchange.getRequest();        String uri = request.getURI().getPath() ;        String host = String.valueOf(request.getHeaders().getHost()) ;        logger.info("request host : {} , uri : {}",host,uri);        return chain.filter(exchange);    }}

3、Feign

Feign組件是聲明式的WebService客戶端,使微服務之間的調用變得更簡單,Feign通過注解手段,將請求進行模板化和接口化管理,可以更加標準的管理各個服務間的通信交互。

響應解碼:定義Feign接口響應時解碼邏輯,校驗和控制統一的接口風格;

public class FeignDecode extends ResponseEntityDecoder {    public FeignDecode(Decoder decoder) {        super(decoder);    }    @Override    public Object decode(Response response, Type type) {        if (!type.getTypeName().startsWith(Rep.class.getName())) {            throw new RuntimeException("響應格式異常");        }        try {            return super.decode(response, type);        } catch (IOException e) {            e.printStackTrace();            throw new RuntimeException(e.getMessage());        }    }}

4、Seata

Seata組件是開源的分布式事務解決方案,致力于提供高性能和簡單易用的分布式事務服務,實現AT、TCC、SAGA、XA事務模式,支持一站式的分布式解決方案。

事務配置:基于nacos管理Seata組件的參數定義;

服務注冊:在需要管理分布式事務的服務中連接和使用Seata服務;

seata:  enabled: true  application-id: ${spring.application.name}  tx-service-group: butte-seata-group  config:    type: nacos    nacos:      server-addr: ${spring.cloud.nacos.config.server-addr}      group: DEFAULT_GROUP  registry:    type: nacos    nacos:      server-addr: ${spring.cloud.nacos.config.server-addr}      application: seata-server      group: DEFAULT_GROUP

五、中間件集成

1、Kafka

Kafka是由Apache開源,具有分布式、分區的、多副本的、多訂閱者,基于Zookeeper協調的分布式消息處理平臺,由Scala和Java語言編寫。還常用于搜集用戶在應用服務中產生的日志數據。

消息發送:封裝消息發送的基礎能力;

@Componentpublic class KafkaSendOperate {    @Resource    private KafkaTemplate kafkaTemplate ;    public void send (SendMsgVO entry) {        kafkaTemplate.send(entry.getTopic(),entry.getKey(),entry.getMsgBody()) ;    }}

消息消費:消費監聽時有兩種策略;

  • 消息生產方自己消費,通過Feign接口去執行具體消費服務的邏輯,這樣有利于流程跟蹤排查;
  • 消息消費方直接監聽,減少消息處理的流程節點,當然也可以打造統一的MQ總線服務(文尾);
public class KafkaListen {    private static final Logger logger = LoggerFactory.getLogger(KafkaListen.class);    /**     * Kafka消息監聽     * @since 2021-11-06 16:47     */    @KafkaListener(topics = KafkaTopic.USER_TOPIC)    public void listenUser (ConsumerRecord record, Acknowledgment acknowledgment) {        try {            String key =  String.valueOf(record.key());            String body = record.value();            switch (key){ }        } catch (Exception e){            e.printStackTrace();        } finally {            acknowledgment.acknowledge();        }    }}

2、Redis

Redis是一款開源組件,基于內存的高性能的key-value數據結構存儲系統,它可以用作數據庫、緩存和消息中間件,支持多種類型的數據結構,如字符串、集合等。在實際應用中,通常用來做變動頻率低的熱點數據緩存和加鎖機制。

KV數據緩存:作為Redis最常用的功能,即緩存一個指定有效期的鍵和值,在使用時直接獲??;

@Componentpublic class RedisKvOperate {    @Resource    private StringRedisTemplate stringRedisTemplate ;    /**     * 創建緩存,必須帶緩存時長     * @param key 緩存Key     * @param value 緩存Value     * @param expire 單位秒     * @return boolean     * @since 2021-08-07 21:12     */    public boolean set (String key, String value, long expire) {        try {            stringRedisTemplate.opsForValue().set(key,value,expire, TimeUnit.SECONDS);        } catch (Exception e){            e.printStackTrace();            return Boolean.FALSE ;        }        return Boolean.TRUE ;    }}

Lock加鎖機制:基于spring-integration-redisRedisLockRegistry,實現分布式鎖;

@Componentpublic class RedisLockOperate {    @Resource    protected RedisLockRegistry redisLockRegistry;    /**     * 嘗試一次加鎖,采用默認時間     * @param lockKey 加鎖Key     * @return java.lang.Boolean     * @since 2021-09-12 13:14     */    @SneakyThrows    public  Boolean tryLock(T lockKey) {        return redisLockRegistry.obtain(lockKey).tryLock(time, TimeUnit.MILLISECONDS);    }    /**     * 釋放鎖     * @param lockKey 解鎖Key     * @since 2021-09-12 13:32     */    public  void unlock(T lockKey) {        redisLockRegistry.obtain(lockKey).unlock();    }}

3、ElasticSearch

ElasticSearch是一個基于Lucene的搜索服務器,它提供了一個分布式多用戶能力的全文搜索引擎,基于RESTful web接口,Elasticsearch是用Java開發的,是當前流行的企業級搜索引擎。

索引管理:索引的創建和刪除,結構添加和查詢;

基于ElasticsearchRestTemplate的模板方法操作;

@Componentpublic class TemplateOperate {    @Resource    private ElasticsearchRestTemplate template ;    /**     * 創建索引和結構     * @param clazz 基于注解類實體     * @return java.lang.Boolean     * @since 2021-08-15 19:25     */    public  Boolean createPut (Class clazz){        boolean createIf = template.createIndex(clazz) ;        if (createIf){            return template.putMapping(clazz) ;        }        return Boolean.FALSE ;    }}

基于RestHighLevelClient原生API操作;

@Componentpublic class IndexOperate {    @Resource    private RestHighLevelClient client ;    /**     * 判斷索引是否存在     * @return boolean     * @since 2021-08-07 18:57     */    public boolean exists (IndexVO entry) {        GetIndexRequest getReq = new GetIndexRequest (entry.getIndexName()) ;        try {            return client.indices().exists(getReq, entry.getOptions());        } catch (Exception e) {            e.printStackTrace();        }        return Boolean.FALSE ;    }}

數據管理:數據新增、主鍵查詢、修改、批量操作,業務性質的搜索封裝復雜度很高;

數據的增刪改方法;

@Componentpublic class DataOperate {    @Resource    private RestHighLevelClient client ;    /**     * 批量更新數據     * @param entry 對象主體     * @since 2021-08-07 18:16     */    public void bulkUpdate (DataVO entry){        if (CollUtil.isEmpty(entry.getDataList())){            return ;        }        // 請求條件        BulkRequest bulkUpdate = new BulkRequest(entry.getIndexName(),entry.getType()) ;        bulkUpdate.setRefreshPolicy(entry.getRefresh()) ;        entry.getDataList().forEach(dataMap -> {            UpdateRequest updateReq = new UpdateRequest() ;            updateReq.id(String.valueOf(dataMap.get("id"))) ;            updateReq.doc(dataMap) ;            bulkUpdate.add(updateReq) ;        });        try {            // 執行請求            client.bulk(bulkUpdate, entry.getOptions());        } catch (IOException e) {            e.printStackTrace();        }    }}

索引主鍵查詢,分組查詢方法;

@Componentpublic class QueryOperate {    @Resource    private RestHighLevelClient client ;    /**     * 指定字段分組查詢     * @since 2021-10-07 19:00     */    public Map groupByField (QueryVO entry){        Map groupMap = new HashMap<>() ;        // 分組API        String groupName = entry.getGroupField()+"_group" ;        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();        sourceBuilder.size(0) ;        TermsAggregationBuilder termAgg = AggregationBuilders.terms(groupName)                                                             .field(entry.getGroupField()) ;        sourceBuilder.aggregation(termAgg);        // 查詢API        SearchRequest searchRequest = new SearchRequest(entry.getIndexName());        searchRequest.source(sourceBuilder) ;        try {            // 執行API            SearchResponse response = client.search(searchRequest, entry.getOptions());            // 響應結果            Terms groupTerm = response.getAggregations().get(groupName) ;            if (CollUtil.isNotEmpty(groupTerm.getBuckets())){                for (Terms.Bucket bucket:groupTerm.getBuckets()){                    groupMap.put(bucket.getKeyAsString(),bucket.getDocCount()) ;                }            }        } catch (IOException e) {            e.printStackTrace();        }        return groupMap ;    }}

4、Logstash

Logstash是一款開源的數據采集組件,具有實時管道功能。Logstash能夠動態的從多個來源采集數據,進行標準化轉換數據,并將數據傳輸到所選擇的存儲容器。

  • Sleuth:管理服務鏈路,提供核心TraceId和SpanId生成;
  • ElasticSearch:基于ES引擎做日志聚合存儲和查詢;
  • Logstash:提供日志采集服務,和數據發送ES的能力;

logback.xml:服務連接Logstash地址,并加載核心配置;

                                ${DES_URI:- }:${DES_PORT:- }                                                        UTC                                                                            {                        "severity": "%level",                        "service": "${APP_NAME:-}",                        "trace": "%X{X-B3-TraceId:-}",                        "span": "%X{X-B3-SpanId:-}",                        "exportable": "%X{X-Span-Export:-}",                        "pid": "${PID:-}",                        "thread": "%thread",                        "class": "%logger{40}",                        "rest": "%message"                        }                                                            

5、Quartz

Quartz是一個完全由java編寫的開源作業調度框架,用來執行各個服務中的定時調度任務,在微服務體系架構下,通常開發一個獨立的Quartz服務,通過Feign接口去觸發各個服務的任務執行。

配置參數:定時任務基礎信息,數據庫表,線程池;

spring:  quartz:    job-store-type: jdbc    properties:      org:        quartz:          scheduler:            instanceName: ButteScheduler            instanceId: AUTO          jobStore:            class: org.quartz.impl.jdbcjobstore.JobStoreTX            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate            tablePrefix: qrtz_            isClustered: true            clusterCheckinInterval: 15000            useProperties: false          threadPool:            class: org.quartz.simpl.SimpleThreadPool            threadPriority: 5            threadCount: 10            threadsInheritContextClassLoaderOfInitializingThread: true

6、Swagger

Swagger是常用的接口文檔管理組件,通過對API接口和對象的簡單注釋,快速生成接口描述信息,并且提供可視化界面可以快速對接口發送請求和調試,該組件在前后端聯調中,極大的提高效率。

配置基本的包掃描能力即可;

@Configurationpublic class SwaggerConfig {    @Bean    public Docket createRestApi() {        return new Docket(DocumentationType.SWAGGER_2)                .apiInfo(apiInfo())                .select()                .apis(RequestHandlerSelectors.basePackage("com.butte"))                .paths(PathSelectors.any())                .build();    }}

訪問:服務:端口/swagger-ui.html即可打開接口文檔;

六、數據庫配置

1、MySQL

微服務架構下,不同的服務對應不同的MySQL庫,基于業務模塊做庫的劃分是當前常用的方式,可以對各自業務下的服務做迭代升級,同時可以避免單點故障導致雪崩效應。

2、HikariCP

HikariCP作為SpringBoot2版本推薦和默認采用的數據庫連接池,具有速度極快、輕量簡單的特點。

spring:  datasource:    type: com.zaxxer.hikari.HikariDataSource    driver-class-name: com.mysql.cj.jdbc.Driver    url: jdbc:mysql://127.0.0.1:3306/${data.name.mysql}?${spring.datasource.db-param}    username: root    password: 123456    db-param: useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&useSSL=false    hikari:      minimumIdle: 5      maximumPoolSize: 10      idleTimeout: 300000      maxLifetime: 500000      connectionTimeout: 30000

連接池的配置根據業務的并發需求量,做適當的調優即可。

3、Mybatis

Mybatis持久層的框架組件,支持定制化SQL、存儲過程以及高級映射,MyBatis-Plus是一個MyBatis的增強工具,在MyBatis的基礎上只做增強不做改變,可以簡化開發、提高效率。

mybatis-plus:  mapper-locations: classpath*:/mapper/**/*.xml  configuration:    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
  • 同系列-架構:┃ 分布式 ┃ 消息中間件 ┃ 事務管理 ┃ 高并發 ┃ 緩存管理 ┃
  • 同系列-組件:┃ Kafka消息 ┃ ElasticSearch搜索 ┃ Redis緩存 ┃ Quartz任務 ┃ Swagger2接口 ┃

七、源代碼地址

應用倉庫:https://gitee.com/cicadasmile/butte-flyer-parent組件封裝:https://gitee.com/cicadasmile/butte-frame-parent