大数据技术之DataX (一)DataX插件开发

article/2025/10/13 2:54:12

文章目录

  • 一、背景
  • 二、基于java的本地测试datax
    • 2.1 github上下载datax的源代码
    • 2.2 datax代码导入idea
  • 三、docker安装南大通用数据库GBase和GBase 8a
    • 3.1 docker安装Gbase 8a
    • 3.2 docker安装Gbase 8s
  • 四、南大通用数据库GBase 8s To GBase 8a
    • 4.1 GBase 8s的reader读插件开发(writer同理)
  • 五、南大通用gbase可视化工具
    • 5.1 南大通用GBase 8a的连接配置
    • 5.2 南大通用GBase 8s的连接配置


一、背景

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。但是随着业务需求的增加,Datax自带的插件逐渐不满足,因此需要进行二次开发新的插件。基于此,本文对datax插件的二次开发步骤进行详细的说明,希望能帮助到大家。

二、基于java的本地测试datax

2.1 github上下载datax的源代码

网址: https://github.com/alibaba/DataX
在这里插入图片描述

2.2 datax代码导入idea

1.在此,把从Git上下载的datax代码,本地解压导入idea,如下图所示:
在这里插入图片描述
2. 基于java的本地测试

利用本地的maven进行编译datax源代码,打包命令如下:

 mvn -U clean package assembly:assembly -Dmaven.test.skip=true

等待打包完成…即生成target目录:
在这里插入图片描述
3. 利用核心类,Core模块下的Engine进行本地测试:
在这里插入图片描述
4 修改Engine.java代码,因为这里是datax的入口。
在这里插入图片描述首先设置系统变量,也就是datax的目录路径:

System.setProperty("datax.home","F:\\gx\\DataX\\target\\datax\\datax");

其次设置job路径和一些参数。

String[] datxArgs = {"-job", "F:\\gx\\DataX\\target\\datax\\datax\\job\\job.json", "-mode", "standalone", "-jobid", "-1"};

(datax目录路径和job路径用的是打包生成的target目录里的datax) 进行本地测试的时候,修改job.json即可。

测试: 直接点击main方法运行。
在这里插入图片描述

三、docker安装南大通用数据库GBase和GBase 8a

3.1 docker安装Gbase 8a

1 第一步 环境准备

确认机器上已经安装了Docker环境

2 第二步 在安装好docker的服务器上进行下载GBase 8a镜像

$ docker pull shihd/gbase8a:1.0 

3 第三步 创建并启动容器

$ docker run --name 容器名 -itd -p5258:5258 shihd/gbase8a:1.0

GBase8a数据库信息

DB: gbase
User: root
Password: root
Port: 5258

3.2 docker安装Gbase 8s

1.查找GBase 8s 镜像版本

[root@localhost ~]# docker search gbase8s

在这里插入图片描述
2. 拉取GBase 8s镜像

[root@localhost ~]# docker pull liaosnet/gbase8s:3.3.0_2_amd64

3 运行容器

docker run --name 容器名 -p 19088:9088 -itd liaosnet/gbase8s:3.3.0_2_amd64

4 安装成功
通过docker ps 命令查看安装是否成功。并且进入容器:docker exec -i -t 容器名 /bin/bash
进入容器,查看数据库状态:
在这里插入图片描述

在这里插入图片描述
在则查看gbase 8s的实例服务:
在这里插入图片描述

四、南大通用数据库GBase 8s To GBase 8a

4.1 GBase 8s的reader读插件开发(writer同理)

1 新建gbase8sreader模块
在这里插入图片描述
2 复制mysqlreader的assembly包和resource包下的plugin.json、plugin_job_template.json
把复制的放入gbase8sreader模块相应的位置。
在这里插入图片描述
3. 修改复制过来的文件。
在这里插入图片描述
package.xml

<assemblyxmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"><id></id><formats><format>dir</format></formats><includeBaseDirectory>false</includeBaseDirectory><fileSets><fileSet><directory>src/main/resources</directory><includes><include>plugin.json</include><include>plugin_job_template.json</include></includes><outputDirectory>plugin/reader/gbase8sreader</outputDirectory></fileSet><fileSet><directory>target/</directory><includes><include>gbase8sreader-0.0.1-SNAPSHOT.jar</include></includes><outputDirectory>plugin/reader/gbase8sreader</outputDirectory></fileSet></fileSets><dependencySets><dependencySet><useProjectArtifact>false</useProjectArtifact><outputDirectory>plugin/reader/gbase8sreader/libs</outputDirectory><scope>runtime</scope></dependencySet></dependencySets>
</assembly>

