2

I'm working on a function that should continue after a response has been given through a websocket. In order to achieve this I use an asyncio event which will be set after the response has been given.

Three functions are involved:

async def send(self, message): await self.channel(message.toJSON()) if (message.method == 'get' or message.method == 'post'): event = asyncio.Event() self._queueMessage(message, event) await event.wait() print('continue') def _queueMessage(self, message, event): self.queue.append([message, event]) def _process_response_message(self, message): for entry in self.queue: if (message['_id'] == entry[0]._id): print(entry[1]) entry[1].set() print(entry[1]) return 

Returns:

<asyncio.locks.Event object at 0x7f3a1ff20da0 [unset,waiters:1]> <asyncio.locks.Event object at 0x7f3a1ff20da0 [set,waiters:1]> 

In this example, the print('continue') function is never called and I do not why because the .set() function is actually called and .set() does seem to work fine if I use it before I call await event.wait().

Is there something I'm missing?

3
  • Are you sure control would be returned to asyncio's event loop after _process_response_message finished? Commented Dec 6, 2017 at 13:01
  • @MikhailGerasimov I assume that something like that is going wrong. I assumed that the event.set() would communicate the change to the asyncio's event loop but you are saying that is not necessarily the case? Commented Dec 6, 2017 at 13:46
  • @MikhailGerasimov I just added the line asyncio.get_event_loop() to the _process_response_message function and I got the message: There is no current event loop in thread 'Dummy-1'. So I think you are indeed correct with your question. Commented Dec 6, 2017 at 14:00

1 Answer 1

2

Based on message you get _process_response_message seems to be running in another thread. asyncio.Event is not thread-safe object you should use loop.call_soon_threadsafe function to call it's methods from other thread. Try to change your code like this:

async def send(self, message): await self.channel(message.toJSON()) if (message.method == 'get' or message.method == 'post'): loop = asyncio.get_event_loop() event = asyncio.Event() self._queueMessage(message, loop, event) await event.wait() print('continue') def _queueMessage(self, message, loop, event): self.queue.append([message, loop, event]) def _process_response_message(self, message): for entry in self.queue: qmsg, loop, event = entry if (message['_id'] == qmsg._id): loop.call_soon_threadsafe(event.set) return 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.