背景
我司购买了一个超大数据库:
SELECT
table_schema AS database_name,
table_name,
table_rows AS approx_rows
FROM information_schema.tables
WHERE table_schema = 'xx' -- 替换为你的数据库名
AND engine = 'InnoDB' -- 可选:只查 InnoDB 表
ORDER BY table_rows DESC;

为了能够顺利将mysql 数据dump到clickhouse中进行分析,我计划按照如下的步骤去进行:
(1)调查mysql 到clickhouse的方式。
(2)导入数据到clickhouse。
(3)增量数据导入。
mysql到clickhouse的方式
方式1 clickhouse mysql engine
在 https://clickhouse.com/docs/engines/database-engines/mysql 这里可以通过如下语句:
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') SETTINGS read_write_timeout=10000, connect_timeout=100;
但是这个本质还是将所有读写发到远端执行。 无法达到快速分析和你用CLICKHOUSE完整OLAP能力的要求。只支持UPDATE和SELECT。
方式2 clickhouse materialized view
之前clickhouse在22版本的时候有通过MaterializedMySQL来讲数据直接复制到clickhouse中:参考链接但是在新版本中被移除了PR:
22版可以用如下方式:
set allow_experimental_database_materialized_mysql = 1;
CREATE DATABASE tableXXX on cluster 'all-nodes' ENGINE = MaterializedMySQL(
'mysql:3306', 'dbxxxx', 'username', 'pass')
settings
materialized_mysql_tables_list = 'interface_access_log,enterprise_info'
TABLE OVERRIDE interface_access_log (
PARTITION BY toYYYYMM(gmt_create)
ORDER BY (gmt_create, id)
)
该方式依赖每个表必须有明确主键。
方式3 将数据导出后 恢复到clickhouse并增量同步
本文计划采用的方式。在调查mysql全量导出的过程中,我也看过相关的导出工具 发现,mysqldump还是太慢了且文件太大,对于上面的数据量。mysqldump的示例:
mysqldump \
--single-transaction \
--master-data=2 \
--routines \
--triggers \
--events \
--hex-blob \
--default-character-set=utf8mb4 \
--host=127.0.0.1 \
--port=3306 \
--user=backup_user \
--password='your_password' \
your_database_name \
> /backup/your_database_$(date +%Y%m%d).sql
--single-transaction保证备份期间视图一致性,且不阻塞正常的CRUD,但是会阻塞DDL(alter table 之类的)。--master-data=2记录dump是的binlog和pos。
这个缺点就是太慢了,且生成文件巨大没法简单分析和导入,所以调查了下mydumper这个工具。
mydumper
mydumper项目地址:https://github.com/mydumper/mydumper。 我主要关注他如何高性能且一致性的导出备份。 所以查看了相关实现,发现他主要通过如下方式实现一致性:
mydumper 在 FTWRL 下保证一致性的典型流程(核心机制)
主控制连接获取全局读锁(FTWRL)
让所有表进入“读锁”状态:阻止新的写入,并等待正在进行的写入结束(达到一个全库静止点)。
这一刻可以认为数据库处于一个确定的、可描述的时间点。
记录复制/增量所需的位置点(metadata)
在读锁还持有时,mydumper 会读取并写出 metadata(常见包含):
binlog file/position(以及可能的 GTID) 这保证“这份 dump 对应主库的哪个位置点”是准确的。
所有 worker 线程在屏障(barrier)下建立“同一时间点快照”
每个线程通常用独立连接去读各自负责的表/分片。
在 FTWRL 仍然持有时,mydumper 会让这些连接几乎同时执行一致性读相关设置并开启事务快照(典型是 REPEATABLE READ + 一致性快照 语义)。
因为此时写入被阻塞,所以这些事务拿到的 read view 等价于同一个时间点。
释放 FTWRL,全库恢复可写;dump 线程继续并发读取
锁释放后,业务写入可以继续。
但每个线程都在自己的事务快照里读数据:
InnoDB 的 MVCC 保证它们看到的仍是“锁释放那一刻”的版本(后续提交的新版本对这些事务不可见)。
java伪代码:
Java 伪代码:FTWRL + 多线程一致性快照(barrier 同步)
// Pseudo-code (Java-like), illustrating mydumper's idea:
// 1) Hold FTWRL briefly to freeze writes
// 2) Record binlog/gtid position under lock
// 3) Let all worker connections START consistent snapshots at the same point
// 4) Release FTWRL, workers dump concurrently using their own snapshot
class DumpCoordinator {
String host;
int port;
String user;
String password;
String database;
int threads;
long chunkSizeBytes;
void runDump() throws Exception {
Connection ctrl = openConnection(); // control connection
ctrl.setAutoCommit(true);
// Barrier to ensure all workers have created snapshot before unlocking
CyclicBarrier snapshotBarrier = new CyclicBarrier(threads + 1);
ExecutorService pool = Executors.newFixedThreadPool(threads);
List<TableTask> tasks = planTableAndChunkTasks(database, chunkSizeBytes);
// Start workers first (they will wait until coordinator says "snapshot now")
for (int i = 0; i < threads; i++) {
pool.submit(new DumpWorker(i, snapshotBarrier, tasks));
}
// 1) Acquire global read lock (FTWRL)
exec(ctrl, "FLUSH TABLES WITH READ LOCK");
// 2) Read metadata (binlog pos / gtid) while lock is held
BinlogPoint p = readBinlogPoint(ctrl); // e.g. SHOW MASTER STATUS / SHOW BINARY LOG STATUS
writeMetadataFile(p);
// 3) Tell workers to create consistent snapshot NOW (while FTWRL is still held)
// coordinator arrives at barrier; workers also arrive after START TRANSACTION WITH CONSISTENT SNAPSHOT
snapshotBarrier.await(); // releases only when all workers + coordinator reach it
// 4) Release lock quickly so production can continue
exec(ctrl, "UNLOCK TABLES");
pool.shutdown();
pool.awaitTermination(24, TimeUnit.HOURS);
ctrl.close();
}
Connection openConnection() { /* DriverManager.getConnection(...) */ return null; }
void exec(Connection c, String sql) { /* execute sql */ }
BinlogPoint readBinlogPoint(Connection c) { /* query master status */ return null; }
void writeMetadataFile(BinlogPoint p) { /* write metadata */ }
List<TableTask> planTableAndChunkTasks(String db, long chunkBytes) {
// Inspect table sizes / PK ranges, split into chunks:
// - small tables: one task
// - large tables: multiple chunk tasks (pk ranges)
return new ArrayList<>();
}
}
class DumpWorker implements Runnable {
int workerId;
CyclicBarrier snapshotBarrier;
List<TableTask> sharedTasks;
DumpWorker(int workerId, CyclicBarrier barrier, List<TableTask> tasks) {
this.workerId = workerId;
this.snapshotBarrier = barrier;
this.sharedTasks = tasks;
}
@Override
public void run() {
Connection conn = openConnection();
conn.setAutoCommit(false);
// Ensure snapshot semantics (InnoDB MVCC)
exec(conn, "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ");
// Create a consistent snapshot while FTWRL is held
exec(conn, "START TRANSACTION WITH CONSISTENT SNAPSHOT");
// Signal "snapshot ready"
await(snapshotBarrier);
// After this point coordinator may UNLOCK TABLES;
// this worker keeps reading the SAME snapshot via MVCC.
while (true) {
TableTask task = pollNextTask(sharedTasks);
if (task == null) break;
dumpTableOrChunk(conn, task);
}
exec(conn, "COMMIT");
closeQuietly(conn);
}
Connection openConnection() { return null; }
void exec(Connection c, String sql) { /* execute sql */ }
void await(CyclicBarrier b) { /* b.await() */ }
TableTask pollNextTask(List<TableTask> tasks) {
// synchronized(tasks) { pop next }
return null;
}
void dumpTableOrChunk(Connection conn, TableTask task) {
// Example chunk query:
// SELECT * FROM db.table WHERE pk >= ? AND pk < ? ORDER BY pk;
// Stream rows -> write file part
}
void closeQuietly(Connection c) {}
}
class TableTask {
String table;
boolean isChunk;
long pkStartInclusive;
long pkEndExclusive;
}
class BinlogPoint {
String binlogFile;
long binlogPos;
String gtidSet; // optional
}
设计还是非常巧妙的,总结起来就是:结合了MVCC + 任务窃取 + Cyclicbarrier 提前连接降低持有锁时长。
这是使用的示例命令:
--compress 可以压缩生成zst文件
mydumper --sync-thread-lock-mode=FTWRL --port=3306 --host=192.168.102. --user=root --password='xx' --database=xx --outputdir=/root/testdump --threads=8 --chunk-filesize=128 --verbose=3
生成的文件中:metadata记录了相关表的文件名—》数据文件的映射 和相关binlog信息:
# Started dump at: 2026-01-09 14:17:57
[config]
quote-character = BACKTICK
[myloader_session_variables]
SQL_MODE='NO_AUTO_VALUE_ON_ZERO,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION' /*!40101
[source]
# Channel_Name = '' # It can be use to setup replication FOR CHANNEL
# executed_gtid_set = "0-1099-53297657"
# SOURCE_LOG_FILE = "master-bin.000080"
# SOURCE_LOG_POS = 761443754
[`dbxxx`.`admin_punish_credit`]
real_table_name=admin_punish_credit
rows = 26
[`dbxxx`.`admin_punish`]
real_table_name=admin_punish
rows = 23
....
[`dbxxx`.`mydumper_7`]
real_table_name=费用信息
rows = 10000
[config]
max-statement-size = 999998
num-sequences = 0
# Finished dump at: 2026-01-09 14:18:05
下一步
下一步就是:解析生成相关的Clickhouse table 和 基于Canal的增量复制。