Flink系列之:Flink CDC深入了解MySQL CDC连接器

article/2025/8/22 7:53:45

Flink系列之:Flink CDC深入了解MySQL CDC连接器

  • 一、增量快照特性
    • 1.增量快照读取
    • 2.并发读取
    • 3.全量阶段支持 checkpoint
    • 4.无锁算法
    • 5.MySQL高可用性支持
  • 二、增量快照读取的工作原理
  • 三、全量阶段分片算法
  • 四、Chunk 读取算法
  • 五、Exactly-Once 处理
  • 六、MySQL心跳事件支持
  • 七、启动模式
  • 八、DataStream Source
  • 九、动态加表
  • 十、数据类型映射

一、增量快照特性

1.增量快照读取

增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括:

  • (1)在快照读取期间,Source 支持并发读取

  • (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint

  • (3)在快照读取之前,Source 不需要数据库锁权限。

  • 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此server id的范围必须类似于 5400-6400, 且范围必须大于并行度。

  • 在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk), 然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。

  • 为每个 Reader 设置不同的 Server id

  • 每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。

  • MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此,如果不同的作业共享相同的 Server id, 则可能导致从错误的 binlog 位置读取数据。

  • 因此,建议通过为每个 Reader 设置不同的 Server id SQL Hints, 假设 Source 并行度为 4, 我们可以使用 SELECT * FROM source_table /*+ OPTIONS(‘server-id’=‘5401-5404’) */ ; 来为 4 个 Source readers 中的每一个分配唯一的 Server id。

2.并发读取

增量快照读取提供了并行读取快照数据的能力。 你可以通过设置作业并行度的方式来控制 Source 的并行度 parallelism.default。

Flink SQL> SET 'parallelism.default' = 8;

3.全量阶段支持 checkpoint

增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

4.无锁算法

MySQL CDC source 使用 增量快照算法, 避免了数据库锁的使用,因此不需要 “RELOAD” 权限。

5.MySQL高可用性支持

mysql cdc 连接器通过使用 GTID 提供 MySQL 高可用集群的高可用性信息。为了获得高可用性, MySQL集群需要启用 GTID 模式,MySQL 配置文件中的 GTID 模式应该包含以下设置

gtid_mode = on
enforce_gtid_consistency = on

如果监控的MySQL服务器地址包含从实例,则需要对MySQL配置文件设置以下设置。设置 log slave updates=1 允许从实例也将从主实例同步的数据写入其binlog, 这确保了mysql cdc连接器可以使用从实例中的全部数据。

gtid_mode = on
enforce_gtid_consistency = on
log-slave-updates = 1

MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服务器地址更改为其他可用服务器,然后从最新的检查点/保存点重新启动作业, 作业将从 checkpoint/savepoint 恢复,不会丢失任何记录。

建议为 MySQL 集群配置 DNS(域名服务)或 VIP(虚拟 IP 地址), 使用mysql cdc连接器的 DNS 或 VIP 地址, DNS或VIP将自动将网络请求路由到活动MySQL服务器。 这样,你就不再需要修改地址和重新启动管道。

二、增量快照读取的工作原理

  • 当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。
  • 在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。 快照块被分配给多个快照读取器。每个快照读取器使用 区块读取算法 并将读取的数据发送到下游。 Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。 如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。
  • 所有快照块完成后,Source 将继续在单个任务中读取 binlog。 为了保证快照记录和 binlog 记录的全局数据顺序,binlog reader 将开始读取数据直到快照块完成后并有一个完整的 checkpoint,以确保所有快照数据已被下游消费。 binlog reader 在状态中跟踪所使用的 binlog 位置,因此 binlog 阶段的 Source 可以支持行级别的 checkpoint。
  • Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业将重新启动并从最后一个成功的 checkpoint 状态恢复,并保证只执行一次语义

三、全量阶段分片算法

  • 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。 如果表中没有主键, 增量快照读取将失败,你可以禁用 scan.incremental.snapshot.enabled 来回退到旧的快照读取机制。

对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。 例如,如果你有一个主键列为id的表,它是自动增量 BIGINT 类型,最小值为0,最大值为100, 和表选项 scan.incremental.snapshot.chunk.size 大小 value为25,表将被拆分为以下块:

