Mydumper一致性数据dump

背景

我司购买了一个超大数据库:

SELECT
    table_schema AS database_name,
    table_name,
    table_rows AS approx_rows
FROM information_schema.tables
WHERE table_schema = 'xx'   -- 替换为你的数据库名
  AND engine = 'InnoDB'            -- 可选:只查 InnoDB 表
ORDER BY table_rows DESC;

在这里插入图片描述

为了能够顺利将mysql 数据dump到clickhouse中进行分析,我计划按照如下的步骤去进行:
(1)调查mysql 到clickhouse的方式。
(2)导入数据到clickhouse。
(3)增量数据导入。

mysql到clickhouse的方式

方式1 clickhouse mysql engine

在 https://clickhouse.com/docs/engines/database-engines/mysql 这里可以通过如下语句:

CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') SETTINGS read_write_timeout=10000, connect_timeout=100;

但是这个本质还是将所有读写发到远端执行。 无法达到快速分析和你用CLICKHOUSE完整OLAP能力的要求。只支持UPDATE和SELECT。

方式2 clickhouse materialized view

之前clickhouse在22版本的时候有通过MaterializedMySQL来讲数据直接复制到clickhouse中:参考链接但是在新版本中被移除了PR

22版可以用如下方式:

set allow_experimental_database_materialized_mysql = 1;
CREATE DATABASE tableXXX on cluster 'all-nodes' ENGINE = MaterializedMySQL(
'mysql:3306', 'dbxxxx', 'username', 'pass')
settings 
materialized_mysql_tables_list = 'interface_access_log,enterprise_info'
TABLE OVERRIDE interface_access_log (
    PARTITION BY  toYYYYMM(gmt_create)
    ORDER BY (gmt_create, id)
)

该方式依赖每个表必须有明确主键。

方式3 将数据导出后 恢复到clickhouse并增量同步

本文计划采用的方式。在调查mysql全量导出的过程中,我也看过相关的导出工具 发现,mysqldump还是太慢了且文件太大,对于上面的数据量。mysqldump的示例:

mysqldump \
  --single-transaction \
  --master-data=2 \
  --routines \
  --triggers \
  --events \
  --hex-blob \
  --default-character-set=utf8mb4 \
  --host=127.0.0.1 \
  --port=3306 \
  --user=backup_user \
  --password='your_password' \
  your_database_name \
  > /backup/your_database_$(date +%Y%m%d).sql

--single-transaction保证备份期间视图一致性,且不阻塞正常的CRUD,但是会阻塞DDL(alter table 之类的)。--master-data=2记录dump是的binlog和pos。
这个缺点就是太慢了,且生成文件巨大没法简单分析和导入,所以调查了下mydumper这个工具。

mydumper

mydumper项目地址:https://github.com/mydumper/mydumper。 我主要关注他如何高性能且一致性的导出备份。 所以查看了相关实现,发现他主要通过如下方式实现一致性:
mydumper 在 FTWRL 下保证一致性的典型流程(核心机制)

主控制连接获取全局读锁(FTWRL)
    让所有表进入“读锁”状态:阻止新的写入,并等待正在进行的写入结束(达到一个全库静止点)。
    这一刻可以认为数据库处于一个确定的、可描述的时间点。

记录复制/增量所需的位置点(metadata)
在读锁还持有时,mydumper 会读取并写出 metadata(常见包含):
    binlog file/position(以及可能的 GTID) 这保证“这份 dump 对应主库的哪个位置点”是准确的。

所有 worker 线程在屏障(barrier)下建立“同一时间点快照”
    每个线程通常用独立连接去读各自负责的表/分片。
    在 FTWRL 仍然持有时,mydumper 会让这些连接几乎同时执行一致性读相关设置并开启事务快照(典型是 REPEATABLE READ + 一致性快照 语义)。
    因为此时写入被阻塞,所以这些事务拿到的 read view 等价于同一个时间点。

释放 FTWRL,全库恢复可写;dump 线程继续并发读取
    锁释放后,业务写入可以继续。
    但每个线程都在自己的事务快照里读数据:
    InnoDB 的 MVCC 保证它们看到的仍是“锁释放那一刻”的版本(后续提交的新版本对这些事务不可见)。

java伪代码:

Java 伪代码:FTWRL + 多线程一致性快照(barrier 同步)

// Pseudo-code (Java-like), illustrating mydumper's idea:
// 1) Hold FTWRL briefly to freeze writes
// 2) Record binlog/gtid position under lock
// 3) Let all worker connections START consistent snapshots at the same point
// 4) Release FTWRL, workers dump concurrently using their own snapshot
 
class DumpCoordinator {
 
  String host;
  int port;
  String user;
  String password;
  String database;
  int threads;
  long chunkSizeBytes;
 
