NIFI 入门使用

article/2025/8/15 8:04:30

1. Kettle与NIFI差异

Kettle 介绍

  • Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。
  • Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。
  • Kettle这个ETL工具集,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。
  • Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
  • Kettle有三个主要组件:Spoon、Kitchen、Pan
    • Spoon:是一个图形化的界面,可以让我们用图形化的方式开发转换和作业。windows选择Spoon.bat;Linux选择Spoon.sh
    • Kitchen:利用Kitchen可以使用命令行调用Job
    • Pan:利用Pan可以用命令行的形式调用Trans
    • Carte:Carte是一个轻量级的Web容器,用于建立专用、远程的ETL Server。

Kettle常用功能

Kettle常用在处理关系型数据库(RDBMS):mysql、oracle、gbase、国产达梦等各种数据库,也可以处理非关系型数据库:elasticsearch、hdfs等数据存储。主要是对数据进行处理操作,常用的功能如下:

(1)全量数据迁移:

就是将某个或多个表或库中的数据进行迁移,可以跨库,也可以同库迁移。速度比较快,性能稳定。

(2)增量数据迁移:

就是对某个表中的数据按照一定的设计思路,根据int的自增主键或datetime的时间戳实现增量数据迁移,并且可以统计增量数据量。速度比较快,性能稳定。

(3)解析xml文件(单个、批量):

可以通过读取本地或远程服务器中的单个、批量xml文件进行解析,高效率的实现xml数据解析入库。

(4)解析JSON数据:

可以零代码通过jsonPath快速完成JSON数据解析,高效率实现JSON解析数据入库。

(5)数据关联比对:

可以将多个数据库根据一定的业务字段进行关联,尤其是针对单表百万、千万级别上的数据比对,普通sql实现困难,可以通过KETTLE方便高效的完成数据关联比对功能。

(6)数据清洗转换:

可以通过KETTLE中设计一定的判断流程,在数据流中逐条对数据进行业务判断和过滤,实现数据清洗转换的功能。

NIFI 介绍

  • Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。
  • 它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。
  • NiFi原来是NSA(National Security Agency [美国国家安全局])的一个项目,目前已经代码开源,是Apache基金会的顶级项目之一
  • NiFi基于Web方式工作,后台在服务器上进行调度。
  • 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。

NiFi 核心概念

  • Nifi 的设计理念接近于基于流的编程 Flow Based Programming。
  • FlowFile:表示通过系统移动的每个对象,包含数据流的基本属性
  • FlowFile Processor(处理器):负责实际对数据流执行工作
  • Connection(连接线):负责不同处理器之间的连接,是数据的有界缓冲区
  • Flow Controller(流量控制器):管理进程使用的线程及其分配
  • Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件

NIFI 特性

  1. 可视化命令与控制
    设计,控制,反馈和监测之间的无缝体验
  2. 高度可配置
    损失容忍vs保证交付
    低延迟vs高吞吐量
    动态优先
    流可以在运行时修改
    数据回压
  3. 数据溯源
    从头到尾跟踪数据流
  4. 为可扩展而设计
    建立自己的处理器和更多
    快速开发和有效的测试
  5. 安全
    SSL,SSH,HTTPS,加密内容等
    多租户授权和内部授权/策略管理

nifi是将数据转换成一种流的形式在各种处理器之间进行处理转换的etl工具,它通过可视化可操作的用户界面来编辑数据,更加直观有效。

kettle 是C/S 架构 ,NiFi是基于WEB的B/S架构,方便集成。

2. NIFI的优点

  1. 可视化的UI界面,各个模块组件之间高度可配置,且每个流程都有监控,可以通过界面直观的看到各个数据处理模块之间的数据流转情况,分析出程序性能瓶颈。
  2. 数据流可以在UI界面自由拖拽和拓展,各模块之间相互独立,互不影响。
  3. 可以在处理耗时的地方创建多个处理模块并行执行,提升处理速度。类似于代码中加入了多线程,但相对于修改代码,界面配置操作十分简单。
  4. 修改方便,任意模块都可以在数据流转过程中随时启停,任意处理模块都可以实现热插拔。数据流流向随时可变。
  5. NiFi的对处理模块有对应的retry机制和错误分发机制,且可配置性强。
  6. NiFi基于组件的热插拔部署,方便集成自定义组件
  7. NiFi支持缓冲所有排队的数据,以及在这些队列达到指定限制时提供背压的能力,或者在数据达到指定年龄(其值已经消失)时使数据老化
  8. 具有多种现有组件可以提供数据抽取转换流程
  9. NiFi 可以进行集群部署,横向扩展,提高系统吞吐量

