Apache NiFi系列文章
1、nifi-1.9.2介绍、单机部署及简单验证
2、NIFI应用示例-GetFile和PutFile应用
3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看
4、集群部署及验证、监控及节点管理
5、NiFi FileFlow示例和NIFI模板示例
6、NIFI应用场景-离线同步Mysql数据到HDFS中
7、NIFI综合应用场景-将mysql查询出的json数据转换成txt后存储至HDFS中
8、NIFI综合应用场景-NiFi监控MySQL binlog进行实时同步到hive
9、NIFI综合应用场景-通过NIFI配置kafka的数据同步
文章目录
- Apache NiFi系列文章
- 一、处理器说明
- 1、处理器说明
- 1)、PublishKafka_0_10
- 1、描述
- 2、属性配置
- 2)、ConsumeKafka_0_10
- 1、描述
- 2、属性配置
- 二、Producer生产
- 1、创建并配置处理器GenerateFlowFile
- 2、创建并配置处理器PublishKafka_0_10
- 3、配置GenerateFlowFile和PublishKafka_0_10连接
- 4、负载均衡并发
- 5、验证
- 三、Consumer消费
- 1、创建并配置ConsumeKafka_0_10处理器并连接
- 2、验证
本文旨在介绍nifi与kafka的交互过程,即生产数据到kafka中,然后通过nifi消费kafka中的数据。
本文前提是nifi、kafka环境正常。
本文分为三个部分,即处理器说明、生产数据到kafka中以及消费kafka中的数据。
一、处理器说明
1、处理器说明
1)、PublishKafka_0_10
1、描述
使用Kafka 0.10.x Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的定界符(例如换行符)进行定界。用于获取消息的辅助NiFi处理器是ConsumeKafka_0_10。
2、属性配置
2)、ConsumeKafka_0_10
1、描述
消耗来自专门针对Kafka 0.10.x Consumer API构建的Apache Kafka的消息。用于发送消息的辅助NiFi处理器是PublishKafka_0_10。
2、属性配置
在下面的列表中,列出属性及其默认值,以及属性是否支持NiFi表达式语言
二、Producer生产
1、创建并配置处理器GenerateFlowFile
创建处理器组kafka,进入组后创建GenerateFlowFile处理器。
每1秒生产一次数据。
- 文件大小100b
- 每次生成10个相同文件
- 每次生成的流文件内容唯一
2、创建并配置处理器PublishKafka_0_10
创建处理器组kafka,进入组后创建PublishKafka_0_10处理器。
- Brokers设置为192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092。图为示例。
- topic设置为nifi-topic,如果topic不存在,会自动创建
- Delivery Guarantee,对应kafka的acks机制,选择最为安去的Guarantee Replicated Delivery,相当于acks=all
3、配置GenerateFlowFile和PublishKafka_0_10连接
连接GenerateFlowFile和PublishKafka_0_10
4、负载均衡并发
5、验证
启动并查看监听kafka消费数据,也可以通过http://server1:8048/topic/meta/nifi-kafka/工具查看生产的数据
在kafka所在服务器执行监听命令:
/usr/local/bigdata/kafka_2.11-0.10.2.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.41:9092 --topic nifi-topic以下为示例数据
[root@server1 bin]# kafka-console-consumer.sh --bootstrap-server server1:9092 --topic nifi-kafka
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
B,,tqZu@a90txGGdx^M7eM*$MXiHL>RX5H*3i1YAJiVq,qy_?Bp'1VV)RdquZOdq>xg6^l#5^iinGAf0Z@Zb+=0i9ZY-.gGN!F
6
cHug@pl13g w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?
cHug@pl13g w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?
cHug@pl13g w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?
cHug@pl13g w/,9%kw=,=n!mp.CH_W3Qt1#Ax^M/sgqu:d)vp#&+yxz3^?(wWK@VpB3'kzc1w0A6b,J$vQDb)9uTrJleh*F+9 ;?
三、Consumer消费
1、创建并配置ConsumeKafka_0_10处理器并连接
- Brokers地址要和Producer的设置一样:Brokers设置为192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092。图为示例。
- Topic设置和Producer一致:nifi-topic
- GroupId随意设置:nifi
- Offset Reset设置为:latest,从最新的消息开始消费
2、验证
启动生产者、消费者,验证nifi是否将数据写入kafka、并且kafka的数据是否被消费。
以下为模板界面。
以上完成了nifi读取kafka中的数据(消费)。类似的也可以通过nifi将数据写入到nifi中,此处不再赘述。