大数据采集工具之Datax

詹学伟
詹学伟
发布于 2024-04-21 / 6 阅读
0
0

大数据采集工具之Datax

1、基本介绍

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

1.1、设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

1.2、当前使用现状

DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。

此前已经开源DataX1.0版本,此次介绍为阿里巴巴开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX

2、datax基本组件介绍

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

3、datax主流数据库支持情况

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

类型

数据源

Reader(读)

Writer(写)

文档

RDBMS 关系型数据库

MySQL

读 、写

MySQL

读 、写

Oracle

读 、写

SQLServer

读 、写

PostgreSQL

读 、写

DRDS

读 、写

通用RDBMS(支持所有关系型数据库)

读 、写

阿里云数仓数据存储

ODPS

读 、写

ADS

OSS

读 、写

OCS

读 、写

NoSQL数据存储

OTS

读 、写

Hbase0.94

读 、写

Hbase1.1

读 、写

Phoenix4.x

读 、写

MongoDB

读 、写

Hive

读 、写

无结构化数据存储

TxtFile

读 、写

FTP

读 、写

HDFS

读 、写

Elasticsearch

4、系统要求

jdk1.8+

python运行环境(推荐python2.6.x)

5、Datax 安装

5.1、第一步:下载安装并解压

下载datax的安装包,并上传到node3服务器的/export/softwares,然后进行解压。

下载安装包地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

node3服务器执行以下命令进行解压

cd /opt/software/

tar -zxf datax.tar.gz

mv datax /usr/local

5.2、第二步:验证datax是否安装成功

进入datax的安装目录的bin路径下,然后执行以下命令验证datax是否安装成功

node3执行以下命令进入datax的bin目录

cd /usr/local/datax/bin

然后执行以下命令

python datax.py -r streamreader -w streamwriter

出现以下结果,证明安装成功

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

6、Datax实战

6.1、案例一:使用datax实现stream2stream数据读取

使用datax实现读取字符串,然后打印到控制台当中来

1)第一步:查看帮助文档
cd /opt/mnt/datax

python bin/datax.py -w streamwriter -r streamreader
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
2)第二步:开发datax配置文件

node3服务器开发stream2stream的配置文件

cd /usr/local/datax/job/

vim stream2stream.json 
{
    "job": {
        "setting": {
            "speed": {
                "byte":10485760
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19890604,
                                "type": "long"
                            },
                            {
                                "value": "1989-06-04 00:00:00",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 10
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}
3)第三步:启动datax实现数据的打印

执行以下命令启动datax

cd /usr/local/datax/

python bin/datax.py job/stream2stream.json

6.2、案例二:使用datax实现mysql2stream

使用datax实现将mysql一张表的指定字段的数据抽取出来,并打印出来

1)第一步:创建mysql数据库以及向mysql当中插入数据

2)第二步:开发datax的配置文件

node3执行以下命令查看帮助文档

cd /usr/local/datax/

python bin/datax.py  -r mysqlreader -w streamwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

node3执行以下命令开发datax配置文件

cd /usr/local/datax/job/

vim mysql2stream.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "`cluster`",
                            "`broker`",
                            "type",
                            "`key`",
                            "`value`",
                            "`timespan`",
                            "`tm`"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "ke_metrics"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://node3:3306/ke"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                         "encoding":"GBK",
                        "print":true
                    }
                }
            }
        ]
    }
}
3)第三步:启动datax实现数据同步

node3执行以下命令实现datax数据同步

cd /usr/local/datax/

python bin/datax.py  job/mysql2stream.json

6.3、案例三:使用datax实现增量数据同步

使用datax实现增量数据同步打印到控制台

1)第一步:开发datax的配置文件

node3执行以下命令开发datax配置文件

cd /usr/local/datax/job/

vim mysql2streamadd.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                                                "where": "create_time > '${start_time}' and create_time < '${end_time}'",
                        "connection": [
                            {
                                "table": [
                                    "t_user"
                                ],
                                "jdbcUrl": [
                                                                        "jdbc:mysql://node3:3306/db_datax_test"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                         "encoding":"GBK",
                        "print":true
                    }
                }
            }
        ]
    }
}
2)第二步:启动datax实现数据同步

