Timeout trying to start flink job master for checkpointed job

6/19/2019

I'm trying to set up flink to recover from checkpoints. For the most part this seems to work however after deploying this to our staging environment for about a week, the job manager has started crash looping because of a timeout when trying to start the "job master" for a job.

I'm using flink 1.7.2 deployed in high-availability mode with zookeeper 3.4.9-1757313 simply to facilitate checkpoint restoration. I've got just a single job manager on kubernetes deployed as a stateful set. Something must have caused the server to crash and on coming back up it appears to be failing in the code that starts up the job masters for (presumably) recovered job.

I've seen this once before and clearing out all the flink zookeeper entries (rmr /flink in the zk cli) and then restarting the flink cluster "fixes" the issue.

Here's the flink config

    blob.server.port: 6124
    blob.storage.directory: s3://...
    high-availability: zookeeper
    high-availability.zookeeper.quorum: zookeeper:2181
    high-availability.zookeeper.path.root: /flink
    high-availability.storageDir: s3://...
    high-availability.jobmanager.port: 6070
    jobmanager.archive.fs.dir: s3://...
    state.backend: rocksdb
    state.backend.fs.checkpointdir: s3://...
    state.checkpoints.dir: s3://...
    state.checkpoints.num-retained: 2
    web.log.path: /var/log/flink.log
    web.upload.dir: /var/flink-recovery/flink-web-upload
    zookeeper.sasl.disable: true
    s3.access-key: __S3_ACCESS_KEY_ID__
    s3.secret-key: __S3_SECRET_KEY__

And here are the container ports on the flink-jobmaster stateful set:

ports:
- containerPort: 8081
  name: ui
- containerPort: 6123
  name: rpc
- containerPort: 6124
  name: blob
- containerPort: 6125
  name: query
- containerPort: 9249
  name: prometheus
- containerPort: 6070
  name: ha

I'd expect flink to successfully restore from the checkpoint in s3 but instead the job manager crashes on startup with the following stack trace:

2019-06-18 14:02:05,123 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job f13131ca883d6cf92f69a52cff4f1017 failed.
    at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:759)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:339)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
    at akka.actor.UntypedActor$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Could not start the job manager.
    at org.apache.flink.runtime.jobmaster.JobManagerRunner.lambda$verifyJobSchedulingStatusAndStartJobManager$2(JobManagerRunner.java:340)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_2#-806528277]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.UnfencedMessage".
    at akka.pattern.PromiseActorRef$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$anon$4.run(Scheduler.scala:126)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at akka.actor.LightArrayRevolverScheduler$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at akka.actor.LightArrayRevolverScheduler$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at akka.actor.LightArrayRevolverScheduler$anon$4.run(LightArrayRevolverScheduler.scala:236)
    ... 1 more

I'm really at a loss here. I don't know much about the inner workings of flink so this exception doesn't mean a lot to me. Any leads would be very much appreciated.

EDIT: I've been poking through the Flink source code. This exception is thrown after a leader has been elected when it tries to restore it's job graphs from the checkpointing information stored in zookeeper. Getting to the bottom of exactly where this exception is coming from is rather troublesome as it's all wrapped up in futures and akka. My guess is that it's happening after the job manager spins up the JobMaster sub-process in order to schedule the job graph. A bit of speculation but I think the job manager is attempting to get the status of the new job from its JobMaster but the JobMaster thread has entered in a deadlock (perhaps it could've also died though I would expect a stack trace then) and so the ask is timing out. Seems like a real doozy.

Note: The UnfencedMessage that the ask was for is for use locally within the job manager (which coincides with the receiving actor being the job manager in the exception) so we can eliminate network miss-configuration between the JobMaster and the task managers.

-- Jonathan Poole
apache-flink
flink-streaming
high-availability
kubernetes

1 Answer

6/19/2019

I'm staging jars on flink before execution using the /jars/upload endpoint. It seems that flink's performance tanks when it has too many jars uploaded. All the endpoints become unresponsive including the /jobs/<job_id> endpoint. It was taking 1 - 2 minutes to load the job graph overview in the flink UI. I imagine this rest endpoint uses the akka same actor the job manager does. I think I must've hit a tipping point where this started causing timeouts. I've reduced the number of jars for 30 odd to just the 4 latest versions and flink is responsive again.

-- Jonathan Poole
Source: StackOverflow