3. NIFI的缺点

  1. 各个步骤中间结果落地导致磁盘IO成为Nifi的瓶颈,这个缺点在数据冗余量越大的时候表现的越明显。
  2. 在实现特定业务场景现有组件不能满足或实现复杂,需自定义开发组件

4. 单机部署 NIFI

  1. 上传Apache NIFI包到Linux上,解压安装包;或者将你的本地作为服务器,直接解压zip包。

  2. 在解压的目录下,找到conf目录,编辑bootstrap.conf文件,修改NIFI的内存配置,默认的值比较小,比如这里我改成启动2g,最大10g

    java.arg.2=-Xms2g
    java.arg.3=-Xmx10g
    
  3. 在解压的目录下,找到bin目录,可以看到里面有一些脚本

    dump-nifi.bat  
    nifi-env.bat  
    nifi-env.sh  
    nifi.sh         
    run-nifi.bat  
    status-nifi.bat
    
  4. 在解压的目录下,找到conf目录,编辑nifi.properties文件,修改端口号,默认为8080

    nifi.web.http.port=8080
    

Linux或者Mac,使用nifi.sh start启动NIFI,nifi.sh stop停止NIFI,nifi.sh restart重启NIFI。

Windows下,直接双击run-nifi.bat即可,退出的时候关闭运行窗口就可以了。

5. 集群部署 NIFI

NiFi采用Zero-Master Clustering范例。集群中的每个节点对数据执行相同的任务,但每个节点都在不同的数据集上运行。其中一个节点自动选择(通过Apache ZooKeeper)作为集群协调器。然后,群集中的所有节点都会向此节点发送心跳/状态信息,并且此节点负责断开在一段时间内未报告任何心跳状态的节点。此外,当新节点选择加入群集时,新节点必须首先连接到当前选定的群集协调器,以获取最新流。如果群集协调器确定允许该节点加入(基于其配置的防火墙文件),则将当前流提供给该节点,并且该节点能够加入群集,假设节点的流副本与群集协调器提供的副本匹配。如果节点的流配置版本与群集协调器的版本不同,则该节点将不会加入群集。

zookeeper:NIFI内置zookeeper

  1. 编辑实例中,conf/nifi.properties文件,不同节点改成对应内容,内容如下:

    nifi.state.management.configuration.file=./conf/state-management.xml                   
    nifi.state.management.provider.local=local-provider  
    nifi.state.management.provider.cluster=zk-provider
    #  指定此NiFi实例是否应运行嵌入式ZooKeeper服务器,默认是false                          
    nifi.state.management.embedded.zookeeper.start=true                                                                
    nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties # 3个节点分别是8081 8082 8083
    nifi.web.http.port=8081# 如果实例是群集中的节点,请将此设置为true。默认值为false
    nifi.cluster.is.node=true
    # 3个节点分别是9081 9082 9083
    nifi.cluster.node.protocol.port=9081# 3个节点分别是6341 6342 6343
    nifi.cluster.load.balance.port=6341# 连接到Apache ZooKeeper所需的连接字符串。这是一个以逗号分隔的hostname:port对列表
    nifi.zookeeper.connect.string=localhost:2181,localhost:2182,localhost:2183
    
  2. 修改zookeeper.properties

    # 3个节点都一样
    server.1=localhost:2111:3111;2181
    server.2=localhost:2222:3222;2182
    server.3=localhost:2333:3333;2183
    
  3. 修改state-management.xml(3个节点都一样)

    <cluster-provider><id>zk-provider</id><class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class><property name="Connect String">localhost:2181,localhost:2182,localhost:2183</property><property name="Root Node">/nifi</property><property name="Session Timeout">10 seconds</property><property name="Access Control">Open</property>
    </cluster-provider>
    
  4. 在3个节点的NIFI目录下(bin目录同级),新建state/zookeeper,zookeeper文件夹里新建文件myid,3个节点分别写入1,2,3

    #3个节点分别写入 1 2 3
    echo 1 > myid
    
  5. 分别启动所有节点

