项目实训第四天

本文最后更新于:2021年7月23日 晚上


安装Flume

  1. apache-flume-1.9.0-bin.tar.gz上传至虚拟机的/opt文件夹下并解压:

    1
    tar -xvf apache-flume-1.9.0-bin.tar.gz
  2. 修改启动脚本参数

    进入apache-flume-1.9.0-bin/conf

    1
    2
    3
    4
    5
    6
    cp flume-env.sh.template flume-env.sh

    vim flume-env.sh

    # 添加
    export $JAVA_HOME=/opt/jdk1.8.0_181
  3. 配置环境变量

    1
    2
    3
    4
    5
    6
    7
    vim /etc/profile

    # 添加
    export FLUME_HOME=/opt/apache-flume-1.9.0-bin
    export PATH=$PATH:$FLUME_HOME/bin

    source /etc/profile
  4. 检测是否安装成功

    1
    flume-ng version

    输出如下:

    1
    2
    3
    4
    5
    Flume 1.9.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: d4fcab4f501d41597bc616921329a4339f73585e
    Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
    From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume

Source

AVRO Source

  1. AVRO是Apache提供的一套序列化反序列化机制,AVRO的序列化机制能够跨平台跨语言

  2. AVRO Source实际上是用于接收被AVRO序列化之后的数据,结合AVRO Sink可以实现多级、扇入以及扇出流动

  3. 配置案例

    1. 格式文件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      #配置一个agent,agent的名称可以自定义(如a1)
      #指定agent的sources(如s1)、sinks(如k1)、channels(如c1)
      #分别指定agent的sources,sinks,channels的名称 名称可以自定义
      a1.sources = s1
      a1.sinks = k1
      a1.channels = c1

      #配置source
      a1.sources.s1.channels = c1
      a1.sources.s1.type = avro
      a1.sources.s1.bind = 172.16.132.6
      a1.sources.s1.port = 6666

      #配置channels
      a1.channels.c1.type = memory

      #配置sinks
      a1.sinks.k1.channel = c1
      a1.sinks.k1.type = logger

      #为sources和sinks绑定channels
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.conf -Dflume.root.logger=INFO,console
    3. 利用AVRO客户端将文件序列化之后发送给Flume

      1
      flume-ng avro-client -H hadoop -p 6666 -F a.txtou

Exec Source

  1. Exec Source会监听指定的命令,会将这个命令的执行结果作为日志来进行收集

  2. 案例:监听文件,如果文件中新添了数据,自动收集文件中新添的数据

    1. 格式文件

      1
      a1.sources = s1a1.channels = c1a1.sinks = k1# 配置Exec Sourcea1.sources.s1.type = exec# 指定监听的命令a1.sources.s1.command = tail -F /opt/a.txt# 指定命令类型a1.sources.s1.shell = /bin/bash -ca1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
    2. 启动Flume

      1
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f execsource.conf -Dflume.root.logger=INFO,console

Sequence Generator Source

  1. Sequence Generator Source是一个序列产生器,会从0开始递增,递增到totalEvents

  2. totalEvents如果不指定,则默认是Long.MAX_VALUE,即2^63^-1

  3. 案例

    1. 格式文件

      1
      a1.sources = s1a1.channels = c1a1.sinks = k1# 配置Sequence Generator Sourcea1.sources.s1.type = seq# 指定递增的最大值a1.sources.s1.totalEvents = 1000a1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f seqsource.conf -Dflume.root.logger=INFO,console

Spooling Directory Source

  1. Spooling Directory Source会监听指定的目录,如果指定的目录下产生了新的文件,那么会将这个新文件中的内容自动的进行按行收集

  2. 被收集完的文件会自动添加一个后缀.COMPLETED

  3. 案例

    1. 格式文件

      1
      a1.sources = s1a1.channels = c1a1.sinks = k1# 配置Spooling Directory Sourcea1.sources.s1.type = spooldir# 要监听的目录a1.sources.s1.spoolDir = /opt/flumedataa1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.conf -Dflume.root.logger=INFO,console

HTTP Source

  1. HTTP Source监听HTTP请求,只能用于监听GET和POST请求。其中对于GET请求的监听只处于实验阶段,所以实际过程中只用这个Source来监听POST请求

  2. 案例

    1. 格式文件

      1
      a1.sources = s1a1.channels = c1a1.sinks = k1# 配置HTTP Sourcea1.sources.s1.type = http# 要监听的端口a1.sources.s1.port = 8090a1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f httpsource.conf -Dflume.root.logger=INFO,console
    3. 发送POST请求

      1
      curl -X POST -d '[{"headers":{"date":"2021-07-14"},"body":"hello class"}]' http://hadoop:8090

Custom Source

  1. 在Flume中,如果Flume原生提供的Source不能适用指定场景,那么此时就可以考虑自定义Source

  2. 格式文件

    1
    a1.sources = s1a1.channels = c1a1.sinks = k1# 配置自定义Source - 指定全路径名a1.sources.s1.type = cn.tedu.flume.source.AuthSource# 指定步长a1.sources.s1.step = 5# 指定终止范围a1.sources.s1.end = 100a1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1

