Python asyncio: RuntimeError Non-thread-safe operation invoked on an event loop other than the current one

8/2/2019

I have setup a Python program that relies on asyncio and socketio to transfer messages between chat participants using two different chat systems (one interface being a CRM backend and the other being a javascript chat widget on a website).

The relevant part of the code is below. I need to open a separate thread for each independent chat. Therefore, in order to open a channel to the backend, I create a separate thread and run a "long_polling" function in it, which constantly checks for new messages from the backend via pull_messages() and sends them to the widget asynchronously via socketio AsyncServer (forwarding messages from widget to backend works fine and I leave it out here):

thread = threading.Thread(target=long_poll, args=())
thread.daemon = True
thread.start()

with the long_poll() function being defined as

self.server = AsyncServer(async_mode="sanic")

async def send_aio(self, msg):
    await self.server.emit(msg)

def long_poll(self):
    while chat:
        response = self.pull_messages(...)
        if response == "/end":
            chat = False

        asyncio.set_event_loop(asyncio.new_event_loop())
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.send_aio(response))
        loop.close()

When the long_poll function is executed, just after the human response is fetched, I get the following error:

ERROR    asyncio  - Task exception was never retrieved
future: <Task finished coro=<AsyncServer._emit_internal() done, defined at /usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py:344> exception=RuntimeError('Non-thread-safe operation invoked on an event loop other than the current one',)>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 354, in _emit_internal
    binary=None))
  File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 365, in _send_packet
    await self.eio.send(sid, encoded_packet, binary=False)
  File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_server.py", line 89, in send
    binary=binary))
  File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_socket.py", line 74, in send
    await self.queue.put(pkt)
  File "/usr/local/lib/python3.6/asyncio/queues.py", line 141, in put
    return self.put_nowait(item)
  File "/usr/local/lib/python3.6/asyncio/queues.py", line 153, in put_nowait
    self._wakeup_next(self._getters)
  File "/usr/local/lib/python3.6/asyncio/queues.py", line 74, in _wakeup_next
    waiter.set_result(None)
  File "uvloop/loop.pyx", line 1251, in uvloop.loop.Loop.call_soon
  File "uvloop/loop.pyx", line 644, in uvloop.loop.Loop._check_thread
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Strange thing is, when I test locally via docker, this error does not happen, only once I migrate to Kubernetes (images and all other settings are the same).

What I have tried so far to analyze the situation: Put logger.info("Using thread: {}".format(threading.current_thread().name)) into the long_poll() function, just before the error occurs. It tells me code is run in Thread-n (n is always 1 locally and usually some higher integer in the Kubernetes Pod), which is different from MainThread where other parts of my code with asyncio run, therefore I thought I am safe (I am aware the above asyncio code is not thread-safe). In case you have any suggestions, let me know.


UPDATE:

As user4815162342 suggested, instead of creating and destroying the event loops within the while loop, I created a single event loop in a dedicated thread and then pass the coroutine I need to run to the loop in the thread via asyncio.run_coroutine_threadsafe(). Not sure whether I did everything correct here, but now this part of the code is blocking the rest of the program (e.g. no more messages from user to backend are possible...)

self.server = AsyncServer(async_mode="sanic")

async def send_aio(self, msg):
    await self.server.emit(msg)

def long_poll(self, loop):
    while chat:
        response = self.pull_messages(...)
        if response == "/end":
            chat = False

        future = asyncio.run_coroutine_threadsafe(self.send_io(response), loop)
        _ = future.result()

In the main program:
...
                loop = asyncio.new_event_loop()
                lpt = LongPollingObject()
                threading.Thread(target=lpt.long_poll, args=(loop,), daemon=True).start()
...
-- martin_m
docker
kubernetes
python
python-asyncio

0 Answers