6. 数据同步(表字段相同)

整体流程如下:

GenerateTableFetch --> ExecuteSQLRecord --> PutDatabaseRecord --> LogAttribute

在这里插入图片描述

GenerateTableFetch组件:从源表中生成获取行的“页”的SQL select查询。分区大小属性以及表的行数决定页面和生成的流文件的大小和数量。此外,可以通过设置最大值列来实现增量获取,这将导致处理器跟踪列的最大值,从而只获取那些列的值超过观察到的最大值的行

在这里插入图片描述

ExecuteSQLRecord组件:执行提供的SQL选择查询。查询结果将转换为所指定格式输出。使用流,因此支持任意大的结果集。

在这里插入图片描述

PutDatabaseRecord组件:使用指定的记录器从传入流文件输入(可能是多个)记录。这些记录被转换为SQL语句,并作为单个批处理执行

在这里插入图片描述

连接池配置DBCPConnectionPool

在这里插入图片描述

7. 数据同步(表字段不相同)

整体流程如下:

QueryDatabaseTable --> ConvertAvroToJSON --> SplitJson --> EvaluateJsonPath --> ReplaceText --> PutSQL

在这里插入图片描述

QueryDatabaseTable组件:生成一个SQL select查询,或使用提供的语句,并执行它来获取指定最大值列中值大于之前看到的最大值的所有行。查询结果将转换为Avro格式。

在这里插入图片描述

ConvertAvroToJSON组件:将二进制Avro记录转换为JSON对象。这个处理器提供了Avro字段到JSON字段的直接映射,这样得到的JSON将具有与Avro文档相同的层次结构

在这里插入图片描述

SplitJson组件:对于由JsonPath表达式指定的数组元素,将一个JSON文件拆分为多个单独的流文件。每个生成的FlowFile由指定数组的一个元素组成,并传输到关系“split”,同时将原始文件传输到关系“original”。如果没有找到指定的JsonPath,或者没有对数组元素求值,原始文件将被路由到“failure”,并且不生成任何文件。

在这里插入图片描述

EvaluateJsonPath组件:根据FlowFile的内容评估一个或多个JsonPath表达式。这些表达式的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。

在这里插入图片描述

ReplaceText组件:通过对正则表达式(regex)求值并将与正则表达式匹配的内容部分替换为其他值,更新流文件的内容。通过替换成目标表字段的sql语句,数据可以从EvaluateJsonPath组件存放到的attribute属性中获取,获取方式${key},将替换后的sql语句传递到下游PutSql组件中。

在这里插入图片描述

PutSQL组件:执行SQL UPDATE或INSERT命令。传入流文件的内容应该是要执行的SQL命令。

在这里插入图片描述

以上两种数据同步都是基于mysql 到 mysql ,oracle只需要更换数据库连接池配置。

注意:oracle数据同步使用EvaluateJsonPath组件获取属性值时字段名称需要大写

在这里插入图片描述

NIFI 组件之间数据传递时通过队列的方式控制,因此不能控制事务,但如果有一个组件初始化失败时,上游传递下来的队列中的数据是不会被消费,当组件异常修复之后会继续执行队列中的内容。

8. binlog日志采集数据同步

为了不影响业务,可以通过binlog日志对数据库表数据进行同步

整体流程:

CaptureChangeMySQL --> RouteOnAttribute --> JoltTransformJSON --> EvaluateJsonPath --> ReplaceText --> PutSQL --> LogAttribute

在这里插入图片描述在这里插入图片描述
CaptureChangeMySQL组件:从MySQL数据库中检索更改数据捕获(CDC)事件。CDC事件包括插入、更新、删除操作。事件输出为按操作发生时的顺序排列的单个流文件。读取binlog日志路由下游处理

在这里插入图片描述

redis存储客户端配置server服务端

在这里插入图片描述

此时你会发现多了一个RedisConnectionPoolService

在这里插入图片描述

继续配置RedisConnectionPoolService

在这里插入图片描述

​ 最后启动redis服务端和客户端

RouteOnAttribute 组件:根据binlog中含有的类型参数,把binlog记录的日志操作根据类型进行路由处理,提供给不同的下游分支操作

在这里插入图片描述

