datax(4): datax.py解读

article/2025/10/13 5:12:54

datax 直接使用py文件进行任务提交,今天读一读它


一、文件位置

原始文件位置在 xx/DataX/core/src/main/bin/下,datax项目打包后会将文件拷贝到 xx/DataX\target\datax\datax\bin 下。

 core模块的pom.xml 指定‘拷贝’datax.py文件的方式maven-assembly-plugin<plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass>com.alibaba.datax.core.Engine</mainClass></manifest></archive><finalName>datax</finalName><descriptors><!--指定装配的配置文件--><descriptor>src/main/assembly/package.xml</descriptor></descriptors></configuration><executions><execution><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>DataX\core\src\main\assembly\package.xml里面是一些打包的细节

二、文件的作用

该py文件主要用来提交datax任务,相当于datax的入口;样例执行datax任务如下

python datax.py {YOUR_JOB.json}

三、文件解读

文件的主入口 if name == “main”:

if __name__ == "__main__":# 1 打印版权信息printCopyright()# 2 获取选项的解析器parser = getOptionParser()# 3 根据入参,使用解析器解析出参数值# 3.1 parse_args方法返回俩参,分别为instance类型的options和list类型的args# 3.2 用sys.argv[1:]来获取命令参数,返回一个list类型的返回值options, args = parser.parse_args(sys.argv[1:])if options.reader is not None and options.writer is not None:# 4 如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板generateJobConfigTemplate(options.reader,options.writer)sys.exit(RET_STATE['OK'])if len(args) != 1:parser.print_help()sys.exit(RET_STATE['FAIL'])# 5 根据入参 构建执行脚本startCommand = buildStartCommand(options, args)# print startCommand  该命令可以打印出 用户输入的参数+py文件构建的参数,作为整体形成一个执行脚本。(执行脚本最后调用java类)# 打印出来的startCommand 如下:# java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\idea-workspace\github\DataX\target\datax\datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\idea-workspace\github\DataX\target\datax\datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\idea-workspace\github\DataX\target\datax\datax -Dlogback.configurationFile=D:\idea-workspace\github\DataX\target\datax\datax/conf/logback.xml -classpath D:\idea-workspace\github\DataX\target\datax\datax/lib/*  -Dlog.file.name=x\datax\job\job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\idea-workspace\github\DataX\target\datax\datax\job\job.json# 6 创建并返回一个子进程,并在这个进程中执行指定的shell 脚本child_process = subprocess.Popen(startCommand, shell=True)# 7 将执行结果保存在信号量中register_signal()# 8 父子进程进行通信,并将通信结果保存到 stdout, stderr(stdout, stderr) = child_process.communicate()# 9 退出(根据子进程的状态码)sys.exit(child_process.returncode)

文件中的方法和变量,通过变量名或函数名就可以直接猜到含义,本文不在赘述;
在这里插入图片描述
整体的datax.py文件如下

