数仓实时数据同步 debezium

article/2025/8/28 20:43:12

数仓实时数据同步 debezium

  • 背景
  • debezium 简介
    • 架构
    • 基本概念
  • 例子
  • Router
  • 目前遇到的问题

背景

数据湖将源库的数据同步到hive数仓ods层,或直接在kafka中用于后面计算。源库包括mysql、postgresql、sqlserver、oracle,大部分是mysql数据库。当前采用的sqoop T+1全量或增量抽取的方式,时效性低,delete的数据可能无法被正确处理。

选择debezium的原因:数据源支持众多,使用的组件仅仅是kafka,需要进行的开发少;debezium使用kafka-connect,而且kafka 2.3版本以后 增加或修改一个任务、整个kafka-connect集群都会rebalance的情况得到优化;类似binlog的位点存储在kafka中,不再需要引入额外的存储也不需要关心位点;能保证at-least-once。

debezium 简介

架构

在这里插入图片描述

debezium 主要是一个kafka-connect的各种数据源同步的一种source实现。
数据存储在kafka中

基本概念

kafka-connect 独立于kafka的服务,本项目中采用集群的部署方式,依赖kafka实现协调。

connector 针对一个连接实例,模仿从库从主库获取实时binlog。 可支持mysql postgresql oracle sqlserver mongoDb 等多种数据源。connector运行在kafka-connect中

task 每个同步源数据的connector,只采用一个task(其他任务可以采用多可来保证高可用和提高并发),task是connector任务执行的最小单位。运行在kafka-connect中,作为一个线程。

例子

同步mysql binlog,检查源库的binlog设置,

--是否开启binlog
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
--是否开启行模式  row
SELECT variable_value as "binlog_format"
FROM information_schema.global_variables WHERE variable_name='binlog_format';
--是否补全所有字段  full
SELECT variable_value as "binlog_row_image"
FROM information_schema.global_variables WHERE variable_name='binlog_row_image';
#新建一个mysql的同步任务
#${connectIp}
#${name}         connector名称 
#${ip}           mysql实例
#${port}
#${user}
#${password}     
#${serverId}     模拟从库的servcer.id
#${serverName}   此名称会出现在topic中  ${serverName}.${schema}.${table}
#${tableList}    db1.table1,db2.table1,db2.table2
#${kafka}        ip:9092,ip:9092 如果服务器较多随机写三个足够
#${historyTopic} 存储ddl的topic名称,debezium内部使用curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" ${connectIp}:8083/connectors/ \
-d '{ "name": "${name}", "config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"${ip}","database.port":"${port}","database.user":"${user}","database.password":"${password}","database.server.id":"${serverId}","database.server.name":"${serverName}","table.whitelist":"${tableList}","database.history.kafka.bootstrap.servers":"${kafka}","snapshot.mode":"schema_only","tombstones.on.delete":"false","database.history.kafka.topic":"${historyTopic}","database.history.skip.unparseable.ddl":"true","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter.schemas.enable":"false","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"false","transforms.unwrap.delete.handling.mode":"rewrite","transforms.unwrap.add.fields":"source.ts_ms"} }'

主要参数,还有很多其他的参数未列出

	"name": "${name}","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "${ip}","database.port": "${port}","database.user": "${user}","database.password": "${password}","database.server.id": "${serverId}","database.server.name": "${serverName}","table.whitelist": "${tableList}","database.history.kafka.bootstrap.servers": "${kafka}","snapshot.mode": "schema_only",      //选择schema_only,从当前最新的位点开始同步,不需要冷数据,冷数据用其他方式抽取"tombstones.on.delete": "false","database.history.kafka.topic": "${historyTopic}","database.history.skip.unparseable.ddl": "true","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "false","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",        //简化数据结构"transforms.unwrap.drop.tombstones": "false","transforms.unwrap.delete.handling.mode": "rewrite","transforms.unwrap.add.fields": "source.ts_ms"                                                         //增加时间戳字段用于}
}

kafka中的数据

