I am running a POC to send large (5 Megabyte) Kafka messages.
I'm failing to send large messages and I don't know if the error is in broker config or client.
I'm using Keubernetes on docker-desktop with this helm chart: https://github.com/helm/charts/tree/master/incubator/kafka I'm using this init command:
helm install --name wielder-kafka --namespace kafka incubator/kafka
--set configurationOverride=
"{"replica.fetch.max.bytes":15048576,"message.max.bytes":15048576}"
I'm not sure the configuration took place! is there any way to check?
And this python script (running from a pod).
#!/usr/bin/env python
import json
import traceback
from time import sleep
from kafka import KafkaProducer
from kafka.errors import KafkaError
a = '/storage/stam/ENERGY.json'
with open(a) as json_file:
data = json.load(json_file)
# print(data["1"])
producer = KafkaProducer(
bootstrap_servers="wielder-kafka.kafka.svc.cluster.local:9092",
max_request_size=15048576
)
j = json.dumps(data)
print(type(j))
# en = f'fool {str("batata")}'.encode('utf-8')
en = j.encode('utf-8')
future = producer.send(topic="grid_1", value=en)
try:
record_metadata = future.get(timeout=1000)
except KafkaError:
# Decide what to do if produce request failed...
print(traceback.format_exc())
result = 'Fail'
finally:
producer.close()
It works well but I get the below error with large JSON files.
root@pep-55c4dd9ff5-rnqjc:/storage/cluster# python3.6 json_to_kafka.py
<class 'str'>
Traceback (most recent call last):
File "json_to_kafka.py", line 34, in <module>
record_metadata = future.get(timeout=1000)
File "/usr/local/lib/python3.6/site-packages/kafka/producer/future.py", line 65, in get
raise self.exception # pylint: disable-msg=raising-bad-type
kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError
root@pep-55c4dd9ff5-rnqjc:/storage/cluster#
Here is the Kubernetes storage information:
➜ ~ kubectl get pv -n kafka -o wide
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
pep-pv 200Mi RWO Retain Bound wielder-services/pep-pvc pep-storage 5d18h
pvc-1069afcd-0c5d-11ea-8ffb-025000000001 1Gi RWO Delete Bound kafka/datadir-wielder-kafka-0 hostpath 5d18h
pvc-5ad5e6c5-0c5d-11ea-8ffb-025000000001 1Gi RWO Delete Bound kafka/datadir-wielder-kafka-1 hostpath 5d18h
pvc-6eaa98ae-0c5d-11ea-8ffb-025000000001 1Gi RWO Delete Bound kafka/datadir-wielder-kafka-2 hostpath 5d18h
How can I send large Kafka messages producing in python