How to configure beam python sdk with spark in a kubernetes environment

3/5/2021

TLDR;

How to configure Apache Beam pipelines options with "environment_type" = EXTERNAL or PROCESS?

Description

Currently, we have a standalone spark cluster inside Kubernetes, following this solution (and the setup) we launch a beam pipeline creating an embedded spark job server on the spark worker who needs to run a python SDK jointly. Apache Beam allows running python SDK in 4 different ways:

  • "DOCKER" - Default and not possible inside a Kubernetes cluster (would use "container inside container")
  • "LOOPBACK" - Only for testing, not possible with more than 1 worker pod
  • "EXTERNAL" - Ideal setup, "just" create a sidecar container to run in the same pod as the spark workers
  • "PROCESS" - Execute a process in the spark worker, not ideal but could be too.

Development

1) Using "External" - Implementing the spark worker with the python sdk on the same pod:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-worker
  labels:
    app: spark-worker
spec:
  selector:
    matchLabels:
      app: spark-worker
  template:
    metadata:
      labels:
        app: spark-worker
    spec:
      containers:
      - name: spark-worker
        image: spark-py-custom:latest
        imagePullPolicy: Never
        ports:
        - containerPort: 8081
          protocol: TCP
        command: ['/bin/bash',"-c","--"]
        args: ["/start-worker.sh" ]
        resources :
          requests :
            cpu : 4
            memory : "5Gi"
          limits :
             cpu : 4
             memory : "5Gi"
        volumeMounts:
          - name: spark-jars
            mountPath: "/tmp"
      - name: python-beam-sdk
        image: apachebeam/python3.7_sdk:latest
        command: ["/opt/apache/beam/boot", "--worker_pool"]
        ports:
        - containerPort: 50000
        resources:
          limits:
            cpu: "1"
            memory: "1Gi"
      volumes:
        - name: spark-jars
          persistentVolumeClaim:
            claimName: spark-jars

And them, if we execute the command

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

We get a stuck terminal in the state of "RUNNING":

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.28.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7fc360c0b8c8> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7fc360c0f048> ====================
INFO:apache_beam.runners.portability.abstract_job_service:Artifact server started on port 36369
INFO:apache_beam.runners.portability.abstract_job_service:Running job 'job-2448721e-e686-41d4-b924-5f8c5ae73ac2'
INFO:apache_beam.runners.portability.spark_uber_jar_job_server:Submitted Spark job with ID driver-20210305172421-0000
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

And in the spark worker log:

21/03/05 17:24:25 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"

And on the python sdk:

2021/03/05 17:19:52 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=', '--artifact_endpoint=', '--provision_endpoint=', '--control_endpoint=']
2021/03/05 17:24:32 No logging endpoint provided.

Checking the spark worker stderr (on localhost 8081):

Spark Executor Command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"
========================================

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/03/05 17:24:26 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 230@spark-worker-64fd4ddd6-tqdrs
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for TERM
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for HUP
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for INT
21/03/05 17:24:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:27 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 50 ms (0 ms spent in bootstraps)
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO DiskBlockManager: Created local directory at /tmp/spark-bdffc2b3-f57a-42fa-a720-e22274b86b67/executor-f1eff7ca-d2cd-4ff4-b18b-c8d6a520f590/blockmgr-c61fb65f-ea97-4bd5-bf15-e0025845a251
21/03/05 17:24:28 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203
21/03/05 17:24:28 INFO WorkerWatcher: Connecting to worker spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to /172.18.0.20:44365 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO WorkerWatcher: Successfully connected to spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/03/05 17:24:28 INFO Executor: Starting executor ID 0 on host 172.18.0.20
21/03/05 17:24:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42561.
21/03/05 17:24:28 INFO NettyBlockTransferService: Server created on 172.18.0.20:42561
21/03/05 17:24:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/03/05 17:24:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 172.18.0.20, 42561, None)

Where it gets stuck forever. Checking the source code of the python SDK we can see that "no logging endpoint provided" is fatal and it comes from the lack of configuration sent to him (no logging/artifact/provision/control endpoints). If I try to add the "--artifact_endpoint" to the python command I get grcp error of failed communication because the jobserver creates its own artifact endpoint. In this setup would be necessary to configure all these endpoints (probably as localhost as the SDK and the worker are in the same pod) with fixed ports but I can't find how to configure it. Checking SO I can find a related issue but in his case he gets the python SDK configurations automatically (maybe a spark runner issue?)

2) Using "PROCESS" - Trying to run the python SDK within a process, I built the python SDK with ./gradlew :sdks:python:container:py37:docker, copied the sdks/python/container/build/target/launcher/linux_amd64/boot executable to /python_sdk/boot inside the spark worker container and used the command:

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=PROCESS \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--environment_config='{"os":"linux","arch":"x84_64","command":"/python_sdk/boot"}'

Resulting in "run time exception" in the terminal:

INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
  File "wordcount.py", line 91, in <module>
    run()
  File "wordcount.py", line 86, in run
    output | "Write" >> WriteToText(known_args.output)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 581, in __exit__
    self.result.wait_until_finish()
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 608, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-95c13aa5-96ab-4d1d-bc68-7f9d203c8251 failed in state FAILED: unknown error

and checking again the spark stderr worker log I can see that the problem is java.lang.IllegalArgumentException: No filesystem found for scheme classpath which I don't know the reason.

21/03/05 18:33:12 INFO Executor: Adding file:/opt/spark/work/app-20210305183309-0000/0/./javax.servlet-api-3.1.0.jar to class loader
21/03/05 18:33:12 INFO TorrentBroadcast: Started reading broadcast variable 0
21/03/05 18:33:12 INFO TransportClientFactory: Successfully created connection to spark-worker-89c5c4c87-5q45s/172.18.0.20:34783 after 1 ms (0 ms spent in bootstraps)
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.3 KB, free 366.3 MB)
21/03/05 18:33:12 INFO TorrentBroadcast: Reading broadcast variable 0 took 63 ms
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.5 KB, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_13_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_17_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 5427 bytes result sent to driver
21/03/05 18:33:14 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5f917914
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
	at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
	at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
	at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:16 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@67fb2b2c
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
	at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
	at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
	at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:19 INFO ProcessEnvironmentFactory: Still waiting for startup of environment '/python_sdk/boot' for worker id 1-1

Probably is missing some configuration parameters.

Obs

If I execute the command

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=LOOPBACK

inside our spark worker (having only one worker in the spark cluster) we have a full working beam pipeline with these logs.

-- Giovani Merlin
apache-beam
apache-spark
java
kubernetes
sdk

1 Answer

3/11/2021
  1. Using "External" - this definitely seems like a bug in Beam. The worker endpoints are supposed to be set up to use localhost; I don't think it is possible to configure them. I'm not sure why they would be missing; one educated guess is that the servers silently fail to start, leaving the endpoints empty. I filed a bug report (BEAM-11957) for this issue.
  2. Using "Process" - The scheme classpath corresponds to ClassLoaderFileSystem. This file system is usually loaded using AutoService, which depends on ClassLoaderFileSystemRegistrar being present on the classpath (no relation to the name of the file system itself). The classpath of the job jar is based on spark_job_server_jar. Where are you getting your beam-runners-spark-job-server-2.28.0.jar from?
-- ibzib
Source: StackOverflow