DataX的使用与介绍(1)

article/2025/10/13 5:59:57

一、什么是DataX?

DataX是阿里云商用产品DataWorks数据集成的开源版本,它是一个异构数据源的离线数据同步工具/平台(ETL工具)。DataX实现了包括Mysql,Oracle、OceanBase、Sqlserver,Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS等各种异构数据源之间高效的数据同步功能。
Tips:异构即不同类型的应用或者数据源,例如Mysql/Oracle/DB2/MongDB等不同类型的数据源;
Tips:离线数据同步常用Sqoop以及DataX工具。
Tips:ETL(Extract-Transform-Load)工具描述将从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据,其常用在数据仓库,但其对象并不限于数据仓库(DW)。
Github仓库:https://github.com/alibaba/Datax
Gitee国内仓库:https://gitee.com/mirrors/DataX

二.DataX的设计思想

描述:为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。简单的说DataX就像中间商一样为每一个服务对象进行需求供应。
在这里插入图片描述

三.DataX的框架设计

描述:DataX本身作为离线数据同步框架,离线(批量)的数据通道通过定义数据源和去向的数据源和数据集,提供一套抽象化的数据抽取插件(Reader)、数据写入插件(Writer),并基于此框架设计一套简化版的中间数据传输格式,从而实现任意结构化、半结构化数据源之间数据传输。
DataX架构设计流程类似source(数据来源)->channel(数据存储池中转通道)->sink
Reader:数据采集模块,负责采集数据源的数据,将数据发送给FrameWork.
Writer:数据写入模块,负责不断向FrameWork取数据,并将数据写入到目的端。
FrameWork:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
Reader、Writer与Transformer
在这里插入图片描述
DataX可支持任意数据源到数据源,只要实现了Reader/Writer Plugin,官方已经实现了主流的数据源插件,比如Mysql,Oracle,Sqlserver等
例如:Mysql的数据离线同步到HDFS之中来展示DataX的框架设计结构
在这里插入图片描述

四.DataX调度流程

1.任务切分

计算并发量

并发量,及所需通道数量(needChannelNumber)。
按表数量
配置了每个通道处理1张表,那么同时处理100张表,就需要100个通道;
按记录数
配置了每个通道处理1000条记录,那么同时处理100000条记录数,就需要10个通道

任务切分

将多个job切分成多个Task:根据所需通道数量将job切分成多个Task

五.安装部署方式

直接下载DataX工具包:Datax下载地址
下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

[root@vagary software]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

可以看一下datax的目录组成

$  # tree -d -L 2
├── bin   # 可执行的Python脚本
├── conf  # Datax 配置文件
├── job   # 离线同步任务
├── lib   # 依赖库
├── log   # 任务执行过程日志
├── log_perf
├── plugin # 各类数据库读写插件
│   ├── reader
│   └── writer
├── script # 脚本存放
└── tmp    # 临时目录 

运行一个实例,检验是否成功安装

[vagary@vagary job]$ datax.py job.json

出现这个界面就成功了


2022-04-24 18:13:16.329 [job-0] INFO  JobContainer - PerfTrace not enable!
2022-04-24 18:13:16.329 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.027s |  All Task WaitReaderTime 0.041s | Percentage 100.00%
2022-04-24 18:13:16.330 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2022-04-24 18:13:06
任务结束时刻                    : 2022-04-24 18:13:16
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

六.DataX任务开发

1.生成导入脚本:

datax.py -r mysqlreader -w hdfswriter

2.生成MySQL -> HFDS 脚本:

{"job": {"content": [{"reader": {                        "name": "mysqlreader",        --name:reader名"parameter": {          "column": [],             -- 字段名"connection": [  {"jdbcUrl": [],    --对数据库的JDBC连接信息"table": []       --表名【"querySql":[]】 -- 支持sql语句,配置它                                                    --后,mysqlreader直接忽略                                                  --table、column、where}], "password": "",        --密码"username": "",        --用户名"where": ""            --筛选条件【"splitPk": ""】       --数据分片字段,一般是主键,                                                --仅支持整型}}, "writer": {"name": "hdfswriter",       --name:writer名"parameter": {   "column": [],     --column:写入数据的字段,其                                               --中name指定字段名,type指定类型"compress": "",   --hdfs文件压缩类型,默认未空"defaultFS": "",  --namenode节点地址"fieldDelimiter": "", --分隔符"fileName": "",       --写入文件名"fileType": "",     --文件的类型,目前只支持用户配                                             --置为"text""orc""path": "",      --hdfs文件系统的路径信息"writeMode": ""  -- hdfswriter写入前数据清理处理模式:
--(1)append:写入前不做任何处理,hdfswriter直接使用filename写入,会重复数据
--(2)nonConflict:如果目录下有fileName前缀的文件,直接报错。}}}], "setting": {"speed": {"channel": ""  --并发数限速(根据自己CPU合理控制并发数)-- channle=2,使用streamwriter打印的时候会打印两遍内容。-- 但是如果是别的writer,比如hdfswriter,mysqlwriter,它只是                 -- 启动两个并发任务,内容只有一次,不会重复两遍。}}}
}

