详解DataX及使用

article/2025/10/13 5:53:44

DataX概述

简介

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

支持数据源

在这里插入图片描述

DataX架构原理

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
在这里插入图片描述

框架设计

DataX本身作为离线数据同步框架,采用Framework + Plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
在这里插入图片描述

  • Reader:数据采集模块,负责采集数据源的数据,将数据发给FrameWork。
  • Writer:数据写入模块,负责不断的向FrameWork取数据,并将数据写入到目的端。
  • FrameWork:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓存,流控,并发,数据转换等核心问题。

运行流程

用一个DataX作业生命周期的时序图说明DataX的运行流程、核心概念以及每个概念之间的关系。
在这里插入图片描述

  • Job:单个作业同步的作业,称为一个Job,一个Job启动一个进程。
  • Task:根据不同数据源切分策略,一个Job会切分为多个Task,Task是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
  • TaskGroup:Scheduler调度模块对Task进行分组,每个Task组叫做一个Task Group,每个Task Group负责以一定的并发度运行其分得得Task,单个Task Group的并发度为5。
  • Reader->Channel->Writer:每个Task启动后,都会固定启动Reader->Channel->Writer的线程来完成同步工作。

调度决策思路

举个栗子
用户提交了一个DataX作业,并且配置了总的并发度为20,目的是对一个有100张分表的mysql数据源进行同步。DataX的调度决策思路是:

  1. DataX Job根据分库分表切分策略,将同步工作分成100个Task。
  2. 根据配置的总的并发度20,以及每个Task Group的并发度5,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分100个Task,每一个TaskGroup负责运行25个Task。

DataX与Sqoop对比
在这里插入图片描述

DataX部署

1)下载DataX安装包并上传到hadoop102的/opt/software
下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2)解压datax.tar.gz到/opt/module tar -zxvf datax.tar.gz -C /opt/module\
3)自检,执行如下命令python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
出现如下内容,则表明安装成功

……
2021-10-12 21:51:12.335 [job-0] INFO JobContainer -
任务启动时刻 : 2021-10-12 21:51:02
任务结束时刻 : 2021-10-12 21:51:12
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0

DataX上手

使用概述

  • 任务提交命令:用户需要根据同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行命令提交数据同步任务即可。python bin/datax.py path/to/your/job.json

配置文件格式

查看DataX配置文件模板 python bin/datax.py -r mysqlreader -w hdfswriter

json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
在这里插入图片描述
Reader和Writer的具体参数参考官方文档
https://github.com/alibaba/DataX/blob/master/README.md
https://gitee.com/mirrors/DataX/blob/master/README.md
在这里插入图片描述

同步MySQL数据到HDFS

使用一个栗子来完成同步MySQL数据->HDFS的应用
要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录
分析:需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。

MySQLReader之TableMode
1)编写配置文件
(1)创建配置文件base_province.json
vim /opt/module/datax/job/base_province.json
(2)配置文件内容如下

  {"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"where": "id>=3","connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"table": ["base_province"]}],"password": "xxxxxx","splitPk": "","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

2)配置文件说明
(1)Reader参数说明
在这里插入图片描述
(2)Writer参数说明
在这里插入图片描述
注意事项:

HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(‘’),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。

解决方案:
一是修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,参考https://blog.csdn.net/u010834071/article/details/105506580。
二是在Hive中建表时指定null值存储格式为空字符串(‘’),例如:

DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(`id`         STRING COMMENT '编号',`name`       STRING COMMENT '省份名称',`region_id`  STRING COMMENT '地区ID',`area_code`  STRING COMMENT '地区编码',`iso_code`   STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/base_province/';

(3)Setting参数说明
在这里插入图片描述
3)提交任务
(1)在HDFS创建/base_province目录
使用DataX向HDFS同步数据时,需确保目标路径已存在
(2)进入DataX根目录
(3)执行命令python bin/datax.py job/base_province.json
4)查看结果
(1)DataX打印日志

(2)查看HDFS文件

MySQLReader之QuerySQLMode

1)编写配置文件
(1)修改配置文件base_province.json
(2)配置文件内容

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "xxxxxx","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

2)配置文件说明
(1)Reader参数说明
在这里插入图片描述
3)提交任务
(1)清空历史数据
(2)进入DataX根目录
(3)执行命令 python bin/datax.py job/base_province.json
4)查看结果
(1)DataX打印日志
(2)查看HDFS文件