node3执行以下命令实现datax数据同步

cd /usr/local/datax/

bin/datax.py job/mysql2streamadd.json -p "-Dstart_time='2023-04-20 00:00:00' -Dend_time='2023-04-20 23:59:59'"

6.4、案例四:使用datax实现mysql2mysql

使用datax实现将数据从mysql当中读取,并且通过sql语句实现数据的过滤,并且将数据写入到mysql另外一张表当中去

1)第一步:创建mysql另外一张表

2)第二步:开发datax的配置文件

查看帮助文档

cd /usr/local/datax/

python bin/datax.py  -r mysqlreader -w mysqlwriter

node3执行以下命令开发datax配置文件

cd /usr/local/datax/job/

vim mysql2mysql.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select id,name,create_time from t_user where id < 1208;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://node3:3306/db_datax_test"
                                ]
                            }
                        ]
                    }
                },
                  "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name",
                            "create_time"
                        ],
                        "session": [
                                "set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from t_user2"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://node3:3306/db_datax_test2?useUnicode=true&characterEncoding=utf-8",
                                "table": [
                                    "t_user2"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}
3)第三步:启动datax实现数据同步

node3执行以下命令实现datax数据同步

cd /usr/local/datax/

python bin/datax.py job/mysql2mysql.json

6.5、案例五:使用datax实现将mysql数据导入到hdfs

需求:将mysql db_datax_test库中表t_user当中的数据导入到hdfs的/datax/mysql2hdfs/路径下面去

1)第一步:创建hdfs文件夹

node3执行以下命令创建hdfs文件夹

hadoop fs -mkdir -p /datax/mysql2hdfs/
2)第二步:开发datax的配置文件

node3执行以下命令开发datax的配置文件

cd /usr/local/datax/job/

vim mysql2hdfs.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select id,name,create_time from t_user;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://node3:3306/db_datax_test"
                                ]
                            }
                        ]
                    }
                },
                     "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://node3:8020",
                        "fileType": "text",
                        "path": "/datax/mysql2hdfs/",
                        "fileName": "t_user.txt",
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "create_time",
                                "type": "DATETIME"
                            } 
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"GZIP"
                    }
                }
            }
        ]
    }
}
3)第三步:启动datax实现数据同步到hdfs

node3执行以下命令,实现数据同步到hdfs上面去

cd /usr/local/datax/

python bin/datax.py job/mysql2hdfs.json

6.6、案例六:使用datax实现将mysql数据同步到hive表

使用datax将mysql表t_user当中的数据,全部同步到hive表当中去

1)第一步:进入hive客户端,创建hive外部表

进入hive客户端并创建数据库和表

2)第二步:开发datax配置文件

node3执行以下命令开发datax配置文件

cd /usr/local/datax/job/

vim mysql2hive.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select * from t_user where id < 1208;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://node3:3306/db_datax_test"
                                ]
                            }
                        ]
                    }
                },
                 
                                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://node3:8020",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/db_datax.db/t_user",
                        "fileName": "t_user",
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "create_time",
                                "type": "STRING"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

特别注意:如果hive存在hdfs上的数据默认是以'\001'分隔的,用vim打开文件会看到是以^A分隔,但分隔符要用"fieldDelimiter": "\u0001"!!!

3)第三步:启动datax实现数据同步

node3执行以下命令启动datax实现数据同步

cd /usr/local/datax/

python bin/datax.py  job/mysql2hive.json
4)第四步:hive当中查询数据

进行hive客户端,然后执行以下命令查询数据

create database if not exists db_datax;

drop database if exists db_datax;

-- 切换数据库
use db_datax;

-- 创建表
CREATE TABLE if not exists t_user
(
    id          string comment "用户id",
    name        string comment "姓名",
    create_time string comment "时间"
) row format delimited
    fields terminated by ',';

drop table if exists t_user;

select *
from t_user;

原文连接:https://zhuanlan.zhihu.com/p/515541286


评论