I have a problem with communication between fluentds using http_out and http_in plugins. I have two kubernetes clusters and I want the fluentd-A in the first cluster to send it's logs to the fluentd-B in the second cluster. But when fluentd-A tries to send the logs it recieves:
#0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="400 Bad Request 400 Bad Request\n'json' or 'msgpack' parameter is required\n"
#0 bad chunk is moved to /tmp/fluent/backup/worker0/object_3ff8a73edae8/5a71a08ca19b1b343c8dce1b74c9a963.log
When I cat this file the logs in it has the following pattern:
{log: ...., ... other json fields... }
{log: ...., ... other json fields... }
{log: ...., ... other json fields... }
I think the problem is that the concatenation of json objects is not a json and that is why fluentd-B sends as response 400 Bad Request.
I have tested the fluentd-B with curl with the following requests and it works ok:
curl -X POST -d '{"foo":"bar"}' -H 'Content-Type: application/json' http://<fluentd-B-ip>:9880/app.log
curl -X POST -d 'json=[{"foo":"bar"},{"abc":"def"},{"xyz":"123"}]' http://<fluentd-B-ip>:9880/app.log
Could someone tell me how to pack the logs in the fluentd-A into a valid json? (It's not necessary to be json)
This is the config of the fluentd-A:
<match **>
@type http
endpoint http://<Fluentd-B-IP>:9880/api.log
open_timeout 2
<format>
@type json
</format>
</match>
<match fluent.**>
@type null
</match>
<source>
@type tail
@id in_tail_container_logs
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
read_from_head true
<parse>
@type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
ca_file "#{ENV['KUBERNETES_CA_FILE']}"
</filter>
<filter kubernetes.**>
@type grep
<regexp>
key $.kubernetes.namespace_name
pattern /test/
</regexp>
</filter>
This is the config of the fluentd-B:
<source>
@type http
port 9880
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 10s
<parse>
@type json
</parse>
</source>
I hope someone could help with this, I though that such basic setup should be easy but maybe I missed something important...
I asked in fluentd github page the same question and it turns out the communication with http_in and http_out is not supported. They are used for communication with other types of components and forward_in and forward_out plugins are recommended for communication between fluentds.
This is the link to my question.