How to run Apache Beam Python pipelines on Flink cluster in Kubernetes?

9/28/2020

Attempting to run the word count example on minikube following the Flink Kubernetes instructions here, but the job never completes. The Python Beam SDK worker pooler doesn't appear to do any work.

In addition to the instructions for configuring a Flink Kubernetes cluster, I added a Python SDK worker pool to the taskmanager deployment. If I understand correctly, the purpose of the worker pool is to execute the Python portions of the pipeline. See full k8s deployment.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink-test
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.10.2-scala_2.11
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      - name: beam-worker-pool
        image: apache/beam_python3.7_sdk:2.24.0
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

I am running the example as follows:

python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

I used the documentation at https://beam.apache.org/documentation/runtime/sdk-harness-config/ to set the environment_type and environment_config values.

The job gets added to the job manager and I can view it in the Flink UI, but the job never completes. I started poking around the container logs as noticed the beam-worker-pool has the following logs:

Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies: 
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
	caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
	caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
	caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
	caused by:
rpc error: code = Unknown desc = 

Likewise the taskmanger is logging:

2020-09-28 16:46:00,155 INFO  org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory  - Still waiting for startup of environment from localhost:50000 for worker id 1-1

Not sure what I am missing. I checked the /tmp/staging/pickled_main_session on the worker pool and it is empty.

Note this issue is similar to these previous questions. https://stackoverflow.com/questions/57851158/how-do-i-run-beam-python-pipelines-using-flink-deployed-on-kubernetes https://stackoverflow.com/questions/60411012/running-apache-beam-python-pipelines-in-kubernetes

-- Nathan B
apache-beam
kubernetes
python

1 Answer

9/28/2020

By default (as of this writing), Beam stages runtime dependencies ("artifacts") to a certain directory (/tmp/staged by default) that needs to be accessible to both the job server (in your case, the client) and the Beam worker.

You can get around this by setting the --flink_submit_uber_jar pipeline option. When --flink_submit_uber_jar is set, Beam wraps all your dependencies in a jar that is submitted to Flink.

-- ibzib
Source: StackOverflow