5、NiFi FileFlow示例和NIFI模板示例

article/2025/8/16 5:26:35

Apache NiFi系列文章

1、nifi-1.9.2介绍、单机部署及简单验证
2、NIFI应用示例-GetFile和PutFile应用
3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看
4、集群部署及验证、监控及节点管理
5、NiFi FileFlow示例和NIFI模板示例
6、NIFI应用场景-离线同步Mysql数据到HDFS中
7、NIFI综合应用场景-将mysql查询出的json数据转换成txt后存储至HDFS中
8、NIFI综合应用场景-NiFi监控MySQL binlog进行实时同步到hive
9、NIFI综合应用场景-通过NIFI配置kafka的数据同步


文章目录

  • Apache NiFi系列文章
  • 一、FlowFile生成器示例
    • 1、GenerateFlowFile解析
      • 1)、描述
      • 2)、属性配置
      • 3)、应用场景
    • 2、ReplaceText解析
      • 1)、描述
      • 2)、属性配置
      • 3)、应用场景
    • 3、示例
      • 1)、创建GenerateFlowFile并设置大小
      • 2)、配置FlowFile
      • 3)、创建ReplaceText并连接
      • 4)、启动GenerateFile
      • 5)、配置ReplaceText
      • 6)、创建PutFile
      • 7)、查看结果
  • 二、Nifi模板
    • 1、导入模板
      • 1)、导入
      • 2)、查看模板列表
      • 3)、创建模板引用
    • 2、组、导出模板
      • 1)、创建组
      • 2)、移动处理器到组
      • 3)、进入与退出组
      • 4)、创建模板
      • 5)、下载模板
      • 6)、嵌套组
  • 三、FlowFile内容和属性
    • 1、FlowFile属性及操作
      • 1)、FlowFile包含两部分
      • 2)、处理器对FlowFile的操作
    • 2、处理器
      • 1)、ExtractText
      • 2)、UpdateAttribute
    • 3、示例
      • 1)、改为单点造数据
      • 2)、负载均衡消费数据
      • 3)、查看队列数据
      • 4)、通过ExtractText将数据写入属性


本文旨在介绍FlowFile属性和内容、模板和简单介绍一个应用示例。其中模板将是后续文章的主要使用内容。
本分前提是nifi环境正常使用。
本分分为三个部分,即FlowFile生成器示例、模板以及FlowFile的内容与属性。

一、FlowFile生成器示例

FlowFile生成器,GenerateFlowFile和ReplaceText处理器,用于生成数据。

1、GenerateFlowFile解析

1)、描述

该处理器使用随机数据或自定义内容创建流文件。GenerateFlowFile用于负载测试、配置和仿真。

2)、属性配置

在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
在这里插入图片描述

3)、应用场景

该处理器多用于测试,配置生成设计人员所需要的特定数据,模拟数据来源或者压力测试、负载测试。

2、ReplaceText解析

1)、描述

使用其他值替换匹配正则表达式的流文件部分内容,从而更新流文件的内容。

2)、属性配置

在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
在这里插入图片描述

3)、应用场景

使用正则表达式,来逐行或者全文本替换文件流内容,往往用于业务逻辑处理。

3、示例

1)、创建GenerateFlowFile并设置大小

在这里插入图片描述

2)、配置FlowFile

仅主节点运行,每10s运行一次,每个文件1B,每个批次5个文件
在这里插入图片描述

3)、创建ReplaceText并连接

在这里插入图片描述

4)、启动GenerateFile

启动GenerateFile处理器,看是否自动生成了数据文件
在这里插入图片描述

5)、配置ReplaceText

所有的内容都替换成 hello world
在这里插入图片描述

6)、创建PutFile

停止GenerateFlowFile,然后创建PutFile
在这里插入图片描述

7)、查看结果

查看主节点目录下的文件内容

