Apache Flink - duplicate message processing during job deployments, with ActiveMQ as source

3/5/2020

Given,

I have a Flink job that reads from ActiveMQ source & writes to a mysql database - keyed on an identifier. I have enabled checkpoints for this job every one second. I point the checkpoints to a Minio instance, I verified the checkpoints are working with the jobid. I deploy this job is an Openshift (Kubernetes underneath) - I can scale up/down this job as & when required.

Problem

When the job is deployed (rolling) or the job went down due to a bug/error, and if there were any unconsumed messages in ActiveMQ or unacknowledged messages in Flink (but written to the database), when the job recovers (or new job is deployed) the job process already processed messages, resulting in duplicate records inserted in the database.

Question

  • Shouldn't the checkpoints help the job recover from where it left?
  • Should I take the checkpoint before I (rolling) deploy new job?
  • What happens if the job quit with error or cluster failure?
  • As the jobid keeps changing on every deployment, how does the recovery happens?
  • Edit As I cannot expect idempotency from the database, to avoid duplicates saved into the database (Exactly-Once), can I write database specific (upsert) query to update if the given record is present & insert if not?
-- Vijay Veeraraghavan
apache-flink
flink-cep
flink-sql
flink-streaming
kubernetes

1 Answer

3/5/2020

JDBC currently only supports at least once, meaning you get duplicate messages upon recovery. There is currently a draft to add support for exactly once, which would probably be released with 1.11.

Shouldn't the checkpoints help the job recover from where it left?

Yes, but the time between last successful checkpoints and recovery could produce the observed duplicates. I gave a more detailed answer on a somewhat related topic.

Should I take the checkpoint before I (rolling) deploy new job?

Absolutely. You should actually use cancel with savepoint. That is the only reliable way to change the topology. Additionally, cancel with savepoints avoids any duplicates in the data as it gracefully shuts down the job.

What happens if the job quit with error or cluster failure?

It should automatically restart (depending on your restart settings). It would use the latest checkpoint for recovery. That would most certainly result in duplicates.

As the jobid keeps changing on every deployment, how does the recovery happens?

You usually point explicitly to the same checkpoint directory (on S3?).

As I cannot expect idempotency from the database, is upsert the only way to achieve Exactly-Once processing?

Currently, I do not see a way around it. It should change with 1.11.

-- Arvid Heise
Source: StackOverflow