Hudi(四)集成Flink(2)

article/2025/8/21 9:37:46

6、读取方式

6.1、流读(Streaming Query)

        当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

1、WITH参数

名称

Required

默认值

说明

read.streaming.enabled

false

false

设置 true 开启流读模式

read.start-commit

false

最新 commit

指定 'yyyyMMddHHmmss' 格式的起始 commit(闭区间)

read.streaming.skip_compaction

false

false

流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:

1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费)

2) changelog 模式下保证语义正确性

0.11开始,以上两个问题已经通过保留 compaction instant time 修复

clean.retain_commits

false

10

cleaner最多保留的历史commits数,大于此数量的历史commits会被清理掉,changelog模式下,这个参数可以控制changelog的保留时间,例如checkpoint周期为5分钟一次,默认最少保留50分钟的时间。

        注意:当参数read.streaming.skip_compaction打开并且streaming reader消费落后于clean.retain_commits数时,流读可能会丢失数据。从0.11开始,compaction不会再变更record的 instant time,因此理论上数据不会再重复消费,但是还是会重复读取并丢弃,因此额外的开销还是无法避免,对性能有要求的话还是可以开启此参数。 

CREATE TABLE t5(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
) WITH ('connector' = 'hudi','path' = '/tmp/hudi_flink/t5','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4'   -- 默认60s
);insert into t5 select * from sourceT;select * from t5;

