Flink Cluster Kubernetes Heartbeat failure between TaskManager and JobManager

9/9/2021

I am trying to run a flink cluster with native kubernetes.

Below is the flink conf,

jobmanager.rpc.port: 6123
blob.server.port: 6124
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 10
jobmanager.memory.process.size: 5120m
jobmanager.memory.jvm-overhead.max: 512m
taskmanager.memory.process.size: 6656m
taskmanager.memory.framework.heap.size: 435m
taskmanager.memory.framework.off-heap.size: 217m
taskmanager.memory.jvm-overhead.max: 435m
kubernetes.jobmanager.cpu: 1
kubernetes.taskmanager.cpu: 1
# akka settings
akka.ask.timeout: 300s
akka.tcp.timeout: 1200s


# JVM configurations
env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError -XX:NativeMemoryTracking=summary -XX:+UseG1GC -Dkubernetes.websocket.ping.interval=300000"


# checkpoint config
execution.checkpointing.interval: 2min
execution.checkpointing.timeout: 30min # savepoint usually takes longer
execution.checkpointing.min-pause: 110s
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION
execution.checkpointing.tolerable-failed-checkpoints: 2
execution.checkpointing.snapshot-compression: true
execution.savepoint.ignore-unclaimed-state: true
#execution.checkpointing.unaligned: true

# heartbeat settings
cluster.registration.initial-timeout: 1000
cluster.registration.max-timeout: 300000
cluster.services.shutdown-timeout: 300000
heartbeat.timeout: 120000
heartbeat.interval: 60000

At first the JobManager and TaskManager comes up successfully and the TM process few events. But after the timeout interval JM throws heart beat error as shown below and will start new taskmanagers. The old task managers are still running and the new task manager start up in the same namespace.

Cant explain the behaviour. I made sure to check the memory on taskmanagers and jobmanager, there is no memory issues. Moreover the TM and JM run without crashing. Except after timeout interval the TM is not recognized and the new TMs are started.

2021-09-09 03:23:32,886 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Co-Process-Broadcast (10/10) (14679f59d1accdc6f925e2637eede0c9) switched from RUNNING to FAILED on flink-taskmanager-1-4 @ 100.114.72.234 (dataPort=41845).
java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id flink-taskmanager-1-4  timed out.
	at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.12-2.5.21.jar:2.5.21]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.12-2.5.21.jar:2.5.21]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [scala-library-2.12.7.jar:?]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [scala-library-2.12.7.jar:?]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.12-2.5.21.jar:2.5.21]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.12.7.jar:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
	at akka.actor.Actor.aroundReceive(Actor.scala:517) [akka-actor_2.12-2.5.21.jar:2.5.21]
	at akka.actor.Actor.aroundReceive$(Actor.scala:515) [akka-actor_2.12-2.5.21.jar:2.5.21]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.12-2.5.21.jar:2.5.21]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.12-2.5.21.jar:2.5.21]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.12-2.5.21.jar:2.5.21]

I also checked /etc/hosts file to see

# Kubernetes-managed hosts file.
127.0.0.1       localhost
::1     localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
fe00::0 ip6-mcastprefix
fe00::1 ip6-allnodes
fe00::2 ip6-allrouters
100.114.126.100 flink-5fd769dbcf-k4cnb
2620:149:106a:220d::7418        flink-5fd769dbcf-k4cnb

Any idea what could be wrong here? Thank you.

-- Praneeth Ramesh
apache-flink
flink-streaming
kubernetes

1 Answer

9/9/2021

It may be necessary to confirm the configuration of JM (cpu core), and the number of TMs, is it possible that JM has a high load, or does JM execute some user logic codes?

-- ChangLi
Source: StackOverflow