how to debug a CommClosedError in Dask Gateway deployed in Kubernetes

10/14/2020

I have deployed dask_gateway 0.8.0 (with dask==2.25.0 and distributed==2.25.0) in a Kubernetes cluster. When I create a new cluster with:

cluster = gateway.new_cluster(public_address = gateway._public_address)

I get this error:

Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
  File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
    handshake = await asyncio.wait_for(comm.read(), 1)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.6.5-f74f0/x86_64-centos7-gcc8-opt/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
    raise CommClosedError() from e
distributed.comm.core.CommClosedError

However, if I check the pods, the cluster has actually been created, and I can scale it up, and everything seems fine in the dashboard (I can even see the workers).

However, I cannot get the client:

> client = cluster.get_client()
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
  File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
    handshake = await asyncio.wait_for(comm.read(), 1)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.6.5-f74f0/x86_64-centos7-gcc8-opt/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
    raise CommClosedError() from e
distributed.comm.core.CommClosedError
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
~/.local/lib/python3.6/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    321             if not comm:
--> 322                 _raise(error)
    323         except FatalCommClosedError:

~/.local/lib/python3.6/site-packages/distributed/comm/core.py in _raise(error)
    274         )
--> 275         raise IOError(msg)
    276 

OSError: Timed out trying to connect to 'gateway://traefik-dask-gateway:80/jhub.0373ea68815d47fca6a6c489c8f7263a' after 100 s: connect() didn't finish in time

During handling of the above exception, another exception occurred:

OSError                                   Traceback (most recent call last)
<ipython-input-19-affca45186d3> in <module>
----> 1 client = cluster.get_client()

~/.local/lib/python3.6/site-packages/dask_gateway/client.py in get_client(self, set_as_default)
   1066             set_as_default=set_as_default,
   1067             asynchronous=self.asynchronous,
-> 1068             loop=self.loop,
   1069         )
   1070         if not self.asynchronous:

~/.local/lib/python3.6/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    743             ext(self)
    744 
--> 745         self.start(timeout=timeout)
    746         Client._instances.add(self)
    747 

~/.local/lib/python3.6/site-packages/distributed/client.py in start(self, **kwargs)
    948             self._started = asyncio.ensure_future(self._start(**kwargs))
    949         else:
--> 950             sync(self.loop, self._start, **kwargs)
    951 
    952     def __await__(self):

~/.local/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

~/.local/lib/python3.6/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/.local/lib/python3.6/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
   1045 
   1046         try:
-> 1047             await self._ensure_connected(timeout=timeout)
   1048         except (OSError, ImportError):
   1049             await self._close()

~/.local/lib/python3.6/site-packages/distributed/client.py in _ensure_connected(self, timeout)
   1103         try:
   1104             comm = await connect(
-> 1105                 self.scheduler.address, timeout=timeout, **self.connection_args
   1106             )
   1107             comm.name = "Client->Scheduler"

~/.local/lib/python3.6/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    332                 backoff = min(backoff, 1)  # wait at most one second
    333             else:
--> 334                 _raise(error)
    335         else:
    336             break

~/.local/lib/python3.6/site-packages/distributed/comm/core.py in _raise(error)
    273             error,
    274         )
--> 275         raise IOError(msg)
    276 
    277     backoff = 0.01

OSError: Timed out trying to connect to 'gateway://traefik-dask-gateway:80/jhub.0373ea68815d47fca6a6c489c8f7263a' after 100 s: Timed out trying to connect to 'gateway://traefik-dask-gateway:80/jhub.0373ea68815d47fca6a6c489c8f7263a' after 100 s: connect() didn't finish in time

How do I debug this? Any pointer would be greatly appreciated.

I already tried increasing all the timeouts, but nothing changed:

os.environ["DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT"]="100s"
os.environ["DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP"]="600s"
os.environ["DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN"]="1s"
os.environ["DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX"]="60s"

I wrote a tutorial about the steps I took to deploy dask gateway, see https://zonca.dev/2020/08/dask-gateway-jupyterhub.html. I am quite sure this was working fine a few weeks ago, but I cannot identify what changed...

-- Andrea Zonca
dask
kubernetes
python

1 Answer

11/3/2020

You need to use compatible versions of dask and dask-distributed everywhere.

I believe this is an error related to an upgrade in the communications protocol for distributed. See https://github.com/dask/dask-gateway/issues/316#issuecomment-702947730

These are the pinned versions of the dependencies for the docker images as of Nov 10, 2020 (in conda environment.yml compatible format):

  - python=3.7.7
  - dask=2.21.0
  - distributed=2.21.0
  - cloudpickle=1.5.0
  - toolz=0.10.0
-- Dan Gerlanc
Source: StackOverflow