【实时数仓】CDC简介、实现DWD层业务数据的处理(主要任务、接收kafka数据、动态分流*****)

article/2025/9/14 15:12:38

文章目录

  • 一 CDC简介
    • 1 什么是CDC
    • 2 CDC的种类
    • 3 Flink-CDC
  • 二 准备业务数据-DWD层
    • 1 主要任务
      • (1)接收Kafka数据,过滤空值数据
      • (2)实现动态分流功能
      • (3)把分好的流保存到对应表、主题中
    • 2 接收Kafka数据,过滤空值数据
      • (1)代码
      • (2)测试
    • 3 根据MySQL的配置表,进行动态分流
      • (1)准备工作
        • a 引入pom.xml 依赖
        • b 在Mysql中创建数据库
        • c 在gmall2022_realtime库中创建配置表table_process
        • d 创建配置表实体类
        • e 在MySQL Binlog添加对配置数据库的监听,并**重启**MySQL
      • (2)FlinkCDC的使用 -- DataStream
        • a 导入依赖
        • b 代码编写
        • c 测试
        • d 端点续传案例测试
      • (3)FlinkCDC的使用 -- FlinkSQL
        • a 导入依赖
        • b 基础信息配置
        • c 代码编写

一 CDC简介

1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2 CDC的种类

CDC主要分为基于查询和基于Binlog两种方式,主要了解一下这两种之间的区别:

基于查询的CDC基于Binlog的CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3 Flink-CDC

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,参考网址。

二 准备业务数据-DWD层

业务数据的变化,可以通过Maxwell采集到,但是MaxWell是把全部数据统一写入一个Topic中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到Hbase,将事实数据写回Kafka作为业务数据的DWD层。

1 主要任务

(1)接收Kafka数据,过滤空值数据

对Maxwell抓取数据进行ETL,有用的部分保留,没用的过滤掉。

(2)实现动态分流功能

由于MaxWell是把全部数据统一写入一个Topic中,这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表在某种情况下既是事实表也是维度表。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?

可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

这种可以有两个方案实现

  • 一种是用Zookeeper存储,通过Watch感知数据变化。
  • 另一种是用mysql数据库存储,周期性的同步,使用FlinkCDC读取。

这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便。

所以就有了如下图:

在这里插入图片描述

配置表字段说明:

  • sourceTable:原表名。
  • sinkType:输出的类型。
  • sinkTable:写出到哪个表。
  • sinkpk:主键。
  • sinkcolum:保留哪些字段。
  • ext:建表语句的扩展,如引擎,主键增长方式,编码方式等。
  • operateType:操作类型,不记录数据的删除操作。

(3)把分好的流保存到对应表、主题中

业务数据保存到Kafka的主题中。

维度数据保存到Hbase的表中。

2 接收Kafka数据,过滤空值数据

整体工作流程:

在这里插入图片描述

(1)代码

