Hi I do not seem to be able to correcly scale my pod for a Kafka stream application (running on java 11 jre) and keep on having OOMKilled containers.
The job consists in an Aggregation of quite a lot of concurrent values
KTable<String, MinuteValue> MinuteValuesKtable = builder.table(
"minuteTopicCompact",
Materialized.<String, MinuteValue, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), minuteValueSerdes)
.withLoggingEnabled(new HashMap<>()));
KStream<String, MinuteAggreg> minuteAggByDay = MinuteValuesKtable
// rekey each MinuteValue and group them
.groupBy(
(key, minuteValue) -> new KeyValue<>(getAggKey(minuteValue), billLine), Serialized.with(Serdes.String(), billLineSerdes))
// aggregate to MinuteAggreg
.aggregate(
MinuteAggreg::new,
(String key, MinuteValue value, MinuteAggreg aggregate) -> aggregate.addLine(value),
(String key, MinuteValue value, MinuteAggreg aggregate) -> aggregate.removeLine(value),
Materialized.with(Serdes.String(), minuteAggregSerdes))
.toStream()
// [...] send to another topic
I tried to tweak these values :
// memory sizing and caches
properties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 5 * 60 * 1000L);
// Enable record cache of size 8 MB.
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 8 * 1024 * 1024L);
// Set commit interval to 1 second.
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
My java 11 Application is started with these arguments :
-XX:+UseContainerSupport
-XX:MaxRAMFraction=2
And the pod has some memory limits :
Limits:
cpu: 4
memory: 2Gi
Requests:
cpu: 2
memory: 1Gi
But still get pod failures, kubernetes deletes the pod with an "OOMKilled".
Could an expert on Kafka stream help me tweaking these values ?
I have read : https://docs.confluent.io/current/streams/sizing.html#troubleshooting and https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt.html
but could not find a comprehensive and simple enough answer for tweaking :