[root@server1 nifioutputtest]# ll
总用量 20
-rw-r--r-- 1 root root 11 27 08:09 2536169b-33eb-4d7e-a22c-0d4ecc9c2496
-rw-r--r-- 1 root root 11 27 08:09 28c09c45-d33b-4381-ae81-87a04e6138c5
-rw-r--r-- 1 root root 11 27 08:09 81303b0e-30f3-4aac-b569-082015a283be
-rw-r--r-- 1 root root 11 27 08:09 852748a5-c6ba-4a73-bf18-8c15425edba0
-rw-r--r-- 1 root root 11 27 08:09 c46d56bf-69d3-455b-9539-07b1a2b5d7ad
[root@server1 nifioutputtest]# cat c46d56bf-69d3-455b-9539-07b1a2b5d7ad
hello world
[root@server1 nifioutputtest]# 

验证正常。

二、Nifi模板

1、导入模板

1)、导入

导入本模板(图文不匹配,本模板是上述例子的模板)

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2"><description>创建模板试一试</description><groupId>29e21e65-0186-1000-01d5-e5cdf061c513</groupId><name>test_template</name><snippet><connections><id>71591b1b-3df4-310b-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>47f06192-31ad-3186-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression><loadBalancePartitionAttribute></loadBalancePartitionAttribute><loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus><loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy><name>gen_rep</name><selectedRelationships>success</selectedRelationships><source><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>49cfc51b-650d-3596-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>daf85806-06eb-347c-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>f4c648ea-d396-3680-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression><loadBalancePartitionAttribute></loadBalancePartitionAttribute><loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus><loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy><name>rep_put</name><selectedRelationships>success</selectedRelationships><source><groupId>6cb373b2-097c-3113-0000-000000000000</groupId><id>47f06192-31ad-3186-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>47f06192-31ad-3186-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><position><x>551.0</x><y>4.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.9.2</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Regular Expression</key><value><name>Regular Expression</name></value></entry><entry><key>Replacement Value</key><value><name>Replacement Value</name></value></entry><entry><key>Character Set</key><value><name>Character Set</name></value></entry><entry><key>Maximum Buffer Size</key><value><name>Maximum Buffer Size</name></value></entry><entry><key>Replacement Strategy</key><value><name>Replacement Strategy</name></value></entry><entry><key>Evaluation Mode</key><value><name>Evaluation Mode</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>(?s)(^.*$)</value></entry><entry><key>Replacement Value</key><value>hello world</value></entry><entry><key>Character Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer Size</key><value>1 MB</value></entry><entry><key>Replacement Strategy</key><value>Always Replace</value></entry><entry><key>Evaluation Mode</key><value>Entire text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><executionNodeRestricted>false</executionNodeRestricted><name>ReplaceText_Demo</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.ReplaceText</type></processors><processors><id>49cfc51b-650d-3596-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><position><x>0.0</x><y>0.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.9.2</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>File Size</key><value><name>File Size</name></value></entry><entry><key>Batch Size</key><value><name>Batch Size</name></value></entry><entry><key>Data Format</key><value><name>Data Format</name></value></entry><entry><key>Unique FlowFiles</key><value><name>Unique FlowFiles</name></value></entry><entry><key>generate-ff-custom-text</key><value><name>generate-ff-custom-text</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>PRIMARY</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>1B</value></entry><entry><key>Batch Size</key><value>5</value></entry><entry><key>Data Format</key><value>Text</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry><entry><key>generate-ff-custom-text</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>20 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><executionNodeRestricted>false</executionNodeRestricted><name>GenerateFlowFile_Demo</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>f4c648ea-d396-3680-0000-000000000000</id><parentGroupId>6cb373b2-097c-3113-0000-000000000000</parentGroupId><position><x>1101.0</x><y>2.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.9.2</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Directory</key><value><name>Directory</name></value></entry><entry><key>Conflict Resolution Strategy</key><value><name>Conflict Resolution Strategy</name></value></entry><entry><key>Create Missing Directories</key><value><name>Create Missing Directories</name></value></entry><entry><key>Maximum File Count</key><value><name>Maximum File Count</name></value></entry><entry><key>Last Modified Time</key><value><name>Last Modified Time</name></value></entry><entry><key>Permissions</key><value><name>Permissions</name></value></entry><entry><key>Owner</key><value><name>Owner</name></value></entry><entry><key>Group</key><value><name>Group</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Directory</key><value>/usr/local/bigdata/testdata/nifioutputtest</value></entry><entry><key>Conflict Resolution Strategy</key><value>fail</value></entry><entry><key>Create Missing Directories</key><value>true</value></entry><entry><key>Maximum File Count</key></entry><entry><key>Last Modified Time</key></entry><entry><key>Permissions</key></entry><entry><key>Owner</key></entry><entry><key>Group</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><executionNodeRestricted>false</executionNodeRestricted><name>PutFile_demo</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>true</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.PutFile</type></processors></snippet><timestamp>02/07/2023 08:15:48 GMT</timestamp>
</template>

