datax(21):编写自己的Transformer

article/2025/10/13 4:23:38

前面2篇文章,已经看完+学习完transform的内容,今天继续编写一个自己的transformer;


一、环境

  1. win10
  2. DataX 3.0(从我的datax分支打包而来)
  3. job.json使用datax的样例json,源文件在xxx\DataX\core\src\main\job\中,打包编译后在xxx\DataX\target\datax\datax\job下。本文略做修改,主要修改2出,是否打印和记录行数
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"percentage": 0.02}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 59880504, "type": "long"},{"value": "2010-05-04 00:00:00","type": "string" // 原来是date},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1 // 原来是100000,修改为1行,看到效果就行}},"writer": {"name": "streamwriter","parameter": {"print": true, //原来是false,不打印,为了在控制台看到效果,修改为true"encoding": "UTF-8"}}}]}
}

二、效果

先看下不加transformer时候的运行效果
在这里插入图片描述

加上自定义的DateTransformer(主要实现时间类型字符串格式转换)配置后运行效果:
在这里插入图片描述


三、实现步骤

  1. 写一个类DateTransformer继承Transformer;
  2. 里面构造方法设置一个唯一标识符,用来代表本transformer;
  3. 重写evaluate()方法,根据自己实现情况定义paras参数个数及含义
  4. 将DateTransformer注册到TransformerRegistry中;

四、具体代码

编写DateTransformer类,写构造方法,写evaluate方法

