进击大数据系列(十三)Hadoop 分布式日志采集系统 Flume

前面介绍了前面介绍了 Hadoop 基本概念与生态、安装(HDFS+YARN+MapReduce)实战操作、常用命令、架构基石 HDFS、统一资源管理和调度平台 YARN、分布式计算框架 MapReduce 、数据仓库 Hive、计...

前面介绍了前面介绍了 Hadoop 基本概念与生态安装(HDFS+YARN+MapReduce)实战操作常用命令架构基石 HDFS统一资源管理和调度平台 YARN分布式计算框架 MapReduce 、数据仓库 Hive计算引擎 Spark实时计算流计算引擎 Flink 数据库 HbaseHadoop 任务调度框架 Oozie数据同步工具 Sqoop等相关的知识点,今天我将详细的为大家介绍 大数据 Hadoop 分布式日志采集系统 Flume 相关知识。

Hadoop 业务开发流程

attachments-2023-10-wyAkwrT8651a84a79a785,png

大数据需要解决的三个问题:采集、存储、计算。

Flume概述

flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据。

attachments-2023-10-MGZLol3E651a84c9715de,png

同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。flume的数据流由事件(Event)贯穿始终。

事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把event推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

attachments-2023-10-rMDwnqKT651a84e0ccc81,png

用一句话总结:Flume不生产数据,它只是数据的搬运工。

flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

flume 基础框架

attachments-2023-10-qKiCbfmY651a84ff8b878,png

flume 之所以这么神奇,是源于它自身的一个设计,这个设计就是 agent,agent 本身是一个 java 进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。

agent

agent里面包含3个核心的组件:source—>channel—>sink,类似生产者、仓库、消费者的架构。

  • source:source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
  • channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
  • sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

Event

传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

attachments-2023-10-UHSJgtrI651a8517dc776,png

Flume运行机制

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据

Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。

值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。

比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,

也就是说,多个agent可以协同工作。

Flume可靠性

Flume 使用事务性的方式保证传送Event整个过程的可靠性。Sink 必须在Event 已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份channel文件作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。

flume的广义用法(多个agent顺序连接)

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的 Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

attachments-2023-10-9FE0Qbgx651a85337cbe1,png

Flume 安装

解压文件

#解压文件到/opt/soft中
[root@hadoop02 install]# tar -zxf ./apache-flume-1.9.0-bin.tar.gz -C ../soft/ 
#在/opt/soft目录下将apache-flume-1.9.0-bin 改名为flume190
[root@hadoop02 soft]# mv apache-flume-1.9.0-bin/ flume190

修改配置文件

#/opt/soft/flume190/conf目录中将临时配置文件flume-env.sh.template拷贝为配置文件flume-env.sh
[root@hadoop02 conf]# cp flume-env.sh.template flume-env.sh 

#配置flume-env.sh文件 (修改22行和25行)
[root@hadoop02 conf]#  vim flume-env.sh 
22 export JAVA_HOME=/opt/soft/jdk180
25 export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote"

attachments-2023-10-iBhc6wea651a85da4bfbc,png

attachments-2023-10-HZpsM7jc651a85e5e8480,png

-Xms:初始Heap大小,使用的最小内存,cpu性能高时此值应设的大一些。  
-Xmx:java heap最大值,使用的最大内存。

安装nc(netcat) /telnet协议

[root@hadoop02 conf]# yum install -y nc 
先查看telnet
[root@hadoop02  ~]# yum list telnet*
安装telnet
[root@hadoop02 ~]# yum install telnet-server
[root@hadoop02 ~]# yum install telnet.*

telnet是teletype network的缩写,现在已成为一个专有名词,表示远程登录协议和方式,分为Telnet客户端和Telnet服务器程序. Telnet可以让用户在本地Telnet客户端上远端登录到远程Telnet服务器上。

监听端口

先开服务端
[root@hadoop02 ~]# nc -lc 9999 
再开客户端
[root@hadoop02 ~]# nc localhost(或输入ip地址) 9999

attachments-2023-10-TXyBpCaq651a865a7f195,png

Flume 应用

