Using Helm, I've created a Dask cluster.
NAME READY STATUS RESTARTS AGE
dask01-jupyter-aaa-aaaa 1/1 Running 0 3d19h
dask01-scheduler-bbb-bbbb 1/1 Running 0 3d19h
dask01-worker-ccc-cccc 1/1 Running 0 3d19h
dask01-worker-ddd-dddd 1/1 Running 0 3d19h
dask01-worker-eee-eeee 1/1 Running 0 3d19h
I can run a basic Dask workload.
import dask.array as da
array = da.ones((1000, 1000, 1000), chunks=(100, 100, 10))
Now, I would like to connect it to a client somehow:
from dask import distributed
cluster = None # TODO: configure KubeCluster somehow https://kubernetes.dask.org/en/latest/
client = distributed.Client(cluster)
This works if I want to launch a cluster:
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
But how do I connect to an existing cluster?
If you've already installed the Dask Helm package, then you can use kubectl
to retrieve the TCP connection address to pass to distributed.Client
. If you chose to, for example, use a Helm release name of dask-abc
distributed. in helm install
- see here - when setting up the cluster, then you could follow the Dask Helm and Kubernetes docs and use kubectl get services
with jsonpath
to filter the output of this command and retrieve only the IP address of the Dask scheduler service (which will be named dask-abc-scheduler
).
Here is a similar example of using kubectl get pods
with jsonpath
to get the names of pods (see the line starting with pods=$(
). You'll have to wait until the IP address of the service becomes available, using the --watch
flag (see the Note section from here)
$ export RELEASE_NAME=dask-abc
# wait until load balancer EXTERNAL_IP is available
$ kubectl get services --wait $RELEASE_NAME-scheduler
# get Dask scheduler address
$ dask_scheduler=$(kubectl get services \
$RELEASE_NAME-scheduler \
--output=jsonpath='{.status.loadBalancer.ingress[0].ip}')
$ echo $dask_scheduler
Then, the dask_scheduler
address printed above can be used in the url that is passed to distributed.Client()
in your Python code
> client_connection_url = "tcp://<dask-scheduler>:8786"
> client = distributed.client(client_connection_url)
> print(client)
.
.
.
.
The Dask Helm Chart and dask-kubernetes
are two separate projects which work in different ways. They are not compatible together.
If you are using the Jupyter Notebook which is created as part of the Helm Chart then everything is already configured for you and you can create a Dask client with default options.
from distributed import Client
client = Client()
If you wish to use a different Python environment, such as the one on your local machine, you must specify the remote address of the scheduler. This will vary depending on how you configured your Helm Chart.
If you exposed your scheduler via a load balancer for example you need to point your client to it.
from distributed import Client
client = Client('tcp://<load balancer ip>:8786')
If you get the status of your Helm Chart deployment it will show you information on how to connect to the scheduler in the printed notes.
helm status <depoyment name>