DataX传参
通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数
举个栗子:
1)编写配置文件
(1)修改配置文件base_province.json
(2)配置文件内容

    "job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "xxxxxx","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province/${dt}","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

2)提交任务
(1)创建目标路径
(2)进入DataX根目录
(3)执行命令python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json
3)查看结果

同步HDFS数据到MySQL

要求:同步HDFS上的/base_province目录下的数据到MySQL gmall 数据库下的test_province表。
分析:要实现该功能,需选用HDFSReader和MySQLWriter。
1)编写配置文件
(1)创建配置文件test_province.json
(2)配置文件内容如下

{"job": {"content": [{"reader": {"name": "hdfsreader","parameter": {"defaultFS": "hdfs://hadoop102:8020","path": "/base_province","column": ["*"],"fileType": "text","compress": "gzip","encoding": "UTF-8","nullFormat": "\\N","fieldDelimiter": "\t",}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "xxxxxx","connection": [{"table": ["test_province"],"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"}],"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"writeMode": "replace"}}}],"setting": {"speed": {"channel": 1}}}
}

2)配置文件说明
(1)Reader参数说明
在这里插入图片描述
(2)Writer参数说明
在这里插入图片描述
3)提交任务
(1)在MySQL中创建gmall.test_province表

DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province`  (`id` bigint(20) NOT NULL,`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

(2)进入DataX根目录
(3)执行命令python bin/datax.py job/test_province.json
4)查看结果
(1)DataX打印日志
(2)查看MySQL目标表数据

DataX优化

速度控制

DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在数据库可以承受的范围内达到最佳的同步速度。
关键优化参数:
在这里插入图片描述
注意事项:

1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:min(总byte限速/单个channle的byte限速,总record限速/单个channel的record限速)

内存调整

当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。
打个比方来说:Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

欢迎有问题留言或关注公众号。
在这里插入图片描述


http://chatgpt.dhexx.cn/article/X77PxAsS.shtml

相关文章

使用 DataX 实现数据同步(高效的同步工具)

DataX 使用介绍 前言一、DataX 简介1.DataX3.0 框架设计2.DataX3.0 核心架构 二、使用 DataX 实现数据同步1.Linux 上安装 DataX 软件2.DataX 基本使用3.安装 MySQL 数据库4.通过 DataX 实 MySQL 数据同步5.使用 DataX 进行增量同步 前言 我们公司有个项目的数据量高达五千万&…

Transpose函数的用法

Transpose函数的用法 在CNN机器学习中,经常要用到transpose函数对多维数组进行转置操作,下面是我对函数的理解过程。 1.二维数组的转换 二维数组中,原数组的第0轴的行,转换成新数组第1轴的列; 2.三维数组 三维数组较…

转置算子(transpose)的一种实现

transpose算子也叫做permute算子,根据白嫖有道英汉大词典的结果,他俩都是转置,改变排列顺序的意思。 算法逻辑是: 通过当前输出的一维偏移量(offset)计算输入矩阵对应的高维索引 然后根据参数pos重新排列输出索引,进…

论文笔记——TransPose

目录 摘要 一、前言 二、相关工作 2.1 人体姿态估计 2.2 可解释性 三、TransPose 3.1 网络结构 3.2 分辨率设置 3.3 attentions是定位关键点的依赖 四、实验 4.1 COCO实验数据对比 4.2 迁移到MPII数据对比 4.3 消融实验​编辑 4.4 量化分析 五、总结 摘要 虽然基…

numpy中的transpose函数使用

二维矩阵的transpose函数 : transpose()简单来说,就相当于数学中的转置,在矩阵中,转置就是把行与列相互调换位置; 例如:随机生成一个三行五列的二维矩阵: arr np.aran…

transpose()函数的理解

图1 输入如图1所示语句,输出如下: 图2 由以上两图说明transpose()函数的作用: 假设shape(z,x,y),在RGB图像中可以理解为z代表通道数,x代表图像的第几行,y代表图像的第几列,x和y组合而成所代表的像素构成…

详解Python的transpose函数

数组转置和换轴 import numpy as np >>> arr np.arange(16).reshape((2,2,4)) array([[[ 0, 1, 2, 3],[ 4, 5, 6, 7]],[[ 8, 9, 10, 11],[12, 13, 14, 15]]])>>> arr.transpose((1, 0, 2)) array([[[ 0, 1, 2, 3],[ 8, 9, 10, 11]],[[ 4, 5, …

np.transpose

最近看代码的时候,老是出现np.transpose()这个用法,但是对其中的原理还是不甚了解,今天就来总结一下,以及这个用法对图像的结果及效果。 参数 a:输入数组 axis: int类型的列表,这个参数是可选的。默认情况下&#xff…

np.transpose()函数详解

1. 碰见 numpy.transpose 用于高维数组时挺让人费解,通过分析和代码验证,发现 transpose 用法还是很简单的。说白了就是映射坐标轴 2. 举个例子: x np.arange(12).reshape((2,3,2))创建一个2 * 3 * 2的数组: 使用 numpy.trans…

【Python学习】transpose函数

shape:(batch_size * x * y ) 有batch_size个二维矩阵(x * y)相当于(z * x * y) 1. 多维数组的索引 import numpy as np # 创建 x np.arange(12).reshape((2,2,3)) print(x)# 得到三维数组 [[[ 0 1 2][ 3 4 5]][[ 6 7 8][ 9 10 11]]] # 相当于 b…

最简单例子解释python的transpose函数

目录 一,我们要弄清楚transpose的轴是什么意思?二,(x,y,z)的物理含义:三,transpose变换的例子四,代码验证五,关于经过了transpose变换之后,这个三维数组的形状是如何变化确定的? 二维…

转置(transpose)的理解

目录 1 .T,适用于一、二维数组 arr.T #求转置 transpose 的原理其实是根据维度(shape)索引决定的,举个栗子: 2. 高维数组 3. swapaxes 转置可以对数组进行重置,返回的是源数据的视图(不会进行任何复制…

Python numpy.transpose 详解

前言 看Python代码时,碰见 numpy.transpose 用于高维数组时挺让人费解,通过一番画图分析和代码验证,发现 transpose 函数的使用方法还是很简单的。 注:评论中说的三维坐标图中的 0 1 2 3 标反了,已经修正&#xff0c…

2020年最新可用的谷歌镜像站

g.vovososo.com 谷歌镜像入口 不用翻,墙,就可实现访问谷歌搜索 ,也可以通过扫描以下二维码下载APP进行访问

谷歌搜索镜像

分享自己收藏的谷歌搜索镜像 可自由在谷歌查询信息 http://ac.scmor.com/ 转载于:https://www.cnblogs.com/aeip/p/9506344.html

谷歌引擎镜像网址

谷歌引擎镜像网址 当前可用的网址 链接: https://gfsoso.xz95.top/ 若是显示我们的系统检测到您的计算机网络中存在异常流量,可重新搜索或者选择图片进行搜索,然后在点开全部搜索(有时候可能会崩吧,我个人感觉用图片搜索再点全…

谷歌镜像地址分享

谷歌镜像地址分享 谷歌镜像地址分享谷歌应用商店参考文档 谷歌镜像地址分享 使用搜索引擎来查找所需要的资料是一件很普遍的事,而搜索引擎主要有百度、必应、雅虎和谷歌等等。 每个搜索引擎各有利弊,而一般大家公认的是谷歌搜索引擎搜索准确率更高一些&…

分享27个谷歌(Google)镜像

为什么80%的码农都做不了架构师?>>> 多年来搜集的谷歌镜像,大多数都能打开,打不开的也许过一段上一段时间能打开。真诚希望这些神奇的搜索引擎,能助你在前端界有所建树。有空关注下面前端相关公众账号jsdig(及时挖掘&…

谷歌学术镜像_Google镜像站

在国外留学要想毕业顺利, 论文可是其中很重要的一环. 相比于国内, 英国对于论文的要求更为严格, 要有一定的格式和学术要求. 除此之外, 还要有理有据, 要用高质量的引文作为对自己观点的论证. 这时就要查文献了. 要说查文献, 谷歌学术绝对是最佳帮手, 免费还操作简单. 当然全面…

国内最新可用Google谷歌镜像网站入口网站网址

很多同学都需要使用谷歌搜索来查找一些英文的学习资料,但是由于某些原因国内无法使用。办公人导航就和大家分享另外一种在国内可以使用谷歌搜索的方法那就使用使用谷歌镜像网站。 Google谷歌镜像网站可以帮助我们在国内访问和使用谷歌搜索,但这并不是真…