Routing Strategy:路由策略用默认的Route toProperty name,根据属性名进行路由.添加的自定义属性可以根据业务分发给不同的下游处理器。

JoltTransformJSON组件:对flowfile JSON有效负载应用一系列的Jolt规范。使用转换后的内容创建一个新的FlowFile,并将其路由到“success”关系。如果JSON转换失败,原始的流文件将被路由到“failure”关系。

在这里插入图片描述

点击高级设置会打开如下图Jolt测试界面
上面有红叉子的那个区域Jolt Specification是填写我们的Jolt语句的;
左下方区域JSON Input是输入要被处理前的Json数据.
右下方区域JSON Output是输出Input被jolt语句处理后的结果.

在这里插入图片描述

Jolt Specification区域输入以下内容

[{"operation": "shift","spec": {"columns": {"*": {"@(value)": "@(1,name)"}}}
}]

“operation”: “shift”:实现整理出key,value格式
“operation”: “modify-default-beta”:实现拼接了一个带前缀字符串的新字段apid,以及value是字符串ap_拼接id的value值.

JSON Input输入以下内容

{"type" : "delete","timestamp" : 1592386594000,"binlog_filename" : "mysql-bin.000001","binlog_position" : 229,"database" : "ipaas","table_name" : "target","table_id" : 33,"columns" : [ {"id" : 1,"name" : "id","column_type" : -5,"value" : 50}, {"id" : 2,"name" : "username","column_type" : 12,"value" : "徐朝"}, {"id" : 3,"name" : "userage","column_type" : 4,"value" : 20}, {"id" : 4,"name" : "time","column_type" : 12,"value" : "2020-06-17 10:31:45"} ]
}

最后点击TRANSFORM按钮查看效果

在这里插入图片描述

测试没问题,可以复制我们调试好的Jolt Specification内容,返回刚才Jolt Specification这里,贴进去保存配置

EvaluateJsonPath组件:根据FlowFile的内容评估一个或多个JsonPath表达式。这些表达式的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。

在这里插入图片描述

ReplaceText组件:通过对正则表达式(regex)求值并将与正则表达式匹配的内容部分替换为其他值,更新流文件的内容。通过替换成目标表字段的sql语句,数据可以从EvaluateJsonPath组件存放到的attribute属性中获取,获取方式${key},将替换后的sql语句传递到下游PutSql组件中。

在这里插入图片描述

PutSQL组件:执行上游传递下来的sql语句

在这里插入图片描述

LogAttribute组件:记录执行日志

在这里插入图片描述

输出结果:

在这里插入图片描述

9. 多表分别查询汇总入库(表字段不相同)

完整流程:

在这里插入图片描述

同**数据同步(表字段不同)**分别有多条处理流程将数据查询出来,然后使用funnel组件进行数据合并后统一入库

10. 根据规则字段映射

完整流程:

在这里插入图片描述

从源数据表中查询出所有数据转换为json,然后通过SplitJson切分成多个json对象,在通过EvaluateJsonPath组件将值存放到属性列表中,再通过ExecuteSQL组件根据字段映射条件查询规则表并转换为json,再通过EvaluateJsonPath组件将规则表数据也添加到源表数据的属性列表中,再根据RouteOnAttribute组件判断条件路由需要的数据到下游;然后通过ReplaceText组件从属性列表中获取值拼接sql交由下游处理器PutSQL执行。

ExecuteSQL组件配置如下:

在这里插入图片描述

RouteOnAttribute组件配置如下:

自定义添加过滤条件

在这里插入图片描述

11. 自定义组件Nifi Processor

1. 创建Maven工程

父工程my-processor,子工程nifi-my-processor-nar和nifi-my-processor-processors,这里使用的版本时1.11.4

在这里插入图片描述

my-processor pom文件:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><packaging>pom</packaging><parent><groupId>org.apache.nifi</groupId><artifactId>nifi</artifactId><version>1.11.4</version></parent><groupId>org.apache.nifi</groupId><artifactId>my-processor</artifactId><version>1.11.4</version><name>my-processor</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><modules><module>nifi-my-processor-nar</module><module>nifi-my-processor-processors</module></modules></project>

