摘要:易用的客戶端自身提供了簡單的客戶端,數據格式較為復雜,處理消費數據也不太方便,為了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。
易用的canaljava 客戶端
canal 自身提供了簡單的客戶端,數據格式較為復雜,處理消費數據也不太方便,為了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。
先說一下實現的思路,首先canal 客戶端的消息對象有兩種,message 和 flatMessage,分別是普通的消息(protobuf格式)和消息隊列的扁平消息(json格式),現在將這兩種消息轉化為我們直接使用的 model 對象,根據消息中的數據庫表名稱找到對應的的實體對象,那么如何根據數據庫表名找到實體對象呢?
第一種方式,如果我們的實體對象都使用JPA 的 @Table注解來標識表和實體的對應關系,可以使用該注解來找到實體對象和表名的關系
第二種方式,可以使用自定義注解的來標注實體和表名的關系,為解耦各個表的處理,我們使用策略模式來封裝各個表的增刪改操作
canal client和server交互之間的身份標識,目前clientId寫死為1001. (目前canal server上的一個instance只能有一個client消費,clientId的設計是為1個instance多client消費模式而預留的)
CanalConnectorSimpleCanalConnector/ClusterCanalConnector : 兩種connector的實現,simple針對的是簡單的ip直連模式,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制
CanalNodeAccessStrategySimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實現,simple針對給定的初始ip列表進行failover選擇,cluster基于zookeeper上的cluster節點動態選擇正在運行的canal server.
ClientRunningMonitor/ClientRunningListener/ClientRunningDataclient running相關控制,主要為解決client自身的failover機制。canal client允許同時啟動多個canal client,通過running機制,可保證只有一個client在工作,其他client做為冷備. 當運行中的client掛了,running會控制讓冷備中的client轉為工作模式,這樣就可以確保canal client也不會是單點. 保證整個系統的高可用性.
Canal 客戶端類型canal 客戶端可以主要分以下幾種類型
單一ip 直連模式這種方式下,可以啟動多個客戶端,連接同一個canal 服務端,多個客戶端只有一個client 工作,其他的可以作為冷備,當一個client的掛了,其他的客戶端會有一個進入工作模式
缺點:連接同一個服務端,如果服務端掛了將導致不可用
這種方式下,客戶端連接多個canal服務端,一個客戶端隨機選擇一個canal server 消費,當這個server 掛了,會選擇另外一個進行消費
缺點:不支持訂閱消費
使用zookeeper來server,client 的狀態,當兩個canal server 連接zookeeper 后,
優先連接的節點作為 活躍節點,client從活躍節點消費,當server掛了以后,從另外一個節點消費
缺點:不支持訂閱消費
canal 支持消息直接發送到消息隊列,從消息隊列消費,目前支持的有kafka 和rocketMq,這種方式支持訂閱消費
canal 客戶端實現 EntryHandler 實體消息處理器首先定義一個策略接口,定義增加,更新,刪除功能,使用java 8聲明方法為default,讓客戶端選擇實現其中的方法,提高靈活性,客戶端實現EntryHandler接口后,會返回基于handler中的泛型的實例對象,在對應的方法中實現自定義邏輯
public interface EntryHandler{ default void insert(T t) { } default void update(T before, T after) { } default void delete(T t) { } }
定義一個canalClient 的抽象類,封裝canal 的鏈接開啟關閉操作,啟動一個線程不斷去消費canal 數據,依賴一個 messageHandler 封裝消息處理的邏輯
public abstract class AbstractCanalClient implements CanalClient { @Override public void start() { log.info("start canal client"); workThread = new Thread(this::process); workThread.setName("canal-client-thread"); flag = true; workThread.start(); } @Override public void stop() { log.info("stop canal client"); flag = false; if (null != workThread) { workThread.interrupt(); } } @Override public void process() { if (flag) { try { connector.connect(); connector.subscribe(filter); while (flag) { Message message = connector.getWithoutAck(batchSize, timeout, unit); log.info("獲取消息 {}", message); long batchId = message.getId(); if (message.getId() != -1 && message.getEntries().size() != 0) { messageHandler.handleMessage(message); } connector.ack(batchId); } } catch (Exception e) { log.error("canal client 異常", e); } finally { connector.disconnect(); } } } }
基于該抽象類,分別提供各種客戶端的實現
SimpleCanalClient
ClusterCanalClient
ZookeeperCanalClient
KafkaCanalClient
消息處理器 messageHandler消息處理器 messageHandler 封裝了消息處理邏輯,其中定義了一個消息處理方法
public interface MessageHandler{ void handleMessage(T t); }
消息處理器可能要適配4種情況,分別是消費message,flatMessage和兩種消息的同步與異步消費
消息處理的工作主要有兩個
獲取增刪改的行數據,交給行處理器繼續處理
在上下文對象中保存其他的數據,例如庫名,表名,binlog 時間戳等等數據
首先我們封裝一個抽象的 message 消息處理器,實現MessageHandler接口
public abstract class AbstractMessageHandler implements MessageHandler{ @Override public void handleMessage(Message message) { List entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) { try { EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName()); if(entryHandler!=null){ CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName()) .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build(); CanalContext.setModel(model); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List rowDataList = rowChange.getRowDatasList(); CanalEntry.EventType eventType = rowChange.getEventType(); for (CanalEntry.RowData rowData : rowDataList) { rowDataHandler.handlerRowData(rowData,entryHandler,eventType); } } } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); }finally { CanalContext.removeModel(); } } } } }
分別定義兩個實現類,同步與異步實現類,繼承AbstractMessageHandler抽象類
public class SyncMessageHandlerImpl extends AbstractMessageHandler { public SyncMessageHandlerImpl(List extends EntryHandler> entryHandlers, RowDataHandlerrowDataHandler) { super(entryHandlers, rowDataHandler); } @Override public void handleMessage(Message message) { super.handleMessage(message); } }
public class AsyncMessageHandlerImpl extends AbstractMessageHandler { private ExecutorService executor; public AsyncMessageHandlerImpl(List extends EntryHandler> entryHandlers, RowDataHandlerRowDataHandler 行消息處理器rowDataHandler, ExecutorService executor) { super(entryHandlers, rowDataHandler); this.executor = executor; } @Override public void handleMessage(Message message) { executor.execute(() -> super.handleMessage(message)); } }
消息處理器依賴的行消息處理器主要是將原始的column list 轉為 實體對象,并將相應的增刪改消息交給相應的hangler對象方法,行消息處理器分別需要處理兩種對象,一個是 message的行數據 和 flatMessage 的行數據
public interface RowDataHandler{ void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception; }
兩個行處理器的實現為
public class RowDataHandlerImpl implements RowDataHandler{ private IModelFactory > modelFactory; public RowDataHandlerImpl(IModelFactory modelFactory) { this.modelFactory = modelFactory; } @Override public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception { if (entryHandler != null) { switch (eventType) { case INSERT: Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList()); entryHandler.insert(object); break; case UPDATE: Set
updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated) .map(CanalEntry.Column::getName).collect(Collectors.toSet()); Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet); Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList()); entryHandler.update(before, after); break; case DELETE: Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList()); entryHandler.delete(o); break; default: break; } } } }
public class MapRowDataHandlerImpl implements RowDataHandlerIModelFactory bean實例創建工廠>> { private IModelFactory
行消息處理的依賴的工廠 主要是是通過反射創建與表名稱對應的bean實例
public interface IModelFactoryCanalContext canal 消息上下文{ Object newInstance(EntryHandler entryHandler, T t) throws Exception; default Object newInstance(EntryHandler entryHandler, T t, Set updateColumn) throws Exception { return null; } }
目前主要用于保存bean實例以外的其他數據,使用threadLocal實現
代碼已在github開源canal-client
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74866.html
摘要:比如,服務數據庫的數據來源于服務的數據庫服務的數據有變更操作時,需要同步到服務中。第二種解決方案通過數據庫的進行同步。并且,我們還用這套架構進行緩存失效的同步。目前這套同步架構正常運行中,后續有遇到問題再繼續更新。在微服務拆分的架構中,各服務擁有自己的數據庫,所以常常會遇到服務之間數據通信的問題。比如,B服務數據庫的數據來源于A服務的數據庫;A服務的數據有變更操作時,需要同步到B服務中。第一...
閱讀 2566·2021-09-02 15:40
閱讀 1572·2019-08-30 15:54
閱讀 1086·2019-08-30 12:48
閱讀 3406·2019-08-29 17:23
閱讀 1053·2019-08-28 18:04
閱讀 3670·2019-08-26 13:54
閱讀 611·2019-08-26 11:40
閱讀 2401·2019-08-26 10:15