6.2、增量读取(Incremental Query)

        从 0.10.0 开始支持。

        如果有增量读取 batch 数据的需求,增量读取包含三种场景。

        (1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;

        (2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit

        (3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

        WITH 参数

名称

Required

默认值

说明

read.start-commit

false

默认从最新 commit

支持 earliest 从最早消费

read.end-commit

false

默认到最新 commit

7、限流

        如果将全量数据(百亿数量级)和增量先同步到kafka,再通过flink流式消费的方式将库表数据直接导成hoodie表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的partition随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。
        WITH参数

名称

Required

默认值

说明

write.rate.limit

false

0

默认关闭限速

8、写入方式

8.1、CDC数据同步

        CDC数据保存了完整的数据库变更,当前可通过两种途径将数据导入hudi:

        第一种:通过cdc-connector直接对接DB的binlog将数据导入hudi,优点是不依赖消息队列,缺点是对db server造成压力。
        第二种:对接cdc format消费kafka数据导入hudi,优点是可扩展性强,缺点是依赖kafka。
        注意:如果上游数据无法保证顺序,需要指定write.precombine.field字段

8.2、离线批量导入

        如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

1、原理

  1. 批量导入省去了avro的序列化以及数据的merge过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
  2. bulk_insert需要在Batch Execuiton Mode下执行更高效,Batch模式默认会按照partition path排序输入消息再写入Hoodie,避免file handle频繁切换导致性能下降。
    1. SET execution.runtime-mode=batch;
    2. SET execution.checkpointing.interval=0;
  3. bulk_insert write task的并发通过参数write.tasks指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task的并发数就是划分的bucket数,当然每个bucket在写到文件大小上限(parquet 120MB)的时候会rollover到新的文件句柄,所以最后:写文件数量>=bulk_insert write task数。

2、WITH参数

名称

Required

默认值

说明

write.operation

true

upsert

配置 bulk_insert 开启该功能

write.tasks

false

4

bulk_insert 写 task 的并发,最后的文件数 >=write.tasks

write.bulk_insert.shuffle_by_partition

write.bulk_insert.shuffle_input

(从 0.11 开始)

false

true

是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险

write.bulk_insert.sort_by_partition

write.bulk_insert.sort_input

(从 0.11 开始)

false

true

是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量

write.sort.memory

128

sort 算子的可用 managed memory(单位 MB)

8.3、全量接增量

        如果已经有全量的离线Hoodie表,需要接上实时写入,并且保证数据不重复,可以开启index bootstrap功能。
        如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

        WITH参数

名称

Required

默认值

说明

index.bootstrap.enabled

true

false

开启索引加载,会将已存表的最新数据一次性加载到 state 中

index.partition.regex

false

*

设置正则表达式进行分区筛选,默认为加载全部分区

9、写入模式

9.1、Changelog模式

        如果希望Hoodie保留消息的所有变更(I/-U/U/D),之后接上Flink引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie的MOR表通过行存原生支持保留消息的所有变更(format层面的集成),通过流读MOR表可以消费到所有的变更记录。

1、WITH参数

名称

Required

默认值

说明

changelog.enabled

false

false

默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。

        批(快照)读仍然会合并所有的中间结果,不管format是否已存储中间状态。

        开启changelog.enabled参数后,中间的变更也只是Best Effort:异步的压缩任务会将中间变更合并成1条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的buffer时间可以预留一定的时间buffer给reader,比如调整压缩的两个参数:

        compaction.delta_commits:5 

        compaction.delta_seconds: 3600

        说明:
        Changelog 模式开启流读的话,要在 sql-client 里面设置参数:
        set sql-client.execution.result-mode=tableau; 
        或者
        set sql-client.execution.result-mode=changelog;
        否则中间结果在读的时候会被直接合并。(参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#running-sql-queries)

2、流读 changelog

        仅在0.10.0支持,本feature为实验性。

        开启changelog模式后,hudi会保留一段时间的changelog供下游consumer消费,我们可以通过流读ODS层changelog接上ETL逻辑写入到DWD层,如下图的pipeline:

        流读的时候我们要注意changelog有可能会被compaction合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。

3、案例演示

-- 使用changelog
CREATE TABLE t6(id int PRIMARY KEY NOT ENFORCED,age INT
) WITH ('connector' = 'hudi','path' = '/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4','changelog.enabled' = 'true'
);insert into t6 values(1,1);
insert into t6 values(1,2);select * from t6;  可以获取最新的数据,一条
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 可以获取2条-- 不使用changelog
CREATE TABLE t6_v(id int PRIMARY KEY NOT ENFORCED,age INT
) WITH ('connector' = 'hudi','path' = '/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4'
);
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 只会获取1条,读时合并

9.2、Append模式

        从0.10开始支持
        对于INSERT模式:

  • MOR默认会apply小文件策略:会追加写avro log文件
  • COW每次直接写新的parquet文件,没有小文件策略

        Hudi支持丰富的Clustering策略,优化INSERT模式下的小文件问题:

1、Inline Clustering

        只有Copy On Write表支持该模式

名称

Required

默认值

说明

write.insert.cluster

false

false

是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响

2、Async Clustering

        从 0.12 开始支持

(1)WITH参数

名称

Required

默认值

说明

clustering.schedule.enabled

false

false

是否在写入时定时异步调度 clustering plan,默认关闭

clustering.delta_commits

false

4

调度 clsutering plan 的间隔 commits,

clustering.schedule.enabled 为 true 时生效

clustering.async.enabled

false

false

是否异步执行 clustering plan,默认关闭

clustering.tasks

false

4

Clustering task 执行并发

clustering.plan.strategy.target.file.max.bytes

false

1024 * 1024 * 1024

Clustering 单文件目标大小,默认 1GB

clustering.plan.strategy.small.file.limit

false

600

小于该大小的文件才会参与 clustering,默认600MB

clustering.plan.strategy.sort.columns

false

N/A

支持指定特殊的排序字段

clustering.plan.partition.filter.mode

false

NONE

支持

NONE:不做限制

RECENT_DAYS:按时间(天)回溯

SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

(2)Clustering Plan Strategy

        支持定制化的 clustering 策略。

名称

Required

默认值

说明

clustering.plan.partition.filter.mode

false

NONE

支持

  • NONE:不做限制
  • RECENT_DAYS:按时间(天)回溯
  • SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

clustering.plan.strategy.cluster.begin.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定开始 partition(inclusive)

clustering.plan.strategy.cluster.end.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定结束 partition(incluseve)

clustering.plan.strategy.partition.regex.pattern

false

N/A

正则表达式过滤 partitions

clustering.plan.strategy.partition.selected

false

N/A

显示指定目标 partitions,支持逗号 , 分割多个 partition

10、Bucket索引

        从0.11开始支持

        默认的flink流式写入使用state存储索引信息:primarykey到fileId的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket索引通过固定的hash策略,将相同key的数据分配到同一个fileGroup中,避免了索引的存储和查询开销。

1、WITH参数

名称

Required

默认值

说明

index.type

false

FLINK_STATE

设置BUCKET开启Bucket索引功能

hoodie.bucket.index.hash.field

false

主键

可以设置成主键的子集

hoodie.bucket.index.num.buckets

false

4

默认每个partition的bucket数,当前设置后则不可再变更。

2、和state索引的对比:
(1)bucket index没有state的存储计算开销,性能较好。
(2)bucket index无法扩buckets,state index则可以依据文件的大小动态扩容。
(3)bucket index不支持跨partition的变更(如果输入是cdc流则没有这个限制),state index没有限制。

11、Hudi Catalog

        将表的元数据持久化。从0.12.0开始支持,通过catalog可以管理flink创建的表,避免重复建表操作,另外hms模式的catalog支持自动补全hive同步参数。

        DFS模式Catalog SQL样例:

CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默认路径}','mode'='dfs' );

         Hms模式Catalog SQL样例:

CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默认路径}','hive.conf.dir' = '${hive-site.xml 所在的目录}','mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性);	

1、With参数

名称

Required

默认值

说明

catalog.path

true

--

默认的 catalog 根路径,用作表路径的自动推导,默认的表路径:${catalog.path}/${db_name}/${table_name}

default-database

false

default

默认的 database 名

hive.conf.dir

false

--

hive-site.xml 所在的目录,只在 hms 模式下生效

mode

false

dfs

支持 hms模式通过 hive 管理元数据

table.external

false

false

是否创建外部表,只在 hms 模式下生效

2、使用dfs方式 

1、创建sql-client初始化sql文件

vim /opt/module/flink-1.13.2/conf/sql-client-init.sqlCREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '/tmp/hudi_catalog','mode'='dfs' );USE CATALOG hoodie_catalog;

