datax(20):内置transformer使用

article/2025/10/13 2:50:08

前面看了transformer的原理及源代码,今天实战下,看看他的效果;


一、环境

  1. win10
  2. DataX 3.0(从我的datax分支打包而来)
  3. job.json使用datax的样例json,源文件在xxx\DataX\core\src\main\job\中,打包编译后在xxx\DataX\target\datax\datax\job下。本文略做修改,主要修改2出,是否打印和记录行数
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"percentage": 0.02}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 19990704,"type": "long"},{"value": "1999-07-04 00:00:00","type": "date"},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1 // 原来是100000,修改为1行,看到效果就行}},"writer": {"name": "streamwriter","parameter": {"print": true, // 原来是false,不打印,为了在控制台看到效果,修改为true"encoding": "UTF-8"}}}]}
}

原job.json 解读:从streamreader中读取1行数据,写入streamwriter。streamreader里有5个字段(string,long,date,bool,bytes)对应的值分别为Datax、19990904、“1999-09-04 00:00:00”、true和“test”。job的全局设置是1个channel,容错率是0.02;
先看下不加transformer时候的运行效果
在这里插入图片描述


二、SubstrTransformer

在job中加入dx_substr类型的transformer

"transformer": [{"name": "dx_substr", //通过唯一标识符指定datax内置的transformer"parameter": {"columnIndex": 0, // col列的下标"paras": ["1",  //从 col的哪个下标开始截取"4" //截取后该col 值的长度]}}]

效果如下
在这里插入图片描述

解释:columnIndex为0获取的字段的值是“Datax”,将“Datax”从下标1开始,截取长度4,所以是“atax”


三、ReplaceTransformer

"transformer": [{"name": "dx_replace","parameter":{"columnIndex":0, //对下标为0的字段进行处理"paras":["3","4","****"]  // 将下标3到4的字符内容替换为 ****}}]

效果
在这里插入图片描述
解释:
columnIndex=0的字段的值是“Datax”,“Datax”的3到4位置内容是“ax”,将“ax”替换为****,最终结果是 Dat****


四、PadTransformer

"transformer": [{"name": "dx_pad","parameter":{"columnIndex":0,"paras":["l","9","-"]  // l在头部填充,9是最后数据的长度,-是要填充的内容}}]

效果:
在这里插入图片描述

解释:
columnIndex=0的字段的值是“Datax”,在“Datax”的头部插入“-”,使其最后长度是9,最终结果是 ----Datax


五、FilterTransformer

"transformer": [{"name": "dx_filter","parameter": {"columnIndex": 1, // 取下标为1的字段"paras": [ // 过滤条件 <0 , 注意满足添加的数据会被过滤掉"<","0"]}}]

效果:在这里插入图片描述

解释:
columnIndex=1的数据值是19990704,因为19990704<0 不成立,所以不会被过滤掉;


六、GroovyTransformer

"transformer": [{"name": "dx_groovy","parameter": {"code": "Column column = record.getColumn(0);def str = column.asString();def sb = new StringBuffer(str);def header = sb.insert(0,'AA');def strHeader = header.toString();record.setColumn(1, new StringColumn(strHeader));return record","extraPackage": ["import groovy.json.JsonSlurper;"]}}]

效果:
在这里插入图片描述
解释:

code中的值如下:
Column column = record.getColumn(0); \\获取下标为0的col
def str = column.asString(); \\ 获取col中的值
def sb = new StringBuffer(str); \\转为stringBuffer
def header = sb.insert(0,'AA'); \\在sb的头部插入字符AA
def strHeader = header.toString(); \\sb转为string
record.setColumn(0, new StringColumn(strHeader)); \\ 数据重新写入col
return record" \\返回数据行

dx_groovy

  • 参数。
    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage
  • 备注:
    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充);

七、总结

  1. 从上面可以看着需要对数据做transformer,只需在job.json里设置对应的参数即可;
  2. 目前datax内置了5种transform,每种都有一个name进行唯一标识,方便用户调用;
  3. json中的transformer节点里面可以设置多个transformer类;
  4. 既然提供了抽象类Transformer,那么肯定可以用户自定义transformer类;(本质和hive中的udf类似,要写hive中的udf,可以看本文)


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖


http://chatgpt.dhexx.cn/article/a1UecTct.shtml

相关文章

DataX 使用详解

目录 一、Datax 概述 1.1 Datax 1.2 Datax Features 1.3 Datax 环境要求 1.4 Datax 安装 1.5 Datax 演示示例 二、Datax 核心详解 2.1 DataX 3.0概览 2.2 DataX3.0框架设计 2.3 DataX3.0 支持的读写插件 2.4 DataX3.0核心架构 2.5 DataX3.0 六大核心优势 三、Datax 实…

datax介绍 基于datax官网

一、Datax概览 离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 Features 将不同数据源的同步抽象为从源头数据源读取数据的Rea…

Datax安装及基本使用

文章目录 一、Datax概述1.概述2.DataX插件体系3.DataX核心架构 二、安装2.1下载并解压2.2运行自检脚本 三、基本使用3.1从stream读取数据并打印到控制台1. 查看官方json配置模板2. 根据模板编写json文件3. 运行Job 3.2 Mysql导入数据到HDFS1. 查看官方json配置模板2. 根据模板编…

dataX和dataX-Web使用总结

dataX整合dataX-Web经验总结&#xff1a; 近日公司由于框架升级而涉及到数据的迁移&#xff0c;使用到了dataX的ETL框架&#xff0c;其中遇到了一些坑在此总结一下方便后人使用。 1.环境准备 1.java8 2.python 2.7.18 (官方推荐是2.7.X&#xff0c;具体可自行选择) 3.mave…

datax(24):远程调试datax

一、datax开启远程debug 1、环境 本地&#xff1a; win10&#xff0c;idea专业版2020.3&#xff0c;datax3.0 远程&#xff1a; CentOS6.5,datax3.02、效果 3、步骤 3.1 远程开启debug /apps/datax/bin/datax.py /apps/datax/job/job.json -d即在后面添加-d 即可&#xff0…

【DataX总结】

DataX 一、 ETL工具概述主流ETL工具 二、Datax概述亮点一&#xff1a;异构数据源DataX 设计框架设计 亮点二&#xff1a;稳定高效运行原理 三、DataX的安装安装使用写json文件任务提交 传参 一、 ETL工具 概述 ETL工具是将数据从来源端经过抽取、转换、装载至目的端的过程。 …

大数据技术之DataX (一)DataX插件开发

文章目录 一、背景二、基于java的本地测试datax2.1 github上下载datax的源代码2.2 datax代码导入idea 三、docker安装南大通用数据库GBase和GBase 8a3.1 docker安装Gbase 8a3.2 docker安装Gbase 8s 四、南大通用数据库GBase 8s To GBase 8a4.1 GBase 8s的reader读插件开发&…

【工具】之DataX-Web简单介绍

目录 一、概念 二、架构 三、功能 四、使用说明 1、执行器配置 2、创建项目 3、创建数据源&#xff08;源库和目标库&#xff09; 4、创建任务模版 5、构建JSON脚本 正常流程&#xff08;单库单表&#xff09; 1.构建reader 2.构建writer 3.字段映射 4.构建 批量创建…

datax(21):编写自己的Transformer

前面2篇文章&#xff0c;已经看完学习完transform的内容&#xff0c;今天继续编写一个自己的transformer&#xff1b; 一、环境 win10DataX 3.0(从我的datax分支打包而来)job.json使用datax的样例json&#xff0c;源文件在xxx\DataX\core\src\main\job\中&#xff0c;打包编译…

大数据项目之电商数仓DataX、DataX简介、DataX支持的数据源、DataX架构原理、DataX部署

文章目录 1. DataX简介1.1 DataX概述1.2 DataX支持的数据源 2. DataX架构原理2.1 DataX设计理念2.2 DataX框架设计2.3 DataX运行流程2.4 DataX调度决策思路2.5 DataX与Sqoop对比 3. DataX部署3.1 下载DataX安装包并上传到hadoop102的/opt/software3.2 解压datax.tar.gz到/opt/m…

datax安装+配置+使用文档

1 DataX离线同步工具DataX3.0介绍 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 Github…

datax(13):源码解读Column-datax中的数据类型

一、基类Column概述 Column是datax中所有数据类型的基类&#xff0c;里面有3个属性&#xff0c;以及一个构造方法&#xff0c;外加一个枚举类&#xff1b; public abstract class Column {private Type type;private Object rawData;private int byteSize;public Column(fina…

DataX使用说明

DataX使用说明 1.DataX介绍 DataX 是一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 也就是数据库的数据同步工具&#xff0c;免费版没有web页面&#xff0…

Datax入门使用

DataX入门使用 一、简介 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。Datax将不同数据源…

DataX 的使用

一、DataX 的部署 1、上传 datax 压缩包并解压 tar -zxvf datax.tar.gz -C /usr/local/soft/ 2、自检&#xff0c;执行命令&#xff08;在datax目录下&#xff09; [rootmaster datax]# python ./bin/datax.py ./job/job.json 安装成功 二、DataX 的使用 MySQL写入MySQL …

DATAX快速上手非常详细

前言 博主在工作的过程中有一天公司决定将数据迁移的新的项目上去&#xff0c;当我发现数据库中的表大于有4000多张表的时我顿时懵了下&#xff0c;这数据迁移人力物力消耗的也太大了吧(看DataX的设计)。所以我们可以借助阿里云开源的DataX来解决这个问题。 看完这篇掌…

DataX及DataX-Web

大数据Hadoop之——数据同步工具DataX数据采集工具-DataX datax详细介绍及使用 一、概述 DataX 是阿里云DataWorks数据集成的开源版本&#xff0c;在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、…

datax(4): datax.py解读

datax 直接使用py文件进行任务提交&#xff0c;今天读一读它 一、文件位置 原始文件位置在 xx/DataX/core/src/main/bin/下&#xff0c;datax项目打包后会将文件拷贝到 xx/DataX\target\datax\datax\bin 下。 core模块的pom.xml 指定‘拷贝’datax.py文件的方式maven-assembly…

DataX使用指南

简介 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 插件 的模式&#xff0c;目前已开源&#xff0c;代码托管在github。…

DataX

DataX的环境搭建以及简单测试 什么是DataX DataX 是一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、 HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 &#xff08;这是一个单机多任务的ETL工具&#xff0…