{"id":1004,"first_name":"10","last_name":"1","email":"1","__source_ts_ms":1596450910000,"__deleted":"false"}  //key是 {"id":1004}
{"id":1004,"first_name":"11","last_name":"1","email":"1","__source_ts_ms":1596451124000,"__deleted":"false"}  //key是 {"id":1004}
{"id":1004,"first_name":"101","last_name":"1","email":"1","__source_ts_ms":1596606837000,"__deleted":"false"} //key是 {"id":1004}
{"id":1004,"first_name":"102","last_name":"1","email":"1","__source_ts_ms":1596606992000,"__deleted":"false"} //key是 {"id":1004}

__source_ts_ms 和__deleted是配置产生的字段

Router

分表的合并,将分表数据写到一个topic

"transforms":"Reroute,unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite","transforms.unwrap.add.fields":"source.ts_ms",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "dbserver1\\..*",
"transforms.Reroute.topic.replacement": "dbserver1.all"

结果

key的值{"id":1004,"__dbz__physicalTableIdentifier":"dbserver1.inventory.customers"}===============value的值{"id":1004,"first_name":"204","last_name":"1","email":"1","__source_ts_ms":1596773158000,"__deleted":"false"}

__dbz__physicalTableIdentifier是自动增加的一个key字段来区别表,字段名称可以改,也可以从topic名称中匹配获取

目前遇到的问题

  • 需要同步的表比较多,kafka topic多,对性能影响比较大。
  • kakfa 新版本kakfa-manager等基本都不能很好支持,需要自己开发来监控和管理kafka,connect集群也要自己开发监控。
  • 数据同步后如何到hive数仓更好,hbase kudu hudi 或者直接hdfs。

http://chatgpt.dhexx.cn/article/g2GgsIqB.shtml

相关文章

基于日志的同步数据一致性和实时抽取

宜信技术研发中心架构师 目前就职于宜信技术研发中心,任架构师,负责流式计算和大数据业务产品解决方案。曾任职于Naver china(韩国最大搜索引擎公司)中国研发中心资深工程师,多年从事CUBRID分布式数据库集群开发和CUB…

解析 TiDB 在线数据同步工具 Syncer

TiDB 是一个完全分布式的关系型数据库,从诞生的第一天起,我们就想让它来兼容 MySQL 语法,希望让原有的 MySQL 用户 (不管是单机的 MySQL,还是多机的 MySQL Sharding) 都可以在基本不修改代码的情况下,除了可以保留原有…

服务器与客户端的数据同步

2019独角兽企业重金招聘Python工程师标准>>> 问题 从一个例子说起,我们的客户端从服务器获取数据,这里假定获取文章。第一次使用,我们获取服务器端最新发表的几篇文章。 我们可以每次都重新获取,但这样费时又费流量。好…

数据实时同步或抽取上收的技术分析

原文:http://blog.csdn.net/dsg_gulibin/article/details/1696365 1 实现数据集中的技术手段分析比较 根据业界提供数据同步或抽取的解决方案来看,主要包括以下几大类: l 存储复制技术 l 数据库复制技术 l …

时间同步/集群时间同步/在线/离线

目录 一、能够连接外网 二、集群不能连接外网--同步其它服务器时间 一、能够连接外网 1.介绍ntp时间协议 NTP(Network Time Protocol)网络时间协议,是用来使计算机时间同步的一种协议,它可以使计算机对其服务器或时钟源做同步…

如何实时同步数据到StarRocks

我们知道,是StarRocks基于Doris开发的,它在多表连接查询的性能方面引领OLAP市场,是一个很好用的结构化数据仓库。但是一直没有很好的工具能够实现业务数据库的数据实时同步到StarRocks分布式数据仓库集群中,本文将带领读者一起来通…

文件实时同步

rsync remote sync 远程同步,同步是把数据从缓冲区同步到磁盘上去的、数据在内存缓存区完成之后还没有写入到磁盘中去、所以有时候要同步到磁盘中去的,而rsync说白了跟复制差不多、能将一个文件从一个地方复制到另外一个地方的、但是他也可以实现跨主机复…

系统间通讯实现数据信息实时同步解决方案

