fluentd 日志分流到不同的kafka

By | 2018年12月1日
版权声明:本文为博主原创文章,未经博主允许随机转载。 https://blog.csdn.net/mtj66/article/details/79130555

说明数据源来自  source1,测试数据来自source2
此处根据日志的特征分流到不同的kafkatopic,当然也可以到ES 或者是 file,
一个match中的数据可以有多个store进行存储,不过可能会相互影响,也就是copy到两个地方。
# 具体参考 https://docs.fluentd.org/v0.12/articles/out_rewrite_tag_filter?q=store
# source
<source>
  @type forward
  port 24225
  bind 172.16.**
</source>
#source2
<source>
  type tail
  path /data/fluentd/docker_loginlog/test.log
  tag sourcetag
  format json
  pos_file /tmp/fluentd--1516355902.pos
</source> 

# 错误日志聚合
<filter **>
  @type concat
  key log
  stream_identity_key container_id
  multiline_start_regexp /.*\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3} ERROR/
  flush_interval 1
  timeout_label @NORMAL
</filter>

# only process normal line 
<filter **>
  @type parser
  key_name log
  reserve_data true
  <parse>
    @type regexp
    # 此处的 时间以及loginlog抽取根据log日志而定,此处还可以继续优化,在此省略
    expression /(?<time>\d{4}\-\d{2}\-\d{2}\s\d{2}\:\d{2}\:\d{2}\.\d{3}) (?<level>[EFINOR]{4,5}|[\-\sloginlog]{7,8}).*/
    time_key time
    keep_time_key true
    time_format %Y-%m-%d %H:%M:%S
  </parse>
</filter>
# for test
#<match **>
#  @type stdout
#</match>

# output loginlog log match loginlog before  match ** 防止死循环 
<match loginlog>  # log has no tag
  @type kafka_buffered
  # docker
  # list of seed brokers
  # prod brokers 172.16.4.63:9092,172.16.4.64:9092,172.16.4.65:9092
  # uat
  brokers 172.16.6.11:9092,172.16.6.12:9092,172.16.6.13:9092
  # buffer settings
  buffer_type file
  buffer_path /data/log/td-agent/buffer/loginlog-log
  flush_interval 3s
  # topic settings
  default_topic testfluentd  # kafka topic name for specify log
  # data type settings
  output_data_type json
  # include_time_key false
  # include_tag_key  false
  # compression_codec snappy
  # producer settings
  max_send_retries 1
  required_acks 1
</match>
# output  app INFO and ERROR log
<match app*>
  @type kafka_buffered
  # docker
  # list of seed brokers
  # prod brokers 172.16.4.63:9092,172.16.4.64:9092,172.16.4.65:9092
  # uat
  brokers 172.16.6.11:9092,172.16.6.12:9092,172.16.6.13:9092
  # buffer settings
  buffer_type file
  buffer_path /data/log/td-agent/buffer/app-log
  flush_interval 3s
  # topic settings
  default_topic test_fluentd  # TODO: kafka topic name for App log
  # data type settings
  output_data_type json
  # include_time_key false
  # include_tag_key  false
  # compression_codec snappy
  # producer settings
  max_send_retries 1
  required_acks 1
</match>

# 此处的 rewrite_tag_filter 重新tag 会重新开始进行match匹配
<match **>
  # https://docs.fluentd.org/v0.12/articles/out_rewrite_tag_filter
  @type rewrite_tag_filter
  <rule>
    key log
    # write the regex to match loginlog log
    # TODO: 改成我们的 loginlog 日志的标识符
    pattern ^.*loginlog.*
    tag loginlog
  </rule>
  <rule>
    key log
    pattern ^.*
    tag app
  </rule>
</match> 

# process multi-line error log and match in this label 
<label @NORMAL> 
# extract time and leve
<filter **>
  @type parser
  key_name log
  reserve_data true
  <parse>
    @type regexp
    #expression /(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}) (?<level>[EFINOR]{4,5}+) .*/
    expression /(?<time>\d{4}\-\d{2}\-\d{2}\s\d{2}\:\d{2}\:\d{2}\.\d{3}) (?<level>[EFINOR]{4,5}|[\-\sloginlog]{7,8}).*/
    time_key time
    keep_time_key true
    time_format %Y-%m-%d %H:%M:%S
  </parse>
</filter>
 # this tag should before match **
 <match app*>
  @type kafka_buffered
  # docker
  # list of seed brokers
  # prod brokers 172.16.4.63:9092,172.16.4.64:9092,172.16.4.65:9092
  # uat
  brokers 172.16.6.11:9092,172.16.6.12:9092,172.16.6.13:9092
  # buffer settings
  buffer_type file
  buffer_path /data/log/td-agent/buffer/app-log-error
  flush_interval 3s
  # topic settings
  default_topic test_fluentd  # 
  # data type settings
  output_data_type json
  # include_time_key false
  # include_tag_key  false
  # compression_codec snappy
  # producer settings
  max_send_retries 1
  required_acks 1
 </match>

 # 此处认为多行日志都会进到这个match **
 <match **>
  #@type stdout
  # https://docs.fluentd.org/v0.12/articles/out_rewrite_tag_filter
  @type rewrite_tag_filter
  # 此处认为 loginlog 不会包含INFO 或者是ERROR
  <rule>
    key log
    pattern ^.*[EFINOR]{4,5}.*
    tag app2
  </rule>
 </match> 
</label>  # error log redirect end 

# for test 
#<match **>
#  @type stdout
#</match>

发表评论