3.编写配置文件(字段模式)

{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["bus_id","bus_name","telephone","pic_url","bus_user_name","province","city","dist","addr","zipcode","is_open","open_time","closed_time"], "connection": [{"jdbcUrl": ["jdbc:mysql://hadoop04:3306/prod"], "table": ["business_info"]}], "password": "shumeng_123", "username": "shumeng", "where": "1=1"}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name":"bus_id","type":"string"},{"name":"bus_name","type":"string"},{"name":"telephone","type":"string"},{"name":"pic_url","type":"string"},                    	{"name":"bus_user_name","type":"string" },{"name":"province","type":"string"},{"name":"city","type":"string"},{"name":"dist","type":"string"},                    	{"name":"addr","type":"string" },{"name":"zipcode","type":"string"},{"name":"is_open","type":"string"},                    	{"name":"open_time","type":"string" }, {"name":"closed_time","type":"string" }], "compress": "snappy", "defaultFS": "hdfs://hadoop04:8020", "fieldDelimiter": ",", "fileName": "ods_business_info_full.txt", "fileType": "orc", "path": "/user/hive/warehouse/ods.db/ods_business_info_full/pt=2022-01-01", "writeMode": "append"}}}], "setting": {"speed": {"channel": "1"}}}
}

4.编写配置文件(SQL模式)

{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop04:3306/prod"], "querySql":["select bus_id,bus_name,telephone,pic_url,bus_user_name,province,city,dist,addr,zipcode,is_open,open_time,close_time from prod.business_info "]}], "password": "shumeng_123", "username": "shumeng", "where": "1=1"}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name":"bus_id","type":"string"},{"name":"bus_name","type":"string"},{"name":"telephone","type":"string"},{"name":"pic_url","type":"string"},                    	{"name":"bus_user_name","type":"string" },{"name":"province","type":"string"},{"name":"city","type":"string"},{"name":"dist","type":"string"},                    	{"name":"addr","type":"string" },{"name":"zipcode","type":"string"},{"name":"is_open","type":"string"},                    	{"name":"open_time","type":"string" }, {"name":"closed_time","type":"string" }], "compress": "snappy", "defaultFS": "hdfs://hadoop04:8020", "fieldDelimiter": ",", "fileName": "ods_business_info_full_sql.txt", "fileType": "orc", "path": "/user/hive/warehouse/ods.db/ods_business_info_full/pt=2022-01-01", "writeMode": "append"}}}], "setting": {"speed": {"channel": "1"}}}
}

PS:DataX的HDFS writer会将null值存储为空字符串’’ ,Hive默认null格式为\N。

alter table ods_business_info_full set serdeproperties('serialization.null.format' = '');

http://chatgpt.dhexx.cn/article/nCLNgFqa.shtml

相关文章

DataX介绍

DataX 是阿里开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 DataX设计理念 DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头…

详解DataX及使用

DataX概述 简介 DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 支持数据源 DataX架构原理 设计理念 为了解决异构数据源同步问…

使用 DataX 实现数据同步(高效的同步工具)

DataX 使用介绍 前言一、DataX 简介1.DataX3.0 框架设计2.DataX3.0 核心架构 二、使用 DataX 实现数据同步1.Linux 上安装 DataX 软件2.DataX 基本使用3.安装 MySQL 数据库4.通过 DataX 实 MySQL 数据同步5.使用 DataX 进行增量同步 前言 我们公司有个项目的数据量高达五千万&…

Transpose函数的用法

Transpose函数的用法 在CNN机器学习中,经常要用到transpose函数对多维数组进行转置操作,下面是我对函数的理解过程。 1.二维数组的转换 二维数组中,原数组的第0轴的行,转换成新数组第1轴的列; 2.三维数组 三维数组较…

转置算子(transpose)的一种实现

transpose算子也叫做permute算子,根据白嫖有道英汉大词典的结果,他俩都是转置,改变排列顺序的意思。 算法逻辑是: 通过当前输出的一维偏移量(offset)计算输入矩阵对应的高维索引 然后根据参数pos重新排列输出索引,进…

论文笔记——TransPose