Channel

Memory Channel

  1. Memory Channel将Source收集来的数据临时放到内存的队列中,因此这个Channel的读写速度相对较快但是不可靠

  2. 通过capacity属性来定义队列的容量。如果队列被放慢,那么后来的数据就会被阻塞。capactiy如果不指定则默认值为为100,实际开发中,一般会将这个值调节为300000~500000

  3. 属性transactionCapacity表示事务容量,实际上表示了每次发送或者接收的数据量。transactionCapacity默认也是100,实际开发过程中,会考虑将这个值调节为3000~5000

  4. 案例

    1. 格式文件

      1
      a1.sources = s1a1.channels = c1a1.sinks = k1a1.sources.s1.type = netcata1.sources.s1.bind = 0.0.0.0a1.sources.s1.port = 8090# 配置Memory Channela1.channels.c1.type = memory# 指定容量a1.channels.c1.capacity = 100000# 指定数据的批量a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/bin -f memory.conf -Dflume.root.logger=INFO,console
    3. 发送数据

      1
      nc hadoop 8090

File Channel

  1. File Channel将Source收集来的数据以文件形式存储到本地磁盘上,所以这个Channel的读写速度慢但是可靠

  2. 在存储的时候,如果不指定,那么会放在~/.flume/file-channel/data路径下

  3. 案例

    1. 格式文件

      1
      a1.sources = s1a1.channels = c1a1.sinks = k1a1.sources.s1.type = netcata1.sources.s1.bind = 0.0.0.0a1.sources.s1.port = 8090# 配置File Channela1.channels.c1.type = file# 指定在磁盘上的临时存储路径a1.channels.c1.dataDirs = /opt/flumedataa1.sinks.k1.type = loggera1.sources.s1.channels = c1a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/bin -f filechannel.conf -Dflume.root.logger=INFO,console
    3. 发送数据

      1
      nc hadoop 8090

JDBC Channel

  1. JDBC Channel是将Source收集的数据临时存储到数据库中,因为数据库存在索引的问题,所以理论上这个JDBC Channel的效率要高于File Channel但是低于Memory Channel
  2. JDBC Channel到目前为止仅仅支持Derby(微型数据库、单连接)。正因为采用的是Derby库,所以实际开发中不用这个JDBC Channel

Sink

HDFS Sink

  1. HDFS Sink将数据写到HDFS上。在写数据的时候,默认是每隔30s在HDFS上生成一个新的文件,那么这会导致HDFS上生成大量的小文件,所以实际过程中需要改变这个值

  2. HDFS Sink在讲数据写到HDFS上的时候,还需要考虑文件类型。Flume支持三种文件类型:SequenceFil(序列文件)、DataStream(文本文件)、CompressedStream(压缩文件)

  3. 案例

    1. 启动HDFS

      1
      start-dfs.sh
    2. 格式文件

      1
      a1.sources = s1a1.channels = c1a1.sinks = k1a1.sources.s1.type = netcata1.sources.s1.bind = 0.0.0.0a1.sources.s1.port = 8090a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100# 配置HDFS Sinka1.sinks.k1.type = hdfs# 指定在HDFS上的存储路径a1.sinks.k1.hdfs.path = hdfs://hadoop:9000/flumedata/test# 指定文件的滚动间隔时间a1.sinks.k1.hdfs.rollInterval = 3600# 指定文件的存储类型a1.sinks.k1.hdfs.fileType = DataStreama1.sources.s1.channels = c1a1.sinks.k1.channel = c1
    3. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/bin -f hdfssink.conf -Dflume.root.logger=INFO,console
    4. 发送数据

      1
      nc hadoop 8090

Logger Sink

  1. Logger Sink表示将数据打印到指定位置上,一般是控制台上

  2. 在打印的时候,为了防止过多的数据将控制台占满,所以默认打印body部分的数据不超过16个字节,可以通过maxBytesToLog来配置

  3. Logger Sink在打印数据的时候,对中文支持不好

  4. 案例

    1. 格式文件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 1000

      # 配置Logger Sink
      a1.sinks.k1.type = logger
      # 指定body部分打印的字节个数
      a1.sinks.k1.maxBytesToLog = 20

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/bin -f loggersink.conf -Dflume.root.logger=INFO,console
    3. 发送数据

      1
      nc hadoop 8090

File Roll Sink

  1. File Roll Sink将数据最终写到执行的目录下。在写的时候同样是每隔30s会生成一个小文件,所以实际过程中需要调节大小

  2. 案例

    1. 格式文件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100

      # 配置File Roll Sink
      a1.sinks.k1.type = file_roll
      # 指定数据的存储目录
      a1.sinks.k1.sink.directory = /opt/flumedata
      # 指定文件滚动的间隔时间
      a1.sinks.k1.sink.rollInterval = 3600

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/bin -f filerollsink.conf -Dflume.root.logger=INFO,console
    3. 发送数据

      1
      nc hadoop 8090

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!