nifi-my-processor-nar pom文件:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>my-processor</artifactId><groupId>org.apache.nifi</groupId><version>1.11.4</version></parent><modelVersion>4.0.0</modelVersion><artifactId>nifi-my-processor-nar</artifactId><packaging>nar</packaging><name>nifi-my-processor-nar</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><maven.javadoc.skip>true</maven.javadoc.skip><source.skip>true</source.skip></properties><dependencies><dependency><groupId>org.apache.nifi</groupId><artifactId>nifi-standard-services-api-nar</artifactId><version>1.11.4</version><type>nar</type></dependency><dependency><groupId>org.apache.nifi</groupId><artifactId>nifi-my-processor-processors</artifactId><version>1.11.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies></project>

nifi-my-processor-processors pom文件:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>my-processor</artifactId><groupId>org.apache.nifi</groupId><version>1.11.4</version></parent><modelVersion>4.0.0</modelVersion><artifactId>nifi-my-processor-processors</artifactId><packaging>jar</packaging><name>nifi-my-processor-processors</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><dependencies><dependency><groupId>org.apache.nifi</groupId><artifactId>nifi-api</artifactId><version>1.11.4</version><scope>provided</scope></dependency><dependency><groupId>org.apache.nifi</groupId><artifactId>nifi-utils</artifactId><version>1.11.4</version></dependency><dependency><groupId>org.apache.nifi</groupId><artifactId>nifi-mock</artifactId><version>1.11.4</version><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies></project>

2. 修改项目编写代码

  1. 删除nifi-my-processor-processors子项目中,src/test中的测试文件(打包可能出现错误)

  2. Nifi的要求是在/src/main/resources/META-INF/services/目录下新建一个文件org.apache.nifi.processor.Processor,这个类似于配置文件,指向该Processor所在的目录,比如我的配置文件内容就是

    org.apache.nifi.processor.MyProcessor

在这里插入图片描述

代码编写,创建MyProcessor类。其中有设置状态,属性,及处理方法(onTrigger)等

在这里插入图片描述

package org.apache.nifi.processor;import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;import java.io.InputStreamReader;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;/*** @Classname MyProcessor* @Description* @Author xuzhaoa* @Date 2020/7/2 9:49*/
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder().name("MY_PROPERTY").displayName("My property").description("Example Property").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();public static final Relationship MY_RELATIONSHIP_SUCCESS = new Relationship.Builder().name("sucess").description("Example relationship Success").build();public static final Relationship MY_RELATIONSHIP_FAILURE = new Relationship.Builder().name("failure").description("Example relationship Failure").build();private List<PropertyDescriptor> descriptors;private Set<Relationship> relationships;@Overrideprotected void init(final ProcessorInitializationContext context) {final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();descriptors.add(MY_PROPERTY);this.descriptors = Collections.unmodifiableList(descriptors);final Set<Relationship> relationships = new HashSet<Relationship>();relationships.add(MY_RELATIONSHIP_SUCCESS);relationships.add(MY_RELATIONSHIP_FAILURE);this.relationships = Collections.unmodifiableSet(relationships);}@Overridepublic Set<Relationship> getRelationships() {return this.relationships;}@Overridepublic final List<PropertyDescriptor> getSupportedPropertyDescriptors() {return descriptors;}@OnScheduledpublic void onScheduled(final ProcessContext context) {}@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {FlowFile flowFile = session.get();if (flowFile == null) {return;}// TODO implementfinal AtomicReference<String> value = new AtomicReference<>();session.read(flowFile, in -> {try {StringWriter sw = new StringWriter();InputStreamReader inr = new InputStreamReader(in);char[] buffer = new char[1024];int n = 0;while (-1 != (n = inr.read(buffer))) {sw.write(buffer, 0, n);}String str = sw.toString();String result = "处理了:" + str + context.getProperty("MY_PROPERTY").getValue();value.set(result);} catch (Exception ex) {ex.printStackTrace();getLogger().error("Failed to read json string.");}});String results = value.get();if (results != null && !results.isEmpty()) {flowFile = session.putAttribute(flowFile, "match", results);}flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));session.transfer(flowFile, MY_RELATIONSHIP_SUCCESS);}
}

我们使其extends AbstractProcessor这个抽象类,@Tag标签是为了在web GUI中,能够使用搜索的方式快速找到我们自己定义的这个Processor。CapabilityDescription内的值会暂时在Processor选择的那个页面中,相当于一个备注。
一般来说只需要继承AbstractProcessor就可以了,但是某些复杂的任务可能需要去继承更底层的AbstractSessionFactoryProcessor这个抽象类。