在这里插入图片描述

2)、查看模板列表

在这里插入图片描述

3)、创建模板引用

在这里插入图片描述

2、组、导出模板

1)、创建组

在这里插入图片描述

2)、移动处理器到组

在这里插入图片描述

3)、进入与退出组

在这里插入图片描述

4)、创建模板

创建模板就是针对当前画布创建的模板,也即该模板包含当前画布的所有操作。
在这里插入图片描述

5)、下载模板

在这里插入图片描述

6)、嵌套组

在这里插入图片描述

三、FlowFile内容和属性

1、FlowFile属性及操作

1)、FlowFile包含两部分

  • 属性
    FlowFile的元数据.保存了关于内容的信息,比如:什么时候创建的,从哪里来,代表什么等等
  • 内容
    FlowFile的实际内容。比如:使用getfile读取数据时,读取到的实际内容文本。

2)、处理器对FlowFile的操作

更新,添加,或者删除FlowFile属性。改变FlowFile内容。

2、处理器

1)、ExtractText

  • 描述
    该处理器使用正则表达式,匹配流文件中的内容,并将匹配成功的内容输出到属性中;如果正则匹配到多个结果,默认只取第一个结果;匹配成功则流文件路由matched,没有匹配则到unmatched;
  • 属性配置
    在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
    在这里插入图片描述
  • 应用场景
    提取content中的内容,输出到流属性当中

2)、UpdateAttribute

  • 描述
    该处理器使用属性表达式语言更新流文件的属性,并且/或则基于正则表达式删除属性
  • 属性配置
    在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言
    在这里插入图片描述
  • 应用场景
    该处理器基本用法最为常用,增加,修改或删除流属性;
    此处理器使用用户添加的属性或规则更新FlowFile的属性。有三种方法可以使用此处理器添加或修改属性。
    1、基本用法: 默认更改通过处理器的每个FlowFile的匹配的属性
    2、高级用法:可以进行条件属性更改,只有在满足特定条件时才会影响FlowFile。可以在同一处理器中同时使用这两种方法
    3、删除属性表达式: 允许提供正则表达式,并且将删除匹配的任何属性

“删除属性表达式”将取代发生的任何更新。如果现有属性与“删除属性表达式”匹配,则无论是否更新,都将删除该属性。“删除属性表达式”仅适用于输入FlowFile中存在的属性,如果属性是由此处理器添加的,则“删除属性表达式”将不会匹配到它。

3、示例

1)、改为单点造数据

在这里插入图片描述

2)、负载均衡消费数据

在这里插入图片描述

3)、查看队列数据

在这里插入图片描述

4)、通过ExtractText将数据写入属性

在这里插入图片描述
查看输出数据属性
在这里插入图片描述
以上完成了FlowFile的示例及模板的示例。更多信息请关注该专栏的其他文章。