plugin.json

{"name": "gbase8sreader","class": "com.cetc.datax.plugin.reader.gbase8sreader.Gbase8sReader","description": "useScene: test. mechanism: use datax framework to transport data from txt file. warn: The more you know about the data, the less problems you encounter.","developer": "cetc"
}

注意:“class” 为我们后面需要编写的代码类名
plugin_job_template.json

{"name": "gbase8sreader","parameter": {"username": "","password": "","column": [],"connection": [{"jdbcUrl": [],"table": []}],"where": ""}
}

4 新建插件代码包
在这里插入图片描述
Gbase8sReader代码如下所示:

package com.cetc.datax.plugin.reader.gbase8sreader;import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import java.util.List;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;/*** @author : shujuelin* @date : 16:08 2021/10/11*/
public class Gbase8sReader extends Reader {private static final DataBaseType DATABASE_TYPE = DataBaseType.Gbase8s;public static class Job extends Reader.Job {private static final Logger LOG = LoggerFactory.getLogger(Job.class);private Configuration originalConfig = null;private CommonRdbmsReader.Job commonRdbmsReaderJob;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);if (userConfigedFetchSize != null) {LOG.warn("对 gbasereader 不需要配置 fetchSize, gbasereader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");}this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);this.commonRdbmsReaderJob.init(this.originalConfig);}@Overridepublic void preCheck(){init();this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE);}@Overridepublic List<Configuration> split(int adviceNumber) {return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);}@Overridepublic void post() {this.commonRdbmsReaderJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderJob.destroy(this.originalConfig);}}public static class Task extends Reader.Task {private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderTask;@Overridepublic void init() {this.readerSliceConfig = super.getPluginJobConf();this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId());this.commonRdbmsReaderTask.init(this.readerSliceConfig);}@Overridepublic void startRead(RecordSender recordSender) {int fetchSize = 1000; //this.readerSliceConfig.getInt(Constant.FETCH_SIZE);this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,super.getTaskPluginCollector(), fetchSize);}@Overridepublic void post() {this.commonRdbmsReaderTask.post(this.readerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);}}
}

注意

代码里的:private static final DataBaseType DATABASE_TYPE = DataBaseType.Gbase8s;
需要在
在这里插入图片描述
进行修改,增加自己的数据源类型。

  Gbase("gbase","com.gbase.jdbc.Driver"),Gbase8s("gbase8s","com.gbasedbt.jdbc.Driver");

在DataBaseType类中针对reader的数据源,要修改appendJDBCSuffixForReader方法:
在这里插入图片描述
同理: 编写writer插件时候也需要在相应的方法里appendJDBCSuffixForWriter进行修改。

5 打包运行
将其他模块注释只留下公共模块和自己的项目模块。最外层的pom
在这里插入图片描述
在最外层的package.xml加上下面这个

