案例一:从控制台打入数据,在控制台显示
1、确定scource
类型,channel
类型和sink
类型
确定的使用类型分别是,
netcat source
,memory channel
,logger sink
.
2、编写conf文件
#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a.sources=r1
a.channels=c1
a.sinks=k1
#定义source类型,这里是试用netcat的类型
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.110.23
a.sources.r1.port=8888
#定义source发送的下游channel
a.sources.r1.channels=c1
#定义channel
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000
#定义sink的类型,确定上游channel
a.sinks.k1.channel=c1
a.sinks.k1.type=logger
3、开启服务,我们重新开启复制一个客户端进行开启服务
命令: 注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径
flume-ng agent -n a -c /usr/local/apache-flume-1.9.0/conf -f ./netcat.conf -Dflume.root.logger=DEBUG,console
4.在另一个客户端输入命令:(需要先启动hadoop
)
注意:这里的
node3
和8888
是在conf
文件中设置好的ip
地址和端口在输入第二个命令的窗口中输入数据,回车,在服务端就会接收到数据。
yum install -y telnet
yum install -y nc
第一种方式打开:
telnet node3 8888
单独起一个线程,不会占用端口号第二种方式打开:
nc -lk 8888
单独起一个线程,会占用端口号
案例二、从本地指定路径中打入数据到HDFS
1、同样,我们需要先确定scource
类型,channel
类型和sink
类型
我们确定使用的类型分别是,
spooldir source
,memory channle
,hdfs sink
2、编写conf文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/flumedata
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path =hdfs://node3:8020/flumeout/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 10000
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#指定channel
a1.channels.c1.type = memory
#暂存条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、开启服务
flume-ng agent -n a1 -c /usr/local/apache-flume-1.9.0/conf -f ./dir2hdfs.conf -Dflume.root.logger=DEBUG,console
4、将文件复制到指定的目录下
cp data1.csv /opt/flumedata/
课堂穿插案例:手动打数据到hive
表
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source类型,这里是试用netcat的类型
a1.sources.r1.type=netcat
a1.sources.r1.bind=192.168.110.23
a1.sources.r1.port=8888
#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path =hdfs://node3:8020/flumeout2/log_s/test
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_test
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 1000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 10
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
案例三、从java
代码中进行捕获打入到HDFS
1、先确定scource
类型,channel
类型和sink
类型
确定的三个组件的类型是,avro source
, memory channel
, hdfs sink
2、打开maven
项目,添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.9.0</version>
</dependency>
3、设置log4J
的内容
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.110.23
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%m%n
编写java
代码(示例,可以修改logger
打印的内容)
package com;
import org.apache.log4j.Logger;
import java.text.SimpleDateFormat;
import java.util.Date;
public class LoggerToFlume {
public static void main(String[] args) throws InterruptedException {
//创建一个logger对象
Logger logger = Logger.getLogger(LoggerToFlume.class.getName());
//写一个死循环
while (true) {
Date date = new Date();
//创建一个日期格式化对象
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time=sdf.format(date);
logger.info("当前的时间是:"+time);
//让线程休眠一会儿
Thread.sleep(1000);
}
}
}
4、编写conf
文件
#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1
#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.110.23
a.sources.r1.port = 41414
#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 1000
#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://node3:8020/flumeout2/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.rollSize =0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0
#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
5、开启服务,命令:
flume-ng agent -n a -c /usr/local/apache-flume-1.9.0/conf -f ./avro2hdfs2.conf -Dflume.root.logger=DEBUG,console
案例四、监控HBase日志到Hbase表中(这里可以换成其他组件日志监控)
1、监控日志
提前建好表
create 'log','cf1'
编写conf
文件 hbaselog2hdfs.conf
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = cat /usr/local/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
运行
flume-ng agent -n a -c /usr/local/apache-flume-1.9.0/conf -f ./ hbaselog2hdfs.conf -Dflume.root.logger=DEBUG,console
2、监控自定义的文件
确保test_idoall_org
表在hbase
中已经存在:
hbase(main):002:0> create 'test_idoall_org','uid','name'
0 row(s) in 0.6730 seconds
=> Hbase::Table - test_idoall_org
hbase(main):003:0> put 'test_idoall_org','10086','name:idoall','idoallvalue'
0 row(s) in 0.0960 seconds
2.创建配置文件:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/flumedata/data.txt
a1.sources.r1.port = 44444
a1.sources.r1.host = 192.168.110.23
# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.启动flume agent
:
flume-ng agent -n a1 -c /usr/local/apache-flume-1.9.0/conf -f ./file2hbase.conf -Dflume.root.logger=DEBUG, console
4.产生数据
echo "hello idoall.org from flume" >> data.txt
案例五、flume
监控Http source
1、先确定scource
类型,channel
类型和sink
类型
确定的三个组件的类型是,
http source
,memory channel
,logger sink
.
2、编写conf文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100
3、启动服务
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]' http://192.168.110.23:50000
案例六、多路复制
1、将flume
复制到node1
,node2
scp -r /usr/local/apache-flume-1.9.0 node4:`pwd`
scp -r /usr/local/apache-flume-1.9.0 node5:`pwd`
2、在node1
节点的/usr/local/soft/bigdata19/scripts
下新建配置文件:
a3.sources = r3
a3.channels = c3
a3.sources.r3.type = avro
a3.sources.r3.channels = c3
a3.sources.r3.bind = node1
a3.sources.r3.port = 4141
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
a3.sinks = k3
a3.sinks.k3.type = logger
a3.sinks.k3.channel = c3
3、在node2
节点的 /usr/local/soft/bigdata19/scripts
下新建配置文件:
vim netcat-flume-loggers.conf
添加如下内容:
a4.sources = r4
a4.channels = c4
a4.sources.r4.type = avro
a4.sources.r4.channels = c4
a4.sources.r4.bind = node2
a4.sources.r4.port = 4141
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100
a4.sinks = k4
a4.sinks.k4.type = logger
a4.sinks.k4.channel = c4
4、在master
节点的 /usr/local/soft/bigdata19/scrips
下新建配置文件:
vim netcat-flume-loggers.conf
添加如下内容
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = master
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node1
a2.sinks.k1.port = 4141
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = node2
a2.sinks.k2.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
三台服务器的配置文件建好了,现在就可以启动flume
集群了:
先启动node1
和node2
节点的logger
服务端:
flume-ng agent -n a3 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a4 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console
启动master
节点的netcat
:
flume-ng agent -n a2 -c ../../flume-1.9.0/conf -f ./netcat-flume-loggers.conf -Dflume.root.logger=INFO,console
开启netcat
后此窗口就不能操作了,再新建一个master
窗口启动telnet
:
telnet master 44444