flume将日志到hive实现

By | 2018年11月27日
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qqpy789/article/details/48470563

科普:flume是apache下的一个日志收集系统,主要由source+channel+ sink组成:source可以看做是源,也就是日志的来源,本例子是用exec source;channel可以看做是中转的路,可以是文件也可以是内存;sink是输出,一般有hive sink,hbase sink,hdfs sink,avro sink。当然一个机器可以有多个source+channel+ sink。

资源: 172.16.6.152 node1 安装有flume+datanode

     172.16.6.151 master  安装有flume +hive+ namenode

思路:1.node1机器中使用exec source 执行tail -F /*/*.log获取日志的source,然后使用memory channel,然后使用avro传输至master

2.master中的flume 的souce是avro,接收node1机器所发送过来的数据,经过memory channel,最后使用hdfs sink写入hdfs中

3.由于本人水平有限,本来想直接在master中使用hive sink,无奈一直报hive class找不着的错误。所以就使用了另外一个损招,hive导入数据,是可以直接复制文件进入到hive表所对应的location中,所以就有了解决办法。

附上详细的设置:

1.flume安装,解压,然后修改配置文件conf/flume-env.sh 设置环境变量

2.node1节点,在conf目录下生成example4.conf文件,example4.conf文件内容如下

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30
 
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent1.sources.avro-source1.channels = ch1
#agent1.sources.avro-source1.type = avro
#agent1.sources.avro-source1.bind = 0.0.0.0
#agent1.sources.avro-source1.port = 41414
#agent1.sources.avro-source1.threads = 5
 
#define source monitor a file
agent1.sources.avro-source1.type = exec
agent1.sources.avro-source1.shell = /bin/bash -c
agent1.sources.avro-source1.command =  tail -F /opt/cdh5.3.0/hadoop/logs/hadoop-hadoop-namenode-node1.log
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.threads = 5
 
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = avro
agent1.sinks.log-sink1.hostname=master
agent1.sinks.log-sink1.port=41415

 
# Finally, now that we’ve defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

3.master节点flume设置,在conf中生成example6.conf,内容如下

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30
 
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent1.sources.avro-source1.channels = ch1
#agent1.sources.avro-source1.type = avro
#agent1.sources.avro-source1.bind = 0.0.0.0
#agent1.sources.avro-source1.port = 41414
#agent1.sources.avro-source1.threads = 5
 
#define source monitor a file
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = master
agent1.sources.avro-source1.port =  41415
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.threads = 5
 
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = hdfs
agent1.sinks.log-sink1.hdfs.path = hdfs://master:8020/zhonghui/flume/logs/type=hadoop/host=172.16.6.152/
agent1.sinks.log-sink1.hdfs.useLocalTimeStamp=true
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType = DataStream
agent1.sinks.log-sink1.hdfs.rollInterval = 0
agent1.sinks.log-sink1.hdfs.rollSize = 1000000
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize = 1000
agent1.sinks.log-sink1.hdfs.txnEventMax = 1000
agent1.sinks.log-sink1.hdfs.callTimeout = 60000
agent1.sinks.log-sink1.hdfs.appendTimeout = 60000
agent1.sinks.log-sink1.hdfs.filePrefix = log
agent1.sinks.log-sink1.hdfs.batchSize=2

 
# Finally, now that we’ve defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

4.启动master的flume,和node1的flum 

master:bin/flume-ng agent –conf conf –conf-file conf/example6.conf –name agent1 -Dflume.root.logger=INFO,consolee

node1:bin/flume-ng agent –conf conf –conf-file conf/example4.conf –name agent1 -Dflume.root.logger=INFO,consolee

6.大概过个几分钟(最多10分钟,当然你可以改配置文件,我暂时没有找到配置项),就会产生文件,如果会产生文件,那就成功了一半,反之,自己小心核对配置文件。然后停止node1和master上的服务,删除刚生成的文件夹(包括文件)

7.hive环境搭建我就不复述了,自己百度吧。

create table logs(
create string,
content string)
PARTITIONED BY (type string,host string)
row format delimited fields terminated by ‘,’  location ‘/zhonghui/flume/logs/’;

alter table logs add partition (type=’hadoop’,host=’172.16.6.152′);

这样就创建了一个hive表,以及一个分区

8.启动node1和master上的flume。然后坐等数据吧

hive> select * from logs;
OK
2015-09-15 17:49:15 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Rescanning after 30000 milliseconds
hadoop 172.16.6.152
2015-09-15 17:49:15 801 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Scanned 0 directive(s) and 0 block(s) in 1 millisecond(s).
hadoop 172.16.6.152
2015-09-15 17:49:37 825 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Number of transactions: 2 Total time for transactions(ms): 1 Number of transactions batched in Syncs: 0 Number of syncs: 1 SyncTimes(ms): 10
50 hadoop
172.16.6.152
2015-09-15 17:49:37 853 INFO BlockStateChange: BLOCK* addToInvalidates: blk_1073860942_120256 172.16.6.153:50010 172.16.6.152:50010
hadoop
172.16.6.152
2015-09-15 17:49:37 870 INFO BlockStateChange: BLOCK* addToInvalidates: blk_1073860943_120257 172.16.6.152:50010 172.16.6.151:50010
hadoop
172.16.6.152
2015-09-15 17:49:38 801 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* BlockManager: ask 172.16.6.151:50010 to delete [blk_1073860943_120257]
hadoop 172.16.6.152
2015-09-15 17:49:41 801 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* BlockManager: ask 172.16.6.153:50010 to delete [blk_1073860942_120256]
hadoop 172.16.6.152
2015-09-15 17:49:44 802 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* BlockManager: ask 172.16.6.152:50010 to delete [blk_1073860942_120256
hadoop 172.16.6.152
2015-09-15 17:49:45 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Rescanning after 30000 milliseconds
hadoop 172.16.6.152
2015-09-15 17:49:45 801 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Scanned 0 directive(s) and 0 block(s) in 1 millisecond(s).
hadoop 172.16.6.152
2015-09-15 17:50:15 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Rescanning after 30000 milliseconds
hadoop 172.16.6.152
2015-09-15 17:50:15 800 INFO org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor: Scanned 0 directive(s) and 0 block(s) in 0 millisecond(s).
hadoop 172.16.6.152
2015-09-15 17:50:22 104 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Roll Edit Log from 172.16.6.150
hadoop 172.16.6.152
2015-09-15 17:50:22 104 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Rolling edit logs
hadoop 172.16.6.152

附:https://flume.apache.org/FlumeUserGuide.html#hdfs-sink 官网上比较详细的资料

发表评论