项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步。当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发送消息,让B系统后台进行自动数据…

一文带你玩转实时数据同步方案

1、概述 1.1、目标 实时数据同步主要实现从源数据库到目标数据库的实时数据同步。源数据主要支持mysql数据库,目标数据包括mysql数据库和hbase数据库。 下面是实时数据同步的数据流转图,mysql的增量订阅数据经过canal和kafka,数据最终实时…

两台服务器同时写文件 怎么做同步,两台服务器做实时数据同步

两台服务器做实时数据同步 内容精选 换一换 DCS Memcached即将下线,部分Region已暂停售卖,建议使用Redis4.0/5.0。本章节主要描述Memcached主备实例。Memcached主备实例在单机实例基础上,增强服务高可用以及数据高可靠性。Memcached主备实例具…

实时数据同步方案

一.Flume收集各数据库日志,准实时抽取到HDFS 安装HDP,包含Flume 方案优点: 1.配置简单,不用编程:只要在flume.conf文件中配置source、channel及sink的相关属性 2.采用普通SQL轮询的方式实现,具有通用性&…

如何实现数据自动化的实时同步?

企业在日常业务中,比如总分支机构之间、数据中心之间、不同节点之间、跨国业务之间等,都需要将文件及时的传输,以供协同使用。所以,很多企业会选择一些同步工具或软件。 谈到文件同步备份大家使用较多的可能是Rsync、同步盘等一些…

像素是什么,一个像素有多大,像素和分辨率的关系

图片的像素和分辨率 对于像素和分辨率这两个词,主要见于图片和显示设备上。只要你用到手机里的照相功能,你都要接触到这两个概念。只是大多数人都是一知半解,而更多的人却根本就不知道,白白浪费了手机里500万、800万像素的摄影头&…

屏幕尺寸、分辨率、DPI、PPI

屏幕尺寸 下面这张图是华为荣耀7的尺寸图,图上写的是5.2英寸。我们所说的这个5.2英寸是手机屏幕对角线的长度。 我们平时是不用英寸这个单位的,我们用的是毫米,厘米,米这些单位。那么英寸和毫米,厘米之间怎样的换算关…

像素(Pixel)、DPI与PPI一看就明白

像素(Pixel)、DPI与PPI 像素(Pixel)DPI 英文全写是(Dots Per Inch,每英寸点数)PPI 英文全写是(Pixels Per Inch,每英寸像素数)比喻来区分应用(这张图熟悉吗)1.分辨率啥意思&#xff…

关于像素、分辨率、PPI、DPI等概念的分析

关于影像图的比例尺和分辨率:https://blog.csdn.net/liliiii/article/details/40261953 当我们说到 像素、分辨率、DPI、PPI等专业术语的时候,一般会涉及到图像、屏幕、打印机等等。 像素(Pixel)为图像显示的基本单位,…

分辨率 PPI DPI概念定义详解

我们在开发中,涉及到UI显示时,经常会遇到的一些概念,比如分辨率,ppi,dpi等,这些概念,在百度百科中,发现都有对它们的定义,一些博客中,也有对这几个概念的对比…

DPI与PPI的区别

开发中不免会遇到分辨率、DPI、PPI和屏幕尺寸等术语,那就弄弄清楚这些概念的真正含义。 分辨率 分辨率这个词在很多地方都有,比如相机、视频、扫描仪。这里说的就是显示器的分辨率。显示器是由一个个像素点(pixel)所组成的,一般所说的显示器…

传感器尺寸、像素、DPI分辨率、英寸、毫米的关系

虽然网上有很多这种资料,但是太过于复杂,每个人的说法都不一样,看的让人云里雾里的,我总结了一下,不知道对不对! 1. 1英寸25.4mm 2. 传感器尺寸:传感器的尺寸是指传感器的大小,一般…

dpi 、 dip 、分辨率、屏幕尺寸、px、density 关系以及换算

一、基本概念 dip : Density independent pixels ,设备无关像素。dp :就是dippx : 像素dpi :dots per inch , 直接来说就是一英寸多少个像素点。常见取值 120,160&…