I'm trying to run Flink Job cluster(1.8.1) in kubernetes environment. I created docker image with my Job jar using this doc.
Following kubefiles
to create job, jobmanager and task manager. Issue is task manager is not able to connect to Job manager and keep crashing.
Upon debugging job manager logs, jobmanager.rpc.address
is binding to "localhost".
But I have passed the args in kube files as per this doc.
I also tried setting jobmanager.rpc.address
in env variable(FLINK_ENV_JAVA_OPTS
).
env:
- name: FLINK_ENV_JAVA_OPTS
value: "-Djobmanager.rpc.address=flink-job-cluster"
Job manager console log:
Starting the job-cluster
Starting standalonejob as a console application on host flink-job-cluster-bbxrn.
2019-07-16 17:31:10,759 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --------------------------------------------------------------------------------
2019-07-16 17:31:10,760 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting StandaloneJobClusterEntryPoint (Version: <unknown>, Rev:4caec0d, Date:03.04.2019 @ 13:25:54 PDT)
2019-07-16 17:31:10,760 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user: flink
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current Hadoop/Kerberos user: <no hadoop dependency found>
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit Server VM - IcedTea - 1.8/25.212-b04
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size: 989 MiBytes
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - No Hadoop Dependency available
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
2019-07-16 17:31:10,761 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Djobmanager.rpc.address=flink-job-cluster
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlog4j.configuration=file:/opt/flink-1.8.1/conf/log4j-console.properties
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlogback.configurationFile=file:/opt/flink-1.8.1/conf/logback-console.xml
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - /opt/flink-1.8.1/conf
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --job-classname
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - wikiedits.WikipediaAnalysis
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --host
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - flink-job-cluster
2019-07-16 17:31:10,762 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Djobmanager.rpc.address=flink-job-cluster
2019-07-16 17:31:10,763 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dparallelism.default=2
2019-07-16 17:31:10,763 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dblob.server.port=6124
2019-07-16 17:31:10,763 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dqueryable-state.server.ports=6125
2019-07-16 17:31:10,763 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath: /opt/flink-1.8.1/lib/log4j-1.2.17.jar:/opt/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.8.1/lib/wiki-edits-0.1.jar:/opt/flink-1.8.1/lib/flink-dist_2.11-1.8.1.jar:::
2019-07-16 17:31:10,763 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --------------------------------------------------------------------------------
2019-07-16 17:31:10,764 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX signal handlers for [TERM, HUP, INT]
2019-07-16 17:31:10,850 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-07-16 17:31:10,851 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-07-16 17:31:10,851 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-07-16 17:31:10,851 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-07-16 17:31:10,851 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-07-16 17:31:10,851 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
Above logs shows rpc.address
is binding to localhost instead of flink-job-cluster
.
I assume task manager's messages are being dropped by Akka rpc as it binds to localhost:6123.
2019-07-16 17:31:12,546 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 38190f2570cd5f0a0a47f65ddf7aae1f with allocation id 97af00eae7e3dfb31a79232077ea7ee6.
2019-07-16 17:31:14,043 ERROR akka.remote.EndpointWriter - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@flink-job-cluster:6123/]] arriving at [akka.tcp://flink@flink-job-cluster:6123] inbound addresses are [akka.tcp://flink@localhost:6123]
2019-07-16 17:31:26,564 ERROR akka.remote.EndpointWriter - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@flink-job-cluster:6123/]] arriving at [akka.tcp://flink@flink-job-cluster:6123] inbound addresses are [akka.tcp://flink@localhost:6123]
Not sure why job manager binds to localhost.
PS: Task manager pod could resolve flink-job-cluster
host. Hostname resolved to service ip address.
Root cause of the issue is jobmanager.rpc.address arg value is not applied. Somehow in-line Args was not set into flink global configuration properly. But args passed as multi-line list works fine.