  void runDump() throws Exception {
    Connection ctrl = openConnection();   // control connection
    ctrl.setAutoCommit(true);
 
    // Barrier to ensure all workers have created snapshot before unlocking
    CyclicBarrier snapshotBarrier = new CyclicBarrier(threads + 1);
 
    ExecutorService pool = Executors.newFixedThreadPool(threads);
    List<TableTask> tasks = planTableAndChunkTasks(database, chunkSizeBytes);
 
    // Start workers first (they will wait until coordinator says "snapshot now")
    for (int i = 0; i < threads; i++) {
      pool.submit(new DumpWorker(i, snapshotBarrier, tasks));
    }
 
    // 1) Acquire global read lock (FTWRL)
    exec(ctrl, "FLUSH TABLES WITH READ LOCK");
 
    // 2) Read metadata (binlog pos / gtid) while lock is held
    BinlogPoint p = readBinlogPoint(ctrl);     // e.g. SHOW MASTER STATUS / SHOW BINARY LOG STATUS
    writeMetadataFile(p);
 
    // 3) Tell workers to create consistent snapshot NOW (while FTWRL is still held)
    // coordinator arrives at barrier; workers also arrive after START TRANSACTION WITH CONSISTENT SNAPSHOT
    snapshotBarrier.await(); // releases only when all workers + coordinator reach it
 
    // 4) Release lock quickly so production can continue
    exec(ctrl, "UNLOCK TABLES");
 
    pool.shutdown();
    pool.awaitTermination(24, TimeUnit.HOURS);
    ctrl.close();
  }
 
  Connection openConnection() { /* DriverManager.getConnection(...) */ return null; }
 
  void exec(Connection c, String sql) { /* execute sql */ }
 
  BinlogPoint readBinlogPoint(Connection c) { /* query master status */ return null; }
 
  void writeMetadataFile(BinlogPoint p) { /* write metadata */ }
 
  List<TableTask> planTableAndChunkTasks(String db, long chunkBytes) {
    // Inspect table sizes / PK ranges, split into chunks:
    // - small tables: one task
    // - large tables: multiple chunk tasks (pk ranges)
    return new ArrayList<>();
  }
}
 
class DumpWorker implements Runnable {
  int workerId;
  CyclicBarrier snapshotBarrier;
  List<TableTask> sharedTasks;
 
  DumpWorker(int workerId, CyclicBarrier barrier, List<TableTask> tasks) {
    this.workerId = workerId;
    this.snapshotBarrier = barrier;
    this.sharedTasks = tasks;
  }
 
  @Override
  public void run() {
    Connection conn = openConnection();
    conn.setAutoCommit(false);
 
    // Ensure snapshot semantics (InnoDB MVCC)
    exec(conn, "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ");
 
    // Create a consistent snapshot while FTWRL is held
    exec(conn, "START TRANSACTION WITH CONSISTENT SNAPSHOT");
 
    // Signal "snapshot ready"
    await(snapshotBarrier);
 
    // After this point coordinator may UNLOCK TABLES;
    // this worker keeps reading the SAME snapshot via MVCC.
    while (true) {
      TableTask task = pollNextTask(sharedTasks);
      if (task == null) break;
 
      dumpTableOrChunk(conn, task);
    }
 
    exec(conn, "COMMIT");
    closeQuietly(conn);
  }
 
  Connection openConnection() { return null; }
 
  void exec(Connection c, String sql) { /* execute sql */ }
 
  void await(CyclicBarrier b) { /* b.await() */ }
 
  TableTask pollNextTask(List<TableTask> tasks) {
    // synchronized(tasks) { pop next }
    return null;
  }
 
  void dumpTableOrChunk(Connection conn, TableTask task) {
    // Example chunk query:
    // SELECT * FROM db.table WHERE pk >= ? AND pk < ? ORDER BY pk;
    // Stream rows -> write file part
  }
 
  void closeQuietly(Connection c) {}
}
 
class TableTask {
  String table;
  boolean isChunk;
  long pkStartInclusive;
  long pkEndExclusive;
}
 
class BinlogPoint {
  String binlogFile;
  long binlogPos;
  String gtidSet; // optional
}

设计还是非常巧妙的,总结起来就是:结合了MVCC + 任务窃取 + Cyclicbarrier 提前连接降低持有锁时长。

这是使用的示例命令:

--compress 可以压缩生成zst文件
mydumper --sync-thread-lock-mode=FTWRL --port=3306 --host=192.168.102. --user=root --password='xx' --database=xx --outputdir=/root/testdump --threads=8 --chunk-filesize=128 --verbose=3

生成的文件中:metadata记录了相关表的文件名—》数据文件的映射 和相关binlog信息:

# Started dump at: 2026-01-09 14:17:57
[config]
quote-character = BACKTICK

[myloader_session_variables]
SQL_MODE='NO_AUTO_VALUE_ON_ZERO,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION' /*!40101

[source]
# Channel_Name = '' # It can be use to setup replication FOR CHANNEL
# executed_gtid_set = "0-1099-53297657"
# SOURCE_LOG_FILE = "master-bin.000080"
# SOURCE_LOG_POS = 761443754

[`dbxxx`.`admin_punish_credit`]
real_table_name=admin_punish_credit
rows = 26

[`dbxxx`.`admin_punish`]
real_table_name=admin_punish
rows = 23
....

[`dbxxx`.`mydumper_7`]
real_table_name=费用信息
rows = 10000
[config]
max-statement-size = 999998
num-sequences = 0
# Finished dump at: 2026-01-09 14:18:05

下一步

下一步就是:解析生成相关的Clickhouse table 和 基于Canal的增量复制。