public class BaseDBApp {public static void main(String[] args) throws Exception {//TODO 1 基本环境准备//流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(1);//TODO 2 检查点设置//开启检查点env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);// 设置检查点超时时间env.getCheckpointConfig().setCheckpointTimeout(60000L);// 设置重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));// 设置job取消后,检查点是否保留env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 设置状态后端 -- 基于内存 or 文件系统 or RocksDBenv.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));// 指定操作HDFS的用户System.setProperty("HADOOP_USER_NAME","hzy");//TODO 3 从kafka中读取数据//声明消费的主题以及消费者组String topic = "ods_base_db_m";String groupId = "base_db_app_group";// 获取消费者对象FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);// 读取数据,封装成流DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//TODO 4 对数据类型进行转换 String -> JSONObjectSingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);//TODO 5 简单的ETLSingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(new FilterFunction<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonobj) throws Exception {boolean flag =jsonobj.getString("table") != null &&jsonobj.getString("table").length() > 0 &&jsonobj.getJSONObject("data") != null &&jsonobj.getString("data").length() > 3;return flag;}});filterDS.print("<<<");//TODO 6 动态分流//TODO 7 将维度侧输出流的数据写到Hbase中//TODO 8 将主流数据写回kafka的dwd层env.execute();}
}

(2)测试

业务数据总体流程如下:

在这里插入图片描述

开启zookeeper
开启kafka
开启maxwell
开启nm,等待安全模式关闭
开启主程序
模拟生成业务数据,查看主程序输出内容

3 根据MySQL的配置表,进行动态分流

通过FlinkCDC动态监控配置表的变化,以流的形式将配置表的变化读到程序中,并以广播流的形式向下传递,主流从广播流中获取配置信息。

(1)准备工作

a 引入pom.xml 依赖

<!--lomback插件依赖-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><scope>provided</scope>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>
<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.2.0</version>
</dependency>

b 在Mysql中创建数据库

注意和gmall2022业务库区分开

在这里插入图片描述

c 在gmall2022_realtime库中创建配置表table_process

CREATE TABLE `table_process` (`source_table` varchar(200) NOT NULL COMMENT '来源表',`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

d 创建配置表实体类

@Data
public class TableProcess {//动态分流Sink常量,改为小写和脚本一致public static final String SINK_TYPE_HBASE = "hbase";public static final String SINK_TYPE_KAFKA = "kafka";public static final String SINK_TYPE_CK = "clickhouse";//来源表String sourceTable;//操作类型 insert,update,deleteString operateType;//输出类型 hbase kafkaString sinkType;//输出表(主题)String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}

e 在MySQL Binlog添加对配置数据库的监听,并重启MySQL

sudo vim /etc/my.cnf
# 添加
binlog-do-db=gmall2022_realtime
# 重启
sudo systemctl restart mysqld

(2)FlinkCDC的使用 – DataStream

新建maven项目gmall2022-cdc。

a 导入依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.48</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

b 代码编写

/*** 通过FlinkCDC动态读取MySQL表中的数据 -- DataStreamAPI*/
public class FlinkCDC01_DS {public static void main(String[] args) throws Exception {//TODO 1 准备流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//TODO 2 开启检查点   Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,// 需要从Checkpoint或者Savepoint启动程序// 开启Checkpoint,每隔5秒钟做一次CK,并指定CK的一致性语义env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);// 设置超时时间为1分钟env.getCheckpointConfig().setCheckpointTimeout(60000);// 指定从CK自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,2000L));// 设置任务关闭的时候保留最后一次CK数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/flinkCDC"));// 设置访问HDFS的用户名System.setProperty("HADOOP_USER_NAME", "hzy");//TODO 3 创建Flink-MySQL-CDC的SourceProperties props = new Properties();props.setProperty("scan.startup.mode","initial");SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("hadoop101").port(3306).username("root").password("123456")// 可配置多个库.databaseList("gmall2022_realtime")///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据//注意:指定的时候需要使用"db.table"的方式.tableList("gmall2022_realtime.t_user").debeziumProperties(props).deserializer(new StringDebeziumDeserializationSchema()).build();//TODO 4 使用CDC Source从MySQL读取数据DataStreamSource<String> mysqlDS = env.addSource(sourceFunction);//TODO 5 打印输出mysqlDS.print();//TODO 6 执行任务env.execute();}
}

c 测试

在gmall2022_realtime添加表,执行程序,添加数据,可以看到以下信息

在这里插入图片描述

d 端点续传案例测试

# 打包并将带依赖的jar包上传至Linux
# 启动HDFS集群
start-dfs.sh
# 启动Flink集群
bin/start-cluster.sh
# 启动程序
bin/flink run -m hadoop101:8081 -c com.hzy.gmall.cdc.FlinkCDC01_DS ./gmall2022-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
# 观察taskManager日志,会从头读取表数据
# 给当前的Flink程序创建Savepoint 
bin/flink savepoint JobId hdfs://hadoop101:8020/flink/save
# 在WebUI中cancelJob
# 在MySQL的gmall2022_realtime.t_user表中添加、修改或者删除数据
# 从Savepoint重启程序
bin/flink run -s hdfs://hadoop101:8020/flink/save/JobId -c com.hzy.gmall.cdc.FlinkCDC01_DS ./gmall2022-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
# 观察taskManager日志,会从检查点读取表数据

(3)FlinkCDC的使用 – FlinkSQL

使用FlinkCDC通过sql的方式从MySQL中获取数据。

a 导入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>

b 基础信息配置

修改语言级别

在这里插入图片描述

修改编译级别

在这里插入图片描述

c 代码编写

public class FlinkCDC02_SQL {public static void main(String[] args) throws Exception {//TODO 1.准备环境//1.1流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.2 表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2.创建动态表tableEnv.executeSql("CREATE TABLE user_info (" +"  id INT," +"  name STRING," +"  age INT" +") WITH (" +"  'connector' = 'mysql-cdc'," +"  'hostname' = 'hadoop101'," +"  'port' = '3306'," +"  'username' = 'root'," +"  'password' = '123456'," +"  'database-name' = 'gmall2022_realtime'," +"  'table-name' = 't_user'" +")");tableEnv.executeSql("select * from user_info").print();//TODO 6.执行任务env.execute();}}

http://chatgpt.dhexx.cn/article/B7eOBodp.shtml

相关文章

数仓开发之DWD层(二)

目录 三&#xff1a;流量域用户跳出事务事实表 3.1 主要任务 3.2 思路分析 3.3 图解 3.4 代码 四&#xff1a;交易域加购事务事实表 4.1 主要任务 4.2 思路分析 4.3 图解 4.4 代码 三&#xff1a;流量域用户跳出事务事实表 3.1 主要任务 过滤用户跳出明细数据。 3.2 思…

电商数仓(dwd 层)

一、dwd 层介绍 1、对用户行为数据解析。 2、对核心数据进行判空过滤。 3、对业务数据采用维度模型重新建模&#xff0c;即维度退化。 二、dwd 层用户行为数据 2.1 用户行为启动表 dwd_start_log 1、数据来源 ods_start_log -> dwd_start_log 2、表的创建 drop table…

详解数仓中的数据分层:ODS、DWD、DWM、DWS、ADS

何为数仓DW Data warehouse(可简写为DW或者DWH)数据仓库,是在数据库已经大量存在的情况下,它是一整套包括了etl、调度、建模在内的完整的理论体系。 数据仓库的方案建设的目的,是为前端查询和分析作为基础,主要应用于OLAP(on-line Analytical Processing),支持复杂的分析…

万字详解数仓分层设计架构 ODS-DWD-DWS-ADS

一、数仓建模的意义&#xff0c;为什么要对数据仓库分层&#xff1f; 只有数据模型将数据有序的组织和存储起来之后&#xff0c;大数据才能得到高性能、低成本、高效率、高质量的使用。 1、分层意义 1&#xff09;清晰数据结构&#xff1a;每一个数据分层都有它的作用域&#x…

数仓开发之DWD层(四)

目录 十一&#xff1a;工具域优惠券领取事务事实表 11.1 主要任务&#xff1a; 11.2 思路分析&#xff1a; 11.3 图解&#xff1a; 十二&#xff1a;工具域优惠券使用&#xff08;下单&#xff09;事务事实表 12.1 主要任务&#xff1a; 12.2 思路分析&#xff1a; 12.3…

数仓开发之DWD层(三)

&#xff08;附&#xff1a;由于篇幅原因&#xff0c;这里就不在展示代码了&#xff0c;直接告诉大家思路&#xff09; 目录 五&#xff1a;交易域订单预处理表 5.1 主要任务 5.2 思路分析 5.3 图解 六&#xff1a;交易域下单事务事实表 6.1 主要任务&#xff1a; 6.2 …

数仓开发之DWD层(一)

目录 一&#xff1a;流量域未经加工的事务事实表 1.1 主要任务 1.2 思路 1.3 图解 1.4 代码 二&#xff1a;流量域独立访客事务事实表 2.1 主要任务 2.2 思路分析 2.3 图解 2.4 代码 DWD层设计要点&#xff1a; &#xff08;1&#xff09;DWD层的设计依据是维度建模理论&…

数据仓库之DWD层

DWD&#xff08;Data WareHouse Detail&#xff09;数据明细层&#xff0c;主要是将从业务数据库中同步过来的ODS层数据进行清洗和整合成相应的事实表。事实表作为数据仓库维度建模的核心&#xff0c;需要紧紧围绕着业务过程来设计。在拿到业务系统的表结构后&#xff0c;进行大…

数仓建设 | ODS、DWD、DWM等理论实战(好文收藏)

本文目录&#xff1a; 一、数据流向 二、应用示例 三、何为数仓DW 四、为何要分层 五、数据分层 六、数据集市 七、问题总结 导读 数仓在建设过程中&#xff0c;对数据的组织管理上&#xff0c;不仅要根据业务进行纵向的主题域划分&#xff0c;还需要横向的数仓分层规范。本文…

数仓及其维度(分层)建模(ODS DWD DWS DWT ADS)

一. 数仓及其维度 1. 什么是数仓&#xff1f; 数据仓库&#xff0c;简称数仓,&#xff08; Data Warehouse &#xff09;。从逻辑上理解&#xff0c;数据库和数仓没有区别&#xff0c;都是通过数据库软件实现存放数据的地方&#xff0c;只不过从数据量来说&#xff0c;数据仓库…

[数据仓库]分层概念,ODS,DM,DWD,DWS,DIM的概念

目录 前言&#xff1a; 一. 各种名词解释 1.1 ODS是什么&#xff1f; 1.2 数据仓库层DW&#xff1f; 1.2.1 DWD明细层? 1.2.2 DWM 轻度汇总层(MID或DWB, data warehouse basis) 1.2.3 DWS 主题层(DM&#xff0c;data market或DWS, data warehouse service) 1.3 APP&…

详解数据仓库和数据集市:ODS、DW、DWD、DWM、DWS、ADS

一、数据流向 二、应用示例 三、何为数仓DW Data warehouse&#xff08;可简写为DW或者DWH&#xff09;数据仓库&#xff0c;是在数据库已经大量存在的情况下&#xff0c;它是一整套包括了etl、调度、建模在内的完整的理论体系。 数据仓库的方案建设的目的&#xff0c;是为前端…

数据仓库分层DWD、DWB、DWS

DW &#xff1a;data warehouse 翻译成数据仓库 DW数据分层&#xff0c;由下到上为 DWD,DWB,DWS DWD&#xff1a;data warehouse detail 细节数据层&#xff0c;有的也称为 ODS层&#xff0c;是业务层与数据仓库的隔离层 DWB&#xff1a;data warehouse base 基础数据层&#x…

数据分层详解ODS、DWD、DWM、DWS、ADS

详解数仓中的数据分层&#xff1a;ODS、DWD、DWM、DWS、ADS 何为数仓DW Data warehouse&#xff08;可简写为DW或者DWH&#xff09;数据仓库&#xff0c;是在数据库已经大量存在的情况下&#xff0c;它是一整套包括了etl、调度、建模在内的完整的理论体系。 数据仓库的方案建…

简单搞定数仓搭建:数仓模型(DWD)

明细粒度事实层&#xff08;DWD&#xff09; 明细粒度事实层以业务过程驱动建模&#xff0c;基于每个具体的业务过程特点&#xff0c;构建最细粒度的明细层事实表。您可以结合企业的数据使用特点&#xff0c;将明细事实表的某些重要维度属性字段做适当冗余&#xff0c;即宽表化…

数据仓库和数据集市详解:ODS、DW、DWD、DWM、DWS、ADS

数据流向 应用示例 何为数仓DW Data warehouse&#xff08;可简写为DW或者DWH&#xff09;数据仓库&#xff0c;是在数据库已经大量存在的情况下&#xff0c;它是一整套包括了etl、调度、建模在内的完整的理论体系。 数据仓库的方案建设的目的&#xff0c;是为前端查询和分析…

六、Sails中执行存储过程模拟Waterline的Create插入数据

文章目录 创建 baseCreate 存储过程参数设置Prepared StatementsLAST_INSERT_ID和IDENTITY 模拟WaterlinesendNativeQuery规划密钥处理转换字段名称和字段值返回数据处理修改控制器代码datetime bugmysql库中对数据库字段类型定义customToJSON postman自动化测试 清楚Waterline…

oracle cdr是什么,CDRD TALK|全栈架构Sails.js简介

原标题&#xff1a;CDRD TALK|全栈架构Sails.js简介 Sails.js是一个可伸缩的、数据驱动的、面向服务的现代App架构。它致力于构建基于Node.js服务的定制化企业级应用。在Sails.js之前&#xff0c;构建一个实用的产品级Node.js应用的时间成本通常以月为单位计算。但是使用Sails.…

node-sails后台搭建

这个就直接简单搭建最基本的后台了 一、安装 安装sails npm i sails sails -v //检测版本 创建空项目 sails new my-app 安装数据库 cd my-app npm install sails-mysql -save 二、文件配置 Datastores.js 里面的数据库配置url Local.js里面port :1448端口 服务启动的端口 …

三、以user表为例,用Amis+Sails实现增删改查操作

文章目录 CRUD 组件查查询api分页fetcher参数观察统一处理method分页参数提交到后端自定义分页和页面大小&#xff08;pageSize&#xff09; 搜索排序头部工具条列折叠按钮刷新和导出excel自定义内容 删单条删除批量删除 增新增数据headerToolbar 结果分析前端数据格式要求 改数…