国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

DM 源碼閱讀系列文章(四)dump/load 全量同步的實現

zombieda / 1313人閱讀

摘要:作者楊非本文為源碼閱讀系列文章的第四篇,上篇文章介紹了數據同步處理單元實現的功能,數據同步流程的運行邏輯以及數據同步處理單元的設計。庫表黑白名單的實現方式。任務執行完成之后,主線程就會釋放鎖,這樣有助于減少鎖持有的時間。

作者:楊非

本文為 DM 源碼閱讀系列文章的第四篇,上篇文章 介紹了數據同步處理單元實現的功能,數據同步流程的運行邏輯以及數據同步處理單元的 interface 設計。本篇文章在此基礎上展開,詳細介紹 dump 和 load 兩個數據同步處理單元的設計實現,重點關注數據同步處理單元 interface 的實現,數據導入并發模型的設計,以及導入任務在暫停或出現異常后如何恢復。

dump 處理單元

dump 處理單元的代碼位于 github.com/pingcap/dm/mydumper 包內,作用是從上游 MySQL 將表結構和數據導出到邏輯 SQL 文件,由于該處理單元總是運行在任務的第一個階段(full 模式和 all 模式),該處理單元每次運行不依賴于其他處理單元的處理結果。另一方面,如果在 dump 運行過程中被強制終止(例如在 dmctl 中執行 pause-task 或者 stop-task),也不會記錄已經 dump 數據的 checkpoint 等信息。不記錄 checkpoint 是因為每次運行 mydumper 從上游導出數據,上游的數據都可能發生變更,為了能得到一致的數據和 metadata 信息,每次恢復任務或重新運行任務時該處理單元會 清理舊的數據目錄,重新開始一次完整的數據 dump。

導出表結構和數據的邏輯并不是在 DM 內部直接實現,而是 通過 os/exec 包調用外部 mydumper 二進制文件 來完成。在 mydumper 內部,我們需要關注以下幾個問題:

數據導出時的并發模型是如何實現的。

no-locks, lock-all-tables, less-locking 等參數有怎樣的功能。

庫表黑白名單的實現方式。

mydumper 的實現細節

mydumper 的一次完整的運行流程從主線程開始,主線程按照以下步驟執行:

解析參數。

創建到數據庫的連接。

會根據 no-locks 選項進行一系列的備份安全策略,包括 long query guardlock all tables or FLUSH TABLES WITH READ LOCK

START TRANSACTION WITH CONSISTENT SNAPSHOT

記錄 binlog 位點信息。

less locking 處理線程的初始化。

普通導出線程初始化。

如果配置了 trx-consistency-only 選項,執行 UNLOCK TABLES /* trx-only */ 釋放之前獲取的表鎖。注意,如果開啟該選項,是無法保證非 InnoDB 表導出數據的一致性。更多關于一致性讀的細節可以參考 MySQL 官方文檔 Consistent Nonlocking Reads 部分。

根據配置規則(包括 --database, --tables-list 和 --regex 配置)讀取需要導出的 schema 和表信息,并在這個過程中有區分的記錄 innodb_tables 和 non_innodb_table。

為工作子線程創建任務,并將任務 push 到相關的工作隊列。

如果沒有配置 no-lockstrx-consistency-only 選項,執行 UNLOCK TABLES / FTWRL / 釋放鎖。

如果開啟 less-locking,等待所有 less locking 子線程退出。

等待所有工作子線程退出。

工作線程的并發控制包括了兩個層面,一層是在不同表級別的并發,另一層是同一張表級別的并發。mydumper 的主線程會將一次同步任務拆分為多個同步子任務,并將每個子任務分發給同一個異步隊列 conf.queue_less_locking/conf.queue,工作子線程從隊列中獲取任務并執行。具體的子任務劃分包括以下策略:

開啟 less-locking 選項的非 InnoDB 表的處理。

先將所有 non_innodb_table 分為 num_threads 組,分組方式是遍歷這些表,依此將遍歷到的表加入到當前數據量最小的分組,盡量保證每個分組內的數據量相近。

上述得到的每個分組內會包含一個或多個非 InnoDB 表,如果配置了 rows-per-file 選項,會對每張表進行 chunks 估算,對于每一張表,如果估算結果包含多個 chunks,會將子任務進一步按照 chunks 進行拆分,分發 chunks 數量個子任務,如果沒有 chunks 劃分,分發為一個獨立的子任務。