(-, 25),[25, 50),[50, 75),[75, 100),[100, +)

对于其他主键列类型, MySQL CDC Source 将以下形式执行语句: SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > ‘uuid-001’ limit 25) 来获得每个区块的低值和高值, 分割块集如下所示:

(-, 'uuid-001'),
['uuid-001', 'uuid-009'),
['uuid-009', 'uuid-abc'),
['uuid-abc', 'uuid-def'),
[uuid-def, +).

四、Chunk 读取算法

对于上面的示例MyTable,如果 MySQL CDC Source 并行度设置为 4,MySQL CDC Source 将在每一个 executes 运行 4 个 Readers 通过偏移信号算法 获取快照区块的最终一致输出。 偏移信号算法简单描述如下:

  • (1) 将当前 binlog 位置记录为LOW偏移量
  • (2) 通过执行语句读取并缓冲快照区块记录 SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high
  • (3) 将当前 binlog 位置记录为HIGH偏移量
  • (4) 从LOW偏移量到HIGH偏移量读取属于快照区块的 binlog 记录
  • (5) 将读取的 binlog 记录向上插入缓冲区块记录,并发出缓冲区中的所有记录作为快照区块的最终输出(全部作为插入记录)
  • (6) 继续读取并发出属于 单个 binlog reader 中HIGH偏移量之后的区块的 binlog 记录。

注意: 如果主键的实际值在其范围内分布不均匀,则在增量快照读取时可能会导致任务不平衡。

五、Exactly-Once 处理

  • MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

六、MySQL心跳事件支持

  • 如果表不经常更新,则 binlog 文件或 GTID 集可能已在其最后提交的 binlog 位置被清理。 在这种情况下,CDC 作业可能会重新启动失败。因此心跳事件将帮助更新 binlog 位置。 默认情况下,MySQL CDC Source 启用心跳事件,间隔设置为30秒。 可以使用表选项heartbeat指定间隔。或将选项设置为0s以禁用心跳事件。

七、启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

使用 DataStream API:

MySQLSource.builder().startupOptions(StartupOptions.earliest()) // 从最早位点启动.startupOptions(StartupOptions.latest()) // 从最晚位点启动.startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动.startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动....build()

使用 SQL:

CREATE TABLE mysql_source (...) WITH ('connector' = 'mysql-cdc','scan.startup.mode' = 'earliest-offset', -- 从最早位点启动'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动'scan.startup.mode' = 'specific-offset', -- 从特定位点启动'scan.startup.mode' = 'timestamp', -- 从特定位点启动'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳...
)

注意:

  • MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 “Binlog offset on checkpoint {checkpoint-id}”。 该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
  • 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。

八、DataStream Source

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;public class MySqlSourceExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*"..tableList("yourDatabaseName.yourTableName") // 设置捕获的表.username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 3s 的 checkpoint 间隔env.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// 设置 source 节点的并行度为 4.setParallelism(4).print().setParallelism(1); // 设置 sink 节点并行度为 1 env.execute("Print MySQL Snapshot + Binlog");}
}

九、动态加表

扫描新添加的表功能使你可以添加新表到正在运行的作业中,新添加的表将首先读取其快照数据,然后自动读取其变更日志。

想象一下这个场景:一开始, Flink 作业监控表 [product, user, address], 但几天后,我们希望这个作业还可以监控表 [order, custom],这些表包含历史数据,我们需要作业仍然可以复用作业的已有状态,动态加表功能可以优雅地解决此问题。

以下操作显示了如何启用此功能来解决上述场景。 使用现有的 Flink CDC Source 作业,如下:

MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能.databaseList("db") // 设置捕获的数据库.tableList("db.product, db.user, db.address") // 设置捕获的表 [product, user, address].username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.build();// 你的业务代码

如果我们想添加新表 [order, custom] 对于现有的 Flink 作业,只需更新 tableList() 将新增表 [order, custom] 加入并从已有的 savepoint 恢复作业。

Step 1: 使用 savepoint 停止现有的 Flink 作业。

$ ./bin/flink stop $Existing_Flink_JOB_ID
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab

Step 2: 更新现有 Flink 作业的表列表选项。
更新 tableList() 参数.
编译更新后的作业,示例如下:

MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).scanNewlyAddedTableEnabled(true) .databaseList("db") .tableList("db.product, db.user, db.address, db.order, db.custom") // 设置捕获的表 [product, user, address ,order, custom].username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.build();// 你的业务代码

Step 3: 从 savepoint 还原更新后的 Flink 作业。

$ ./bin/flink run \--detached \ --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \./FlinkCDCExample.jar

十、数据类型映射

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


http://chatgpt.dhexx.cn/article/imRCBKJA.shtml

相关文章

大数据面试重点之kafka(三)

Kafka如何保证全局有序&#xff1f; 可回答&#xff1a;1&#xff09;Kafka消费者怎么保证有序性&#xff1f;2&#xff09;Kafka生产者写入数据怎么保证有序&#xff1f;3&#xff09;Kafka可以保证 数据的局部有序&#xff0c;如何保证数据的全局有序&#xff1f;4&#xff0…

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

文章目录 官方说明参数解读CodePOM依赖配置文件生产者消费者单元测试测试earliestlatest(默认&#xff09;noneexception 源码地址 官方说明 https://kafka.apache.org/documentation/ 选择对应的版本&#xff0c;我这里选的是 2.4.X https://kafka.apache.org/24/documenta…

Kafka之auto.offset.reset值解析

今日在使用kafka时&#xff0c;发现将 auto.offset.reset 设置为earliest、latest、none 都没有达到自己预期的效果。 earliest&#xff1a; 当各分区下有已提交的offset时&#xff0c;从提交的offset开始消费&#xff1b;无提交的offset时&#xff0c;从头开始消费latest&…

关于EarlyZ

在前向渲染中&#xff0c;ZTest是在Fragement Shader之后进行的&#xff0c;也就是说&#xff0c;被遮挡的部分也要绘制FS&#xff0c;就产生了Over Draw&#xff0c;其实很费&#xff0c;Early Z Culling就解决了这个问题 Early fragment tests, as an optimization, exist t…

【EARLIER/EARLIEST函数】引用不存在的更早的行上下文 报错解决

引用PowerQuery的例子并给予个人理解 X1 SUMX(FILTER(Data,Data[订单日期]>EARLIER(Data[订单日期])),[金额])---WRONG X2CALCULATE(SUM(Data[金额]),FILTER(Data,SUMX(FILTER(Data,Data[订单日期]>EARLIER(Data[订单日期])),[金额])))---RIGHT X1报错原因&#xff1a…

EarlyStop

在训练中&#xff0c;我们希望在中间箭头的位置停止训练。而Early stopping就可以实现该功能&#xff0c;这时获得的模型泛化能力较强&#xff0c;还可以得到一个中等大小的w的弗罗贝尼乌斯范数。其与L2正则化相似&#xff0c;选择参数w范数较小的神经网络。 可以用L2正则化代…

Kafka 使用java api从指定位移消费 (从开头消费/从结尾消费)

一、auto.offset.reset值详解 在 Kafka 中&#xff0c;每当消费者组内的消费者查找不到所记录的消费位移或发生位移越界时&#xff0c;就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费&#xff0c;这个参数的默认值为 “latest” 。 auto.offset…

动态SQL之 where 标签

动态SQL之 where 标签 where和if一般结合使用&#xff1a; 1.若where标签中的 if 条件都不满足&#xff0c;则where标签没有任何功能&#xff0c;即不会添加where关键字 2.若where标签中的 if 条件满足&#xff0c;则where标签会自动添加where关键字&#xff0c;并将条件最前…

mybatis-动态sql

文章目录 1. 动态sql简述2. 动态sql示例 2.1 if2.2 choose2.3 foreach2.4 sql 及 include2.5 sql中的特殊字符3. 后台分页实现4. 数据版本号处理并发问题 1. 动态sql简述 mybatis的动态sql语句是基于OGNL表达式的。可以方便的在sql语句中实现某些逻辑. 总体说来mybatis动态SQL…

mysql动态sql拼接_动态SQL(拼接)

Q1:什么是动态SQL呢? A1:首先是SQL语句,是根据条件来拼接SQL Q2:为什么要用动态SQL? A2:因为在条件WHERE中出现OR会导致不能使用索引,从而使效率差别巨大。 例如:如图1、2, 图(1) 图(2) Q3:怎么样使用动态SQL? A3: 存储过程Proc_Test是没有采用拼接的:CREATE PROC…

Mybatis学习之动态Sql

目录 1. 什么是动态Sql 2. 动态Sql需要学习什么 3. 动态Sql之《if》 4. 动态Sql之《where》 5. 动态Sql之《foreach》 6. 动态Sql之《sql》 7. PageHelper分页插件的使用 1. 什么是动态Sql 答案&#xff1a;动态Sql指的是&#xff0c;Sql语句是变化的&#xff0c;不是固…

Mybatis 动态SQL

Mybatis 动态SQL 一 .动态SQL 数组 array 使用foreach 标签 <!-- mybatis的集合操作知识点: 如果遇到集合参数传递,需要将集合遍历标签: foreach 循环遍历集合标签属性说明:1.collection 表示遍历的集合类型1.1 数组 关键字 array1.2 List集合 关键字 list1.3 Map集…

Mybatis动态SQL解析

文章目录 1 为什么需要动态SQL&#xff1f;2 动态标签有哪些?3 举例说明ifchoose (when, otherwise)trim (where, set)foreach 1 为什么需要动态SQL&#xff1f; 看一段Oracle存储过程代码&#xff1a; 由于前台传入的查询参数不同&#xff0c;所以写了很多的if else&#x…

Java MyBatis动态SQL

&#x1f9ed;MyBatis学习 &#x1f389; 内容回顾 Java MyBatis的介绍及其执行原理 Java MyBatis配置详解 Java Mybatis中使用Junit进行测试_程序员必备 Java MyBatis的使用 &#x1f4e2;今天我们进行 Java MyBatis动态SQL 的学习&#xff0c;感谢你的阅读&#xff0c;内容若…

mysql动态SQL用法

顾名思义“动态”SQL就是不固定的SQL&#xff0c;根据不同的条件把SQL语句进行拼接&#xff0c;来实现对数据库更加精准的操作。可以通过配置文件或者注解的形式实现&#xff0c;多用于多条件联查。 xml版&#xff08;配置文件&#xff09;&#xff1a; xml版的动态SQL 接…

动态SQL标签

所谓的动态SQL&#xff0c;本职还是SQL语句&#xff0c;只是可以在SQL层面&#xff0c;去执行一个逻辑代码 动态SQL就是在拼接SQL语句&#xff0c;我们只要保证SQL的正确性&#xff0c;按照SQL的格式&#xff0c;去排列组合。 建议&#xff1a; 先在MySQL中写出完整的SQL&am…

MyBatis动态SQL

文章目录 前言一、\<if\>标签二、\<where\>标签三、\<trim\>标签四、\<set\>标签五、\<foreach\>标签五、\<sql\>标签 与 \<include\>标签 前言 动态sql是Mybatis的强大功能特性之一&#xff0c;能够完成不同条件下的sql拼接 以上…

动态SQL

动态SQL 在项目开发中&#xff0c;动态SQL可以解决很多不确定因素导致的SQL语句不同的问题。动态SQL可以简单高效的进行编码。在接下来的案例中进行认识和学习动态SQL。 动态SQL只是在原有的SQL语句中进行细微修改。案例贴合实际&#xff0c;编码简单易懂 文章目录 动态SQL一、…

动态 SQL

文章目录 一、学习目的二、动态 SQL 中的元素三、条件查询操作四、更新操作五、复杂查询操作1.foreach 元素中的属性2.foreach 元素迭代数组3.foreach 元素迭代 List4.foreach 元素迭代 Map 一、学习目的 在实际项目的开发中&#xff0c;开发人员在使用 JDBC 或其他持久层框架…

第3章 动态SQL

目录/Contents 第3章 动态SQL学习目标掌握MyBatis中动态SQL元素的使用掌握MyBatis的条件查询操作掌握MyBatis的更新操作掌握MyBatis的复杂查询操作 学习内容1 动态SQL中的元素1.1 使用动态SQL的好处1.2 动态SQL常用元素 2 条件查询操作2.1 \<if>元素2.2 \<choose>、…