Apache NiFi系列文章
1、nifi-1.9.2介绍、单机部署及简单验证
2、NIFI应用示例-GetFile和PutFile应用
3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看
4、集群部署及验证、监控及节点管理
5、NiFi FileFlow示例和NIFI模板示例
6、NIFI应用场景-离线同步Mysql数据到HDFS中
7、NIFI综合应用场景-将mysql查询出的json数据转换成txt后存储至HDFS中
8、NIFI综合应用场景-NiFi监控MySQL binlog进行实时同步到hive
9、NIFI综合应用场景-通过NIFI配置kafka的数据同步
文章目录
- Apache NiFi系列文章
- 一、FlowFile生成器示例
- 1、GenerateFlowFile解析
- 1)、描述
- 2)、属性配置
- 3)、应用场景
- 2、ReplaceText解析
- 1)、描述
- 2)、属性配置
- 3)、应用场景
- 3、示例
- 1)、创建GenerateFlowFile并设置大小
- 2)、配置FlowFile
- 3)、创建ReplaceText并连接
- 4)、启动GenerateFile
- 5)、配置ReplaceText
- 6)、创建PutFile
- 7)、查看结果
- 二、Nifi模板
- 1、导入模板
- 1)、导入
- 2)、查看模板列表
- 3)、创建模板引用
- 2、组、导出模板
- 1)、创建组
- 2)、移动处理器到组
- 3)、进入与退出组
- 4)、创建模板
- 5)、下载模板
- 6)、嵌套组
- 三、FlowFile内容和属性
- 1、FlowFile属性及操作
- 1)、FlowFile包含两部分
- 2)、处理器对FlowFile的操作
- 2、处理器
- 1)、ExtractText
- 2)、UpdateAttribute
- 3、示例
- 1)、改为单点造数据
- 2)、负载均衡消费数据
- 3)、查看队列数据
- 4)、通过ExtractText将数据写入属性
本文旨在介绍FlowFile属性和内容、模板和简单介绍一个应用示例。其中模板将是后续文章的主要使用内容。
本分前提是nifi环境正常使用。
本分分为三个部分,即FlowFile生成器示例、模板以及FlowFile的内容与属性。
一、FlowFile生成器示例
FlowFile生成器,GenerateFlowFile和ReplaceText处理器,用于生成数据。
1、GenerateFlowFile解析
1)、描述
该处理器使用随机数据或自定义内容创建流文件。GenerateFlowFile用于负载测试、配置和仿真。
2)、属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
3)、应用场景
该处理器多用于测试,配置生成设计人员所需要的特定数据,模拟数据来源或者压力测试、负载测试。
2、ReplaceText解析
1)、描述
使用其他值替换匹配正则表达式的流文件部分内容,从而更新流文件的内容。
2)、属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
3)、应用场景
使用正则表达式,来逐行或者全文本替换文件流内容,往往用于业务逻辑处理。
3、示例
1)、创建GenerateFlowFile并设置大小
2)、配置FlowFile
仅主节点运行,每10s运行一次,每个文件1B,每个批次5个文件
3)、创建ReplaceText并连接
4)、启动GenerateFile
启动GenerateFile处理器,看是否自动生成了数据文件
5)、配置ReplaceText
所有的内容都替换成 hello world
6)、创建PutFile
停止GenerateFlowFile,然后创建PutFile
7)、查看结果
查看主节点目录下的文件内容
[root@server1 nifioutputtest]# ll
总用量 20
-rw-r--r-- 1 root root 11 2月 7 08:09 2536169b-33eb-4d7e-a22c-0d4ecc9c2496
-rw-r--r-- 1 root root 11 2月 7 08:09 28c09c45-d33b-4381-ae81-87a04e6138c5
-rw-r--r-- 1 root root 11 2月 7 08:09 81303b0e-30f3-4aac-b569-082015a283be
-rw-r--r-- 1 root root 11 2月 7 08:09 852748a5-c6ba-4a73-bf18-8c15425edba0
-rw-r--r-- 1 root root 11 2月 7 08:09 c46d56bf-69d3-455b-9539-07b1a2b5d7ad
[root@server1 nifioutputtest]# cat c46d56bf-69d3-455b-9539-07b1a2b5d7ad
hello world
[root@server1 nifioutputtest]#
验证正常。
二、Nifi模板
1、导入模板
1)、导入
导入本模板(图文不匹配,本模板是上述例子的模板)
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2"><description>创建模板试一试</description><groupId>29e21e65-0186-1000-01d5-e5cdf061c513</groupId><name>test_template</name><snippet><connections><id>71591b1b-3df4-310b-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>47f06192-31ad-3186-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression><loadBalancePartitionAttribute></loadBalancePartitionAttribute><loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus><loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy><name>gen_rep</name><selectedRelationships>success</selectedRelationships><source><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>49cfc51b-650d-3596-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>daf85806-06eb-347c-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>f4c648ea-d396-3680-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression><loadBalancePartitionAttribute></loadBalancePartitionAttribute><loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus><loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy><name>rep_put</name><selectedRelationships>success</selectedRelationships><source><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>47f06192-31ad-3186-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>47f06192-31ad-3186-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><position><x>551.0</x><y>4.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.9.2</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Regular Expression</key><value><name>Regular Expression</name></value></entry><entry><key>Replacement Value</key><value><name>Replacement Value</name></value></entry><entry><key>Character Set</key><value><name>Character Set</name></value></entry><entry><key>Maximum Buffer Size</key><value><name>Maximum Buffer Size</name></value></entry><entry><key>Replacement Strategy</key><value><name>Replacement Strategy</name></value></entry><entry><key>Evaluation Mode</key><value><name>Evaluation Mode</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>(?s)(^.*$)</value></entry><entry><key>Replacement Value</key><value>hello world</value></entry><entry><key>Character Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer Size</key><value>1 MB</value></entry><entry><key>Replacement Strategy</key><value>Always Replace</value></entry><entry><key>Evaluation Mode</key><value>Entire text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><executionNodeRestricted>false</executionNodeRestricted><name>ReplaceText_Demo</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.ReplaceText</type></processors><processors><id>49cfc51b-650d-3596-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><position><x>0.0</x><y>0.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.9.2</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>File Size</key><value><name>File Size</name></value></entry><entry><key>Batch Size</key><value><name>Batch Size</name></value></entry><entry><key>Data Format</key><value><name>Data Format</name></value></entry><entry><key>Unique FlowFiles</key><value><name>Unique FlowFiles</name></value></entry><entry><key>generate-ff-custom-text</key><value><name>generate-ff-custom-text</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>PRIMARY</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>1B</value></entry><entry><key>Batch Size</key><value>5</value></entry><entry><key>Data Format</key><value>Text</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry><entry><key>generate-ff-custom-text</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>20 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><executionNodeRestricted>false</executionNodeRestricted><name>GenerateFlowFile_Demo</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>f4c648ea-d396-3680-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><position><x>1101.0</x><y>2.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.9.2</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Directory</key><value><name>Directory</name></value></entry><entry><key>Conflict Resolution Strategy</key><value><name>Conflict Resolution Strategy</name></value></entry><entry><key>Create Missing Directories</key><value><name>Create Missing Directories</name></value></entry><entry><key>Maximum File Count</key><value><name>Maximum File Count</name></value></entry><entry><key>Last Modified Time</key><value><name>Last Modified Time</name></value></entry><entry><key>Permissions</key><value><name>Permissions</name></value></entry><entry><key>Owner</key><value><name>Owner</name></value></entry><entry><key>Group</key><value><name>Group</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Directory</key><value>/usr/local/bigdata/testdata/nifioutputtest</value></entry><entry><key>Conflict Resolution Strategy</key><value>fail</value></entry><entry><key>Create Missing Directories</key><value>true</value></entry><entry><key>Maximum File Count</key></entry><entry><key>Last Modified Time</key></entry><entry><key>Permissions</key></entry><entry><key>Owner</key></entry><entry><key>Group</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><executionNodeRestricted>false</executionNodeRestricted><name>PutFile_demo</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>true</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.PutFile</type></processors></snippet><timestamp>02/07/2023 08:15:48 GMT</timestamp>
</template>
2)、查看模板列表
3)、创建模板引用
2、组、导出模板
1)、创建组
2)、移动处理器到组
3)、进入与退出组
4)、创建模板
创建模板就是针对当前画布创建的模板,也即该模板包含当前画布的所有操作。
5)、下载模板
6)、嵌套组
三、FlowFile内容和属性
1、FlowFile属性及操作
1)、FlowFile包含两部分
- 属性
FlowFile的元数据.保存了关于内容的信息,比如:什么时候创建的,从哪里来,代表什么等等 - 内容
FlowFile的实际内容。比如:使用getfile读取数据时,读取到的实际内容文本。
2)、处理器对FlowFile的操作
更新,添加,或者删除FlowFile属性。改变FlowFile内容。
2、处理器
1)、ExtractText
- 描述
该处理器使用正则表达式,匹配流文件中的内容,并将匹配成功的内容输出到属性中;如果正则匹配到多个结果,默认只取第一个结果;匹配成功则流文件路由matched,没有匹配则到unmatched; - 属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
- 应用场景
提取content中的内容,输出到流属性当中
2)、UpdateAttribute
- 描述
该处理器使用属性表达式语言更新流文件的属性,并且/或则基于正则表达式删除属性 - 属性配置
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言
- 应用场景
该处理器基本用法最为常用,增加,修改或删除流属性;
此处理器使用用户添加的属性或规则更新FlowFile的属性。有三种方法可以使用此处理器添加或修改属性。
1、基本用法: 默认更改通过处理器的每个FlowFile的匹配的属性
2、高级用法:可以进行条件属性更改,只有在满足特定条件时才会影响FlowFile。可以在同一处理器中同时使用这两种方法
3、删除属性表达式: 允许提供正则表达式,并且将删除匹配的任何属性
“删除属性表达式”将取代发生的任何更新。如果现有属性与“删除属性表达式”匹配,则无论是否更新,都将删除该属性。“删除属性表达式”仅适用于输入FlowFile中存在的属性,如果属性是由此处理器添加的,则“删除属性表达式”将不会匹配到它。
3、示例
1)、改为单点造数据
2)、负载均衡消费数据
3)、查看队列数据
4)、通过ExtractText将数据写入属性
查看输出数据属性
以上完成了FlowFile的示例及模板的示例。更多信息请关注该专栏的其他文章。