I have setup a Python program that relies on asyncio
and socketio
to transfer messages between chat participants using two different chat systems (one interface being a CRM backend and the other being a javascript chat widget on a website).
The relevant part of the code is below. I need to open a separate thread for each independent chat. Therefore, in order to open a channel to the backend, I create a separate thread and run a "long_polling" function in it, which constantly checks for new messages from the backend via pull_messages()
and sends them to the widget asynchronously via socketio AsyncServer
(forwarding messages from widget to backend works fine and I leave it out here):
thread = threading.Thread(target=long_poll, args=())
thread.daemon = True
thread.start()
with the long_poll()
function being defined as
self.server = AsyncServer(async_mode="sanic")
async def send_aio(self, msg):
await self.server.emit(msg)
def long_poll(self):
while chat:
response = self.pull_messages(...)
if response == "/end":
chat = False
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
loop.run_until_complete(self.send_aio(response))
loop.close()
When the long_poll function is executed, just after the human response is fetched, I get the following error:
ERROR asyncio - Task exception was never retrieved
future: <Task finished coro=<AsyncServer._emit_internal() done, defined at /usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py:344> exception=RuntimeError('Non-thread-safe operation invoked on an event loop other than the current one',)>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 354, in _emit_internal
binary=None))
File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 365, in _send_packet
await self.eio.send(sid, encoded_packet, binary=False)
File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_server.py", line 89, in send
binary=binary))
File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_socket.py", line 74, in send
await self.queue.put(pkt)
File "/usr/local/lib/python3.6/asyncio/queues.py", line 141, in put
return self.put_nowait(item)
File "/usr/local/lib/python3.6/asyncio/queues.py", line 153, in put_nowait
self._wakeup_next(self._getters)
File "/usr/local/lib/python3.6/asyncio/queues.py", line 74, in _wakeup_next
waiter.set_result(None)
File "uvloop/loop.pyx", line 1251, in uvloop.loop.Loop.call_soon
File "uvloop/loop.pyx", line 644, in uvloop.loop.Loop._check_thread
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
Strange thing is, when I test locally via docker, this error does not happen, only once I migrate to Kubernetes (images and all other settings are the same).
What I have tried so far to analyze the situation: Put logger.info("Using thread: {}".format(threading.current_thread().name))
into the long_poll()
function, just before the error occurs. It tells me code is run in Thread-n
(n is always 1 locally and usually some higher integer in the Kubernetes Pod), which is different from MainThread
where other parts of my code with asyncio run, therefore I thought I am safe (I am aware the above asyncio code is not thread-safe). In case you have any suggestions, let me know.
UPDATE:
As user4815162342 suggested, instead of creating and destroying the event loops within the while loop, I created a single event loop in a dedicated thread and then pass the coroutine I need to run to the loop in the thread via asyncio.run_coroutine_threadsafe()
. Not sure whether I did everything correct here, but now this part of the code is blocking the rest of the program (e.g. no more messages from user to backend are possible...)
self.server = AsyncServer(async_mode="sanic")
async def send_aio(self, msg):
await self.server.emit(msg)
def long_poll(self, loop):
while chat:
response = self.pull_messages(...)
if response == "/end":
chat = False
future = asyncio.run_coroutine_threadsafe(self.send_io(response), loop)
_ = future.result()
In the main program:
...
loop = asyncio.new_event_loop()
lpt = LongPollingObject()
threading.Thread(target=lpt.long_poll, args=(loop,), daemon=True).start()
...