前面2篇文章,已经看完+学习完transform的内容,今天继续编写一个自己的transformer;
一、环境
- win10
- DataX 3.0(从我的datax分支打包而来)
- job.json使用datax的样例json,源文件在xxx\DataX\core\src\main\job\中,打包编译后在xxx\DataX\target\datax\datax\job下。本文略做修改,主要修改2出,是否打印和记录行数
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"percentage": 0.02}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 59880504, "type": "long"},{"value": "2010-05-04 00:00:00","type": "string" // 原来是date},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1 // 原来是100000,修改为1行,看到效果就行}},"writer": {"name": "streamwriter","parameter": {"print": true, //原来是false,不打印,为了在控制台看到效果,修改为true"encoding": "UTF-8"}}}]}
}
二、效果
先看下不加transformer时候的运行效果
加上自定义的DateTransformer(主要实现时间类型字符串格式转换)
配置后运行效果:
三、实现步骤
- 写一个类DateTransformer继承Transformer;
- 里面构造方法设置一个唯一标识符,用来代表本transformer;
- 重写evaluate()方法,根据自己实现情况定义paras参数个数及含义
- 将DateTransformer注册到TransformerRegistry中;
四、具体代码
编写DateTransformer类,写构造方法,写evaluate方法
package com.alibaba.datax.core.transport.transformer;import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER;
import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION;import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.transformer.Transformer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;/*** @author water* @desc 自己定义一个时间类型转换的类*/
public class DateTransformer extends Transformer {/*** 通过设置name,标注出唯一的标识符,代表被类*/public DateTransformer() {setTransformerName("dx_date");}/*** @param record Record* @param paras Object 第一个参数是 colIndex。 第二个参数是原时间格式,第三个是目标时间格式* @return Record*/@Overridepublic Record evaluate(Record record, Object... paras) {int colIndex;String oldPattern;String newPattern;try {if (paras.length != 3) {throw new RuntimeException("dx_date paras must be 3");}colIndex = (Integer) paras[0];oldPattern = (String) paras[1];newPattern = (String) paras[2];} catch (Exception e) {throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,"paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());}Column column = record.getColumn(colIndex);try {String oriValue = column.asString();//如果字段为空,跳过处理if (oriValue == null) {return record;}SimpleDateFormat oldSdf = new SimpleDateFormat(oldPattern);Date date = oldSdf.parse(oriValue);SimpleDateFormat newSdf = new SimpleDateFormat(newPattern);String newValue = newSdf.format(date);record.setColumn(colIndex, new StringColumn(newValue));} catch (Exception e) {throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);}return record;}
}
将DateTransformer注册到TransformerRegistry中
TransformerRegistry类中static {/*** add native transformer* local storage and from server will be delay load.* 官方默认注册了 5 个方法,分别是截取字符串、填补、替换、过滤、groovy 代码段(后面会详细介绍)*/registTransformer(new SubstrTransformer());registTransformer(new PadTransformer());registTransformer(new ReplaceTransformer());registTransformer(new FilterTransformer());registTransformer(new GroovyTransformer());//将自己写的transformer注册进来registTransformer(new DateTransformer());}
so easy,哪里需要写哪里~
注:
-
对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;
-
所有代码都已经上传到github(master分支和dev),可以免费白嫖