目录 摘要 一、前言 二、相关工作 2.1 人体姿态估计 2.2 可解释性 三、TransPose 3.1 网络结构 3.2 分辨率设置 3.3 attentions是定位关键点的依赖 四、实验 4.1 COCO实验数据对比 4.2 迁移到MPII数据对比 4.3 消融实验​编辑 4.4 量化分析 五、总结 摘要 虽然基…

numpy中的transpose函数使用

二维矩阵的transpose函数 : transpose()简单来说,就相当于数学中的转置,在矩阵中,转置就是把行与列相互调换位置; 例如:随机生成一个三行五列的二维矩阵: arr np.aran…

transpose()函数的理解

图1 输入如图1所示语句,输出如下: 图2 由以上两图说明transpose()函数的作用: 假设shape(z,x,y),在RGB图像中可以理解为z代表通道数,x代表图像的第几行,y代表图像的第几列,x和y组合而成所代表的像素构成…

详解Python的transpose函数

数组转置和换轴 import numpy as np >>> arr np.arange(16).reshape((2,2,4)) array([[[ 0, 1, 2, 3],[ 4, 5, 6, 7]],[[ 8, 9, 10, 11],[12, 13, 14, 15]]])>>> arr.transpose((1, 0, 2)) array([[[ 0, 1, 2, 3],[ 8, 9, 10, 11]],[[ 4, 5, …

np.transpose

最近看代码的时候,老是出现np.transpose()这个用法,但是对其中的原理还是不甚了解,今天就来总结一下,以及这个用法对图像的结果及效果。 参数 a:输入数组 axis: int类型的列表,这个参数是可选的。默认情况下&#xff…

np.transpose()函数详解

1. 碰见 numpy.transpose 用于高维数组时挺让人费解,通过分析和代码验证,发现 transpose 用法还是很简单的。说白了就是映射坐标轴 2. 举个例子: x np.arange(12).reshape((2,3,2))创建一个2 * 3 * 2的数组: 使用 numpy.trans…

【Python学习】transpose函数

shape:(batch_size * x * y ) 有batch_size个二维矩阵(x * y)相当于(z * x * y) 1. 多维数组的索引 import numpy as np # 创建 x np.arange(12).reshape((2,2,3)) print(x)# 得到三维数组 [[[ 0 1 2][ 3 4 5]][[ 6 7 8][ 9 10 11]]] # 相当于 b…

最简单例子解释python的transpose函数

目录 一,我们要弄清楚transpose的轴是什么意思?二,(x,y,z)的物理含义:三,transpose变换的例子四,代码验证五,关于经过了transpose变换之后,这个三维数组的形状是如何变化确定的? 二维…

转置(transpose)的理解

目录 1 .T,适用于一、二维数组 arr.T #求转置 transpose 的原理其实是根据维度(shape)索引决定的,举个栗子: 2. 高维数组 3. swapaxes 转置可以对数组进行重置,返回的是源数据的视图(不会进行任何复制…

Python numpy.transpose 详解

前言 看Python代码时,碰见 numpy.transpose 用于高维数组时挺让人费解,通过一番画图分析和代码验证,发现 transpose 函数的使用方法还是很简单的。 注:评论中说的三维坐标图中的 0 1 2 3 标反了,已经修正&#xff0c…

2020年最新可用的谷歌镜像站

g.vovososo.com 谷歌镜像入口 不用翻,墙,就可实现访问谷歌搜索 ,也可以通过扫描以下二维码下载APP进行访问

谷歌搜索镜像

分享自己收藏的谷歌搜索镜像 可自由在谷歌查询信息 http://ac.scmor.com/ 转载于:https://www.cnblogs.com/aeip/p/9506344.html

谷歌引擎镜像网址

谷歌引擎镜像网址 当前可用的网址 链接: https://gfsoso.xz95.top/ 若是显示我们的系统检测到您的计算机网络中存在异常流量,可重新搜索或者选择图片进行搜索,然后在点开全部搜索(有时候可能会崩吧,我个人感觉用图片搜索再点全…

谷歌镜像地址分享

谷歌镜像地址分享 谷歌镜像地址分享谷歌应用商店参考文档 谷歌镜像地址分享 使用搜索引擎来查找所需要的资料是一件很普遍的事,而搜索引擎主要有百度、必应、雅虎和谷歌等等。 每个搜索引擎各有利弊,而一般大家公认的是谷歌搜索引擎搜索准确率更高一些&…

分享27个谷歌(Google)镜像

为什么80%的码农都做不了架构师?>>> 多年来搜集的谷歌镜像,大多数都能打开,打不开的也许过一段上一段时间能打开。真诚希望这些神奇的搜索引擎,能助你在前端界有所建树。有空关注下面前端相关公众账号jsdig(及时挖掘&…