http://chatgpt.dhexx.cn/article/rSThfOI2.shtml

相关文章

大数据NiFi(二):NiFi架构

文章目录 NiFi架构 一、​​​​​​​NiFi核心概念

Nifi:nifi的基本使用

Nifi的安装使用 爱购物 www.cqfenfa.com Nifi安装 首先说一下Nifi的安装&#xff0c;这里Nifi可以支持Windows版和Linux&#xff0c;只需要去官网&#xff1a;http://nifi.apache.org/ 根据自己需要的版本&#xff0c;选择下载&#xff0c;然后安装解压就行 各目录及主要文件…

大数据NiFi(一):什么是NiFi

文章目录 什么是NiFi 一、NiFi背景介绍

Nifi介绍、安装、实践案例

第1章NiFi基本概念 1.1 概述 简单地说&#xff0c;NiFi是为了自动化系统之间的数据流而构建的。虽然术语“数据流”在各种环境中使用&#xff0c;但我们在此处使用它来表示系统之间自动化和管理的信息流。这个问题空间一直存在&#xff0c;因为企业有多个系统&#xff0c;其中…

NiFi技术干货

第1章 NiFi概述 1.1 NiFi是什么 简单的说&#xff0c;NiFi就是为了解决不同系统间数据自动流通问题而建立的。虽然dataflow这个术语在各种场景都有被使用&#xff0c;但我们在这里使用它来表示不同系统间的自动化的可管理的信息流。自企业拥有多个系统开始&#xff0c;一些系…

NiFi 基本概念

NiFi基本概念 一. NiFi是什么 Apache NiFi 是一个易于使用, 功能强大且可靠的系统, 用于处理和分发数据。可以自动化管理系统间的数据流。它使用高度可配置的指示图来管理数据路由, 转换和系统中介逻辑, 支持从多种数据源动态拉取数据。NiFi 原来是 NSA(美国国家安全局) 的一…

大数据Nifi简介

目录 1 NIFI简介2 NIFI核心概念3 NIFI构架3.1 网络服务器3.2 流控制器3.3 扩展3.4 FlowFile存储库3.5 内容存储库3.6 源头存储库 1 NIFI简介 Apache NiFi 是一个易于使用&#xff0c;功能强大且可靠的系统&#xff0c;用于处理和分发数据。可以自动化管理系统间的数据流。它支…

springboot error at line 1, column 5. Encountered: “\uff01“ (65281), after : ““

出现这种错误表面mybatis配置文件中sql查询语句中出现了中文字符&#xff0c;需要将对应的中文字符修改为英文。 ERROR 10500 --- [nio-8081-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] …

编码GBK的不可映射字符 和 错误: 非法字符: ‘\uff1b‘

题目&#xff1a;解决编码GBK的不可映射字符 和 错误: 非法字符: ‘\uff1b’ 解决编码GBK的不可映射字符 1&#xff09;首先&#xff0c;在d盘所示的目录新建一个文本文件&#xff08;记事本&#xff09;&#xff0c;保存时字符集为utf-8 2&#xff09;输入&#xff08;d:&…

如何处理“错误: 非法字符: ‘\uff1b‘”

解决方法&#xff1a;把原代码中的中文分号改为英文分号。 注意事项&#xff1a;中英文切换时应尤其注意符号问题。

Lexical error at line 1, column 20. Encountered: “\uff01“ (65281), after : ““

问题&#xff1a; 测试myBatis的动态SQL的时候&#xff0c;控制台提示我出现了&#xff1a;Lexical error at line 1, column 20. Encountered: "\uff01" (65281), after : "" 的错误 原因&#xff1a; 在<if test"sex !null and sex &#xff0…

深度学习之格式转换笔记(三):keras(.hdf5)模型转TensorFlow(.pb) 转TensorRT(.uff)格式

