Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints

3/20/2020

I am working on a Flink application that sinks to Kafka. I created a Kafka producer that has default pool size of 5. I have enabled checkpoints with following config:

    env.enableCheckpointing(1800000);//checkpointing for every 30 minutes.

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);

    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The app sometimes keeps on crashing with following exception. Is this issue with kafka producer pool size or checkpoints ?

2020-03-20 22:31:23,859 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer011 0/1 aborted recovered transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=FileSplitReader -> metrics-map -> Sink: components-topic-sink-4ab008489d4c8ed0fe577883438cc1ff-1, producerId=21, epoch=3], transactionStartTime=1584742933826}
2020-03-20 22:31:23,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
java.lang.NullPointerException
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
2020-03-20 22:31:23,861 INFO  org.apache.flink.runtime.taskmanager.Task                     - FileSplitReader -> metrics-map -> Sink: components-topic-sink (1/1) (92b7f3ed8f6362fe0087efd40eb94016) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createTransactionalProducer(FlinkKafkaProducer011.java:934)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:701)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:97)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:394)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:385)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:862)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
-- VSK
apache-flink
apache-kafka
checkpointing
java
kubernetes

2 Answers

3/21/2020

I recommend you upgrade to the latest flink/kafka connector -- it looks like you're running FlinkKafkaProducer011, which is intended for Kafka 0.11.

You should be using FlinkKafkaProducer from the universal Kafka connector: flink-connector-kafka. Since Flink 1.9, this uses the Kafka 2.2.0 client.

With maven you want to specify

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

Or replace 2.11 with 2.12 if you are using Scala 2.12.

-- David Anderson
Source: StackOverflow

3/21/2020

It's hard to tell without access to the environment.

It may be related to the specific code you are running. You are basically hitting this exception.

A couple of things:

  1. This is a similar issue that was related to an array in the code: Interrupted while joining ioThread / Error during disposal of stream operator in flink application

  2. It sounds like you are running in Kubernetes and if you look at this you can see that it that the problem could be related to a failed teardown or lack of connectivity between job and task managers, so you may want to check the networking in your Kubernetes cluster and make sure that all your Flink pods can communicate with each other.

Hope it helps!

-- Rico
Source: StackOverflow