摘要:作者楊非本文為源碼閱讀系列文章的第四篇,上篇文章介紹了數據同步處理單元實現的功能,數據同步流程的運行邏輯以及數據同步處理單元的設計。庫表黑白名單的實現方式。任務執行完成之后,主線程就會釋放鎖,這樣有助于減少鎖持有的時間。
作者:楊非
本文為 DM 源碼閱讀系列文章的第四篇,上篇文章 介紹了數據同步處理單元實現的功能,數據同步流程的運行邏輯以及數據同步處理單元的 interface 設計。本篇文章在此基礎上展開,詳細介紹 dump 和 load 兩個數據同步處理單元的設計實現,重點關注數據同步處理單元 interface 的實現,數據導入并發模型的設計,以及導入任務在暫停或出現異常后如何恢復。
dump 處理單元dump 處理單元的代碼位于 github.com/pingcap/dm/mydumper 包內,作用是從上游 MySQL 將表結構和數據導出到邏輯 SQL 文件,由于該處理單元總是運行在任務的第一個階段(full 模式和 all 模式),該處理單元每次運行不依賴于其他處理單元的處理結果。另一方面,如果在 dump 運行過程中被強制終止(例如在 dmctl 中執行 pause-task 或者 stop-task),也不會記錄已經 dump 數據的 checkpoint 等信息。不記錄 checkpoint 是因為每次運行 mydumper 從上游導出數據,上游的數據都可能發生變更,為了能得到一致的數據和 metadata 信息,每次恢復任務或重新運行任務時該處理單元會 清理舊的數據目錄,重新開始一次完整的數據 dump。
導出表結構和數據的邏輯并不是在 DM 內部直接實現,而是 通過 os/exec 包調用外部 mydumper 二進制文件 來完成。在 mydumper 內部,我們需要關注以下幾個問題:
數據導出時的并發模型是如何實現的。
no-locks, lock-all-tables, less-locking 等參數有怎樣的功能。
庫表黑白名單的實現方式。
mydumper 的實現細節mydumper 的一次完整的運行流程從主線程開始,主線程按照以下步驟執行:
解析參數。
創建到數據庫的連接。
會根據 no-locks 選項進行一系列的備份安全策略,包括 long query guard 和 lock all tables or FLUSH TABLES WITH READ LOCK。
START TRANSACTION WITH CONSISTENT SNAPSHOT。
記錄 binlog 位點信息。
less locking 處理線程的初始化。
普通導出線程初始化。
如果配置了 trx-consistency-only 選項,執行 UNLOCK TABLES /* trx-only */ 釋放之前獲取的表鎖。注意,如果開啟該選項,是無法保證非 InnoDB 表導出數據的一致性。更多關于一致性讀的細節可以參考 MySQL 官方文檔 Consistent Nonlocking Reads 部分。
根據配置規則(包括 --database, --tables-list 和 --regex 配置)讀取需要導出的 schema 和表信息,并在這個過程中有區分的記錄 innodb_tables 和 non_innodb_table。
為工作子線程創建任務,并將任務 push 到相關的工作隊列。
如果沒有配置 no-locks 和 trx-consistency-only 選項,執行 UNLOCK TABLES / FTWRL / 釋放鎖。
如果開啟 less-locking,等待所有 less locking 子線程退出。
等待所有工作子線程退出。
工作線程的并發控制包括了兩個層面,一層是在不同表級別的并發,另一層是同一張表級別的并發。mydumper 的主線程會將一次同步任務拆分為多個同步子任務,并將每個子任務分發給同一個異步隊列 conf.queue_less_locking/conf.queue,工作子線程從隊列中獲取任務并執行。具體的子任務劃分包括以下策略:
開啟 less-locking 選項的非 InnoDB 表的處理。
先將所有 non_innodb_table 分為 num_threads 組,分組方式是遍歷這些表,依此將遍歷到的表加入到當前數據量最小的分組,盡量保證每個分組內的數據量相近。
上述得到的每個分組內會包含一個或多個非 InnoDB 表,如果配置了 rows-per-file 選項,會對每張表進行 chunks 估算,對于每一張表,如果估算結果包含多個 chunks,會將子任務進一步按照 chunks 進行拆分,分發 chunks 數量個子任務,如果沒有 chunks 劃分,分發為一個獨立的子任務。
注意,在該模式下,子任務會 發送到 queue_less_locking,并在編號為 num_threads ~ 2 * num_threads 的子線程中處理任務。
less_locking_threads 任務執行完成之后,主線程就會 UNLOCK TABLES / FTWRL / 釋放鎖,這樣有助于減少鎖持有的時間。主線程根據 conf.unlock_tables 來判斷非 InnoDB 表是否全部導出,普通工作線程 或者 queue_less_locking 工作線程每次處理完一個非 InnoDB 表任務都會根據 non_innodb_table_counter 和 non_innodb_done 兩個變量判斷是否還有沒有導出結束的非 InnoDB 表,如果都已經導出結束,就會向異步隊列 conf.unlock_tables 中發送一條數據,表示可以解鎖全局鎖。
每個 less_locking_threads 處理非 InnoDB 表任務時,會先 加表鎖,導出數據,最后 解鎖表鎖。
未開啟 less-locking 選項的非 InnoDB 表的處理。
遍歷每一張非 InnoDB 表,同樣對每張表進行 chunks 估算,如果包含多個 chunks,按照 chunks 個數分發同樣的子任務數;如果沒有劃分 chunks,每張表分發一個子任務。所有的任務都分發到 conf->queue 隊列。
InnoDB 表的處理。
與未開啟 less-locking 選項的非 InnoDB 表的處理相同,同樣是 按照表分發子任務,如果有 chunks 子任務會進一步細分。
從上述的并發模型可以看出 mydumper 首先按照表進行同步任務拆分,對于同一張表,如果配置 rows-per-file 參數,會根據該參數和表行數將表劃分為合適的 chunks 數,這即是同一張表內部的并發。具體表行數的估算和 chunks 劃分的實現見 get_chunks_for_table 函數。
需要注意目前 DM 在任務配置中指定的庫表黑白名單功能只應用于 load 和 binlog replication 處理單元。如果在 dump 處理單元內使用庫表黑白名單功能,需要在同步任務配置文件的 dump 處理單元配置提供 extra-args 參數,并指定 mydumper 相關參數,包括 --database, --tables-list 和 --regex。mydumper 使用 regex 過濾庫表的實現參考 check_regex 函數。
load 處理單元load 處理單元的代碼位于 github.com/pingcap/dm/loader 包內,該處理單元在 dump 處理單元運行結束后運行,讀取 dump 處理單元導出的 SQL 文件解析并在下游數據庫執行邏輯 SQL。我們重點分析 Init 和 Process 兩個 interface 的實現。
Init 實現細節該階段進行一些初始化和清理操作,并不會開始同步任務,如果在該階段運行中出現錯誤,會通過 rollback 機制 清理資源,不需要調用 Close 函數。該階段包含的初始化操作包括以下幾點:
創建 checkpoint,checkpoint 用于記錄全量數據的導入進度和 load 處理單元暫停或異常終止后,恢復或重新開始任務時可以從斷點處繼續導入數據。
應用任務配置的數據同步規則,包括以下規則:
初始化黑白名單
初始化表路有規則
初始化列值轉換規則
Process 實現細節該階段的工作流程也很直觀,通過 一個收發數據類型為 *pb.ProcessError 的 channel 接收運行過程中出現的錯誤,出錯后通過 context 的 CancelFunc 強制結束處理單元運行。在核心的 數據導入函數 中,工作模型與 mydumper 類似,即在 主線程中分發任務,有多個工作線程執行具體的數據導入任務。具體的工作細節如下:
主線程會按照庫,表的順序讀取創建庫語句文件
主線程讀取 checkpoint 信息,結合數據文件信息創建 fileJob 隨機分發任務給一個工作子線程,fileJob 任務的結構如下所示 :
type fileJob struct { schema string table string dataFile string offset int64 // 表示讀取文件的起始 offset,如果沒有 checkpoint 斷點信息該值為 0 info *tableInfo // 保存原庫表,目標庫表,列名,insert 語句 column 名字列表等信息 }
在每個工作線程內部,有一個循環不斷從自己 fileJobQueue 獲取任務,每次獲取任務后會對文件進行解析,并將解析后的結果分批次打包為 SQL 語句分發給線程內部的另外一個工作協程,該工作協程負責處理 SQL 語句的執行。工作流程的偽代碼如下所示,完整的代碼參考 func (w *Worker) run():
// worker 工作線程內分發給內部工作協程的任務結構 type dataJob struct { sql string // insert 語句, insert into