Flink实战 - Binlog日志并对接Kafka实战

article/2025/10/7 4:16:32

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

大数据真好玩

点击右侧关注,大数据真好玩!

对于 Flink 数据流的处理,一般都是去直接监控 xxx.log 日志的数据,至于如何实现关系型数据库数据的同步的话网上基本没啥多少可用性的文章,基于项目的需求,经过一段时间的研究终于还是弄出来了,写这篇文章主要是以中介的方式记录下来,也希望能帮助到在做关系型数据库的实时计算处理流的初学者。

一、设计流程图

二、MySQL 的 Binlog 日志的设置

找到 MySQL 的配置文件并编辑:

[root@localhost etc]# vim /etc/my.cnf
[mysqld]
# 其它配置省略。。。。。。lower_case_table_names=1
## Replication
server_id                       =2020041006     # 唯一
log_bin                         =mysql-bin-1       # 唯一
relay_log_recovery              =1
binlog_format                   =row   # 格式必须是 row,否则 ogg 监控有问题
master_info_repository          =TABLE
relay_log_info_repository       =TABLE
#rpl_semi_sync_master_enabled    =1
rpl_semi_sync_master_timeout    =1000
rpl_semi_sync_slave_enabled     =1
binlog-do-db                    =dsout    # 要生成binlog 的数据库
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

这里注意的是配置完 my.cnf 文件之后要重启 MySQL 服务器才能生效。查看配置的 状态 和 serverid 命令请参见这篇文章:

三、下载 OGG 并安装部署

下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html

1.下载下来的压缩包解压并放入指定的文件夹中去

mkdir -p /opt/module/ogg/oggservice
tar -xvf ggs_Linux_x64_MySQL_64bit.tar -C /opt/module/ogg/oggservice/
chown -R root:root oggservice/     # 授权成指定的用户及用户组

2.进入ogg并启动

cd oggservice/
./ggsci

3.源系统的操作步骤及配置信息如下:

GGSCI (localhost.localdomain) 1> create subdirs   # 创建目录
GGSCI (localhost.localdomain) 3> dblogin sourcedb dsout@192.168.x.xxx:3306,userid 用户,password 密码;   # 监控日志
GGSCI (localhost.localdomain) 3> edit params mgr
port 7015
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3GGSCI (localhost.localdomain) 4> edit params ext1   # 抽取进程
EXTRACT ext1
setenv (MYSQL_HOME="/usr/local/mysql")
dboptions host 192.168.x.xx:3306, connectionport 3306
tranlogoptions altlogdest /usr/local/mysql/data/mysql-bin-1.index
SOURCEDB dsout@192.168.x.xx:3306,userid 用户,password 密码
EXTTRAIL ./dirdat/et
dynamicresolution
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
table dsout.employees;
table dsout.departments;GGSCI (localhost.localdomain) 5> edit params pump1  # 推送进程
EXTRACT pump1
SOURCEDB dsout@192.168.x.xx:3306,userid 用户,password 密码
RMTHOST 目标服务器的IP地址, MGRPORT 2021
RMTTRAIL ./dirdat/xd
table dsout.*;    # 要推送的表#为数据库的binlog添加监控和推送进程
GGSCI (localhost.localdomain DBLOGIN as dsout) 8> add extract ext1, tranlog,begin now
GGSCI (localhost.localdomain DBLOGIN as dsout) 9> add exttrail ./dirdat/et, extract ext1
GGSCI (localhost.localdomain DBLOGIN as dsout) 10> add extract pump1, exttrailsource ./dirdat/et
GGSCI (localhost.localdomain DBLOGIN as dsout) 11> add rmttrail ./dirdat/rt,extract pump1# 配置 defgen 进程
GGSCI (localhost.localdomain) 6> edit param defgen
defsfile ./dirdef/defgen.def
sourcedb dsout@192.168.x.xx:3306,userid 用户,password 密码
table dsout.*;# 生成 defgen.prm 文件
[mysql@localhost oggformysql]$ ./defgen paramfile ./dirprm/defgen.prm

4.进入 ogg 查看各个配置的服务进程:

GGSCI (localhost.localdomain) 5> info all

效果图如下:

到此为止源系统的ogg已经配置完成,接下来我们要在目标端配置接收到的数据将其以 json 的形式发送到 kafka。

5.解压并授权

mkdir -p /opt/module/ogg/oggservice
tar -xvf OGG_BigData_Linux_x64_19.1.0.0.1.tar -C /opt/module/ogg/oggservice/
chown -R root:root oggservice/

6.配置依赖包

find / -name libjvm.sovim ~/.bash_profile
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b03-1.el7.x86_64/jre/lib/amd64/server/source ~/.bash_profile

7.启动并配置相关进程