注意,在該模式下,子任務會 發送到 queue_less_locking,并在編號為 num_threads ~ 2 * num_threads 的子線程中處理任務。

less_locking_threads 任務執行完成之后,主線程就會 UNLOCK TABLES / FTWRL / 釋放鎖,這樣有助于減少鎖持有的時間。主線程根據 conf.unlock_tables 來判斷非 InnoDB 表是否全部導出,普通工作線程 或者 queue_less_locking 工作線程每次處理完一個非 InnoDB 表任務都會根據 non_innodb_table_counternon_innodb_done 兩個變量判斷是否還有沒有導出結束的非 InnoDB 表,如果都已經導出結束,就會向異步隊列 conf.unlock_tables 中發送一條數據,表示可以解鎖全局鎖。

每個 less_locking_threads 處理非 InnoDB 表任務時,會先 加表鎖,導出數據,最后 解鎖表鎖。

未開啟 less-locking 選項的非 InnoDB 表的處理。

遍歷每一張非 InnoDB 表,同樣對每張表進行 chunks 估算,如果包含多個 chunks,按照 chunks 個數分發同樣的子任務數;如果沒有劃分 chunks,每張表分發一個子任務。所有的任務都分發到 conf->queue 隊列。

InnoDB 表的處理。

與未開啟 less-locking 選項的非 InnoDB 表的處理相同,同樣是 按照表分發子任務,如果有 chunks 子任務會進一步細分。

從上述的并發模型可以看出 mydumper 首先按照表進行同步任務拆分,對于同一張表,如果配置 rows-per-file 參數,會根據該參數和表行數將表劃分為合適的 chunks 數,這即是同一張表內部的并發。具體表行數的估算和 chunks 劃分的實現見 get_chunks_for_table 函數。

需要注意目前 DM 在任務配置中指定的庫表黑白名單功能只應用于 load 和 binlog replication 處理單元。如果在 dump 處理單元內使用庫表黑白名單功能,需要在同步任務配置文件的 dump 處理單元配置提供 extra-args 參數,并指定 mydumper 相關參數,包括 --database, --tables-list 和 --regex。mydumper 使用 regex 過濾庫表的實現參考 check_regex 函數。

load 處理單元

load 處理單元的代碼位于 github.com/pingcap/dm/loader 包內,該處理單元在 dump 處理單元運行結束后運行,讀取 dump 處理單元導出的 SQL 文件解析并在下游數據庫執行邏輯 SQL。我們重點分析 InitProcess 兩個 interface 的實現。

Init 實現細節

該階段進行一些初始化和清理操作,并不會開始同步任務,如果在該階段運行中出現錯誤,會通過 rollback 機制 清理資源,不需要調用 Close 函數。該階段包含的初始化操作包括以下幾點:

創建 checkpointcheckpoint 用于記錄全量數據的導入進度和 load 處理單元暫停或異常終止后,恢復或重新開始任務時可以從斷點處繼續導入數據。

應用任務配置的數據同步規則,包括以下規則:

初始化黑白名單

初始化表路有規則

初始化列值轉換規則

Process 實現細節

該階段的工作流程也很直觀,通過 一個收發數據類型為 *pb.ProcessErrorchannel 接收運行過程中出現的錯誤,出錯后通過 context 的 CancelFunc 強制結束處理單元運行。在核心的 數據導入函數 中,工作模型與 mydumper 類似,即在 主線程中分發任務,有多個工作線程執行具體的數據導入任務。具體的工作細節如下:

主線程會按照庫,表的順序讀取創建庫語句文件 -schema-create.sql 和建表語句文件 .-schema-create.sql,并在下游執行 SQL 創建相對應的庫和表。

主線程讀取 checkpoint 信息,結合數據文件信息創建 fileJob 隨機分發任務給一個工作子線程,fileJob 任務的結構如下所示 :

type fileJob struct {
   schema    string
   table     string
   dataFile  string
   offset    int64 // 表示讀取文件的起始 offset,如果沒有 checkpoint 斷點信息該值為 0
   info      *tableInfo // 保存原庫表,目標庫表,列名,insert 語句 column 名字列表等信息
}

