国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

易用的 canal java 客戶端 canal-client

aboutU / 1239人閱讀

摘要:易用的客戶端自身提供了簡單的客戶端,數據格式較為復雜,處理消費數據也不太方便,為了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。

易用的canaljava 客戶端

canal 自身提供了簡單的客戶端,數據格式較為復雜,處理消費數據也不太方便,為了方便給業務使用,提供一種直接能獲取實體對象的方式來進行消費才更方便。
先說一下實現的思路,首先canal 客戶端的消息對象有兩種,message 和 flatMessage,分別是普通的消息(protobuf格式)和消息隊列的扁平消息(json格式),現在將這兩種消息轉化為我們直接使用的 model 對象,根據消息中的數據庫表名稱找到對應的的實體對象,那么如何根據數據庫表名找到實體對象呢?
第一種方式,如果我們的實體對象都使用JPA 的 @Table注解來標識表和實體的對應關系,可以使用該注解來找到實體對象和表名的關系
第二種方式,可以使用自定義注解的來標注實體和表名的關系,為解耦各個表的處理,我們使用策略模式來封裝各個表的增刪改操作

canal 主要客戶端類 ClientIdentity

canal client和server交互之間的身份標識,目前clientId寫死為1001. (目前canal server上的一個instance只能有一個client消費,clientId的設計是為1個instance多client消費模式而預留的)

CanalConnector

SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實現,simple針對的是簡單的ip直連模式,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制

CanalNodeAccessStrategy

SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實現,simple針對給定的初始ip列表進行failover選擇,cluster基于zookeeper上的cluster節點動態選擇正在運行的canal server.

ClientRunningMonitor/ClientRunningListener/ClientRunningData

client running相關控制,主要為解決client自身的failover機制。canal client允許同時啟動多個canal client,通過running機制,可保證只有一個client在工作,其他client做為冷備. 當運行中的client掛了,running會控制讓冷備中的client轉為工作模式,這樣就可以確保canal client也不會是單點. 保證整個系統的高可用性.

Canal 客戶端類型

canal 客戶端可以主要分以下幾種類型

單一ip 直連模式

這種方式下,可以啟動多個客戶端,連接同一個canal 服務端,多個客戶端只有一個client 工作,其他的可以作為冷備,當一個client的掛了,其他的客戶端會有一個進入工作模式
缺點:連接同一個服務端,如果服務端掛了將導致不可用

多ip 模式

這種方式下,客戶端連接多個canal服務端,一個客戶端隨機選擇一個canal server 消費,當這個server 掛了,會選擇另外一個進行消費
缺點:不支持訂閱消費

zookeeper 模式

使用zookeeper來server,client 的狀態,當兩個canal server 連接zookeeper 后,
優先連接的節點作為 活躍節點,client從活躍節點消費,當server掛了以后,從另外一個節點消費
缺點:不支持訂閱消費

消息 隊列模式

canal 支持消息直接發送到消息隊列,從消息隊列消費,目前支持的有kafka 和rocketMq,這種方式支持訂閱消費

canal 客戶端實現 EntryHandler 實體消息處理器

首先定義一個策略接口,定義增加,更新,刪除功能,使用java 8聲明方法為default,讓客戶端選擇實現其中的方法,提高靈活性,客戶端實現EntryHandler接口后,會返回基于handler中的泛型的實例對象,在對應的方法中實現自定義邏輯

public interface EntryHandler {

    default void insert(T t) {

    }


    default void update(T before, T after) {

    }


    default void delete(T t) {

    }
}

定義一個canalClient 的抽象類,封裝canal 的鏈接開啟關閉操作,啟動一個線程不斷去消費canal 數據,依賴一個 messageHandler 封裝消息處理的邏輯

public abstract class AbstractCanalClient implements CanalClient {



    @Override
    public void start() {
        log.info("start canal client");
        workThread = new Thread(this::process);
        workThread.setName("canal-client-thread");
        flag = true;
        workThread.start();
    }

    @Override
    public void stop() {
        log.info("stop canal client");
        flag = false;
        if (null != workThread) {
            workThread.interrupt();
        }

    }