cd oggservice/
./ggsci GGSCI (cdh102) 1> create subdirs GGSCI (cdh102) 1> edit param mgr   # 配置主进程
PORT 2021
ACCESSRULE, PROG *, IPADDR *, ALLOWGGSCI (cdh102) 2> edit param rep2  # 配置复制进程
replicat rep2
sourcedefs ./dirdef/defgen.def
TARGETDB LIBFILE libggjava.so SET property=./dirprm/kafkaxd.props
MAP dsout.*, TARGET dsout.*;# (注意,这里的exttrail必须和源端的dump配置一致)
GGSCI (cdh102) 5> add replicat rep2, exttrail ./dirdat/rt

8.创建对接 kafka的配置文件

cd ./dirprm[root@cdh102 dirprm]# vim kafkaxd.props   # -> 配置文件内容如下gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=xindai_kafka_producer.properties   # kafka 生产者属性文件
#######The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=xindai-topic   # 主题
############The following selects the message key using the concatenated primary keys
############gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
###########gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.SchemaTopicName=xindai-topic    # 主题
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format=json
#########gg.handler.kafkahandler.format.insertOpKey=I
#######gg.handler.kafkahandler.format.updateOpKey=U
#########gg.handler.kafkahandler.format.deleteOpKey=D
#######gg.handler.kafkahandler.format.truncateOpKey=T
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
##########Sample gg.classpath for Apache Kafka  这里一定要指定kafka依赖包的路径
gg.classpath=dirprm/:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/libs/*
##########Sample gg.classpath for HDP
#########gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* 
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

9.配置 KafkaProducerConfigFile属性文件

[root@cdh102 dirprm]# vim xindai_kafka_producer.propertiesbootstrap.servers=cdh101:9092,cdh102:9092,cdh103:9092
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
######## 100KB per partition
batch.size=16384
linger.ms=0
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

10.启动进程

# 目标端
GGSCI (gpdata) 6> start mgr
GGSCI (gpdata) 7> start rep2# 源端
GGSCI (localhost.localdomain) 1> start mgr
GGSCI (localhost.localdomain) 2> start ext1
GGSCI (localhost.localdomain) 3> start pump1   # (先起目标的 mgr 才不会报错)

效果图如下:

四、验证

1.启动kafka消费者

[root@cdh102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic xindai-topic --from-beginning

2.向库的监控表中对数据进行增、删、改 操作

INSERT INTO employees VALUES('101','changyin',6666.66,'2020-05-05 16:12:20','syy01');
INSERT INTO employees VALUES('102','siling',1234.12,'2020-05-05 16:12:20','syy01');

3.查看Kafka消费者的数据

到此,我们已经成功的配置好了 使用 Ogg 监控 MySQL - Binlog 日志,然后将数据以 Json 的形式传给 Kafka 的消费者的整个流程;这是项目实践中总结出来的,为了方便以后查询,在此做了下记录,希望也能帮到志同道合的同学们。

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。

微信公众号|import_bigdata

编辑 《大数据技术与架构》

插画 《大数据技术与架构》

文章链接 https://www.jianshu.com/u/b14730fd40bd

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????


http://chatgpt.dhexx.cn/article/owCKxvJC.shtml

相关文章

mysql 无bin_Mysql无Binlog数据恢复

无全量备份、未开启binlog日志,利用percona工具恢复 delete的数据 今天,利用Percona Data Recovery Tool for InnoDB工具(仅支持InnoDB,MyISAM不支持),可以找回被删除的数据。 原理:在InnoDB引擎,delete删除…

nodejs安装和环境配置

1、node下载 官方下载地址: Node.js 下载node-v16.16.0-x64 2、安装测试 安装一直cmd即可 在主目录下打开cmd node -v 查看node的版本 npm -v 查看npm的版本(新版的node安装自带安装npm) 3、配置全局安装的模块路径和缓存路径 在nodejs文件夹,创建 node_global 和 node_…

Nodejs安装及常见问题

一、安装环境 简单的说 Node.js 就是运行在服务端的 JavaScript。Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。Node.js 使用了一个事件驱动、非阻塞式 I/O 的模型,使其轻量又高效。Node.js 的包管理器 npm,是全球最大的开源库生态系统。…

nodejs安装的坑后坑

npm改默认位置后报错权限不足 由于不想将npm的模块下载到c盘,虽然某人一直说node才十几兆,但是C盘是真的小,绝对不能放里面了。 本来我的node就是安装在d盘,今天看到了npm改路径的方法,正好就想改一下,没想…

vue安裝及配置 nodejs安装配置

vue安装及配置 vue安装步骤 nodejs安装 安装nodejs环境:https://nodejs.org/en/ 查看node版本:node-v vue3.0需要使用node 8版本以上 npm镜像配置 npm是nodejs内置的资源管理器 npm两个镜像: 淘宝镜像:https://registry.npm.…

win10 Nodejs安装步骤

本人后端 仅供学习参考记录,有不妥之处 望指点。 Nodejs安装步骤 官网 下载安装包 官网地址:https://nodejs.org/zh-cn/ 历史版本地址 Node v14.16.0 (LTS) | Node.js 安装步骤: 双击下载的安装包 安装最新17.2项目有问题 后卸载17.2 …

linux系统宝塔安装nodejs,node安装,nodejs安装,Windows nodejs安装,Linux nodejs安装

node安装,nodejs安装,Windows nodejs安装,Linux nodejs安装 Windows系统: 安装:node-v12.14.0-x86.msi 查看:node -v 返回版本信息,比喻:v0.10.48 Linux系统: 第一种&…

nodejs 安装及环境配置

一、安装nodejs 从nodejs官网找到需版本的nodejs下载。 直接双击下一步安装,建议安装时更换路径,默认使用C盘,我这里更换路径为这个D:\software\nodejs 安装完成之后,检查一下 1.检查node安装版本 命令 node -v2.检查npm版本&a…

Nodejs安装npm

Nodejs安装npm 修改NPM默认安装路径,下载cnpm及设置npm源 修改NPM全局模式的默认安装路径 一般情况下,我们安装 Node.js环境,程序会自动把 NPM全局模块的路径设置在系统盘(通常是在 c盘下),我们在项目开…

nodejs安装

记录知识点滴,以供随时查阅,如有发现错漏和需要补充的地方,欢迎留言说明 nodejs安装 1、官网下载最新版本nodejs的windows安装包2、使用安装包安装 win10系统中安装nodejs 1、官网下载最新版本nodejs的windows安装包 http://nodejs.cn/down…

NodeJS安装(npm包管理器)

1、nodejs下载 windows下的NodeJS安装是比较方便的, 只需要登陆官网(Node.js),直接点击64-bit下载安装 2、安装过程基本直接“NEXT”,NodeJS已经集成了npm,所以npm也一并安装好了 3、在cmd窗口输入node -…

nodejs安装和环境配置-Windows

0.安装过程中遇到的常见问题 访问:https://blog.csdn.net/weixin_52799373/article/details/125718587?spm1001.2014.3001.5502 1.下载node.js 下载地址: https://nodejs.org/en/ 2.安装 2.1 安装 其实就是无脑下一步,第三步的时候可以选择自定义目…

mysql 更新sql 语句怎么写_sql更新语句怎么写

在SQL数据库中的更新语句要使用UPDATE语句来完成,UPDATE语句的作用是改变数据库中现有的数据,从而达到更新数据的目的,其语法是“update set where...”。 在SQL数据库中的更新语句要使用UPDATE语句来完成,UPDATE语句的作用是改变…

mysql更新语句怎么写

本文摘自由千锋教育高教产品研发部编著的**《MySQL数据库从入门到精通》**,转载请注明来源,谢谢! MySQL中update语句用于更新表中的现有数据。亦可用UPDATE语句来更改表中单个行,一组行或所有行的列值。 MySQL中UPDATE语句的语法…

mysql的更新用法_mysql更新语句的详细用法

首先,单个表的UPDATE语句: 更新[LOW_PRIORITY] [IGNORE] tbl_name SET col_name1 = expr1 [,col_name2 = expr2 ...] [WHERE where_definition] [ORDER BY ...] [LIMIT row_count] 第二,多表UPDATE语句: 更新[LOW_PRIORITY] [IGNORE] table_references SET col_name1 = expr…

mysql更新的语句怎么写_mysql更新语句怎么写?

MySQL更新语句也就是MySQL中的update语句,当我们需要更新或者修改表中的数据时,就会使用这个update语句,下面我们就来看一下mysql更新语句的具体写法。 MySQL中update语句用于更新表中的现有数据。亦可用UPDATE语句来更改表中单个行&#xff…

FIDDLER可以用来干啥?!

Fiddler的功能,完全可以用一张图来概括总结,真的是很精辟啊!所以开篇就和大家来分享一下:

rabbitmq中的consumerTag和deliveryTag分别是干啥的,有什么用?

2019独角兽企业重金招聘Python工程师标准>>> 同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息1,可以做此消息处理通道的名字。 因此 deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清…

过年回家,你是否也努力的给别人解释软件开发是干啥滴?

这个年就这样,在喜气洋洋的气氛中,在我们依依不舍的留恋中,从我们身边溜走了。这次回家又碰见了亲戚们不厌其烦的问我,你做什么工作呐?于是就有了我以下生动的解释 目录 打字的 帮助传话,帮助卖东西 皮…

网优测试软件p,网优到底是干啥的?

原标题:网优到底是干啥的? 如果你走在马路上 或者坐在地铁上 看到这么一个奇怪的男人—— 戴着黑框眼镜 穿着深色的外套或格子衬衫 抱着一台破旧的笔记本电脑 还拿着一部更加破旧的手机 电脑屏幕上有奇怪的地图 还有变来变去的奇怪数字 聚精会神 表情时而严肃 时而欢喜 时而绝…