DataX使用总结_datax where-程序员宅基地

技术标签: DataX  MySQL  

简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github,地址:https://github.com/alibaba/DataX。

 

DataX安装部署


1.下载压缩包:

下载页面地址:https://github.com/alibaba/DataX 在页面中【Quick Start】--->【Download DataX下载地址】进行下载。下载后的包名:datax.tar.gz。解压后{datax}目录下有{bin conf job lib log log_perf plugin script tmp}几个目录。
2.安装

将下载后的压缩包直接解压后可用,前提是对应的java及python环境满足要求。
    JDK(1.6以上,推荐1.6)
    Python(推荐Python2.6.X)一定要为python2,因为后面执行datax.py的时候,里面的python的print会执行不了,导致运行不成功,会提示你print语法要加括号,python2中加不加都行 python3中必须要加,否则报语法错

另外要注意, 不要解压到C:\Program Files目录下或其他名字带空格的目录下,因为在cmd执行时会因为路径有空格导致找不到程序主文件。

DataX支持绝大部分种类数据库的数据转移,其数据转移的主要流程有三步:Reader -->transform-->writer

reader 从数据库读取需要转移的数据,transform在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,这些工作在这里进行,writer将读取并处理后的数据写入目标数据库。

DataX配置文件

DataX是通过读取配置文件进行数据转移,配置文件为json格式,这里以MySQL做示范,MySQL 2 MySQL

{
    "job": {
        
        "content": [
                    //reader过程配置信息
            {
                "reader": {   
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",//数据库用户名
                        "password": "root",
                        "column": [
                            "id",        //要读取的列元素 list
                            "name"
                        ],
                         "where":"",//可以添加筛选条件
                        "splitPk": "db_id",//数据分片
                        "connection": [
                            {
                                "table": [
                                    "table"//要读取的表名list,支持多个表的读取
                                ],
                                "querySql":[     //自定义筛选SQL
                                    "select reflect.id as id from user ,reflect where user.id = reflect.user_id",
                                ]
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database" //要读取的数据库URL 可以加数据库配置信息后缀
                                ]
                            }
                        ]
                    }
                },

                "writer": {  //writer过程配置信息
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",    //要写入的列list 注意要与读取的列一致
                        "name"
                        ],
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://120.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "root", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
            }
        ]
    }
}
  • 数据分片:如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。官方文档推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点,要注意目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!

  • Where:添加where配置可以对要转移的数据进行筛选,比如可以选择只转移今天的数据,"where":"gmt_create > $bizdate "

MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据读取

querySql:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

类型转换

DataX 内部类型 Mysql 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

贴上我的一个测试数据转移配置

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "querySql":[
                                    "select reflect.id as id,reflect.user_id as user_id,user.mail as comment from user ,reflect where user.id = reflect.user_id",
                                ]
                            }
                        ], 
                        "password": "xxxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",//必选 控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句 insert/replace/update  默认insert
                        "column": [
                        "id",
                        "user_id",
                        "comment"
                        ],
                        "preSql":"",   //可选 插入数据前执行的SQL
                        "postSql":"",  //可选 插入数据成功后执行的SQL
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

用到了连表查询信息然后作为一个字段值转移到目标数据库,这里要注意查询结果的列名和顺序要和writer里column里一样。

writeMode

  • 描述:控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句

  • 必选:是

  • 所有选项:insert/replace/update

  • 默认值:insert

启动命令

CMD下 进到你解压的dataX/bin目录下

python  datax.py  ..\job\mysql2mysql.json 

Transformer

下面说一下transform的使用,transform用于对读取的数据进行特殊定制化的需求场景,包括裁剪列、转换列等工作,主要使用的是五个对数据进行处理的方法,分别是dx_substr(),dx_pad(),dx_replace(),dx_filter(),dx_groovy(),官方文档对这几个方法的解释如下: 

dx_substr

参数:3个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:目标字段长度。

返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"

dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"

dx_pad

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。
    • 第三个参数:目标字段长度。
    • 第四个参数:需要pad的字符。

返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符

举例:

         dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz

         dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz

dx_replace

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:需要替换的字段长度。
    • 第四个参数:需要替换的字符串。

返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"

dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"data****"

dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)

参数:

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
    • 第三个参数:正则表达式(java正则表达式)、值。

返回:

    • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
    • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
    • , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
    • 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。

