Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

article/2025/10/7 4:18:22

点击上方蓝色字体,选择“设为星标”

回复”面试“获取更多惊喜

我在之前的文章中已经详细的介绍过Flink CDC的原理和实践了。

如果你对Flink CDC 还没有什么概念,可以参考这里:Flink CDC 原理及生产实践

在实际生产中相信已经有很多小伙伴尝试过了,我在这里将一些个人遇到的、搜索到的、官方博客中总结的以及在Flink的邮件组中的看到过的一些常见问题进行了总结。供大家参考。

不同的kafka版本依赖冲突

不同的kafka版本依赖冲突会造成cdc报错,参考这个issue:

http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393

2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

源码如下:

public class CdcTest {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("sohay") // monitor all tables under inventory database.username("root").password("123456").deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute();}
}

确实是pom中存在一个Kafka的依赖包,导致冲突,去掉后问题解决。

MySQL CDC源等待超时

在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

数据库切换,重新开启binlog,Mysql全局锁无法释放

原因是因为切换了数据库环境,重新开启binlog,所有的作业都重新同步binlog的全量数据,导致了全局锁一直在等待,所有作业都无法执行。解决方法:记录checkpoint的地址,取消作业,然后根据checkpoint重启作业。

使用Flink SQL CDC模式创建维表异常

CREATE TABLE cdc_test
(id  STRING,ip  STRING,url STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', 'hostname' = '127.0.0.1','port' = '3306','database-name' = 'xx','table-name' = 'xx','username' = 'xx','password' = 'xx'
);

执行查询:

SELECT * FROM cdc_test;

任务无法运行,抛出异常

User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.

原因是连接MySQL的用户缺乏必要的CDC权限。

Flink SQL CDC基于Debezium实现。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。

解决办法:创建一个新的MySQL用户并授予其必要的权限。

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;

Flink作业扫描MySQL全量数据出现fail-over

Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:

原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。

解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

作业在运行时 mysql cdc source 报 no viable alternative at input 'alter table std'

原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。

解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。

多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。

原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。

解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:

FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

CDC source 扫描 MySQL 表期间,发现无法往该表 insert 数据

原因:由于使用的 MySQL 用户未授权 RELOAD 权限,导致无法获取全局读锁(FLUSH TABLES WITH READ LOCK), CDC source 就会退化成表级读锁,而使用表级读锁需要等到全表 scan 完,才能释放锁,所以会发现持锁时间过长的现象,影响其他业务写入数据。

解决方法:给使用的 MySQL 用户授予 RELOAD 权限即可。所需的权限列表详见文档:

https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server

如果出于某些原因无法授予 RELOAD 权限,也可以显式配上 'debezium.snapshot.locking.mode' = 'none'来避免所有锁的获取,但要注意只有当快照期间表的 schema 不会变更才安全。

我们在学习Flink的时候,到底在学习什么?

我们在学习Spark的时候,到底在学习什么?

【面试&个人成长】2021年过半,社招和校招的经验之谈

八千里路云和月 | 从零到大数据专家学习路径指南

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

你好,我是王知无,一个大数据领域的硬核原创作者。

做过后端架构、数据中间件、数据平台&架构、算法工程化。

专注大数据领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。


http://chatgpt.dhexx.cn/article/atoJJwFL.shtml

相关文章

MySql的Binlog日志工具分析:Canal、Maxwell、Databus、DTS

点击上方蓝色字体&#xff0c;选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注&#xff0c;大数据开发领域最强公众号&#xff01; 暴走大数据 点击右侧关注&#xff0c;暴走大数据&#xff01; Canal 定位&#xff1a;基于数据库增量日志解析&#…

[架构设计]--让你的数据库流动起来 – 利用MySQL Binlog实现流式实时分析架构

感谢原文作者&#xff1a;https://aws.amazon.com/cn/blogs/china/mysql-binlog-architecture/ 数据分析特别是实时数据分析&#xff0c;已经越来越多的成为各行各业的分析要求与标准 – 例如&#xff0c;&#xff08;新&#xff09;零售行业可能希望通过线下POS数据与实时门店…

Flink实战 - Binlog日志并对接Kafka实战

点击上方蓝色字体&#xff0c;选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注&#xff0c;大数据开发领域最强公众号&#xff01; 大数据真好玩 点击右侧关注&#xff0c;大数据真好玩&#xff01; 对于 Flink 数据流的处理&#xff0c;一般都是去直…

mysql 无bin_Mysql无Binlog数据恢复

无全量备份、未开启binlog日志&#xff0c;利用percona工具恢复 delete的数据 今天&#xff0c;利用Percona Data Recovery Tool for InnoDB工具(仅支持InnoDB&#xff0c;MyISAM不支持)&#xff0c;可以找回被删除的数据。 原理&#xff1a;在InnoDB引擎&#xff0c;delete删除…

nodejs安装和环境配置

1、node下载 官方下载地址: Node.js 下载node-v16.16.0-x64 2、安装测试 安装一直cmd即可 在主目录下打开cmd node -v 查看node的版本 npm -v 查看npm的版本(新版的node安装自带安装npm) 3、配置全局安装的模块路径和缓存路径 在nodejs文件夹,创建 node_global 和 node_…

Nodejs安装及常见问题

一、安装环境 简单的说 Node.js 就是运行在服务端的 JavaScript。Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。Node.js 使用了一个事件驱动、非阻塞式 I/O 的模型&#xff0c;使其轻量又高效。Node.js 的包管理器 npm&#xff0c;是全球最大的开源库生态系统。…