我们通过PropertyDescriptor以及Relationship中的模板方法定义了两个新的关系和属性描述值,这些值会出现在webUI中

该组件只是简单的测试将流中数据替换,功能实现主要通过该类自行实现

整个Processor的核心部分 -> onTrigger 部分, onTrigger方法会在一个flow file被传入处理器时调用。为了读取以及改变传递来的FlowFile,Nifi提供了三个callback接口方法

  • InputStreamCallback:
    该接口继承细节如下: 将流中的数据读取处理进行替换

在这里插入图片描述

OutputStreamCallback :将内容写入值中

在这里插入图片描述

最后使用transfer()功能传递回这个flowFile以及成功标识。

3. 打包部署

项目打包后将nifi-my-processor-nar工程target目录中的 nifi-my-processor-nar-1.0-SNAPSHOT.nar 文件,拷贝到 nifi\lib 目录中

新建流程使用自定义组件

GenerateFlowFile --> MyProcessor --> PutFile

在这里插入图片描述

GenerateFlowFile 组件配置生成内容

在这里插入图片描述

MyProcessor 组件替换内容

在这里插入图片描述

查看结果:

在这里插入图片描述


http://chatgpt.dhexx.cn/article/tOk8J6gI.shtml

相关文章

《数据同步-NIFI系列》Nifi详细教程入门-06Nifi基础操作

Nifi基础操作 1 主页面 2 组 2.1 创建组 从常用功能模块&#xff0c;拖动组到画布上&#xff0c;自定义组名。可以通过鼠标移动组在画布位置。 2.2 进入、退出组 选中某一个组&#xff0c;单击右键选择enter group或者双击组进入组内&#xff0c;在组内单击右键选择leave g…

nifi-搭建

NIFI 简介 1、NIFI 的概念 1.1 起源&#xff1a;NIFI是为了自动化的处理和管理系统之间的数据流而产生的&#xff0c;基本设计概念与基于流的编程[fbp]的主要思想密切相关 1.2 nifi核心概念 FlowFile&#xff1a;FlowFile表示通过系统移动的每个对象&#xff0c;包含数据流的基…

9、NIFI综合应用场景-通过NIFI配置kafka的数据同步

Apache NiFi系列文章 1、nifi-1.9.2介绍、单机部署及简单验证 2、NIFI应用示例-GetFile和PutFile应用 3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看 4、集群部署及验证、监控及节点管理 5、NiFi FileFlow示例和NIFI模板示例 6、NIFI应用场景-离线同步Mys…

Apache NiFi 入门指南

本指南使用于谁&#xff1f; 本指南适用于从未使用过&#xff0c;在NiFi中有限度接触或仅完成特定任务的用户。本指南不是详尽的说明手册或参考指南。“ 用户指南”提供了大量信息&#xff0c;旨在提供更加详尽的资源&#xff0c;并且作为参考指南非常有用。相比之下&#xff…

2、NIFI应用示例-GetFile和PutFile应用

Apache NiFi系列文章 1、nifi-1.9.2介绍、单机部署及简单验证 2、NIFI应用示例-GetFile和PutFile应用 3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看 4、集群部署及验证、监控及节点管理 5、NiFi FileFlow示例和NIFI模板示例 6、NIFI应用场景-离线同步Mys…

大数据NiFi(三):NiFi关键特性

文章目录 NiFi关键特性 一、​​​​​​​​​​​​​​流管理

NiFi学习笔记

目录 NiFi概念 NiFi是什么 Apache NiFi 包括以下功能 NIFI核心概念 NiFi架构 NiFi入门 常用术语 下载安装NiFi 启动和关闭NIFI NIFI处理器 查看处理器 常用处理器 配置处理器 其他组件 应用场景 1.添加和配置第一个处理器GetFile 2.添加第二个处理器PutFile NiF…

NiFi的简介

使用java开发的一个开源项目&#xff0c;数据处理工具 1.简介&#xff1a; NiFi 是一个易于使用、功能强大而且可靠的流式数据处理和分发系统。NiFi 是为数据流设计&#xff0c;支持从多种数据源动态的拉取数据&#xff0c;并基于WEB图形界面&#xff0c;通过拖拽、连接、配置…