举例:

dx_filter(1,"like","dataTest") 

dx_filter(1,">=","10") 

dx_groovy

参数。

    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage

备注:

    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):

 transform job示例:

"transformer": [
                    {
                        "name": "dx_substr",
                        "parameter": 
                            {
                            "columnIndex":5,
                            "paras":["1","3"]
                            }  
                    },
                    {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":4,
                            "paras":["3","4","****"]
                            }  
                    },
                    {
                        "name": "dx_groovy",
                          "parameter": 
                            {
                               "code": "//groovy code//",  
                               "extraPackage":[
                               "import somePackage1;", 
                               "import somePackage2;"
                               ]                      
                            }  
                    }
                ]

要注意  Reader  Writer 和Transform 的配置都要写在content下面,Reader和writer的配置是用大括号,transform是写在[]里,因为transform可以包括多个数据处理,下面贴上一个包括reader writer transform Job ,

{
    "job": {

        "setting": {
            "speed": {
                "channel": "1"
            },
            "errorLimit": {
                "record": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",
                        "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                },
                "transformer":[
                    {
                        "name":"dx_substr",
                        "parameter": 
                            {
                            "columnIndex":3,
                            "paras":["1","3"]
                            }  
                    },
                     {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":2,
                            "paras":["1","4","****"]
                            }  
                    }
                ]
            }
        ]

    }
}

打开CMD 执行命令

结果:

可以看到读取到两条符合筛选条件的数据并全部写入,transform两条数据全部成功,然后看一下数据库:

被读取数据库数据:

写入目标数据库结果:

可以看到第三条数据经过了dx_replace()替换,第四条经过了dx_substr()裁剪,数据转移成功。

PS:CMD中文乱码问题  使用HCP 65001即可

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_42050545/article/details/93883887

智能推荐

jquery 获取子 div_jq 子级别div-程序员宅基地

文章浏览阅读908次。获取 dom 对象$("#divId").children("div").get(0);$("#divId").children("div")[0];获取 jquery 对象$("#divId").children("div").eq(0);$($("#divId").children("div").get(0));【Java面试题与答案】整理推荐基础..._jq 子级别div

基于Springboot + vue实现的交通管理在线服务系统-程序员宅基地

文章浏览阅读271次,点赞4次,收藏4次。管理员管理:负责添加、删除、修改管理员账号,并设置相应的权限,确保管理员团队的专业性和高效性。新闻信息管理:发布、编辑和删除交通新闻、政策更新、路况信息等,保持信息的实时性和有效性。驾驶证业务管理:在线提交驾驶证申请、查询、更新、补办等业务,并实时查看办理进度。新闻信息查看:浏览系统发布的交通新闻、政策更新、路况信息等,了解最新的交通动态。机动车业务管理:在线提交车辆注册、年检、转移、报废等业务申请,并获取办理结果。用户管理:管理用户账号,包括用户注册、登录、权限设置等,确保系统的安全性。

打印系统开发(42)——静默打印_静默打印是什么意思-程序员宅基地

文章浏览阅读4.4k次。1.问题描述希望每次打印时,都是用固定的打印机打印并且不希望弹出对话框进行设置,此时便可以设置静默打印。1.1什么是静默打印静默打印即点击打印时不弹出选项窗口和打印机设置窗口直接进行打印。1.2支持静默打印的打印方式零客户端打印、本地打印、服务器端打印支持静默打印。2.静默打印设置方法2.1 零客户端打印设置方法注:只支持 IE点击模板-打印..._静默打印是什么意思

STM32+74HC595:带领你10分钟用对74HC595_74hc595连接stm32-程序员宅基地

文章浏览阅读2.4w次,点赞14次,收藏68次。使用的是STM32CBT8,小模块用起来性价比超级高,资源丰富,移植u/COS及HTTP、MQTT协议等等用起来简直欲罢不能,摇摇欲仙!BUT:IO口资源太少了,我想让你驱动100个LED,你缺告诉我,我的要求太多,你满足不了......还好,找到了74HC595,但是网上很多资源讲的我看了半天才总结、提炼并另辟蹊径出来精髓===============================_74hc595连接stm32

莱昂哈德·欧拉生平及其成就简介_欧拉的物理成就-程序员宅基地

