Kafka Stream memory management (Ktable, RocksDb)

6/14/2019

Hi I do not seem to be able to correcly scale my pod for a Kafka stream application (running on java 11 jre) and keep on having OOMKilled containers.

kafka stream topology

The job consists in an Aggregation of quite a lot of concurrent values

  • I use a KTable :
    KTable<String, MinuteValue> MinuteValuesKtable = builder.table(
                  "minuteTopicCompact",
                  Materialized.<String, MinuteValue, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), minuteValueSerdes)
          .withLoggingEnabled(new HashMap<>()));
  • And compute an aggregation :
    KStream<String, MinuteAggreg> minuteAggByDay = MinuteValuesKtable
      // rekey each MinuteValue and group them
      .groupBy(
        (key, minuteValue) -> new KeyValue<>(getAggKey(minuteValue), billLine), Serialized.with(Serdes.String(), billLineSerdes))
      // aggregate to MinuteAggreg
      .aggregate(
            MinuteAggreg::new,
        (String key, MinuteValue value, MinuteAggreg aggregate) -> aggregate.addLine(value),
        (String key, MinuteValue value, MinuteAggreg aggregate) -> aggregate.removeLine(value),
            Materialized.with(Serdes.String(), minuteAggregSerdes))
        .toStream()
    // [...] send to another topic

kafka stream memory settings

I tried to tweak these values :

    // memory sizing and caches
properties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 5 * 60 * 1000L);
// Enable record cache of size 8 MB.
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 8 * 1024 * 1024L);
// Set commit interval to 1 second.
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

My java 11 Application is started with these arguments :

      -XX:+UseContainerSupport
      -XX:MaxRAMFraction=2

pod memory settings

And the pod has some memory limits :

    Limits:
      cpu:     4
      memory:  2Gi
    Requests:
      cpu:      2
      memory:   1Gi

But still get pod failures, kubernetes deletes the pod with an "OOMKilled".

Could an expert on Kafka stream help me tweaking these values ?

read resources

I have read : https://docs.confluent.io/current/streams/sizing.html#troubleshooting and https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt.html

but could not find a comprehensive and simple enough answer for tweaking :

  • rocks db limits,
  • kafka stream limits,
  • jmv limits
  • and the containers'limit
-- Antonin
apache-kafka
java
jvm
kubernetes
rocksdb

0 Answers