nodejs安装的坑后坑

npm改默认位置后报错权限不足 由于不想将npm的模块下载到c盘&#xff0c;虽然某人一直说node才十几兆&#xff0c;但是C盘是真的小&#xff0c;绝对不能放里面了。 本来我的node就是安装在d盘&#xff0c;今天看到了npm改路径的方法&#xff0c;正好就想改一下&#xff0c;没想…

vue安裝及配置 nodejs安装配置

vue安装及配置 vue安装步骤 nodejs安装 安装nodejs环境&#xff1a;https://nodejs.org/en/ 查看node版本&#xff1a;node-v vue3.0需要使用node 8版本以上 npm镜像配置 npm是nodejs内置的资源管理器 npm两个镜像&#xff1a; 淘宝镜像&#xff1a;https://registry.npm.…

win10 Nodejs安装步骤

本人后端 仅供学习参考记录&#xff0c;有不妥之处 望指点。 Nodejs安装步骤 官网 下载安装包 官网地址&#xff1a;https://nodejs.org/zh-cn/ 历史版本地址 Node v14.16.0 (LTS) | Node.js 安装步骤&#xff1a; 双击下载的安装包 安装最新17.2项目有问题 后卸载17.2 …

linux系统宝塔安装nodejs,node安装,nodejs安装,Windows nodejs安装,Linux nodejs安装

node安装&#xff0c;nodejs安装&#xff0c;Windows nodejs安装&#xff0c;Linux nodejs安装 Windows系统&#xff1a; 安装&#xff1a;node-v12.14.0-x86.msi 查看&#xff1a;node -v 返回版本信息&#xff0c;比喻&#xff1a;v0.10.48 Linux系统&#xff1a; 第一种&…

nodejs 安装及环境配置

一、安装nodejs 从nodejs官网找到需版本的nodejs下载。 直接双击下一步安装&#xff0c;建议安装时更换路径&#xff0c;默认使用C盘&#xff0c;我这里更换路径为这个D:\software\nodejs 安装完成之后&#xff0c;检查一下 1.检查node安装版本 命令 node -v2.检查npm版本&a…

Nodejs安装npm

Nodejs安装npm 修改NPM默认安装路径&#xff0c;下载cnpm及设置npm源 修改NPM全局模式的默认安装路径 一般情况下&#xff0c;我们安装 Node.js环境&#xff0c;程序会自动把 NPM全局模块的路径设置在系统盘&#xff08;通常是在 c盘下&#xff09;&#xff0c;我们在项目开…

nodejs安装

记录知识点滴&#xff0c;以供随时查阅&#xff0c;如有发现错漏和需要补充的地方&#xff0c;欢迎留言说明 nodejs安装 1、官网下载最新版本nodejs的windows安装包2、使用安装包安装 win10系统中安装nodejs 1、官网下载最新版本nodejs的windows安装包 http://nodejs.cn/down…

NodeJS安装(npm包管理器)

1、nodejs下载 windows下的NodeJS安装是比较方便的&#xff0c; 只需要登陆官网&#xff08;Node.js&#xff09;&#xff0c;直接点击64-bit下载安装 2、安装过程基本直接“NEXT”&#xff0c;NodeJS已经集成了npm&#xff0c;所以npm也一并安装好了 3、在cmd窗口输入node -…

nodejs安装和环境配置-Windows

0.安装过程中遇到的常见问题 访问&#xff1a;https://blog.csdn.net/weixin_52799373/article/details/125718587?spm1001.2014.3001.5502 1.下载node.js 下载地址: https://nodejs.org/en/ 2.安装 2.1 安装 其实就是无脑下一步&#xff0c;第三步的时候可以选择自定义目…

mysql 更新sql 语句怎么写_sql更新语句怎么写

在SQL数据库中的更新语句要使用UPDATE语句来完成&#xff0c;UPDATE语句的作用是改变数据库中现有的数据&#xff0c;从而达到更新数据的目的&#xff0c;其语法是“update set where...”。 在SQL数据库中的更新语句要使用UPDATE语句来完成&#xff0c;UPDATE语句的作用是改变…

mysql更新语句怎么写

本文摘自由千锋教育高教产品研发部编著的**《MySQL数据库从入门到精通》**&#xff0c;转载请注明来源&#xff0c;谢谢&#xff01; MySQL中update语句用于更新表中的现有数据。亦可用UPDATE语句来更改表中单个行&#xff0c;一组行或所有行的列值。 MySQL中UPDATE语句的语法…

mysql的更新用法_mysql更新语句的详细用法

首先,单个表的UPDATE语句: 更新[LOW_PRIORITY] [IGNORE] tbl_name SET col_name1 = expr1 [,col_name2 = expr2 ...] [WHERE where_definition] [ORDER BY ...] [LIMIT row_count] 第二,多表UPDATE语句: 更新[LOW_PRIORITY] [IGNORE] table_references SET col_name1 = expr…

mysql更新的语句怎么写_mysql更新语句怎么写?

MySQL更新语句也就是MySQL中的update语句&#xff0c;当我们需要更新或者修改表中的数据时&#xff0c;就会使用这个update语句&#xff0c;下面我们就来看一下mysql更新语句的具体写法。 MySQL中update语句用于更新表中的现有数据。亦可用UPDATE语句来更改表中单个行&#xff…

FIDDLER可以用来干啥?!

Fiddler的功能&#xff0c;完全可以用一张图来概括总结&#xff0c;真的是很精辟啊&#xff01;所以开篇就和大家来分享一下&#xff1a;