文章浏览阅读4.1k次,点赞2次,收藏8次。莱昂哈德·欧拉(Leonhard Euler ,1707年4月15日~1783年9月18日),瑞士数学家、自然科学家。1707年4月15日出生于瑞士的巴塞尔,1783年9月18日于俄国圣彼得堡去世。欧拉出生于牧师家庭,自幼受父亲的影响。13岁时入读巴塞尔大学,15岁大学毕业,16岁获得硕士学位。欧拉是18世纪数学界最杰出的人物之一,他不但为数学界作出贡献,更把整个数学推至物理的领域。他是数学史上最多产的数学家,平均每年写出八百多页的论文,还写了大量的力学、分析学、几何学、变分法等的课本,《无穷小分析引论》、_欧拉的物理成就

Error: PL/SQL: ORA-00980: 同义词转换不再有效_sql数据库中同义词转换不再有效-程序员宅基地

文章浏览阅读1.5w次。今天在写存储过程的时候,碰到一个问题,在执行存储过程的时候总是报错--同义词转换不再有效,发现一个查询语句中的一个表原来使用的是一个同义词,就试着把这个同义词单独拿出来进行查询操作,发现并没有问题。最后,经过一番努力,发现该同义词并不是直接指向一个实体表,而是指向另一个同义词。所以,将改同义词的指向改为直接指向原实体表的指向,问题得到解决。即同义词指向的 object ow_sql数据库中同义词转换不再有效

随便推点

AssertionError: Torch not compiled with CUDA enabled-程序员宅基地

文章浏览阅读1.7w次,点赞17次,收藏101次。解决问题:AssertionError: Torch not compiled with CUDA enabled_assertionerror: torch not compiled with cuda enabled

Silvaco TCAD 2017 在RedHat6.5 Linux系统的安装教程_silvaco的license更新-程序员宅基地

文章浏览阅读1.5w次,点赞4次,收藏37次。Silvaco TCAD 2017 在RedHat6.5 Linux系统的安装教程很多网友问到关于在Linux系统下安装Silvaco TCAD的问题,这里我整理了最近安装Silvaco的安装方法,前前后后共花六个月的时间研究,无数个深夜在重装中度过,希望看到的网友不要重复我的经历首先感谢网络上各路大神提供的安装方法,相信在Windows环境中很多人都能安装上但苦于Windows下无..._silvaco的license更新

html页面点击按钮上传文件,点击按钮实现文件上传及控制文件上传类型-程序员宅基地

文章浏览阅读4.3k次。1.原生js实现文件上传html部分:上传文件js部分:upload(event) { //代替执行上传功能let it = event.target;$(it).next().click();},UploadFile() { //上传文件let msg = new FormData();msg.append('file', $('#uploadBillsInp')[0].files[0..._formdata.append('enctype', 'multipart/form-data');

Android后台源码,Android8.0的后台Service优化源码解析-程序员宅基地

文章浏览阅读245次。今天在用户的错误列表上看到这么个bugjava.lang.RuntimeException: Unable to start receiver com.anysoft.tyyd.appwidget.PlayAppWidgetProvider:java.lang.IllegalStateException: Not allowed to start service Intent { cmp=com...._to start receiver com.mediatek.engineermode.emstartreceiver: java.lang.secur

深刻对比一下阿里云服务器和腾讯云服务器的优劣和区别_腾讯云与阿里云的优劣-程序员宅基地

文章浏览阅读2.5w次,点赞10次,收藏19次。我来简单对比阿里云服务器和腾讯云服务器的优劣和区别腾讯云相比阿里云优势不明显。阿里云比腾讯云开放的时间更早,辅助系统更完善些,功能更多可用性更强。但腾讯云不是单纯卖云服务的,凡是要接入腾讯的生态(比如微信小程序等)必须得用腾讯云服务器,腾讯云迅速发展壮大。腾讯云也在慢慢完善,大多数应用场景也都能满足,但就是对很多新技术的支持总是比阿里云慢一些,高级的配置定制也少一些。服务器结构不是很复杂的话用......_腾讯云与阿里云的优劣

应用C预处理命令_c 添加预处理命令-程序员宅基地

文章浏览阅读1.6k次。********************************LoongEmbedded******************************** 作者:LoongEmbedded(kandi)时间:2011.10.17类别:C基础************_c 添加预处理命令

推荐文章

热门文章

相关标签