i need help to configure Fluentd to filter logs based on severity.
we have 2 different monitoring systems Elasticsearch and Splunk, when we enabled log level DEBUG in our application it's generating tons of logs everyday, so we want to filter logs based severity and push it to 2 different logging systems.
when logs has severity: INFO and ERROR then forward container logs to Splunk and except those DEBUG, TRACE, WARN and other logs should go to elastocsearch, please help me how can we do filter it.
Here is the log generated format:
event.log:{"@severity":"DEBUG","@timestamp":"2019-01-18T00:15:34.416Z","@traceId":
event.log:{"@severity":"INFO","@timestamp":"2019-01-18T00:15:34.397Z","@traceId":
event.log:{"@severity":"WARN","@timestamp":"2019-01-18T00:15:34.920Z","@traceId":
please find below fluentd config.
i have added exclude method inside filter and also installed grep plugin added grep method, its not working.
added filter for testing:
<exclude>
@type grep
key severity
pattern DEBUG
</exclude>
also added:
<filter kubernetes.**>
@type grep
exclude1 severity (DEBUG|NOTICE|WARN)
</filter>
kind: ConfigMap
apiVersion: v1
metadata:
name: fluentd-config
namespace: logging
labels:
k8s-app: fluentd
data:
fluentd-standalone.conf: |
<match fluent.**>
@type null
</match>
# include other configs
@include systemd.conf
@include kubernetes.conf
fluentd.conf: |
@include systemd.conf
@include kubernetes.conf
fluentd.conf: |
# Use the config specified by the FLUENTD_CONFIG environment variable, or
# default to fluentd-standalone.conf
@include "#{ENV['FLUENTD_CONFIG'] || 'fluentd-standalone.conf'}"
kubernetes.conf: |
<source>
@type tail
@log_level debug
path /var/log/containers/*.log
pos_file /var/log/kubernetes.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
tag kubernetes.*
format json
</source>
<filter kubernetes.**>
@type kubernetes_metadata
verify_ssl false
<exclude>
@type grep
key severity
pattern DEBUG
</exclude>
</filter>
<filter kubernetes.**>
@type record_transformer
enable_ruby
<record>
event ${record}
</record>
renew_record
auto_typecast
</filter>
<filter kubernetes.**>
@type grep
exclude1 severity (DEBUG|NOTICE|WARN)
</filter>
kubernetes.conf: |
<source>
@type tail
@log_level debug
path /var/log/containers/*.log
pos_file /var/log/kubernetes.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
tag kubernetes.*
format json
</source>
<filter kubernetes.**>
@type kubernetes_metadata
verify_ssl false
</filter>
<filter kubernetes.**>
@type record_transformer
enable_ruby
<record>
event ${record}
</record>
renew_record
auto_typecast
</filter>
# The `all_items` paramater isn't documented, but it is necessary in order for
# us to be able to send k8s events to splunk in a useful manner
<match kubernetes.**>
@type copy
<store>
@type splunk-http-eventcollector
all_items true
server localhost:8088
protocol https
verify false
</store>
<store>
@type elasticsearch
host localhost
port 9200
scheme http
ssl_version TLSv1_2
ssl_verify false
</buffer>
</store>
</match>
How about the following? (not tested)
<source>
@type tail
@log_level debug
path /var/log/containers/*.log
pos_file /var/log/kubernetes.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
tag kubernetes.*
format json
@label @INPUT
</source>
<label @INPUT>
<filter kubernetes.**>
@type kubernetes_metadata
verify_ssl false
</filter>
<filter kubernetes.**>
@type record_transformer
enable_ruby
<record>
event ${record}
</record>
renew_record
auto_typecast
</filter>
<match>
@type relabel
@label @RETAG
</match>
</label>
<label @RETAG>
<match>
@type rewrite_tag_filter
<rule>
key @severity
pattern /(INFO|ERROR)/
tag splunk.${tag}
</rule>
<rule>
key @severity
pattern /(DEBUG|TRACE|WARN)/
tag elasticsearch.${tag}
</rule>
@label @OUTPUT
</match>
</label>
<label @OUTPUT>
<match splunk.**>
@type splunk-http-eventcollector
# ... snip
</match>
<match elasticsearch.**>
@type elasticsearch
# ... snip
</match>
</label>