How can I use Helm Kafka Chart With Python pod to publish large (5 Megabyte) messages

11/24/2019

I am running a POC to send large (5 Megabyte) Kafka messages.

I'm failing to send large messages and I don't know if the error is in broker config or client.

I'm using Keubernetes on docker-desktop with this helm chart: https://github.com/helm/charts/tree/master/incubator/kafka I'm using this init command:

helm install --name wielder-kafka --namespace kafka incubator/kafka 
--set configurationOverride=
"{"replica.fetch.max.bytes":15048576,"message.max.bytes":15048576}"

I'm not sure the configuration took place! is there any way to check?

And this python script (running from a pod).

#!/usr/bin/env python

import json
import traceback

from time import sleep

from kafka import KafkaProducer
from kafka.errors import KafkaError

a = '/storage/stam/ENERGY.json'

with open(a) as json_file:
    data = json.load(json_file)

    # print(data["1"])

    producer = KafkaProducer(
        bootstrap_servers="wielder-kafka.kafka.svc.cluster.local:9092",
        max_request_size=15048576
    )

    j = json.dumps(data)

    print(type(j))

    # en = f'fool {str("batata")}'.encode('utf-8')

    en = j.encode('utf-8')

    future = producer.send(topic="grid_1", value=en)

    try:
        record_metadata = future.get(timeout=1000)
    except KafkaError:
        # Decide what to do if produce request failed...
        print(traceback.format_exc())
        result = 'Fail'
    finally:
        producer.close()

It works well but I get the below error with large JSON files.

root@pep-55c4dd9ff5-rnqjc:/storage/cluster# python3.6 json_to_kafka.py
<class 'str'>
Traceback (most recent call last):
  File "json_to_kafka.py", line 34, in <module>
    record_metadata = future.get(timeout=1000)
  File "/usr/local/lib/python3.6/site-packages/kafka/producer/future.py", line 65, in get
    raise self.exception # pylint: disable-msg=raising-bad-type
kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError

root@pep-55c4dd9ff5-rnqjc:/storage/cluster#

Here is the Kubernetes storage information:

➜  ~ kubectl get pv -n kafka -o wide
NAME                                       CAPACITY   ACCESS MODES           RECLAIM POLICY   STATUS   CLAIM                           STORAGECLASS   REASON   AGE
pep-pv                                     200Mi      RWO            Retain           Bound    wielder-services/pep-pvc        pep-storage             5d18h
pvc-1069afcd-0c5d-11ea-8ffb-025000000001   1Gi        RWO            Delete           Bound    kafka/datadir-wielder-kafka-0   hostpath                    5d18h
pvc-5ad5e6c5-0c5d-11ea-8ffb-025000000001   1Gi        RWO            Delete           Bound    kafka/datadir-wielder-kafka-1   hostpath                5d18h
pvc-6eaa98ae-0c5d-11ea-8ffb-025000000001   1Gi        RWO            Delete           Bound    kafka/datadir-wielder-kafka-2   hostpath                5d18h

How can I send large Kafka messages producing in python

-- Rubber Duck
apache-kafka
kubernetes
kubernetes-helm
python-3.x

0 Answers