前面介绍了前面介绍了 Hadoop 基本概念与生态、安装(HDFS+YARN+MapReduce)实战操作、常用命令、架构基石 HDFS、统一资源管理和调度平台 YARN、分布式计算框架 MapReduce 、数据仓库 Hive、计算引擎 Spark、实时计算流计算引擎 Flink、 数据库 Hbase、Hadoop 任务调度框架 Oozie、数据同步工具 Sqoop等相关的知识点,今天我将详细的为大家介绍 大数据 Hadoop 分布式日志采集系统 Flume 相关知识。
大数据需要解决的三个问题:采集、存储、计算。
flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据。
同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。flume的数据流由事件(Event)贯穿始终。
事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把event推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
用一句话总结:Flume不生产数据,它只是数据的搬运工。
flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。
flume 之所以这么神奇,是源于它自身的一个设计,这个设计就是 agent,agent 本身是一个 java 进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
agent里面包含3个核心的组件:source—>channel—>sink,类似生产者、仓库、消费者的架构。
传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据
Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。
比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,
也就是说,多个agent可以协同工作。
Flume 使用事务性的方式保证传送Event整个过程的可靠性。Sink 必须在Event 已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份channel文件作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的 Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
#解压文件到/opt/soft中
[root@hadoop02 install]# tar -zxf ./apache-flume-1.9.0-bin.tar.gz -C ../soft/
#在/opt/soft目录下将apache-flume-1.9.0-bin 改名为flume190
[root@hadoop02 soft]# mv apache-flume-1.9.0-bin/ flume190
#/opt/soft/flume190/conf目录中将临时配置文件flume-env.sh.template拷贝为配置文件flume-env.sh
[root@hadoop02 conf]# cp flume-env.sh.template flume-env.sh
#配置flume-env.sh文件 (修改22行和25行)
[root@hadoop02 conf]# vim flume-env.sh
22 export JAVA_HOME=/opt/soft/jdk180
25 export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote"
-Xms:初始Heap大小,使用的最小内存,cpu性能高时此值应设的大一些。
-Xmx:java heap最大值,使用的最大内存。
[root@hadoop02 conf]# yum install -y nc
先查看telnet
[root@hadoop02 ~]# yum list telnet*
安装telnet
[root@hadoop02 ~]# yum install telnet-server
[root@hadoop02 ~]# yum install telnet.*
telnet是teletype network的缩写,现在已成为一个专有名词,表示远程登录协议和方式,分为Telnet客户端和Telnet服务器程序. Telnet可以让用户在本地Telnet客户端上远端登录到远程Telnet服务器上。
先开服务端
[root@hadoop02 ~]# nc -lc 9999
再开客户端
[root@hadoop02 ~]# nc localhost(或输入ip地址) 9999
新建netcat-logger.conf文件
新建netcat-logger.conf文件
[root@hadoop02 conf]# vim ./netcat-logger.conf
# 给agent命名为a1
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# source的描述信息/配置
a1.sources.r1.type=netcat
a1.sources.r1.bind=192.168.152.192
a1.sources.r1.port=8888
# channel的描述信息/配置
a1.channels.c1.type=memory
-- 事务容量,单次发送最大的事件量。要小于总容量
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
# 将source端和sink端进行绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动监听
监听端口:
以下命令需要进入到flume/bin目录下运行
[root@hadoop02 ~]# /opt/soft/flume190/bin/flume-ng agent --name a1 --conf /opt/soft/flume190/conf/ --conf-file /opt/soft/flume190/conf/netcat-logger.conf -Dflume.root.logger=INFO,console
访问端口:
[root@hadoop02 conf]# netcat 192.168.153.139 8888
flume-ng agent:
启动命令。
--name(-n):
agent的名字,这里为a1。
--conf(-c):
flume的配置文件存储在conf/目录。
--conf-file(-f):
flume本次启动的配置文件是conf/netcat-logger.conf文件。
-Dflume.root.logger=INFO,console:
普通运行模式,将运行日志输出到控制台。
-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。
新建flume-file-hdfs.conf文件
--文件保存在/opt/conf中
a1.sources=r1
a1.sinks=k1
a1.channels=c1
--监听hadoop datanode的log文件
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /opt/data/datas.csv
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop02:9000/flume/upload/%Y%m%d/%H
a1.sources.r1.channels=c1
a1.sinks.k1.channels=c1
启动hadoop
[root@hadoop02 ~]# start-all.sh
启动监听
[root@hadoop02 flume190]# ./bin/flume-ng agent -n a1 -c conf/ -f /opt/conf/flume-file-hdfs.conf
Source 采集数据并包装成Event,并将Event缓存再Channel中,Sink不断地从Channel 获取Event,并解决成数据,最终将数据写入存储或索引系统。
Put事务流程:
Take事务:
在使用之前,提供一个大致思想,使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。
1、确定scource类型,channel类型和sink类型
确定的使用类型分别是,netcat source, memory channel, logger sink.
2、编写conf文件
#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a.sources=r1
a.channels=c1
a.sinks=k1
#定义source类型,这里是试用netcat的类型
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.40.110
a.sources.r1.port=8888
#定义source发送的下游channel
a.sources.r1.channels=c1
#定义channel
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000
#定义sink的类型,确定上游channel
a.sinks.k1.channel=c1
a.sinks.k1.type=logger
3、开启服务,我们重新开启复制一个客户端进行开启服务**
命令: 注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径。
[root@master scrips]# flume-ng agent -n a -c flume-1.9.0/conf -f /usr/local/soft/bigdata17/scrips/netcat.conf -Dflume.root.logger=DEBUG,console
4、在另一个客户端输入命令:
注意:这里的master和8888是在conf文件中设置好的ip地址和端口。
在输入第二个命令的窗口中输入数据,回车,在服务端就会接收到数据。
yum install -y telnet
telnet master 8888
1、同样,我们需要先确定scource类型,channel类型和sink类型
我们确定使用的类型分别是,spooldir source, memory channle, hdfs sink
2、编写conf文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/flumedata
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path =hdfs://master:9000/shujia/bigdata17/flumeout/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100000
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#指定channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、开启服务
[root@master scrips]# flume-ng agent -n a1 -c ../../flume/conf -f ./linux2hdfs.conf -Dflume.root.logger=DEBUG, console
4、将文件复制到指定的目录下
cp DIANXIN.csv /usr/local/soft/flumedata/
1、先确定scource类型,channel类型和sink类型
确定的三个组件的类型是,avro source, memory channel, hdfs sink
2、打开maven项目,添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.9.0</version>
</dependency>
3、设置log4J的内容
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.230.50
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%m%n
4、编写conf文件
#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1
#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.40.110
a.sources.r1.port = 41414
#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 100
#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://master:9000/shujia/bigdata17/flumeout2/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#a.sinks.k1.hdfs.fileType = CompressedStream
#a.sinks.k1.hdfs.codeC = gzip
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.rollSize =0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0
#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
5、开启服务,命令
flume-ng agent -n a -c ../conf -f ./avro2hdfs2.conf -Dflume.root.logger=DEBUG,console
6、运行Java代码
7、查看HDFS
提前建好表
create 'log','cf1'
编写conf文件 hbaselog2hdfs.conf
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 100000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
运行
flume-ng agent -n a -c ../conf -f ./ hbaselog2hdfs.conf -Dflume.root.logger=DEBUG,console监控自定义的文件
确保test_idoall_org表在hbase中已经存在
hbase(main):002:0> create 'test_idoall_org','uid','name'创建配置文件
0 row(s) in 0.6730 seconds
=> Hbase::Table - test_idoall_org
hbase(main):003:0> put 'test_idoall_org','10086','name:idoall','idoallvalue'
0 row(s) in 0.0960 seconds
a1.sources = r1启动flume agent
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/flumedata/data.txt
a1.sources.r1.port = 44444
a1.sources.r1.host = 192.168.40.110
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = memoryChannel
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -n a1 -c ../../flume/conf -f ./flie2hbase.conf -Dflume.root.logger=DEBUG, console产生数据
echo "hello idoall.org from flume" >> data.txt
确定的三个组件的类型是,http source, memory channel, logger sink
2、编写conf文件a1.sources=r13、启动服务
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100
flume-ng agent -n a1 -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console4、复制一个窗口进行打数据
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]' http://192.168.40.110:500003、启动服务
=50000
a1.sources.r1.channels=c1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100
flume-ng agent -n a1 -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console4、复制一个窗口进行打数据
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello bigdata"}]' http://192.168.40.110:50000
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!