Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

07-13 1684阅读

章节内容

上一节完成了如下的内容:

  • 编写Agent Conf配置文件
  • 收集Hive数据
  • 汇聚到HDFS中
  • 测试效果

    背景介绍

    这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

    之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。

    • 2C4G 编号 h121
    • 2C4G 编号 h122
    • 2C2G 编号 h123

      Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

      文档推荐

      除了官方文档以外,这里有一个写的很好的中文文档:

      https://flume.liyifeng.org/

      监控目录

      业务需求

      • 想要监控指定目录 收集信息并上传到HDFS中

        Source

        选择 spooldir,因为 spooldir 能够保证数据不丢失,且能够进行断点续传,但是延迟较高,不能实时监控。

        Channel

        选择 memory

        Sink

        选择 HDFS

        需要注意

        • 拷贝到 spool 目录下的文件 不可以再打开编辑
        • 无法监控子目录的文件夹变动
        • 被监控文件夹每500毫秒 扫描一次文件变动
        • 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

          配置文件

          cd /opt/wzk/flume_test
          vim flume_spooldir-hdfs.conf
          

          我们需要写入如下内容

          # Name the components on this agent
          a3.sources = r3
          a3.channels = c3
          a3.sinks = k3
          # Describe/configure the source
          a3.sources.r3.type = spooldir
          # 注意这里的文件夹 换成自己的!!!
          a3.sources.r3.spoolDir = /opt/wzk/upload
          a3.sources.r3.fileSuffix = .COMPLETED
          a3.sources.r3.fileHeader = true
          # 忽略以.tmp结尾的文件,不上传
          a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
          # Use a channel which buffers events in memory
          a3.channels.c3.type = memory
          a3.channels.c3.capacity = 10000
          a3.channels.c3.transactionCapacity = 500
          # Describe the sink
          a3.sinks.k3.type = hdfs
          # 注意修改成你自己的IP!!!
          a3.sinks.k3.hdfs.path = hdfs://h121.wzk.icu:9000/flume/upload/%Y%m%d/%H%M
          # 上传文件的前缀
          a3.sinks.k3.hdfs.filePrefix = upload-
          # 是否使用本地时间戳
          a3.sinks.k3.hdfs.useLocalTimeStamp = true
          # 积攒500个Event,flush到HDFS一次
          a3.sinks.k3.hdfs.batchSize = 500
          # 设置文件类型
          a3.sinks.k3.hdfs.fileType = DataStream
          # 60秒滚动一次
          a3.sinks.k3.hdfs.rollInterval = 60
          # 128M滚动一次
          a3.sinks.k3.hdfs.rollSize = 134217700
          # 文件滚动与event数量无关
          a3.sinks.k3.hdfs.rollCount = 0
          # 最小冗余数
          a3.sinks.k3.hdfs.minBlockReplicas = 1
          # Bind the source and sink to the channel
          a3.sources.r3.channels = c3
          a3.sinks.k3.channel = c3
          

          启动Agent

          $FLUME_HOME/bin/flume-ng agent --name a3 \
          --conf-file flume-spooldir-hdfs.conf \
          -Dflume.root.logger=INFO,console
          

          Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

          测试效果

          Flume

          cd /opt/wzk/upload
          vim 1.txt
          

          随便向其中写入一些内容,并保存,可以看到Flume已经有反应了。

          Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

          HDFS

          查看HDFS,也已经有内容了

          Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

          采集双写

          这里业务上需要:

          • Flume将数据写入本地
          • Flume将数据写入HDFS

            分析实现

            • 需要多个Agent级联实现
            • Source选择taildir
            • Channel选择memory
            • 最终的Sink分别选择HDFS,file_roll

              Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

              配置文件1

              配置文件包含如下内容:

              • 1个 taildir source
              • 2个 memory channel
              • 2个 avro sink

                新建文件

                vim flume-taildir-avro.conf
                

                写入如下内容

                # Name the components on this agent
                a1.sources = r1
                a1.sinks = k1 k2
                a1.channels = c1 c2
                # 将数据流复制给所有channel
                a1.sources.r1.selector.type = replicating
                # source
                a1.sources.r1.type = taildir
                # 记录每个文件最新消费位置
                a1.sources.r1.positionFile = /root/flume/taildir_position.json
                a1.sources.r1.filegroups = f1
                # 备注:.*log 是正则表达式;这里写成 *.log 是错误的
                a1.sources.r1.filegroups.f1 = /tmp/root/.*log
                # sink
                a1.sinks.k1.type = avro
                a1.sinks.k1.hostname = linux123
                a1.sinks.k1.port = 9091
                a1.sinks.k2.type = avro
                a1.sinks.k2.hostname = linux123
                a1.sinks.k2.port = 9092
                # channel
                a1.channels.c1.type = memory
                a1.channels.c1.capacity = 10000
                a1.channels.c1.transactionCapacity = 500
                a1.channels.c2.type = memory
                a1.channels.c2.capacity = 10000
                a1.channels.c2.transactionCapacity = 500
                # Bind the source and sink to the channel
                a1.sources.r1.channels = c1 c2
                a1.sinks.k1.channel = c1
                a1.sinks.k2.channel = c2
                

                配置文件2

                配置文件包含如下内容:

                • 1个 avro source
                • 1个 memory channel
                • 1个 hdfs sink

                  新建配置文件

                  vim flume-avro-hdfs.conf
                  

                  写入如下的内容:

                  # Name the components on this agent
                  a2.sources = r1
                  a2.sinks = k1
                  a2.channels = c1
                  # Describe/configure the source
                  a2.sources.r1.type = avro
                  a2.sources.r1.bind = linux123
                  a2.sources.r1.port = 9091
                  # Describe the channel
                  a2.channels.c1.type = memory
                  a2.channels.c1.capacity = 10000
                  a2.channels.c1.transactionCapacity = 500
                  # Describe the sink
                  a2.sinks.k1.type = hdfs
                  a2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H
                  # 上传文件的前缀
                  a2.sinks.k1.hdfs.filePrefix = flume2-
                  # 是否使用本地时间戳
                  a2.sinks.k1.hdfs.useLocalTimeStamp = true
                  # 500个Event才flush到HDFS一次
                  a2.sinks.k1.hdfs.batchSize = 500
                  # 设置文件类型,可支持压缩
                  a2.sinks.k1.hdfs.fileType = DataStream
                  # 60秒生成一个新的文件
                  a2.sinks.k1.hdfs.rollInterval = 60
                  a2.sinks.k1.hdfs.rollSize = 0
                  a2.sinks.k1.hdfs.rollCount = 0
                  a2.sinks.k1.hdfs.minBlockReplicas = 1
                  # Bind the source and sink to the channel
                  a2.sources.r1.channels = c1
                  a2.sinks.k1.channel = c1
                  

                  配置文件3

                  配置文件包含如下内容:

                  • 1个 avro source
                  • 1个 memory channel
                  • 1个 file_roll sink

                    新建配置文件

                    vim flume-avro-file.conf
                    

                    写入如下的内容

                    # Name the components on this agent
                    a3.sources = r1
                    a3.sinks = k1
                    a3.channels = c2
                    # Describe/configure the source
                    a3.sources.r1.type = avro
                    a3.sources.r1.bind = linux123
                    a3.sources.r1.port = 9092
                    # Describe the sink
                    a3.sinks.k1.type = file_roll
                    # 目录需要提前创建好
                    a3.sinks.k1.sink.directory = /root/flume/output
                    # Describe the channel
                    a3.channels.c2.type = memory
                    a3.channels.c2.capacity = 10000
                    a3.channels.c2.transactionCapacity = 500
                    # Bind the source and sink to the channel
                    a3.sources.r1.channels = c2
                    a3.sinks.k1.channel = c2
                    

                    启动Agent1

                    $FLUME_HOME/bin/flume-ng agent --name a3 \
                    --conf-file ~/conf/flume-avro-file.conf \
                    -Dflume.root.logger=INFO,console &
                    

                    启动Agent2

                    $FLUME_HOME/bin/flume-ng agent --name a2 \
                    --conf-file ~/conf/flume-avro-hdfs.conf \
                    -Dflume.root.logger=INFO,console &
                    

                    启动Agent3

                    $FLUME_HOME/bin/flume-ng agent --name a1 \
                    --conf-file ~/conf/flume-taildir-avro.conf \
                    -Dflume.root.logger=INFO,console &
                    

                    Hive测试

                    hive -e "show databases;"
                    
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]