在每個工作線程內部,有一個循環不斷從自己 fileJobQueue 獲取任務,每次獲取任務后會對文件進行解析,并將解析后的結果分批次打包為 SQL 語句分發給線程內部的另外一個工作協程,該工作協程負責處理 SQL 語句的執行。工作流程的偽代碼如下所示,完整的代碼參考 func (w *Worker) run()

// worker 工作線程內分發給內部工作協程的任務結構
type dataJob struct {
   sql         string // insert 語句, insert into  values (x, y, z), (x2, y2, z2), … (xn, yn, zn);
   schema      string // 目標數據庫
   file        string // SQL 文件名
   offset      int64 // 本次導入數據在 SQL 文件的偏移量
   lastOffset  int64 // 上一次已導入數據對應 SQL 文件偏移量
}

// SQL 語句執行協程
doJob := func() {
   for {
       select {
       case <-ctx.Done():
           return
       case job := <-jobQueue:
           sqls := []string{
               fmt.Sprintf("USE `%s`;", job.schema), // 指定插入數據的 schema
               job.sql,
               checkpoint.GenSQL(job.file, job.offset), // 更新 checkpoint 的 SQL 語句
           }
           executeSQLInOneTransaction(sqls) // 在一個事務中執行上述 3 條 SQL 語句
       }
   }
}
?
// worker 主線程
for {
   select {
   case <-ctx.Done():
       return
   case job := <-fileJobQueue:
       go doJob()
       readDataFileAndDispatchSQLJobs(ctx, dir, job.dataFile, job.offset, job.info)
   }
}

dispatchSQL 函數負責在工作線程內部讀取 SQL 文件和重寫 SQL,該函數會在運行初始階段 創建所操作表的 checkpoint 信息,需要注意在任務中斷恢復之后,如果這個文件的導入還沒有完成,checkpoint.Init 仍然會執行,但是這次運行不會更新該文件的 checkpoint 信息。列值轉換和庫表路由也是在這個階段內完成。

列值轉換:需要對輸入 SQL 進行解析拆分為每一個 field,對需要轉換的 field 進行轉換操作,然后重新拼接起 SQL 語句。詳細重寫流程見 reassemble 函數。

庫表路由:這種場景下只需要 替換源表到目標表 即可。

在工作線程執行一個批次的 SQL 語句之前,會首先根據文件 offset 信息生成一條更新 checkpoint 的語句,加入到打包的 SQL 語句中,具體執行時這些語句會 在一個事務中提交,這樣就保證了斷點信息的準確性,如果導入過程暫停或中斷,恢復任務后從斷點重新同步可以保證數據一致。

小結

本篇詳細介紹 dump 和 load 兩個數據同步處理單元的設計實現,對核心 interface 實現、數據導入并發模型、數據導入暫停或中斷的恢復進行了分析。接下來的文章會繼續介紹 binlog replicationrelay log 兩個數據同步處理單元的實現。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/18003.html

相關文章

  • DM 源碼閱讀系列文章(三)數據同步處理單元介紹

    摘要:實際上中的數據同步處理單元分為兩類全局共享單例。獨享數據同步處理單元使用邏輯相關代碼在。數據同步處理單元運行狀態監控。后續會分三篇文章詳細地介紹數據同步處理單元的實現,包括全量同步實現增量同步實現實現 作者:lan 本文為 DM 源碼閱讀系列文章的第三篇,上篇文章 介紹了 DM 的整體架構,DM 組件 DM-master 和 DM-worker 的入口代碼,以及兩者之間的數據交互模型。...

    Forelax 評論0 收藏0
  • DM 源碼閱讀系列文章(一)序

    摘要:內容概要源碼閱讀系列將會從兩條線進行展開,一條是圍繞的系統架構和重要模塊進行分析,另一條線圍繞內部的同步機制展開分析。更多的代碼閱讀內容會在后面的章節中逐步展開,敬請期待。 作者:楊非 前言 TiDB-DM 是由 PingCAP 開發的一體化數據同步任務管理平臺,支持從 MySQL 或 MariaDB 到 TiDB 的全量數據遷移和增量數據同步,在 TiDB DevCon 2019 正...

    Mr_houzi 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看

            <