Attempting to run the word count example on minikube following the Flink Kubernetes instructions here, but the job never completes. The Python Beam SDK worker pooler doesn't appear to do any work.
In addition to the instructions for configuring a Flink Kubernetes cluster, I added a Python SDK worker pool to the taskmanager deployment. If I understand correctly, the purpose of the worker pool is to execute the Python portions of the pipeline. See full k8s deployment.
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink-test
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.10.2-scala_2.11
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- name: beam-worker-pool
image: apache/beam_python3.7_sdk:2.24.0
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
I am running the example as follows:
python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000
I used the documentation at https://beam.apache.org/documentation/runtime/sdk-harness-config/ to set the environment_type
and environment_config
values.
The job gets added to the job manager and I can view it in the Flink UI, but the job never completes. I started poking around the container logs as noticed the beam-worker-pool has the following logs:
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies:
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc =
Likewise the taskmanger is logging:
2020-09-28 16:46:00,155 INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory - Still waiting for startup of environment from localhost:50000 for worker id 1-1
Not sure what I am missing. I checked the /tmp/staging/pickled_main_session
on the worker pool and it is empty.
Note this issue is similar to these previous questions. https://stackoverflow.com/questions/57851158/how-do-i-run-beam-python-pipelines-using-flink-deployed-on-kubernetes https://stackoverflow.com/questions/60411012/running-apache-beam-python-pipelines-in-kubernetes
By default (as of this writing), Beam stages runtime dependencies ("artifacts") to a certain directory (/tmp/staged by default) that needs to be accessible to both the job server (in your case, the client) and the Beam worker.
You can get around this by setting the --flink_submit_uber_jar
pipeline option. When --flink_submit_uber_jar
is set, Beam wraps all your dependencies in a jar that is submitted to Flink.