Given,
I have a Flink job that reads from ActiveMQ
source & writes to a mysql database - keyed on an identifier. I have enabled checkpoints for this job every one second. I point the checkpoints to a Minio
instance, I verified the checkpoints are working with the jobid
. I deploy this job is an Openshift (Kubernetes underneath) - I can scale up/down this job as & when required.
Problem
When the job is deployed (rolling) or the job went down due to a bug/error, and if there were any unconsumed messages in ActiveMQ or unacknowledged messages in Flink (but written to the database), when the job recovers (or new job is deployed) the job process already processed messages, resulting in duplicate records inserted in the database.
Question
jobid
keeps changing on every deployment, how does the recovery happens?Exactly-Once
), can I write database specific (upsert
) query to update if the given record is present & insert if not?JDBC currently only supports at least once, meaning you get duplicate messages upon recovery. There is currently a draft to add support for exactly once, which would probably be released with 1.11.
Shouldn't the checkpoints help the job recover from where it left?
Yes, but the time between last successful checkpoints and recovery could produce the observed duplicates. I gave a more detailed answer on a somewhat related topic.
Should I take the checkpoint before I (rolling) deploy new job?
Absolutely. You should actually use cancel with savepoint. That is the only reliable way to change the topology. Additionally, cancel with savepoints avoids any duplicates in the data as it gracefully shuts down the job.
What happens if the job quit with error or cluster failure?
It should automatically restart (depending on your restart settings). It would use the latest checkpoint for recovery. That would most certainly result in duplicates.
As the jobid keeps changing on every deployment, how does the recovery happens?
You usually point explicitly to the same checkpoint directory (on S3?).
As I cannot expect idempotency from the database, is upsert the only way to achieve Exactly-Once processing?
Currently, I do not see a way around it. It should change with 1.11.