环境&#xff1a; tensorflow1.15&#xff0c;cuda10.0&#xff0c;cudnn7.6.4 将keras训练好保存的.hdf5格式模型转为tensorflow的.pb模型&#xff0c;然后转为tensorrt支持的uff格式。 keras(.hdf5)模型转TensorFlow(.pb) # h5_to_pb.pyfrom keras.models import load_mod…

Jetson 学习笔记(五):pb转uff---pb转onnx转trt----pth转onnx转pb

文章目录 pb转uff具体代码运行结果 pb转onnx转trt具体代码pb转onnx运行结果onnx转化trt方法1:trtexec方法2:onnx-tensorrt工具 推理trt模型 pth转onnxonnx转pb方法1:onnx-tensorflow工具方法2:代码执行 pb转uff具体代码 这里用的是uff自带的一个转换器&#xff0c;直接通过调用…

Tensorflow pb模型转uff模型方法及遇到KeyError20和expected Const问题解决

项目所需要将Tensorflow 生成的pb模型转为uff模型&#xff0c;方法很简单&#xff0c;但是遇到的问题着实很多&#xff0c;这里主要记录下问题及解决方法&#xff0c;总之&#xff0c;最后是成功生成uff格式的模型的&#xff0c;有需要的可以参考。 pb模型转为uff模型的方法及步…

安装TensorRT,然后导入uff库包的时候报错:ImportError: ERROR: Failed to import module(cannot import name ‘GraphDef`)

欢迎大家关注笔者&#xff0c;你的关注是我持续更博的最大动力 原创文章&#xff0c;转载告知&#xff0c;盗版必究 安装TensorRT&#xff0c;然后导入uff库包的时候报错&#xff1a;ImportError: ERROR: Failed to import module&#xff08;cannot import name GraphDef from…

pb模型转uff模型(tensorflow2.x)

大多数的博客只是提到tensorflow1.x系列下的转换。大概步骤就是安装tensorrt&#xff0c;同时安装tensorrt下的几个python的wl文件。可参见博主之前的博客: 1.tensorrt的安装Ubuntu配置TensorRT及验证_竹叶青lvye的博客-CSDN博客 2.tensorrt下几个whl文件的安装TensorRT加速方…

Sony索尼HLG灰片电影调色LUT预设合集 HLG CINEMATIC LUTPACK BUNDLE

Sony索尼HLG灰片电影调色LUT预设合集 HLG CINEMATIC LUTPACK BUNDLE 原文地址&#xff1a;https://www.aeziyuan.com/t-20546.html 包括用于Sony索尼LOG相机的39组LUT预设。是目前为止Sony索尼相机最好的预设之一。 它适用于支持LUT预设效果的大多数软件。例如Premiere Pro&…

两种HDR格式(HLG, HDR10)的理解

1、HLG只在sps的vui中会携带信息&#xff0c;包括colour_primaries(9)&#xff0c;transfer_characteristics(18)&#xff0c;matrix_coefficients(9) 2、HDR10除了SPS的VUI中携带的信息&#xff0c;在key frame会携带相应的SEI&#xff08;mastering display、content light l…

详解HDR的三个标准——HLG/HDR10/Dolby Vision

HDR的三大标准&#xff1a;HLG&#xff08;Hybrid Log Gamma&#xff09;&#xff1b;HDR10&#xff1b;Dolby Vision HLG&#xff1a;HLG的全称是Hybrid Log Gamma&#xff0c;它是由英国BBC和日本NHK电视台联合开发的高动态范围HDR的一个标准。HLG不需要元数据&#xff0c;能…

HLG移动

在平面坐标系内&#xff0c;有两个坐标轴x轴和y轴。&#xff08;x,y&#xff09;表示点的坐标。 有一点处于&#xff08;x1,y1&#xff09;位置上&#xff0c;他可以向相临8个位置移动&#xff08;移动方式见下图&#xff09;。 划定范围&#xff1a;此点只可以在[0<x<3…