1、基本介绍
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
1.1、设计理念
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
1.2、当前使用现状
DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。
此前已经开源DataX1.0版本,此次介绍为阿里巴巴开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX
2、datax基本组件介绍
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
3、datax主流数据库支持情况
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南
4、系统要求
jdk1.8+
python运行环境(推荐python2.6.x)
5、Datax 安装
5.1、第一步:下载安装并解压
下载datax的安装包,并上传到node3服务器的/export/softwares,然后进行解压。
下载安装包地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
node3服务器执行以下命令进行解压
cd /opt/software/
tar -zxf datax.tar.gz
mv datax /usr/local
5.2、第二步:验证datax是否安装成功
进入datax的安装目录的bin路径下,然后执行以下命令验证datax是否安装成功
node3执行以下命令进入datax的bin目录
cd /usr/local/datax/bin
然后执行以下命令
python datax.py -r streamreader -w streamwriter
出现以下结果,证明安装成功
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
6、Datax实战
6.1、案例一:使用datax实现stream2stream数据读取
使用datax实现读取字符串,然后打印到控制台当中来
1)第一步:查看帮助文档
cd /opt/mnt/datax
python bin/datax.py -w streamwriter -r streamreader
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
2)第二步:开发datax配置文件
node3服务器开发stream2stream的配置文件
cd /usr/local/datax/job/
vim stream2stream.json
{
"job": {
"setting": {
"speed": {
"byte":10485760
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
3)第三步:启动datax实现数据的打印
执行以下命令启动datax
cd /usr/local/datax/
python bin/datax.py job/stream2stream.json
6.2、案例二:使用datax实现mysql2stream
使用datax实现将mysql一张表的指定字段的数据抽取出来,并打印出来
1)第一步:创建mysql数据库以及向mysql当中插入数据
略
2)第二步:开发datax的配置文件
node3执行以下命令查看帮助文档
cd /usr/local/datax/
python bin/datax.py -r mysqlreader -w streamwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
node3执行以下命令开发datax配置文件
cd /usr/local/datax/job/
vim mysql2stream.json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"`cluster`",
"`broker`",
"type",
"`key`",
"`value`",
"`timespan`",
"`tm`"
],
"connection": [
{
"table": [
"ke_metrics"
],
"jdbcUrl": [
"jdbc:mysql://node3:3306/ke"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding":"GBK",
"print":true
}
}
}
]
}
}
3)第三步:启动datax实现数据同步
node3执行以下命令实现datax数据同步
cd /usr/local/datax/
python bin/datax.py job/mysql2stream.json
6.3、案例三:使用datax实现增量数据同步
使用datax实现增量数据同步打印到控制台
1)第一步:开发datax的配置文件
node3执行以下命令开发datax配置文件
cd /usr/local/datax/job/
vim mysql2streamadd.json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"where": "create_time > '${start_time}' and create_time < '${end_time}'",
"connection": [
{
"table": [
"t_user"
],
"jdbcUrl": [
"jdbc:mysql://node3:3306/db_datax_test"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding":"GBK",
"print":true
}
}
}
]
}
}
2)第二步:启动datax实现数据同步
node3执行以下命令实现datax数据同步
cd /usr/local/datax/
bin/datax.py job/mysql2streamadd.json -p "-Dstart_time='2023-04-20 00:00:00' -Dend_time='2023-04-20 23:59:59'"
6.4、案例四:使用datax实现mysql2mysql
使用datax实现将数据从mysql当中读取,并且通过sql语句实现数据的过滤,并且将数据写入到mysql另外一张表当中去
1)第一步:创建mysql另外一张表
略
2)第二步:开发datax的配置文件
查看帮助文档
cd /usr/local/datax/
python bin/datax.py -r mysqlreader -w mysqlwriter
node3执行以下命令开发datax配置文件
cd /usr/local/datax/job/
vim mysql2mysql.json
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select id,name,create_time from t_user where id < 1208;"
],
"jdbcUrl": [
"jdbc:mysql://node3:3306/db_datax_test"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "root",
"column": [
"id",
"name",
"create_time"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from t_user2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://node3:3306/db_datax_test2?useUnicode=true&characterEncoding=utf-8",
"table": [
"t_user2"
]
}
]
}
}
}
]
}
}
3)第三步:启动datax实现数据同步
node3执行以下命令实现datax数据同步
cd /usr/local/datax/
python bin/datax.py job/mysql2mysql.json
6.5、案例五:使用datax实现将mysql数据导入到hdfs
需求:将mysql db_datax_test库中表t_user当中的数据导入到hdfs的/datax/mysql2hdfs/路径下面去
1)第一步:创建hdfs文件夹
node3执行以下命令创建hdfs文件夹
hadoop fs -mkdir -p /datax/mysql2hdfs/
2)第二步:开发datax的配置文件
node3执行以下命令开发datax的配置文件
cd /usr/local/datax/job/
vim mysql2hdfs.json
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select id,name,create_time from t_user;"
],
"jdbcUrl": [
"jdbc:mysql://node3:3306/db_datax_test"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://node3:8020",
"fileType": "text",
"path": "/datax/mysql2hdfs/",
"fileName": "t_user.txt",
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "create_time",
"type": "DATETIME"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress":"GZIP"
}
}
}
]
}
}
3)第三步:启动datax实现数据同步到hdfs
node3执行以下命令,实现数据同步到hdfs上面去
cd /usr/local/datax/
python bin/datax.py job/mysql2hdfs.json
6.6、案例六:使用datax实现将mysql数据同步到hive表
使用datax将mysql表t_user当中的数据,全部同步到hive表当中去
1)第一步:进入hive客户端,创建hive外部表
进入hive客户端并创建数据库和表
略
2)第二步:开发datax配置文件
node3执行以下命令开发datax配置文件
cd /usr/local/datax/job/
vim mysql2hive.json
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select * from t_user where id < 1208;"
],
"jdbcUrl": [
"jdbc:mysql://node3:3306/db_datax_test"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://node3:8020",
"fileType": "text",
"path": "/user/hive/warehouse/db_datax.db/t_user",
"fileName": "t_user",
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "create_time",
"type": "STRING"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
特别注意:如果hive存在hdfs上的数据默认是以'\001'分隔的,用vim打开文件会看到是以^A分隔,但分隔符要用"fieldDelimiter": "\u0001"!!!
3)第三步:启动datax实现数据同步
node3执行以下命令启动datax实现数据同步
cd /usr/local/datax/
python bin/datax.py job/mysql2hive.json
4)第四步:hive当中查询数据
进行hive客户端,然后执行以下命令查询数据
create database if not exists db_datax;
drop database if exists db_datax;
-- 切换数据库
use db_datax;
-- 创建表
CREATE TABLE if not exists t_user
(
id string comment "用户id",
name string comment "姓名",
create_time string comment "时间"
) row format delimited
fields terminated by ',';
drop table if exists t_user;
select *
from t_user;