监控端口并输出到控制台

  • 1> 通过netcat工具向本机的8888端口发送数据。
  • 2> flume监控本机的8888端口。通过flume的source端读取数据。
  • 3> flume将获取的数据通过sink端写出到控制台。

新建netcat-logger.conf文件

新建netcat-logger.conf文件

[root@hadoop02 conf]# vim ./netcat-logger.conf
# 给agent命名为a1
a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
# source的描述信息/配置
a1.sources.r1.type=netcat
a1.sources.r1.bind=192.168.152.192
a1.sources.r1.port=8888
 
# channel的描述信息/配置
a1.channels.c1.type=memory
-- 事务容量,单次发送最大的事件量。要小于总容量
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
a1.sinks.k1.type=logger
 
# 将source端和sink端进行绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动监听

监听端口:
以下命令需要进入到flume/bin目录下运行
[root@hadoop02 ~]# /opt/soft/flume190/bin/flume-ng agent --name a1 --conf /opt/soft/flume190/conf/ --conf-file /opt/soft/flume190/conf/netcat-logger.conf -Dflume.root.logger=INFO,console
 
访问端口:
[root@hadoop02 conf]# netcat 192.168.153.139 8888

attachments-2023-10-JTFiOMHo651a86fb51789,png

flume-ng agent:  
启动命令。  
  
--name(-n):  
agent的名字,这里为a1。  
  
--conf(-c):  
flume的配置文件存储在conf/目录。  
  
--conf-file(-f):  
flume本次启动的配置文件是conf/netcat-logger.conf文件。  
  
-Dflume.root.logger=INFO,console:  
普通运行模式,将运行日志输出到控制台。

-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

监控单个追加文件

新建flume-file-hdfs.conf文件

--文件保存在/opt/conf中
 
a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
--监听hadoop datanode的log文件
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /opt/data/datas.csv
 
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop02:9000/flume/upload/%Y%m%d/%H
 
a1.sources.r1.channels=c1
a1.sinks.k1.channels=c1

启动hadoop

[root@hadoop02 ~]# start-all.sh

启动监听

[root@hadoop02 flume190]# ./bin/flume-ng agent -n a1 -c conf/ -f /opt/conf/flume-file-hdfs.conf 

flume事务

attachments-2023-10-aEyxkEKb651a8714a5540,png

Source 采集数据并包装成Event,并将Event缓存再Channel中,Sink不断地从Channel 获取Event,并解决成数据,最终将数据写入存储或索引系统。

  • Put事务流程:

    • doPut:将批数据先写入临时缓冲区。
    • doCommit:检查channel内存队列是否足够合并。
    • doRollback:channel内存队列空间不足,回滚事务。
  • Take事务:

    • doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
    • doCommit:如果数据全部发送成功,则清楚临时缓冲区takeList。
    • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。

使用案例

在使用之前,提供一个大致思想,使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。

案例一、从控制台打入数据,在控制台显示

1、确定scource类型,channel类型和sink类型

确定的使用类型分别是,netcat source, memory channel, logger sink.

2、编写conf文件

#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a.sources=r1
a.channels=c1
a.sinks=k1
#定义source类型,这里是试用netcat的类型
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.40.110
a.sources.r1.port=8888
#定义source发送的下游channel
a.sources.r1.channels=c1
#定义channel
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000
#定义sink的类型,确定上游channel
a.sinks.k1.channel=c1
a.sinks.k1.type=logger

3、开启服务,我们重新开启复制一个客户端进行开启服务**

命令: 注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径。

[root@master scrips]# flume-ng agent -n a -c flume-1.9.0/conf -f /usr/local/soft/bigdata17/scrips/netcat.conf -Dflume.root.logger=DEBUG,console

4、在另一个客户端输入命令:

注意:这里的master和8888是在conf文件中设置好的ip地址和端口。

在输入第二个命令的窗口中输入数据,回车,在服务端就会接收到数据。

yum install -y telnet
telnet master 8888

attachments-2023-10-38M3mNBe651a8748c7fea,png

attachments-2023-10-2Fw7aK9b651a8771d7bc3,png


案例二、从本地指定路径中打入数据到HDFS

