阿里云开源离线同步工具DataX3.0,或成未来数据库同步主流工具

DataX3.0简介

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

本次介绍为阿里云开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。

DataX3.0概览

设计理念 与 框架设计

为了解决异构数据源同步问题,DataX将复杂的 状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: 为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
  • 框架设计

    DataX3.0 插件体系

    DataX3.0 目前支持数据如下:

    DataX3.0 的优势

  • 精准的速度控制
  • 强劲的同步性能
  • 极简的使用体验

  • 实现步骤

    先说一下需求:

    我们有一台测试的mysql数据库上有 cms和appdata 两个库,这两个库里面都有user_msg 这个表,现在需要把appdata 下面的user_msg表同步到 cms 库下面的user_msg表中,并且要求每分钟同步一次。

    分析需求:

    1,看到这个需求后,首先排除了mysql主从复制,因为同一个数据库 server_id 相同不能用主从复制。2,要求每分钟同步一次,如果表的数据量很多很大,那很容易阻塞,所以不能每次全量同步,要用增量同步的方式才行,这里就排除一些手写sql的方式了。

    我采用的就是 DataX3.0 ,下面记录了详细步骤:

    1,安装#我的服务器系统是 CentOS7#这个包有点大,注意磁盘容量wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gztar xf datax.tar.gz#看一下目录结构:[root@VM_0_5_centos ~]# tree -L 2 dataxdatax├── bin   ├── datax.py   ├── dxprof.py   └── perftrace.py├── conf   ├── core.json   └── logback.xml├── job   └── job.json├── lib   ├── commons-beanutils-1.9.2.jar   ├── commons-cli-1.2.jar   ├── commons-codec-1.9.jar   ├── commons-collections-3.2.1.jar   ├── commons-configuration-1.10.jar   ├── commons-io-2.4.jar   ├── commons-lang-2.6.jar   ├── commons-lang3-3.3.2.jar   ├── commons-logging-1.1.1.jar   ├── commons-math3-3.1.1.jar   ├── datax-common-0.0.1-SNAPSHOT.jar   ├── datax-core-0.0.1-SNAPSHOT.jar   ├── datax-transformer-0.0.1-SNAPSHOT.jar   ├── fastjson-1.1.46.sec01.jar   ├── fluent-hc-4.4.jar   ├── groovy-all-2.1.9.jar   ├── hamcrest-core-1.3.jar   ├── httpclient-4.4.jar   ├── httpcore-4.4.jar   ├── janino-2.5.16.jar   ├── logback-classic-1.0.13.jar   ├── logback-core-1.0.13.jar   └── slf4j-api-1.7.10.jar├── log   ├── 2020-06-11   ├── 2020-06-12   ├── 2020-06-13   ├── 2020-06-14   ├── 2020-06-15   ├── 2020-06-16   ├── 2020-06-17   ├── 2020-06-18   ├── 2020-06-19   ├── 2020-06-20   ├── 2020-06-21   ├── 2020-06-22   ├── 2020-06-23   ├── 2020-06-24   ├── 2020-06-25   ├── 2020-06-26   └── 2020-06-27├── log_perf   ├── 2020-06-01   ├── 2020-06-02   ├── 2020-06-03   ├── 2020-06-04   ├── 2020-06-05   ├── 2020-06-06   ├── 2020-06-07   ├── 2020-06-08   ├── 2020-06-09   ├── 2020-06-10   ├── 2020-06-11   ├── 2020-06-12   ├── 2020-06-13   ├── 2020-06-14   ├── 2020-06-15   ├── 2020-06-16   ├── 2020-06-17   ├── 2020-06-18   ├── 2020-06-19   ├── 2020-06-20   ├── 2020-06-21   ├── 2020-06-22   ├── 2020-06-23   ├── 2020-06-24   ├── 2020-06-25   ├── 2020-06-26   └── 2020-06-27├── plugin   ├── reader   └── writer├── script   └── Readme.md└── tmp    └── readme.txt

    2,生成模板文件 并配置同步

    同步需要一个配置文件,我们先生成一个模板文件,然后修改。我是从mysql读取,再写到 mysql , 所以我的 命令是下面这样:python datax/bin/datax.py -r mysqlreader -w mysqlwriter再比如,你要把 mongodb 的数据同步到mysql,生成模板文件的命令就变成了下面这样:python datax/bin/datax.py -r mongodbreader -w mysqlwriter#mysqlreader 和 mysqlwriter 分别是mysql的读取和写入插件。#datax3.0 支持非常多的插件,可以到 datax的github主页了解详情。有了配置文件之后要按需求修改一下,我修改后是下面这样的:cat /tmp/test.json{    "job": {        "content": [            {                "reader": {                    "name": "mysqlreader",                     "parameter": {                        "column": ['*'],                         "connection": [                            {                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/appdata"],                                 "table": ["user_msg"]                            }                        ],                         "password": "your-passwd",                         "username": "your-username",                         "where": ""                    }                },                 "writer": {                    "name": "mysqlwriter",                     "parameter": {                        "column": ['*'                          ],                         "connection": [                            {                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/cms",                                 "table": ["user_msg"]                            }                        ],                         "password": "your-passwd",                         "preSql": [],                         "session": [],                         "username": "your-username",                         "writeMode": "update"                    }                }            }        ],         "setting": {            "speed": {                "channel": "1"            }        }    }}#注意:writeMode 我这里是 update,如果不存在会先创建数据,存在就更新然后先执行一次全量同步datax/bin/datax.py /tmp/test.json

    3,增量同步并设置定时

    配置文件中的 where 参数可以设置一些查询条件,比如我的表里有一个字段是 create_timestamp ,这个是一个时间戳,我可以通过这个字段来过滤一个时间段内的新增数据,从而实现增量同步。修改后我的配置文件是下面这样 :{    "job": {        "content": [            {                "reader": {                    "name": "mysqlreader",                     "parameter": {                        "column": ['*'],                         "connection": [                            {                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/appdata"],                                 "table": ["user_msg"]                            }                        ],                         "password": "your-passwd",                         "username": "your-username",                         "where": "end_timestamp > ${start_time} and end_timestamp  < ${end_time}"                    }                },                 "writer": {                    "name": "mysqlwriter",                     "parameter": {                        "column": ['*'                          ],                         "connection": [                            {                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/cms",                                 "table": ["user_msg"]                            }                        ],                         "password": "your-passwd",                         "preSql": [],                         "session": [],                         "username": "your-username",                         "writeMode": "update"                    }                }            }        ],         "setting": {            "speed": {                "channel": "1"            }        }    }}# 注意事项,create_timestamp 我这里是int 类型的 。下面简单写个同步脚本:cat sync.sh#!/bin/bash# 截至时间设置为当前时间戳end_time=$(date +%s)# 开始时间设置为120s前时间戳start_time=$(($end_time - 120))/root/datax/bin/datax.py /tmp/test.json -p "-Dstart_time=$start_time -Dend_time=$end_time"#添加定时任务,实现每分钟同步一次* * * * * sh /root/sync.sh

    声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

    上一篇 2020年6月2日
    下一篇 2020年6月2日

    相关推荐