9、NIFI综合应用场景-通过NIFI配置kafka的数据同步

article/2025/8/15 19:46:45

Apache NiFi系列文章

1、nifi-1.9.2介绍、单机部署及简单验证
2、NIFI应用示例-GetFile和PutFile应用
3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看
4、集群部署及验证、监控及节点管理
5、NiFi FileFlow示例和NIFI模板示例
6、NIFI应用场景-离线同步Mysql数据到HDFS中
7、NIFI综合应用场景-将mysql查询出的json数据转换成txt后存储至HDFS中
8、NIFI综合应用场景-NiFi监控MySQL binlog进行实时同步到hive
9、NIFI综合应用场景-通过NIFI配置kafka的数据同步


文章目录

  • Apache NiFi系列文章
  • 一、处理器说明
    • 1、处理器说明
      • 1)、PublishKafka_0_10
        • 1、描述
        • 2、属性配置
      • 2)、ConsumeKafka_0_10
        • 1、描述
        • 2、属性配置
  • 二、Producer生产
    • 1、创建并配置处理器GenerateFlowFile
    • 2、创建并配置处理器PublishKafka_0_10
    • 3、配置GenerateFlowFile和PublishKafka_0_10连接
    • 4、负载均衡并发
    • 5、验证
  • 三、Consumer消费
    • 1、创建并配置ConsumeKafka_0_10处理器并连接
    • 2、验证


本文旨在介绍nifi与kafka的交互过程,即生产数据到kafka中,然后通过nifi消费kafka中的数据。
本文前提是nifi、kafka环境正常。
本文分为三个部分,即处理器说明、生产数据到kafka中以及消费kafka中的数据。

一、处理器说明

1、处理器说明

1)、PublishKafka_0_10

1、描述

使用Kafka 0.10.x Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的定界符(例如换行符)进行定界。用于获取消息的辅助NiFi处理器是ConsumeKafka_0_10。

2、属性配置

在这里插入图片描述

2)、ConsumeKafka_0_10

1、描述

消耗来自专门针对Kafka 0.10.x Consumer API构建的Apache Kafka的消息。用于发送消息的辅助NiFi处理器是PublishKafka_0_10。

2、属性配置

在下面的列表中,列出属性及其默认值,以及属性是否支持NiFi表达式语言
在这里插入图片描述

二、Producer生产

1、创建并配置处理器GenerateFlowFile

创建处理器组kafka,进入组后创建GenerateFlowFile处理器。
每1秒生产一次数据。
在这里插入图片描述

  • 文件大小100b
  • 每次生成10个相同文件
  • 每次生成的流文件内容唯一
    在这里插入图片描述

2、创建并配置处理器PublishKafka_0_10

创建处理器组kafka,进入组后创建PublishKafka_0_10处理器。

  • Brokers设置为192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092。图为示例。
  • topic设置为nifi-topic,如果topic不存在,会自动创建
  • Delivery Guarantee,对应kafka的acks机制,选择最为安去的Guarantee Replicated Delivery,相当于acks=all
    在这里插入图片描述
    在这里插入图片描述

3、配置GenerateFlowFile和PublishKafka_0_10连接

连接GenerateFlowFile和PublishKafka_0_10
在这里插入图片描述

4、负载均衡并发

在这里插入图片描述

5、验证

启动并查看监听kafka消费数据,也可以通过http://server1:8048/topic/meta/nifi-kafka/工具查看生产的数据
在kafka所在服务器执行监听命令:

/usr/local/bigdata/kafka_2.11-0.10.2.1/bin/kafka-console-consumer.sh --bootstrap-server  192.168.10.41:9092 --topic nifi-topic以下为示例数据
[root@server1 bin]# kafka-console-consumer.sh --bootstrap-server  server1:9092 --topic nifi-kafka
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
cHug@pl13g      w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?
cHug@pl13g      w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?
cHug@pl13g      w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?
cHug@pl13g      w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?

三、Consumer消费

1、创建并配置ConsumeKafka_0_10处理器并连接

  • Brokers地址要和Producer的设置一样:Brokers设置为192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092。图为示例。
  • Topic设置和Producer一致:nifi-topic
  • GroupId随意设置:nifi
  • Offset Reset设置为:latest,从最新的消息开始消费
    在这里插入图片描述

2、验证

启动生产者、消费者,验证nifi是否将数据写入kafka、并且kafka的数据是否被消费。
以下为模板界面。
在这里插入图片描述
以上完成了nifi读取kafka中的数据(消费)。类似的也可以通过nifi将数据写入到nifi中,此处不再赘述。


http://chatgpt.dhexx.cn/article/JQngB5Ob.shtml

相关文章

Apache NiFi 入门指南

本指南使用于谁? 本指南适用于从未使用过,在NiFi中有限度接触或仅完成特定任务的用户。本指南不是详尽的说明手册或参考指南。“ 用户指南”提供了大量信息,旨在提供更加详尽的资源,并且作为参考指南非常有用。相比之下&#xff…

2、NIFI应用示例-GetFile和PutFile应用

Apache NiFi系列文章 1、nifi-1.9.2介绍、单机部署及简单验证 2、NIFI应用示例-GetFile和PutFile应用 3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看 4、集群部署及验证、监控及节点管理 5、NiFi FileFlow示例和NIFI模板示例 6、NIFI应用场景-离线同步Mys…

大数据NiFi(三):NiFi关键特性

文章目录 NiFi关键特性 一、​​​​​​​​​​​​​​流管理