2、 指定sql-client启动时加载sql文件

hadoop fs -mkdir /tmp/hudi_catalogbin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

需要先创建库 

3、建库建表插入

create database test;
use test;create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20),
primary key (uuid) not enforced
)
with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t2','table.type' = 'MERGE_ON_READ'
);insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');

4、退出sql-client,重新进入,表信息还在

use test;
show tables;
select * from t2;


http://chatgpt.dhexx.cn/article/a3ltBPHh.shtml

相关文章

Spring Boot锦集(三):Spring Boot整合Kafka | Zookeeper/Kafka的安装和配置 | 总结的很详细

前言 在学习本章节前,务必做好以下准备工作: 1、安装并启动了Zookeeper[官网],如需帮助,点击进入; 2、安装并启动了Kafka[官网],如需帮助,点击进入。 注:zk和kafka的安装与介绍&…

Flink系列之:Flink CDC深入了解MySQL CDC连接器

Flink系列之:Flink CDC深入了解MySQL CDC连接器 一、增量快照特性1.增量快照读取2.并发读取3.全量阶段支持 checkpoint4.无锁算法5.MySQL高可用性支持 二、增量快照读取的工作原理三、全量阶段分片算法四、Chunk 读取算法五、Exactly-Once 处理六、MySQL心跳事件支持…

大数据面试重点之kafka(三)

Kafka如何保证全局有序? 可回答:1)Kafka消费者怎么保证有序性?2)Kafka生产者写入数据怎么保证有序?3)Kafka可以保证 数据的局部有序,如何保证数据的全局有序?4&#xff0…

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

文章目录 官方说明参数解读CodePOM依赖配置文件生产者消费者单元测试测试earliestlatest(默认)noneexception 源码地址 官方说明 https://kafka.apache.org/documentation/ 选择对应的版本,我这里选的是 2.4.X https://kafka.apache.org/24/documenta…

Kafka之auto.offset.reset值解析

今日在使用kafka时,发现将 auto.offset.reset 设置为earliest、latest、none 都没有达到自己预期的效果。 earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费latest&…

关于EarlyZ

在前向渲染中,ZTest是在Fragement Shader之后进行的,也就是说,被遮挡的部分也要绘制FS,就产生了Over Draw,其实很费,Early Z Culling就解决了这个问题 Early fragment tests, as an optimization, exist t…

【EARLIER/EARLIEST函数】引用不存在的更早的行上下文 报错解决

引用PowerQuery的例子并给予个人理解 X1 SUMX(FILTER(Data,Data[订单日期]>EARLIER(Data[订单日期])),[金额])---WRONG X2CALCULATE(SUM(Data[金额]),FILTER(Data,SUMX(FILTER(Data,Data[订单日期]>EARLIER(Data[订单日期])),[金额])))---RIGHT X1报错原因&#xff1a…

EarlyStop

在训练中,我们希望在中间箭头的位置停止训练。而Early stopping就可以实现该功能,这时获得的模型泛化能力较强,还可以得到一个中等大小的w的弗罗贝尼乌斯范数。其与L2正则化相似,选择参数w范数较小的神经网络。 可以用L2正则化代…

Kafka 使用java api从指定位移消费 (从开头消费/从结尾消费)

