项目实训第四天
本文最后更新于:2021年7月23日 晚上
安装Flume
将
apache-flume-1.9.0-bin.tar.gz
上传至虚拟机的/opt文件夹下并解压:1
tar -xvf apache-flume-1.9.0-bin.tar.gz
修改启动脚本参数
进入apache-flume-1.9.0-bin/conf
1
2
3
4
5
6cp flume-env.sh.template flume-env.sh
vim flume-env.sh
# 添加
export $JAVA_HOME=/opt/jdk1.8.0_181配置环境变量
1
2
3
4
5
6
7vim /etc/profile
# 添加
export FLUME_HOME=/opt/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile检测是否安装成功
1
flume-ng version
输出如下:
1
2
3
4
5Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
Flume
Source
AVRO Source
AVRO是Apache提供的一套序列化反序列化机制,AVRO的序列化机制能够跨平台跨语言
AVRO Source实际上是用于接收被AVRO序列化之后的数据,结合AVRO Sink可以实现多级、扇入以及扇出流动
配置案例
格式文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23#配置一个agent,agent的名称可以自定义(如a1)
#指定agent的sources(如s1)、sinks(如k1)、channels(如c1)
#分别指定agent的sources,sinks,channels的名称 名称可以自定义
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置source
a1.sources.s1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = 172.16.132.6
a1.sources.s1.port = 6666
#配置channels
a1.channels.c1.type = memory
#配置sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
#为sources和sinks绑定channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.conf -Dflume.root.logger=INFO,console
利用AVRO客户端将文件序列化之后发送给Flume
1
flume-ng avro-client -H hadoop -p 6666 -F a.txtou
Exec Source
Exec Source会监听指定的命令,会将这个命令的执行结果作为日志来进行收集
案例:监听文件,如果文件中新添了数据,自动收集文件中新添的数据
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1# 配置Exec Sourcea1.sources.s1.type = exec# 指定监听的命令a1.sources.s1.command = tail -F /opt/a.txt# 指定命令类型a1.sources.s1.shell = /bin/bash -ca1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
启动Flume
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f execsource.conf -Dflume.root.logger=INFO,console
Sequence Generator Source
Sequence Generator Source是一个序列产生器,会从0开始递增,递增到totalEvents
totalEvents如果不指定,则默认是Long.MAX_VALUE,即2^63^-1
案例
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1# 配置Sequence Generator Sourcea1.sources.s1.type = seq# 指定递增的最大值a1.sources.s1.totalEvents = 1000a1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f seqsource.conf -Dflume.root.logger=INFO,console
Spooling Directory Source
Spooling Directory Source会监听指定的目录,如果指定的目录下产生了新的文件,那么会将这个新文件中的内容自动的进行按行收集
被收集完的文件会自动添加一个后缀.COMPLETED
案例
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1# 配置Spooling Directory Sourcea1.sources.s1.type = spooldir# 要监听的目录a1.sources.s1.spoolDir = /opt/flumedataa1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.conf -Dflume.root.logger=INFO,console
HTTP Source
HTTP Source监听HTTP请求,只能用于监听GET和POST请求。其中对于GET请求的监听只处于实验阶段,所以实际过程中只用这个Source来监听POST请求
案例
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1# 配置HTTP Sourcea1.sources.s1.type = http# 要监听的端口a1.sources.s1.port = 8090a1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f httpsource.conf -Dflume.root.logger=INFO,console
发送POST请求
1
curl -X POST -d '[{"headers":{"date":"2021-07-14"},"body":"hello class"}]' http://hadoop:8090
Custom Source
在Flume中,如果Flume原生提供的Source不能适用指定场景,那么此时就可以考虑自定义Source
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1# 配置自定义Source - 指定全路径名a1.sources.s1.type = cn.tedu.flume.source.AuthSource# 指定步长a1.sources.s1.step = 5# 指定终止范围a1.sources.s1.end = 100a1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
Channel
Memory Channel
Memory Channel将Source收集来的数据临时放到内存的队列中,因此这个Channel的读写速度相对较快但是不可靠
通过capacity属性来定义队列的容量。如果队列被放慢,那么后来的数据就会被阻塞。capactiy如果不指定则默认值为为100,实际开发中,一般会将这个值调节为300000~500000
属性transactionCapacity表示事务容量,实际上表示了每次发送或者接收的数据量。transactionCapacity默认也是100,实际开发过程中,会考虑将这个值调节为3000~5000
案例
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1a1.sources.s1.type = netcata1.sources.s1.bind = 0.0.0.0a1.sources.s1.port = 8090# 配置Memory Channela1.channels.c1.type = memory# 指定容量a1.channels.c1.capacity = 100000# 指定数据的批量a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/bin -f memory.conf -Dflume.root.logger=INFO,console
发送数据
1
nc hadoop 8090
File Channel
File Channel将Source收集来的数据以文件形式存储到本地磁盘上,所以这个Channel的读写速度慢但是可靠
在存储的时候,如果不指定,那么会放在
~/.flume/file-channel/data
路径下案例
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1a1.sources.s1.type = netcata1.sources.s1.bind = 0.0.0.0a1.sources.s1.port = 8090# 配置File Channela1.channels.c1.type = file# 指定在磁盘上的临时存储路径a1.channels.c1.dataDirs = /opt/flumedataa1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/bin -f filechannel.conf -Dflume.root.logger=INFO,console
发送数据
1
nc hadoop 8090
JDBC Channel
- JDBC Channel是将Source收集的数据临时存储到数据库中,因为数据库存在索引的问题,所以理论上这个JDBC Channel的效率要高于File Channel但是低于Memory Channel
- JDBC Channel到目前为止仅仅支持Derby(微型数据库、单连接)。正因为采用的是Derby库,所以实际开发中不用这个JDBC Channel
Sink
HDFS Sink
HDFS Sink将数据写到HDFS上。在写数据的时候,默认是每隔30s在HDFS上生成一个新的文件,那么这会导致HDFS上生成大量的小文件,所以实际过程中需要改变这个值
HDFS Sink在讲数据写到HDFS上的时候,还需要考虑文件类型。Flume支持三种文件类型:SequenceFil(序列文件)、DataStream(文本文件)、CompressedStream(压缩文件)
案例
启动HDFS
1
start-dfs.sh
格式文件
1
a1.sources = s1a1.channels = c1a1.sinks = k1a1.sources.s1.type = netcata1.sources.s1.bind = 0.0.0.0a1.sources.s1.port = 8090a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100# 配置HDFS Sinka1.sinks.k1.type = hdfs# 指定在HDFS上的存储路径a1.sinks.k1.hdfs.path = hdfs://hadoop:9000/flumedata/test# 指定文件的滚动间隔时间a1.sinks.k1.hdfs.rollInterval = 3600# 指定文件的存储类型a1.sinks.k1.hdfs.fileType = DataStreama1.sources.s1.channels = c1a1.sinks.k1.channel = c1
启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/bin -f hdfssink.conf -Dflume.root.logger=INFO,console
发送数据
1
nc hadoop 8090
Logger Sink
Logger Sink表示将数据打印到指定位置上,一般是控制台上
在打印的时候,为了防止过多的数据将控制台占满,所以默认打印body部分的数据不超过16个字节,可以通过maxBytesToLog来配置
Logger Sink在打印数据的时候,对中文支持不好
案例
格式文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# 配置Logger Sink
a1.sinks.k1.type = logger
# 指定body部分打印的字节个数
a1.sinks.k1.maxBytesToLog = 20
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/bin -f loggersink.conf -Dflume.root.logger=INFO,console
发送数据
1
nc hadoop 8090
File Roll Sink
File Roll Sink将数据最终写到执行的目录下。在写的时候同样是每隔30s会生成一个小文件,所以实际过程中需要调节大小
案例
格式文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# 配置File Roll Sink
a1.sinks.k1.type = file_roll
# 指定数据的存储目录
a1.sinks.k1.sink.directory = /opt/flumedata
# 指定文件滚动的间隔时间
a1.sinks.k1.sink.rollInterval = 3600
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1启动命令
1
flume-ng agent -n a1 -c $FLUME_HOME/bin -f filerollsink.conf -Dflume.root.logger=INFO,console
发送数据
1
nc hadoop 8090
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!