<fileSet><directory>gbase8sreader/target/datax/</directory><includes><include>**/*.*</include></includes><outputDirectory>datax</outputDirectory></fileSet>

五、南大通用gbase可视化工具

本文使用的客户端连接工具为:
在这里插入图片描述

5.1 南大通用GBase 8a的连接配置

在这里插入图片描述

5.2 南大通用GBase 8s的连接配置

在这里插入图片描述


http://chatgpt.dhexx.cn/article/xW5LYq3u.shtml

相关文章

【工具】之DataX-Web简单介绍

目录 一、概念 二、架构 三、功能 四、使用说明 1、执行器配置 2、创建项目 3、创建数据源&#xff08;源库和目标库&#xff09; 4、创建任务模版 5、构建JSON脚本 正常流程&#xff08;单库单表&#xff09; 1.构建reader 2.构建writer 3.字段映射 4.构建 批量创建…

datax(21):编写自己的Transformer

前面2篇文章&#xff0c;已经看完学习完transform的内容&#xff0c;今天继续编写一个自己的transformer&#xff1b; 一、环境 win10DataX 3.0(从我的datax分支打包而来)job.json使用datax的样例json&#xff0c;源文件在xxx\DataX\core\src\main\job\中&#xff0c;打包编译…

大数据项目之电商数仓DataX、DataX简介、DataX支持的数据源、DataX架构原理、DataX部署

文章目录 1. DataX简介1.1 DataX概述1.2 DataX支持的数据源 2. DataX架构原理2.1 DataX设计理念2.2 DataX框架设计2.3 DataX运行流程2.4 DataX调度决策思路2.5 DataX与Sqoop对比 3. DataX部署3.1 下载DataX安装包并上传到hadoop102的/opt/software3.2 解压datax.tar.gz到/opt/m…

datax安装+配置+使用文档

1 DataX离线同步工具DataX3.0介绍 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 Github…

datax(13):源码解读Column-datax中的数据类型

一、基类Column概述 Column是datax中所有数据类型的基类&#xff0c;里面有3个属性&#xff0c;以及一个构造方法&#xff0c;外加一个枚举类&#xff1b; public abstract class Column {private Type type;private Object rawData;private int byteSize;public Column(fina…

DataX使用说明

DataX使用说明 1.DataX介绍 DataX 是一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 也就是数据库的数据同步工具&#xff0c;免费版没有web页面&#xff0…

Datax入门使用

DataX入门使用 一、简介 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。Datax将不同数据源…

DataX 的使用

一、DataX 的部署 1、上传 datax 压缩包并解压 tar -zxvf datax.tar.gz -C /usr/local/soft/ 2、自检&#xff0c;执行命令&#xff08;在datax目录下&#xff09; [rootmaster datax]# python ./bin/datax.py ./job/job.json 安装成功 二、DataX 的使用 MySQL写入MySQL …

DATAX快速上手非常详细

前言 博主在工作的过程中有一天公司决定将数据迁移的新的项目上去&#xff0c;当我发现数据库中的表大于有4000多张表的时我顿时懵了下&#xff0c;这数据迁移人力物力消耗的也太大了吧(看DataX的设计)。所以我们可以借助阿里云开源的DataX来解决这个问题。 看完这篇掌…

DataX及DataX-Web

大数据Hadoop之——数据同步工具DataX数据采集工具-DataX datax详细介绍及使用 一、概述 DataX 是阿里云DataWorks数据集成的开源版本&#xff0c;在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、…

datax(4): datax.py解读

datax 直接使用py文件进行任务提交&#xff0c;今天读一读它 一、文件位置 原始文件位置在 xx/DataX/core/src/main/bin/下&#xff0c;datax项目打包后会将文件拷贝到 xx/DataX\target\datax\datax\bin 下。 core模块的pom.xml 指定‘拷贝’datax.py文件的方式maven-assembly…

DataX使用指南

简介 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 插件 的模式&#xff0c;目前已开源&#xff0c;代码托管在github。…

DataX

DataX的环境搭建以及简单测试 什么是DataX DataX 是一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、 HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 &#xff08;这是一个单机多任务的ETL工具&#xff0…

DataX 简介及架构原理

DataX 简介及架构原理 概述 DataX是阿里巴巴使用 Java 和 Python 开发的一个异构数据源离线同步工具 异构数据源&#xff1a;不同存储结构的数据源 致力于实现包括关系型数据库 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS…

DataX的使用与介绍(1)

一、什么是DataX&#xff1f; DataX是阿里云商用产品DataWorks数据集成的开源版本&#xff0c;它是一个异构数据源的离线数据同步工具/平台&#xff08;ETL工具&#xff09;。DataX实现了包括Mysql&#xff0c;Oracle、OceanBase、Sqlserver&#xff0c;Postgre、HDFS、Hive、…

DataX介绍

DataX 是阿里开源的一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 DataX设计理念 DataX本身作为数据同步框架&#xff0c;将不同数据源的同步抽象为从源头…

详解DataX及使用

DataX概述 简介 DataX 是阿里巴巴开源的一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 支持数据源 DataX架构原理 设计理念 为了解决异构数据源同步问…

使用 DataX 实现数据同步(高效的同步工具)

DataX 使用介绍 前言一、DataX 简介1.DataX3.0 框架设计2.DataX3.0 核心架构 二、使用 DataX 实现数据同步1.Linux 上安装 DataX 软件2.DataX 基本使用3.安装 MySQL 数据库4.通过 DataX 实 MySQL 数据同步5.使用 DataX 进行增量同步 前言 我们公司有个项目的数据量高达五千万&…

Transpose函数的用法

Transpose函数的用法 在CNN机器学习中&#xff0c;经常要用到transpose函数对多维数组进行转置操作&#xff0c;下面是我对函数的理解过程。 1.二维数组的转换 二维数组中&#xff0c;原数组的第0轴的行&#xff0c;转换成新数组第1轴的列&#xff1b; 2.三维数组 三维数组较…

转置算子(transpose)的一种实现

transpose算子也叫做permute算子&#xff0c;根据白嫖有道英汉大词典的结果&#xff0c;他俩都是转置&#xff0c;改变排列顺序的意思。 算法逻辑是&#xff1a; 通过当前输出的一维偏移量(offset)计算输入矩阵对应的高维索引 然后根据参数pos重新排列输出索引&#xff0c;进…