1、同样,我们需要先确定scource类型,channel类型和sink类型

我们确定使用的类型分别是,spooldir source, memory channle, hdfs sink

2、编写conf文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/flumedata
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path =hdfs://master:9000/shujia/bigdata17/flumeout/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100000
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#指定channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
 #组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、开启服务

[root@master scrips]# flume-ng agent -n a1 -c ../../flume/conf -f ./linux2hdfs.conf -Dflume.root.logger=DEBUG, console

4、将文件复制到指定的目录下

cp DIANXIN.csv /usr/local/soft/flumedata/

attachments-2023-10-CDa1HixB651a87bacaf21,png

案例三、从java代码中进行捕获打入到HDFS

1、先确定scource类型,channel类型和sink类型

确定的三个组件的类型是,avro source, memory channel, hdfs sink

2、打开maven项目,添加依赖

            <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flume.flume-ng-clients</groupId>
                <artifactId>flume-ng-log4jappender</artifactId>
                <version>1.9.0</version>
            </dependency>

3、设置log4J的内容

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.230.50
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout 
log4j.appender.flume.layout.ConversionPattern=%m%n

4、编写conf文件

#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1

#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.40.110
a.sources.r1.port = 41414

#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 100

#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://master:9000/shujia/bigdata17/flumeout2/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#a.sinks.k1.hdfs.fileType = CompressedStream
#a.sinks.k1.hdfs.codeC = gzip
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.rollSize =0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0 
#组装source、channel、sink

a.sources.r1.channels = c1
a.sinks.k1.channel = c1

5、开启服务,命令

flume-ng agent -n a -c ../conf -f ./avro2hdfs2.conf -Dflume.root.logger=DEBUG,console

6、运行Java代码

attachments-2023-10-dPMxhICL651a880bb4957,png

7、查看HDFS

attachments-2023-10-r10Fdti5651a882384631,png
attachments-2023-10-vaAfv0ns651a884e55e3b,png

案例四、监控HBase日志到Hbase表中(这里可以换成其他组件日志监控)

监控日志

提前建好表

 create 'log','cf1'

编写conf文件 hbaselog2hdfs.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec 
a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 100000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

运行

flume-ng agent -n a -c ../conf -f ./ hbaselog2hdfs.conf -Dflume.root.logger=DEBUG,console
监控自定义的文件

确保test_idoall_org表在hbase中已经存在

hbase(main):002:0> create 'test_idoall_org','uid','name'
0 row(s) in 0.6730 seconds

=> Hbase::Table - test_idoall_org
hbase(main):003:0> put 'test_idoall_org','10086','name:idoall','idoallvalue'
0 row(s) in 0.0960 seconds
创建配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/flumedata/data.txt
a1.sources.r1.port = 44444
a1.sources.r1.host = 192.168.40.110
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = memoryChannel

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume agent
flume-ng agent -n a1 -c ../../flume/conf -f ./flie2hbase.conf -Dflume.root.logger=DEBUG, console
产生数据
echo "hello idoall.org from flume" >> data.txt

案例五、flume监控Http source

1、先确定scource类型,channel类型和sink类型

确定的三个组件的类型是,http source, memory channel, logger sink

2、编写conf文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
 
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
 
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100
3、启动服务
flume-ng agent -n a1 -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
4、复制一个窗口进行打数据
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]'  http://192.168.40.110:50000

=50000  
a1.sources.r1.channels=c1

a1.sinks.k1.type=logger  
a1.sinks.k1.channel=c1

a1.channels.c1.type=memory  
a1.channels.c1.capacity=10000

# 表示sink每次会从channel里取多少数据

a1.channels.c1.transactionCapacity=100
3、启动服务
flume-ng agent -n a1 -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
4、复制一个窗口进行打数据
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]'  http://192.168.40.110:50000

attachments-2023-10-Sx9Wb7sb651a887c450fc,png

  • 发表于 2023-10-02 16:51
  • 阅读 ( 470 )

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
石天
石天

437 篇文章

作家榜 »

  1. shitian 662 文章
  2. 石天 437 文章
  3. 每天惠23 33 文章
  4. 小A 29 文章