#!/usr/bin/env python
# -*- coding:utf-8 -*-import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platformdef isWindows():return platform.system() == 'Windows'DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"RET_STATE = {"KILL": 143,"FAIL": -1,"OK": 0,"RUN": 1,"RETRY": 2
}# 获取本地ip
def getLocalIp():try:return socket.gethostbyname(socket.getfqdn(socket.gethostname()))except:return "Unknown"# 根据信号值,结束本 子进程
def suicide(signum):global child_processprint >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)if child_process:child_process.send_signal(signal.SIGQUIT)time.sleep(1)child_process.kill()print >> sys.stderr, "DataX Process was killed ! you did ?"sys.exit(RET_STATE["KILL"])# 
def register_signal():if not isWindows():global child_processsignal.signal(2, suicide)signal.signal(3, suicide)signal.signal(15, suicide)# 构造解析器
def getOptionParser():usage = "usage: %prog [options] job-url-or-path"parser = OptionParser(usage=usage)prodEnvOptionGroup = OptionGroup(parser, "Product Env Options","Normal user use these options to set jvm parameters, job runtime mode etc. ""Make sure these options can be used in Product Env.")prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",default=DEFAULT_JVM, help="Set jvm parameters if necessary.")prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",help="Set job unique id when running by Distribute/Local Mode.")prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",action="store", default="standalone",help="Set job runtime mode such as: standalone, local, distribute. ""Default mode is standalone.")prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",action="store", dest="params",help='Set job parameter, eg: the source tableName you want to set it by command, ''then you can use like this: -p"-DtableName=your-table-name", ''if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".''Note: you should config in you job tableName with ${tableName}.')prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",action="store", dest="reader",type="string",help='View job config[reader] template, eg: mysqlreader,streamreader')prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",action="store", dest="writer",type="string",help='View job config[writer] template, eg: mysqlwriter,streamwriter')parser.add_option_group(prodEnvOptionGroup)devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options","Developer use these options to trace more details of DataX.")devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",help="Set to remote debug mode.")devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",default="info", help="Set log level such as: debug, info, all etc.")parser.add_option_group(devEnvOptionGroup)return parser# 根据writer和reader名, 从github拉取对应的模板,最终创建出 json任务的模板
def generateJobConfigTemplate(reader, writer):readerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)writerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)print readerRefprint writerRefjobGuid = 'Please save the following configuration as a json file and  use\n     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'print jobGuidjobTemplate={"job": {"setting": {"speed": {"channel": ""}},"content": [{"reader": {},"writer": {}}]}}readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)try:readerPar = readPluginTemplate(readerTemplatePath);except Exception, e:print "Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath)try:writerPar = readPluginTemplate(writerTemplatePath);except Exception, e:print "Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath)jobTemplate['job']['content'][0]['reader'] = readerPar;jobTemplate['job']['content'][0]['writer'] = writerPar;print json.dumps(jobTemplate, indent=4, sort_keys=True)# 根据插件名读取插件模板
def readPluginTemplate(plugin):with open(plugin, 'r') as f:return json.load(f)# 判断入参是否为一个 url
def isUrl(path):if not path:return Falseassert (isinstance(path, str))m = re.match(r"^http[s]?://\S+\w*", path.lower())if m:return Trueelse:return False# 通过各类 if else 构建启动命令。启动命令中包含2部分 JVM参数+环境变量
def buildStartCommand(options, args):commandMap = {}tempJVMCommand = DEFAULT_JVMif options.jvmParameters:tempJVMCommand = tempJVMCommand + " " + options.jvmParametersif options.remoteDebug:tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIGprint 'local ip: ', getLocalIp()if options.loglevel:tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))if options.mode:commandMap["mode"] = options.mode# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)jobResource = args[0]if not isUrl(jobResource):jobResource = os.path.abspath(jobResource)if jobResource.lower().startswith("file://"):jobResource = jobResource[len("file://"):]jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))if options.params:jobParams = jobParams + " " + options.paramsif options.jobid:commandMap["jobid"] = options.jobidcommandMap["jvm"] = tempJVMCommandcommandMap["params"] = jobParamscommandMap["job"] = jobResourcereturn Template(ENGINE_COMMAND).substitute(**commandMap)# 打印版权信息
def printCopyright():print '''
DataX (%s), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.''' % DATAX_VERSIONsys.stdout.flush()# 程序主入口
if __name__ == "__main__":# 1 打印版权信息printCopyright()# 2 获取选项的解析器parser = getOptionParser()# 3 根据入参,使用解析器解析出参数值# 3.1 parse_args方法返回俩参,分别为instance类型的options和list类型的args# 3.2 用sys.argv[1:]来获取命令参数,返回一个list类型的返回值options, args = parser.parse_args(sys.argv[1:])if options.reader is not None and options.writer is not None:# 4 如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板generateJobConfigTemplate(options.reader,options.writer)sys.exit(RET_STATE['OK'])if len(args) != 1:parser.print_help()sys.exit(RET_STATE['FAIL'])# 5 根据入参 构建执行脚本startCommand = buildStartCommand(options, args)# print startCommand  该命令可以打印出 用户输入的参数+py文件构建的参数,作为整体形成一个执行脚本。(执行脚本最后调用java类)# 6 创建并返回一个子进程,并在这个进程中执行指定的shell 脚本child_process = subprocess.Popen(startCommand, shell=True)# 7 将执行结果保存在信号量中register_signal()# 8 父子进程进行通信,并将通信结果保存到 stdout, stderr(stdout, stderr) = child_process.communicate()# 9 退出(根据子进程的状态码)sys.exit(child_process.returncode)


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖


http://chatgpt.dhexx.cn/article/yYA963gR.shtml

相关文章

DataX使用指南

简介 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台&#xff0c;实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 插件 的模式&#xff0c;目前已开源&#xff0c;代码托管在github。…

DataX

DataX的环境搭建以及简单测试 什么是DataX DataX 是一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、 HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 &#xff08;这是一个单机多任务的ETL工具&#xff0…

DataX 简介及架构原理

DataX 简介及架构原理 概述 DataX是阿里巴巴使用 Java 和 Python 开发的一个异构数据源离线同步工具 异构数据源&#xff1a;不同存储结构的数据源 致力于实现包括关系型数据库 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS…

DataX的使用与介绍(1)

一、什么是DataX&#xff1f; DataX是阿里云商用产品DataWorks数据集成的开源版本&#xff0c;它是一个异构数据源的离线数据同步工具/平台&#xff08;ETL工具&#xff09;。DataX实现了包括Mysql&#xff0c;Oracle、OceanBase、Sqlserver&#xff0c;Postgre、HDFS、Hive、…

DataX介绍

DataX 是阿里开源的一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 DataX设计理念 DataX本身作为数据同步框架&#xff0c;将不同数据源的同步抽象为从源头…

详解DataX及使用

DataX概述 简介 DataX 是阿里巴巴开源的一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 支持数据源 DataX架构原理 设计理念 为了解决异构数据源同步问…

使用 DataX 实现数据同步(高效的同步工具)

DataX 使用介绍 前言一、DataX 简介1.DataX3.0 框架设计2.DataX3.0 核心架构 二、使用 DataX 实现数据同步1.Linux 上安装 DataX 软件2.DataX 基本使用3.安装 MySQL 数据库4.通过 DataX 实 MySQL 数据同步5.使用 DataX 进行增量同步 前言 我们公司有个项目的数据量高达五千万&…