package com.alibaba.datax.core.transport.transformer;import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER;
import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION;import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.transformer.Transformer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;/*** @author water* @desc 自己定义一个时间类型转换的类*/
public class DateTransformer extends Transformer {/*** 通过设置name,标注出唯一的标识符,代表被类*/public DateTransformer() {setTransformerName("dx_date");}/*** @param record Record* @param paras  Object  第一个参数是 colIndex。 第二个参数是原时间格式,第三个是目标时间格式* @return Record*/@Overridepublic Record evaluate(Record record, Object... paras) {int colIndex;String oldPattern;String newPattern;try {if (paras.length != 3) {throw new RuntimeException("dx_date paras must be 3");}colIndex = (Integer) paras[0];oldPattern = (String) paras[1];newPattern = (String) paras[2];} catch (Exception e) {throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,"paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());}Column column = record.getColumn(colIndex);try {String oriValue = column.asString();//如果字段为空,跳过处理if (oriValue == null) {return record;}SimpleDateFormat oldSdf = new SimpleDateFormat(oldPattern);Date date = oldSdf.parse(oriValue);SimpleDateFormat newSdf = new SimpleDateFormat(newPattern);String newValue = newSdf.format(date);record.setColumn(colIndex, new StringColumn(newValue));} catch (Exception e) {throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);}return record;}
}

将DateTransformer注册到TransformerRegistry中

TransformerRegistry类中static {/*** add native transformer* local storage and from server will be delay load.* 官方默认注册了 5 个方法,分别是截取字符串、填补、替换、过滤、groovy 代码段(后面会详细介绍)*/registTransformer(new SubstrTransformer());registTransformer(new PadTransformer());registTransformer(new ReplaceTransformer());registTransformer(new FilterTransformer());registTransformer(new GroovyTransformer());//将自己写的transformer注册进来registTransformer(new DateTransformer());}

so easy,哪里需要写哪里~


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖


http://chatgpt.dhexx.cn/article/yO44yVoV.shtml

相关文章

大数据项目之电商数仓DataX、DataX简介、DataX支持的数据源、DataX架构原理、DataX部署

文章目录 1. DataX简介1.1 DataX概述1.2 DataX支持的数据源 2. DataX架构原理2.1 DataX设计理念2.2 DataX框架设计2.3 DataX运行流程2.4 DataX调度决策思路2.5 DataX与Sqoop对比 3. DataX部署3.1 下载DataX安装包并上传到hadoop102的/opt/software3.2 解压datax.tar.gz到/opt/m…

datax安装+配置+使用文档

1 DataX离线同步工具DataX3.0介绍 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 Github…

datax(13):源码解读Column-datax中的数据类型

一、基类Column概述 Column是datax中所有数据类型的基类,里面有3个属性,以及一个构造方法,外加一个枚举类; public abstract class Column {private Type type;private Object rawData;private int byteSize;public Column(fina…

DataX使用说明

DataX使用说明 1.DataX介绍 DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 也就是数据库的数据同步工具,免费版没有web页面&#xff0…

Datax入门使用

DataX入门使用 一、简介 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。Datax将不同数据源…

DataX 的使用

一、DataX 的部署 1、上传 datax 压缩包并解压 tar -zxvf datax.tar.gz -C /usr/local/soft/ 2、自检,执行命令(在datax目录下) [rootmaster datax]# python ./bin/datax.py ./job/job.json 安装成功 二、DataX 的使用 MySQL写入MySQL …

DATAX快速上手非常详细

前言 博主在工作的过程中有一天公司决定将数据迁移的新的项目上去,当我发现数据库中的表大于有4000多张表的时我顿时懵了下,这数据迁移人力物力消耗的也太大了吧(看DataX的设计)。所以我们可以借助阿里云开源的DataX来解决这个问题。 看完这篇掌…

DataX及DataX-Web

大数据Hadoop之——数据同步工具DataX数据采集工具-DataX datax详细介绍及使用 一、概述 DataX 是阿里云DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、…

datax(4): datax.py解读

datax 直接使用py文件进行任务提交,今天读一读它 一、文件位置 原始文件位置在 xx/DataX/core/src/main/bin/下,datax项目打包后会将文件拷贝到 xx/DataX\target\datax\datax\bin 下。 core模块的pom.xml 指定‘拷贝’datax.py文件的方式maven-assembly…

DataX使用指南

简介 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 插件 的模式,目前已开源,代码托管在github。…

DataX

DataX的环境搭建以及简单测试 什么是DataX DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、 HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 (这是一个单机多任务的ETL工具&#xff0…

DataX 简介及架构原理

DataX 简介及架构原理 概述 DataX是阿里巴巴使用 Java 和 Python 开发的一个异构数据源离线同步工具 异构数据源:不同存储结构的数据源 致力于实现包括关系型数据库 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS…

DataX的使用与介绍(1)

一、什么是DataX? DataX是阿里云商用产品DataWorks数据集成的开源版本,它是一个异构数据源的离线数据同步工具/平台(ETL工具)。DataX实现了包括Mysql,Oracle、OceanBase、Sqlserver,Postgre、HDFS、Hive、…

DataX介绍

DataX 是阿里开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 DataX设计理念 DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头…

详解DataX及使用

DataX概述 简介 DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 支持数据源 DataX架构原理 设计理念 为了解决异构数据源同步问…

使用 DataX 实现数据同步(高效的同步工具)

DataX 使用介绍 前言一、DataX 简介1.DataX3.0 框架设计2.DataX3.0 核心架构 二、使用 DataX 实现数据同步1.Linux 上安装 DataX 软件2.DataX 基本使用3.安装 MySQL 数据库4.通过 DataX 实 MySQL 数据同步5.使用 DataX 进行增量同步 前言 我们公司有个项目的数据量高达五千万&…

Transpose函数的用法

Transpose函数的用法 在CNN机器学习中,经常要用到transpose函数对多维数组进行转置操作,下面是我对函数的理解过程。 1.二维数组的转换 二维数组中,原数组的第0轴的行,转换成新数组第1轴的列; 2.三维数组 三维数组较…

转置算子(transpose)的一种实现

transpose算子也叫做permute算子,根据白嫖有道英汉大词典的结果,他俩都是转置,改变排列顺序的意思。 算法逻辑是: 通过当前输出的一维偏移量(offset)计算输入矩阵对应的高维索引 然后根据参数pos重新排列输出索引,进…

论文笔记——TransPose

目录 摘要 一、前言 二、相关工作 2.1 人体姿态估计 2.2 可解释性 三、TransPose 3.1 网络结构 3.2 分辨率设置 3.3 attentions是定位关键点的依赖 四、实验 4.1 COCO实验数据对比 4.2 迁移到MPII数据对比 4.3 消融实验​编辑 4.4 量化分析 五、总结 摘要 虽然基…

numpy中的transpose函数使用

二维矩阵的transpose函数 : transpose()简单来说,就相当于数学中的转置,在矩阵中,转置就是把行与列相互调换位置; 例如:随机生成一个三行五列的二维矩阵: arr np.aran…