    @Override
    public void process() {
        if (flag) {
            try {
                connector.connect();
                connector.subscribe(filter);
                while (flag) {
                    Message message = connector.getWithoutAck(batchSize, timeout, unit);
                    log.info("獲取消息 {}", message);
                    long batchId = message.getId();
                    if (message.getId() != -1 && message.getEntries().size() != 0) {
                        messageHandler.handleMessage(message);
                    }
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                log.error("canal client 異常", e);
            } finally {
                connector.disconnect();
            }
        }
    }

}

基于該抽象類,分別提供各種客戶端的實現

SimpleCanalClient

ClusterCanalClient

ZookeeperCanalClient

KafkaCanalClient

消息處理器 messageHandler

消息處理器 messageHandler 封裝了消息處理邏輯,其中定義了一個消息處理方法

public interface MessageHandler {

     void handleMessage(T t);

}

消息處理器可能要適配4種情況,分別是消費message,flatMessage和兩種消息的同步與異步消費
消息處理的工作主要有兩個

獲取增刪改的行數據,交給行處理器繼續處理

在上下文對象中保存其他的數據,例如庫名,表名,binlog 時間戳等等數據

首先我們封裝一個抽象的 message 消息處理器,實現MessageHandler接口

public abstract class AbstractMessageHandler implements MessageHandler {


    @Override
    public void handleMessage(Message message) {
        List entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
                try {
                    EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName());
                    if(entryHandler!=null){
                        CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())
                                .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();
                        CanalContext.setModel(model);
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List rowDataList = rowChange.getRowDatasList();
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        for (CanalEntry.RowData rowData : rowDataList) {
                            rowDataHandler.handlerRowData(rowData,entryHandler,eventType);
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }finally {
                   CanalContext.removeModel();
                }

            }
        }
    }
}

分別定義兩個實現類,同步與異步實現類,繼承AbstractMessageHandler抽象類

public class SyncMessageHandlerImpl extends AbstractMessageHandler {


    public SyncMessageHandlerImpl(List entryHandlers, RowDataHandler rowDataHandler) {
        super(entryHandlers, rowDataHandler);
    }

    @Override
    public void handleMessage(Message message) {
        super.handleMessage(message);
    }
}
public class AsyncMessageHandlerImpl extends AbstractMessageHandler {


    private ExecutorService executor;


    public AsyncMessageHandlerImpl(List entryHandlers, RowDataHandler rowDataHandler, ExecutorService executor) {
        super(entryHandlers, rowDataHandler);
        this.executor = executor;
    }

    @Override
    public void handleMessage(Message message) {
        executor.execute(() -> super.handleMessage(message));
    }
}
RowDataHandler 行消息處理器

消息處理器依賴的行消息處理器主要是將原始的column list 轉為 實體對象,并將相應的增刪改消息交給相應的hangler對象方法,行消息處理器分別需要處理兩種對象,一個是 message的行數據 和 flatMessage 的行數據

public interface RowDataHandler {


    void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception;
}

兩個行處理器的實現為

public class RowDataHandlerImpl implements RowDataHandler {



    private IModelFactory> modelFactory;




    public RowDataHandlerImpl(IModelFactory modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception {
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Set updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)
                            .map(CanalEntry.Column::getName).collect(Collectors.toSet());
                    Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);
                    Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
public class MapRowDataHandlerImpl implements RowDataHandler>> {



    private IModelFactory> modelFactory;


    public MapRowDataHandlerImpl(IModelFactory> modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(List> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Object before = modelFactory.newInstance(entryHandler, list.get(1));
                    Object after = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
IModelFactory bean實例創建工廠

行消息處理的依賴的工廠 主要是是通過反射創建與表名稱對應的bean實例

public interface IModelFactory {


    Object newInstance(EntryHandler entryHandler, T t) throws Exception;


    default Object newInstance(EntryHandler entryHandler, T t, Set updateColumn) throws Exception {
        return null;
    }
}
CanalContext canal 消息上下文

目前主要用于保存bean實例以外的其他數據,使用threadLocal實現

代碼已在github開源canal-client

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74866.html

相關文章

  • 使用canal+Kafka進行數據庫同步實踐

    摘要:比如,服務數據庫的數據來源于服務的數據庫服務的數據有變更操作時,需要同步到服務中。第二種解決方案通過數據庫的進行同步。并且,我們還用這套架構進行緩存失效的同步。目前這套同步架構正常運行中,后續有遇到問題再繼續更新。在微服務拆分的架構中,各服務擁有自己的數據庫,所以常常會遇到服務之間數據通信的問題。比如,B服務數據庫的數據來源于A服務的數據庫;A服務的數據有變更操作時,需要同步到B服務中。第一...

    Tecode 評論0 收藏0

發表評論

0條評論

aboutU

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<