一、auto.offset.reset值详解 在 Kafka 中,每当消费者组内的消费者查找不到所记录的消费位移或发生位移越界时,就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为 “latest” 。 auto.offset…

动态SQL之 where 标签

动态SQL之 where 标签 where和if一般结合使用: 1.若where标签中的 if 条件都不满足,则where标签没有任何功能,即不会添加where关键字 2.若where标签中的 if 条件满足,则where标签会自动添加where关键字,并将条件最前…

mybatis-动态sql

文章目录 1. 动态sql简述2. 动态sql示例 2.1 if2.2 choose2.3 foreach2.4 sql 及 include2.5 sql中的特殊字符3. 后台分页实现4. 数据版本号处理并发问题 1. 动态sql简述 mybatis的动态sql语句是基于OGNL表达式的。可以方便的在sql语句中实现某些逻辑. 总体说来mybatis动态SQL…

mysql动态sql拼接_动态SQL(拼接)

Q1:什么是动态SQL呢? A1:首先是SQL语句,是根据条件来拼接SQL Q2:为什么要用动态SQL? A2:因为在条件WHERE中出现OR会导致不能使用索引,从而使效率差别巨大。 例如:如图1、2, 图(1) 图(2) Q3:怎么样使用动态SQL? A3: 存储过程Proc_Test是没有采用拼接的:CREATE PROC…

Mybatis学习之动态Sql

目录 1. 什么是动态Sql 2. 动态Sql需要学习什么 3. 动态Sql之《if》 4. 动态Sql之《where》 5. 动态Sql之《foreach》 6. 动态Sql之《sql》 7. PageHelper分页插件的使用 1. 什么是动态Sql 答案:动态Sql指的是,Sql语句是变化的,不是固…

Mybatis 动态SQL

Mybatis 动态SQL 一 .动态SQL 数组 array 使用foreach 标签 <!-- mybatis的集合操作知识点: 如果遇到集合参数传递,需要将集合遍历标签: foreach 循环遍历集合标签属性说明:1.collection 表示遍历的集合类型1.1 数组 关键字 array1.2 List集合 关键字 list1.3 Map集…

Mybatis动态SQL解析

文章目录 1 为什么需要动态SQL&#xff1f;2 动态标签有哪些?3 举例说明ifchoose (when, otherwise)trim (where, set)foreach 1 为什么需要动态SQL&#xff1f; 看一段Oracle存储过程代码&#xff1a; 由于前台传入的查询参数不同&#xff0c;所以写了很多的if else&#x…

Java MyBatis动态SQL

&#x1f9ed;MyBatis学习 &#x1f389; 内容回顾 Java MyBatis的介绍及其执行原理 Java MyBatis配置详解 Java Mybatis中使用Junit进行测试_程序员必备 Java MyBatis的使用 &#x1f4e2;今天我们进行 Java MyBatis动态SQL 的学习&#xff0c;感谢你的阅读&#xff0c;内容若…

mysql动态SQL用法

顾名思义“动态”SQL就是不固定的SQL&#xff0c;根据不同的条件把SQL语句进行拼接&#xff0c;来实现对数据库更加精准的操作。可以通过配置文件或者注解的形式实现&#xff0c;多用于多条件联查。 xml版&#xff08;配置文件&#xff09;&#xff1a; xml版的动态SQL 接…

动态SQL标签

所谓的动态SQL&#xff0c;本职还是SQL语句&#xff0c;只是可以在SQL层面&#xff0c;去执行一个逻辑代码 动态SQL就是在拼接SQL语句&#xff0c;我们只要保证SQL的正确性&#xff0c;按照SQL的格式&#xff0c;去排列组合。 建议&#xff1a; 先在MySQL中写出完整的SQL&am…

MyBatis动态SQL

文章目录 前言一、\<if\>标签二、\<where\>标签三、\<trim\>标签四、\<set\>标签五、\<foreach\>标签五、\<sql\>标签 与 \<include\>标签 前言 动态sql是Mybatis的强大功能特性之一&#xff0c;能够完成不同条件下的sql拼接 以上…

动态SQL

动态SQL 在项目开发中&#xff0c;动态SQL可以解决很多不确定因素导致的SQL语句不同的问题。动态SQL可以简单高效的进行编码。在接下来的案例中进行认识和学习动态SQL。 动态SQL只是在原有的SQL语句中进行细微修改。案例贴合实际&#xff0c;编码简单易懂 文章目录 动态SQL一、…