Apache Flink Job cluster rpc.address binding to localhost on kubernetes

7/16/2019

I'm trying to run Flink Job cluster(1.8.1) in kubernetes environment. I created docker image with my Job jar using this doc.

Following kubefiles to create job, jobmanager and task manager. Issue is task manager is not able to connect to Job manager and keep crashing.

Upon debugging job manager logs, jobmanager.rpc.address is binding to "localhost".

But I have passed the args in kube files as per this doc.

I also tried setting jobmanager.rpc.address in env variable(FLINK_ENV_JAVA_OPTS).

  env:
          - name: FLINK_ENV_JAVA_OPTS
            value: "-Djobmanager.rpc.address=flink-job-cluster"

Job manager console log:

Starting the job-cluster
Starting standalonejob as a console application on host flink-job-cluster-bbxrn.
2019-07-16 17:31:10,759 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2019-07-16 17:31:10,760 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneJobClusterEntryPoint (Version: <unknown>, Rev:4caec0d, Date:03.04.2019 @ 13:25:54 PDT)
2019-07-16 17:31:10,760 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - IcedTea - 1.8/25.212-b04
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 989 MiBytes
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2019-07-16 17:31:10,761 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Djobmanager.rpc.address=flink-job-cluster
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.8.1/conf/log4j-console.properties
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.8.1/conf/logback-console.xml
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.8.1/conf
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --job-classname
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     wikiedits.WikipediaAnalysis
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     flink-job-cluster
2019-07-16 17:31:10,762 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Djobmanager.rpc.address=flink-job-cluster
2019-07-16 17:31:10,763 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dparallelism.default=2
2019-07-16 17:31:10,763 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dblob.server.port=6124
2019-07-16 17:31:10,763 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dqueryable-state.server.ports=6125
2019-07-16 17:31:10,763 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.8.1/lib/log4j-1.2.17.jar:/opt/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.8.1/lib/wiki-edits-0.1.jar:/opt/flink-1.8.1/lib/flink-dist_2.11-1.8.1.jar:::
2019-07-16 17:31:10,763 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2019-07-16 17:31:10,764 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2019-07-16 17:31:10,850 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-07-16 17:31:10,851 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-07-16 17:31:10,851 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-07-16 17:31:10,851 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-07-16 17:31:10,851 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-07-16 17:31:10,851 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1

Above logs shows rpc.address is binding to localhost instead of flink-job-cluster.

I assume task manager's messages are being dropped by Akka rpc as it binds to localhost:6123.

2019-07-16 17:31:12,546 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 38190f2570cd5f0a0a47f65ddf7aae1f with allocation id 97af00eae7e3dfb31a79232077ea7ee6.
2019-07-16 17:31:14,043 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@flink-job-cluster:6123/]] arriving at [akka.tcp://flink@flink-job-cluster:6123] inbound addresses are [akka.tcp://flink@localhost:6123]
2019-07-16 17:31:26,564 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@flink-job-cluster:6123/]] arriving at [akka.tcp://flink@flink-job-cluster:6123] inbound addresses are [akka.tcp://flink@localhost:6123]

Not sure why job manager binds to localhost.

PS: Task manager pod could resolve flink-job-cluster host. Hostname resolved to service ip address.

-- user1083206
akka
apache
apache-flink
java
kubernetes

1 Answer

7/17/2019

Root cause of the issue is jobmanager.rpc.address arg value is not applied. Somehow in-line Args was not set into flink global configuration properly. But args passed as multi-line list works fine.

-- user1083206
Source: StackOverflow