Transpose函数的用法

Transpose函数的用法 在CNN机器学习中&#xff0c;经常要用到transpose函数对多维数组进行转置操作&#xff0c;下面是我对函数的理解过程。 1.二维数组的转换 二维数组中&#xff0c;原数组的第0轴的行&#xff0c;转换成新数组第1轴的列&#xff1b; 2.三维数组 三维数组较…

转置算子(transpose)的一种实现

transpose算子也叫做permute算子&#xff0c;根据白嫖有道英汉大词典的结果&#xff0c;他俩都是转置&#xff0c;改变排列顺序的意思。 算法逻辑是&#xff1a; 通过当前输出的一维偏移量(offset)计算输入矩阵对应的高维索引 然后根据参数pos重新排列输出索引&#xff0c;进…

论文笔记——TransPose

目录 摘要 一、前言 二、相关工作 2.1 人体姿态估计 2.2 可解释性 三、TransPose 3.1 网络结构 3.2 分辨率设置 3.3 attentions是定位关键点的依赖 四、实验 4.1 COCO实验数据对比 4.2 迁移到MPII数据对比 4.3 消融实验​编辑 4.4 量化分析 五、总结 摘要 虽然基…

numpy中的transpose函数使用

二维矩阵的transpose函数 &#xff1a; transpose&#xff08;&#xff09;简单来说&#xff0c;就相当于数学中的转置&#xff0c;在矩阵中&#xff0c;转置就是把行与列相互调换位置&#xff1b; 例如&#xff1a;随机生成一个三行五列的二维矩阵&#xff1a; arr np.aran…

transpose()函数的理解

图1 输入如图1所示语句&#xff0c;输出如下&#xff1a; 图2 由以上两图说明transpose()函数的作用&#xff1a; 假设shape(z,x,y),在RGB图像中可以理解为z代表通道数&#xff0c;x代表图像的第几行&#xff0c;y代表图像的第几列&#xff0c;x和y组合而成所代表的像素构成…

详解Python的transpose函数

数组转置和换轴 import numpy as np >>> arr np.arange(16).reshape((2,2,4)) array([[[ 0, 1, 2, 3],[ 4, 5, 6, 7]],[[ 8, 9, 10, 11],[12, 13, 14, 15]]])>>> arr.transpose((1, 0, 2)) array([[[ 0, 1, 2, 3],[ 8, 9, 10, 11]],[[ 4, 5, …

np.transpose

最近看代码的时候&#xff0c;老是出现np.transpose()这个用法&#xff0c;但是对其中的原理还是不甚了解&#xff0c;今天就来总结一下&#xff0c;以及这个用法对图像的结果及效果。 参数 a:输入数组 axis: int类型的列表&#xff0c;这个参数是可选的。默认情况下&#xff…

np.transpose()函数详解

1. 碰见 numpy.transpose 用于高维数组时挺让人费解&#xff0c;通过分析和代码验证&#xff0c;发现 transpose 用法还是很简单的。说白了就是映射坐标轴 2. 举个例子&#xff1a; x np.arange(12).reshape((2,3,2))创建一个2 * 3 * 2的数组&#xff1a; 使用 numpy.trans…

【Python学习】transpose函数

shape:(batch_size * x * y ) 有batch_size个二维矩阵&#xff08;x * y&#xff09;相当于(z * x * y) 1. 多维数组的索引 import numpy as np # 创建 x np.arange(12).reshape((2,2,3)) print(x)# 得到三维数组 [[[ 0 1 2][ 3 4 5]][[ 6 7 8][ 9 10 11]]] # 相当于 b…

最简单例子解释python的transpose函数

目录 一&#xff0c;我们要弄清楚transpose的轴是什么意思&#xff1f;二&#xff0c;(x,y,z)的物理含义:三&#xff0c;transpose变换的例子四&#xff0c;代码验证五&#xff0c;关于经过了transpose变换之后&#xff0c;这个三维数组的形状是如何变化确定的&#xff1f; 二维…

转置(transpose)的理解

目录 1 .T,适用于一、二维数组 arr.T #求转置 transpose 的原理其实是根据维度&#xff08;shape&#xff09;索引决定的&#xff0c;举个栗子&#xff1a; 2. 高维数组 3. swapaxes 转置可以对数组进行重置&#xff0c;返回的是源数据的视图&#xff08;不会进行任何复制…

Python numpy.transpose 详解

前言 看Python代码时&#xff0c;碰见 numpy.transpose 用于高维数组时挺让人费解&#xff0c;通过一番画图分析和代码验证&#xff0c;发现 transpose 函数的使用方法还是很简单的。 注&#xff1a;评论中说的三维坐标图中的 0 1 2 3 标反了&#xff0c;已经修正&#xff0c…

2020年最新可用的谷歌镜像站

g.vovososo.com 谷歌镜像入口 不用翻&#xff0c;墙&#xff0c;就可实现访问谷歌搜索 &#xff0c;也可以通过扫描以下二维码下载APP进行访问