I have created flink app which takes a datastream of strings and sink it with Kafka. The datastream of strings is simple strings fromCollection.
List<String> listOfStrings = new ArrayList<>();
listOfStrings.add("testkafka1");
listOfStrings.add("testkafka2");
listOfStrings.add("testkafka3");
listOfStrings.add("testkafka4");
DataStream<String> testStringStream = env.fromCollection(listOfStrings);
The flink runs on Kubernetes with parllelism 1 and 1 task manager. As soon as flink job starts it is failing with following error.
ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught exception in kafka-producer-network-thread | producer-1:
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:337)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:204)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
The taskmanager config I have is (Taken from taskmanager logs)
Starting Task Manager config file:
jobmanager.rpc.address: component-app-adb71002-tm-5c6f4d58bd-rtblz
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
blob.server.port: 6125
fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.DefaultAWSCredentialsProviderChain
jobmanager.heap.size: 524288k
jobmanager.rpc.port: 6123
jobmanager.web.port: 8081
metrics.internal.query-service.port: 50101
metrics.reporter.dghttp.apikey: f52362263f032f2ebc3622cafc0171cd
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.tags: componentingestion,dev
query.server.port: 6124
taskmanager.heap.size: 1048576k
taskmanager.numberOfTaskSlots: 1
web.upload.dir: /opt/flink
jobmanager.rpc.address: component-app-adb71002
taskmanager.host: 10.42.6.6
Starting taskexecutor as a console application on host component-app-adb71002-tm-5c6f4d58bd-rtblz.
2020-02-11 15:19:20,519 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - --------------------------------------------------------------------------------
2020-02-11 15:19:20,520 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Starting TaskManager (Version: 1.9.2, Rev:c9d2c90, Date:24.01.2020 @ 08:44:30 CST)
2020-02-11 15:19:20,520 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - OS current user: flink
2020-02-11 15:19:20,520 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Current Hadoop/Kerberos user: <no hadoop dependency found>
2020-02-11 15:19:20,520 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.242-b08
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Maximum heap size: 922 MiBytes
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - JAVA_HOME: /usr/local/openjdk-8
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - No Hadoop Dependency available
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - JVM Options:
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - -XX:+UseG1GC
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - -Xms922M
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - -Xmx922M
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - -XX:MaxDirectMemorySize=8388607T
2020-02-11 15:19:20,521 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2020-02-11 15:19:20,522 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2020-02-11 15:19:20,522 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Program Arguments:
2020-02-11 15:19:20,522 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - --configDir
2020-02-11 15:19:20,522 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - /opt/flink/conf
2020-02-11 15:19:20,522 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Classpath: /opt/flink/lib/flink-metrics-datadog-1.9.2.jar:/opt/flink/lib/flink-table-blink_2.12-1.9.2.jar:/opt/flink/lib/flink-table_2.12-1.9.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.9.2.jar:::
Producer config that I have is
acks = 1
batch.size = 16384
bootstrap.servers = [XXXXXXXXXXXXXXXX] ---I masked it intentionally
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = Source: Collection Source -> Sink: Unnamed-eb99017e0f9125fa6648bf56123bdcf7-19
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Most of the producer config is default, is there something I am missing here or something wrong with the config ?
As Dominik suggested, the issue is not related to Heap.
If the broker is setup with ssl authentication and client is not setup for ssl auth, this exception is thrown.
this is a bug with kafka.