大数据迁移工具对比:从 Sqoop 到自研,万亿级迁移的选型逻辑
大数据迁移工具对比:从 Sqoop 到自研,万亿级迁移的选型逻辑
一、数据迁移的工程挑战:不是"搬数据"那么简单
大数据迁移的表面需求是将数据从 A 系统搬到 B 系统,但工程上的挑战远不止于此。万亿级数据的迁移需要考虑:全量迁移的窗口期(业务停机时间)、增量同步的延迟(CDC 方案的可靠性)、数据一致性校验(源和目标的行数/校验和匹配)、迁移失败的重试和回滚、迁移期间的双写一致性。
更深层的问题是,不同数据源和目标的迁移需求差异巨大:MySQL → ClickHouse 是 OLTP 到 OLAP 的迁移,HDFS → 对象存储是冷数据归档,Oracle → PostgreSQL 是异构数据库迁移。每种场景需要不同的工具和策略。
二、迁移工具对比:Sqoop、DataX、Flink CDC 与自研
主流迁移工具按架构分为四类:基于 MapReduce 的批处理工具(Sqoop)、基于多线程的批处理工具(DataX)、基于 CDC 的流式工具(Flink CDC)、自研迁移框架。每类工具有不同的适用场景。
flowchart TB A[数据迁移需求] --> B{迁移类型} B -->|全量批迁移| C{数据量} B -->|增量实时同步| D{延迟要求} C -->|< 1TB| E[DataX<br/>多线程并行] C -->|1TB-100TB| F[Sqoop<br/>MapReduce 分布式] C -->|> 100TB| G[Spark 批处理<br/>弹性扩缩容] D -->|秒级| H[Flink CDC<br/>流式同步] D -->|分钟级| I[Canal + MQ<br/>异步同步] D -->|小时级| J[定时批同步<br/>简单可靠] subgraph 自研场景 K[异构数据库<br/>类型映射复杂] L[双写一致性<br/>需要事务协调] M[增量+全量<br/>无缝切换] end K --> N[自研迁移框架] L --> N M --> N选型逻辑:全量迁移优先考虑 DataX(中小规模)和 Spark(大规模),增量同步优先考虑 Flink CDC(低延迟)和 Canal(中等延迟),异构迁移和双写场景考虑自研。
三、生产级代码实现:DataX 配置与 Flink CDC 同步
3.1 DataX 全量迁移配置
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "${MYSQL_USER}", "password": "${MYSQL_PASSWORD}", "connection": [ { "jdbcUrl": ["jdbc:mysql://source:3306/db"], "query": "SELECT id, name, amount, created_at FROM orders WHERE created_at < '2024-01-01'" } ], "column": ["id", "name", "amount", "created_at"], "splitPk": "id", "fetchSize": 1024 } }, "writer": { "name": "clickhousewriter", "parameter": { "username": "${CH_USER}", "password": "${CH_PASSWORD}", "connection": [ { "jdbcUrl": "jdbc:clickhouse://target:8123/db", "table": ["orders"] } ], "column": ["id", "name", "amount", "created_at"], "batchSize": 50000, "dryRun": false } } } ], "setting": { "speed": { "channel": 8, "record": 100000, "byte": 10485760 }, "errorLimit": { "record": 100, "percentage": 0.01 } } } }3.2 Flink CDC 增量同步
// Flink CDC: MySQL → ClickHouse 实时同步 public class MysqlToClickHouseCDC { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启 Checkpoint,保证 Exactly-Once // 为什么开启 Checkpoint:CDC 同步需要保证 // 数据不丢不重;Checkpoint 记录消费位点, // 故障恢复后从上次提交的位点继续消费 env.enableCheckpointing(60000); env.getCheckpointConfig() .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig() .setMinPauseBetweenCheckpoints(30000); // MySQL CDC Source MySqlSource<String> source = MySqlSource.<String>builder() .hostname("mysql-source") .port(3306) .databaseList("db") .tableList("db.orders") .username("${MYSQL_USER}") .password("${MYSQL_PASSWORD}") .serverTimeZone("Asia/Shanghai") .deserializer(new JsonDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) // initial: 先做全量快照,再切换到增量 // 为什么用 initial 而非 latest: // initial 确保不遗漏历史数据; // latest 只消费增量,适合源表已有 // 全量数据的场景 .build(); DataStream<String> stream = env .fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); // 解析 CDC 事件并写入 ClickHouse stream.process(new CDCEventParser()) .addSink(new ClickHouseBulkSink()); env.execute("MySQL to ClickHouse CDC"); } } class CDCEventParser extends ProcessFunction<String, CdcRecord> { @Override public void processElement(String value, Context ctx, Collector<CdcRecord> out) { try { JSONObject event = JSON.parseObject(value); String operation = event.getJSONObject("payload") .getString("op"); // c=create, u=update, d=delete, r=snapshot JSONObject after = event.getJSONObject("payload") .getJSONObject("after"); JSONObject before = event.getJSONObject("payload") .getJSONObject("before"); CdcRecord record = new CdcRecord(); record.operation = operation; record.table = event.getJSONObject("payload") .getString("source.table"); switch (operation) { case "c", "r" -> record.data = after; case "u" -> { record.data = after; record.beforeData = before; } case "d" -> record.data = before; } out.collect(record); } catch (Exception e) { // 解析失败的记录写入死信队列 // 为什么不直接丢弃:丢弃会导致数据不一致; // 死信队列保留原始数据,人工排查后重放 ctx.output(DEAD_LETTER_TAG, value); } } }3.3 数据一致性校验
class DataConsistencyChecker: """数据一致性校验器""" def check_row_count(self, source_db, target_db, table: str) -> dict: """校验行数一致性""" source_count = self._get_count(source_db, table) target_count = self._get_count(target_db, table) diff = source_count - target_count diff_rate = diff / source_count if source_count > 0 else 0 return { "table": table, "source_count": source_count, "target_count": target_count, "diff": diff, "diff_rate": f"{diff_rate:.4%}", "consistent": diff == 0, } def check_checksum(self, source_db, target_db, table: str, columns: List[str]) -> dict: """校验数据校验和""" # 对比源和目标的 CRC32 校验和 # 为什么用校验和而非逐行对比:万亿级数据 # 逐行对比不现实;校验和可以在 SQL 层计算, # 只传输一个数值 source_checksum = self._compute_checksum( source_db, table, columns) target_checksum = self._compute_checksum( target_db, table, columns) return { "table": table, "source_checksum": source_checksum, "target_checksum": target_checksum, "consistent": source_checksum == target_checksum, } def _compute_checksum(self, db, table, columns): """计算表的 CRC32 校验和""" col_expr = ", ".join(columns) cursor = db.cursor() cursor.execute( f"SELECT CRC32(GROUP_CONCAT({col_expr} ORDER BY id)) " f"FROM {table}") result = cursor.fetchone() return result[0] if result else None四、数据迁移的架构权衡:速度、一致性与停机时间
全量+增量的无缝切换:全量迁移期间源表持续写入新数据,全量完成后需要同步增量。切换点的一致性是关键——Flink CDC 的initial模式自动处理切换,自研方案需要记录全量开始时的 Binlog 位点,全量完成后从该位点开始消费增量。
双写一致性:迁移期间可能需要双写(源和目标同时写入)。双写的一致性保证需要分布式事务或幂等写入。建议在迁移完成前只读源表,迁移完成后切换到目标表,避免双写。
迁移速度与源库压力:迁移速度越快,停机窗口越短,但对源库的压力越大。建议在业务低峰期执行全量迁移,并限制迁移任务的并发度和带宽。DataX 的channel参数和 Flink 的parallelism参数控制并发度。
回滚方案的设计:迁移失败时需要回滚到源系统。回滚方案取决于迁移阶段——全量迁移失败可以清空目标表重来,增量同步失败需要从断点继续。建议在迁移前备份目标表的现有数据(如果有),并记录迁移的进度位点。
五、总结
大数据迁移的选型应基于数据量、延迟要求和异构程度。中小规模全量迁移用 DataX,大规模用 Spark,低延迟增量用 Flink CDC,异构场景考虑自研。全量+增量的无缝切换是迁移成功的关键,Flink CDC 的 initial 模式是当前最成熟的方案。数据一致性校验必须覆盖行数和校验和两个维度,校验失败时需要定位差异行并修复。