NiFi学习笔记

目录 NiFi概念 NiFi是什么 Apache NiFi 包括以下功能 NIFI核心概念 NiFi架构 NiFi入门 常用术语 下载安装NiFi 启动和关闭NIFI NIFI处理器 查看处理器 常用处理器 配置处理器 其他组件 应用场景 1.添加和配置第一个处理器GetFile 2.添加第二个处理器PutFile NiF…

NiFi的简介

使用java开发的一个开源项目,数据处理工具 1.简介: NiFi 是一个易于使用、功能强大而且可靠的流式数据处理和分发系统。NiFi 是为数据流设计,支持从多种数据源动态的拉取数据,并基于WEB图形界面,通过拖拽、连接、配置…

Nifi的入门使用

Nifi的使用 1.官方文档2.Nifi简介3.简单使用4.Template 使用nifi前,需要知道ETL在做什么,如果源端和目标端栏位不匹配,就需要用到小帮手, 让你更直观的了解映射关系,才能更好的构建DataFlow 第一步:Nifi开发…

NiFi【部署 01】NiFi最新版本1.18.0下载安装配置启动及问题处理(一篇学会部署NiFi)

Apache NIFI中文文档 地址:https://nifichina.github.io/ 1.简介 官网的介绍: An easy to use, powerful, and reliable system to process and distribute data. 一个易用、功能强大、可靠的处理和分发数据的系统。 来自网络的介绍: 2006…

5、NiFi FileFlow示例和NIFI模板示例

Apache NiFi系列文章 1、nifi-1.9.2介绍、单机部署及简单验证 2、NIFI应用示例-GetFile和PutFile应用 3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看 4、集群部署及验证、监控及节点管理 5、NiFi FileFlow示例和NIFI模板示例 6、NIFI应用场景-离线同步Mys…

大数据NiFi(二):NiFi架构

文章目录 NiFi架构 一、​​​​​​​NiFi核心概念

Nifi:nifi的基本使用

Nifi的安装使用 爱购物 www.cqfenfa.com Nifi安装 首先说一下Nifi的安装,这里Nifi可以支持Windows版和Linux,只需要去官网:http://nifi.apache.org/ 根据自己需要的版本,选择下载,然后安装解压就行 各目录及主要文件…

大数据NiFi(一):什么是NiFi

文章目录 什么是NiFi 一、NiFi背景介绍

Nifi介绍、安装、实践案例

第1章NiFi基本概念 1.1 概述 简单地说,NiFi是为了自动化系统之间的数据流而构建的。虽然术语“数据流”在各种环境中使用,但我们在此处使用它来表示系统之间自动化和管理的信息流。这个问题空间一直存在,因为企业有多个系统,其中…

NiFi技术干货

第1章 NiFi概述 1.1 NiFi是什么 简单的说,NiFi就是为了解决不同系统间数据自动流通问题而建立的。虽然dataflow这个术语在各种场景都有被使用,但我们在这里使用它来表示不同系统间的自动化的可管理的信息流。自企业拥有多个系统开始,一些系…

NiFi 基本概念

NiFi基本概念 一. NiFi是什么 Apache NiFi 是一个易于使用, 功能强大且可靠的系统, 用于处理和分发数据。可以自动化管理系统间的数据流。它使用高度可配置的指示图来管理数据路由, 转换和系统中介逻辑, 支持从多种数据源动态拉取数据。NiFi 原来是 NSA(美国国家安全局) 的一…

大数据Nifi简介

目录 1 NIFI简介2 NIFI核心概念3 NIFI构架3.1 网络服务器3.2 流控制器3.3 扩展3.4 FlowFile存储库3.5 内容存储库3.6 源头存储库 1 NIFI简介 Apache NiFi 是一个易于使用,功能强大且可靠的系统,用于处理和分发数据。可以自动化管理系统间的数据流。它支…

springboot error at line 1, column 5. Encountered: “\uff01“ (65281), after : ““

出现这种错误表面mybatis配置文件中sql查询语句中出现了中文字符,需要将对应的中文字符修改为英文。 ERROR 10500 --- [nio-8081-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] …

编码GBK的不可映射字符 和 错误: 非法字符: ‘\uff1b‘

题目:解决编码GBK的不可映射字符 和 错误: 非法字符: ‘\uff1b’ 解决编码GBK的不可映射字符 1)首先,在d盘所示的目录新建一个文本文件(记事本),保存时字符集为utf-8 2)输入(d:&…

如何处理“错误: 非法字符: ‘\uff1b‘”

解决方法:把原代码中的中文分号改为英文分号。 注意事项:中英文切换时应尤其注意符号问题。

Lexical error at line 1, column 20. Encountered: “\uff01“ (65281), after : ““

问题&#xff1a; 测试myBatis的动态SQL的时候&#xff0c;控制台提示我出现了&#xff1a;Lexical error at line 1, column 20. Encountered: "\uff01" (65281), after : "" 的错误 原因&#xff1a; 在<if test"sex !null and sex &#xff0…

深度学习之格式转换笔记(三):keras(.hdf5)模型转TensorFlow(.pb) 转TensorRT(.uff)格式

环境&#xff1a; tensorflow1.15&#xff0c;cuda10.0&#xff0c;cudnn7.6.4 将keras训练好保存的.hdf5格式模型转为tensorflow的.pb模型&#xff0c;然后转为tensorrt支持的uff格式。 keras(.hdf5)模型转TensorFlow(.pb) # h5_to_pb.pyfrom keras.models import load_mod…