EFK JSON Keys as Fields

2/24/2020

I am using an EFK (elasticsearch,fluentd,kibana) stack in my kubernetes cluster for logging. Everything works fine but the log field that contains the most useful information is displayed in plain JSON in kibana.

enter image description here

Is there a way to extract those key-values from the log field and display them as separate fields?

Example:

enter image description here

I already extracted the fluentd.conf into a configmap and tried to achieve the result with the filter parser

    <filter kubernetes.var.log.containers.dealing-**.log>
      @type parser
      key_name log
      <parse>
        @type regexp
        expression  {{tried different regexes without luck}}
      </parse>
    </filter>

At this point I am not sure which of the 3 (elasticsearch, fluentd or kibana) should be configured to achieve the wanted result.

PS: I am fairly new to the stack.

Any help would be much appreciated.

Fluentd config:

    <source>
      @type prometheus
      bind "0.0.0.0"
      port 24231
      metrics_path "/metrics"
    </source>

    <source>
      @type prometheus_output_monitor
    </source>

    <match fluent.**>
      @type null
    </match>

    <source>
      @type tail
      @id in_tail_container_logs
      path "/var/log/containers/*.log"
      pos_file "/var/log/fluentd-containers.log.pos"
      tag "kubernetes.*"
      read_from_head true
      <parse>
        @type "json"
        time_format "%Y-%m-%dT%H:%M:%S.%NZ"
        time_type string
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_minion
      path "/var/log/salt/minion"
      pos_file "/var/log/fluentd-salt.pos"
      tag "salt"
      <parse>
        @type "regexp"
        expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
        time_format "%Y-%m-%d %H:%M:%S"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_startupscript
      path "/var/log/startupscript.log"
      pos_file "/var/log/fluentd-startupscript.log.pos"
      tag "startupscript"
      <parse>
        @type "syslog"
      </parse>
    </source>

    <source>
      @type tail
      @id in_tail_docker
      path "/var/log/docker.log"
      pos_file "/var/log/fluentd-docker.log.pos"
      tag "docker"
      <parse>
        @type "regexp"
        expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=(
lt;status_code>
\d+))?/ </parse> </source> <source> @type tail @id in_tail_etcd path "/var/log/etcd.log" pos_file "/var/log/fluentd-etcd.log.pos" tag "etcd" <parse> @type "none" </parse> </source> <source> @type tail @id in_tail_kubelet multiline_flush_interval 5s path "/var/log/kubelet.log" pos_file "/var/log/fluentd-kubelet.log.pos" tag "kubelet" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_kube_proxy multiline_flush_interval 5s path "/var/log/kube-proxy.log" pos_file "/var/log/fluentd-kube-proxy.log.pos" tag "kube-proxy" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_kube_apiserver multiline_flush_interval 5s path "/var/log/kube-apiserver.log" pos_file "/var/log/fluentd-kube-apiserver.log.pos" tag "kube-apiserver" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_kube_controller_manager multiline_flush_interval 5s path "/var/log/kube-controller-manager.log" pos_file "/var/log/fluentd-kube-controller-manager.log.pos" tag "kube-controller-manager" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_kube_scheduler multiline_flush_interval 5s path "/var/log/kube-scheduler.log" pos_file "/var/log/fluentd-kube-scheduler.log.pos" tag "kube-scheduler" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_rescheduler multiline_flush_interval 5s path "/var/log/rescheduler.log" pos_file "/var/log/fluentd-rescheduler.log.pos" tag "rescheduler" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_glbc multiline_flush_interval 5s path "/var/log/glbc.log" pos_file "/var/log/fluentd-glbc.log.pos" tag "glbc" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_cluster_autoscaler multiline_flush_interval 5s path "/var/log/cluster-autoscaler.log" pos_file "/var/log/fluentd-cluster-autoscaler.log.pos" tag "cluster-autoscaler" <parse> @type "kubernetes" expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m time_format "%m%d %H:%M:%S.%N" </parse> </source> <source> @type tail @id in_tail_kube_apiserver_audit multiline_flush_interval 5s path "/var/log/kubernetes/kube-apiserver-audit.log" pos_file "/var/log/kube-apiserver-audit.log.pos" tag "kube-apiserver-audit" <parse> @type "multiline" format_firstline "/^\\S+\\s+AUDIT:/" format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/ time_format "%Y-%m-%dT%T.%L%Z" </parse> </source> <filter kubernetes.**> @type kubernetes_metadata @id filter_kube_metadata kubernetes_url "https://172.20.0.1:443/api" verify_ssl true ca_file "" </filter> <match **> @type elasticsearch @id out_es @log_level "info" include_tag_key true host "elasticsearch.logging.svc.cluster.local" port 9200 path "" scheme http ssl_verify true ssl_version TLSv1 user "" password xxxxxx reload_connections false reconnect_on_error true reload_on_failure true log_es_400_reason false logstash_prefix "logstash" logstash_format true index_name "logstash" type_name "fluentd" template_name template_file template_overwrite false <buffer> flush_thread_count 8 flush_interval 5s chunk_limit_size 2M queue_limit_length 32 retry_max_interval 30 retry_forever true </buffer> </match>
-- PPetkov
elasticsearch
fluentd
kibana
kubernetes

2 Answers

2/25/2020

I was able to reach the wanter result with the following filter

    <filter kubernetes.**>
      @type parser
      key_name log
      <parse>
        @type json
        json_parser json
      </parse>
      replace_invalid_sequence true
      reserve_data true # this preserves unparsable log lines
      emit_invalid_record_to_error false # In case of unparsable log lines keep the error log clean
      reserve_time # the time was already parsed in the source, we don't want to overwrite it with current time.
    </filter>

Thanks @Al-waleed Shihadeh for your time and insights!

-- PPetkov
Source: StackOverflow

2/24/2020

Try to add this filter to the Fluentd Configs before the match section

    <filter kubernetes.**>
      @type parser
      key_name log
      reserve_data true
      <parse>
        @type json
      </parse>
    </filter>
-- Al-waleed Shihadeh
Source: StackOverflow