Nifi的入门使用

Nifi的使用 1.官方文档2.Nifi简介3.简单使用4.Template 使用nifi前&#xff0c;需要知道ETL在做什么&#xff0c;如果源端和目标端栏位不匹配&#xff0c;就需要用到小帮手&#xff0c; 让你更直观的了解映射关系&#xff0c;才能更好的构建DataFlow 第一步&#xff1a;Nifi开发…

NiFi【部署 01】NiFi最新版本1.18.0下载安装配置启动及问题处理(一篇学会部署NiFi)

Apache NIFI中文文档 地址&#xff1a;https://nifichina.github.io/ 1.简介 官网的介绍&#xff1a; An easy to use, powerful, and reliable system to process and distribute data. 一个易用、功能强大、可靠的处理和分发数据的系统。 来自网络的介绍&#xff1a; 2006…

5、NiFi FileFlow示例和NIFI模板示例

Apache NiFi系列文章 1、nifi-1.9.2介绍、单机部署及简单验证 2、NIFI应用示例-GetFile和PutFile应用 3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看 4、集群部署及验证、监控及节点管理 5、NiFi FileFlow示例和NIFI模板示例 6、NIFI应用场景-离线同步Mys…

大数据NiFi(二):NiFi架构

文章目录 NiFi架构 一、​​​​​​​NiFi核心概念

Nifi:nifi的基本使用

Nifi的安装使用 爱购物 www.cqfenfa.com Nifi安装 首先说一下Nifi的安装&#xff0c;这里Nifi可以支持Windows版和Linux&#xff0c;只需要去官网&#xff1a;http://nifi.apache.org/ 根据自己需要的版本&#xff0c;选择下载&#xff0c;然后安装解压就行 各目录及主要文件…

大数据NiFi(一):什么是NiFi

文章目录 什么是NiFi 一、NiFi背景介绍

Nifi介绍、安装、实践案例

第1章NiFi基本概念 1.1 概述 简单地说&#xff0c;NiFi是为了自动化系统之间的数据流而构建的。虽然术语“数据流”在各种环境中使用&#xff0c;但我们在此处使用它来表示系统之间自动化和管理的信息流。这个问题空间一直存在&#xff0c;因为企业有多个系统&#xff0c;其中…

NiFi技术干货

第1章 NiFi概述 1.1 NiFi是什么 简单的说&#xff0c;NiFi就是为了解决不同系统间数据自动流通问题而建立的。虽然dataflow这个术语在各种场景都有被使用&#xff0c;但我们在这里使用它来表示不同系统间的自动化的可管理的信息流。自企业拥有多个系统开始&#xff0c;一些系…

NiFi 基本概念

NiFi基本概念 一. NiFi是什么 Apache NiFi 是一个易于使用, 功能强大且可靠的系统, 用于处理和分发数据。可以自动化管理系统间的数据流。它使用高度可配置的指示图来管理数据路由, 转换和系统中介逻辑, 支持从多种数据源动态拉取数据。NiFi 原来是 NSA(美国国家安全局) 的一…

大数据Nifi简介

目录 1 NIFI简介2 NIFI核心概念3 NIFI构架3.1 网络服务器3.2 流控制器3.3 扩展3.4 FlowFile存储库3.5 内容存储库3.6 源头存储库 1 NIFI简介 Apache NiFi 是一个易于使用&#xff0c;功能强大且可靠的系统&#xff0c;用于处理和分发数据。可以自动化管理系统间的数据流。它支…

springboot error at line 1, column 5. Encountered: “\uff01“ (65281), after : ““

出现这种错误表面mybatis配置文件中sql查询语句中出现了中文字符&#xff0c;需要将对应的中文字符修改为英文。 ERROR 10500 --- [nio-8081-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] …

编码GBK的不可映射字符 和 错误: 非法字符: ‘\uff1b‘

题目&#xff1a;解决编码GBK的不可映射字符 和 错误: 非法字符: ‘\uff1b’ 解决编码GBK的不可映射字符 1&#xff09;首先&#xff0c;在d盘所示的目录新建一个文本文件&#xff08;记事本&#xff09;&#xff0c;保存时字符集为utf-8 2&#xff09;输入&#xff08;d:&…