From: Jakob Cornell Date: Mon, 9 Jan 2023 00:11:40 +0000 (-0600) Subject: Fix concurrency bug in Strikebot's queues X-Git-Tag: strikebot-0.0.8^0 X-Git-Url: https://jcornell.net/gitweb/gitweb.cgi?a=commitdiff_plain;h=bb536756de2e4970699b583a7d1f1e4dd1552c2f;p=counting.git Fix concurrency bug in Strikebot's queues --- diff --git a/strikebot/strikebot/setup.cfg b/strikebot/strikebot/setup.cfg index cdd2e66..573a48a 100644 --- a/strikebot/strikebot/setup.cfg +++ b/strikebot/strikebot/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = strikebot -version = 0.0.7 +version = 0.0.8 [options] package_dir = diff --git a/strikebot/strikebot/src/strikebot/queue.py b/strikebot/strikebot/src/strikebot/queue.py index acf5ef0..eb0fe18 100644 --- a/strikebot/strikebot/src/strikebot/queue.py +++ b/strikebot/strikebot/src/strikebot/queue.py @@ -26,7 +26,12 @@ class Queue: self.push(el) async def pop(self) -> Any: - if not self._deque: + # When we're unparked, a new element is in the queue and "intended" for us, but it's up to us to actually + # consume it. This may create a race for the new element that's decided by the Trio scheduler; we may get the + # element or another task that's ready to run might (coming from a sleep not due to a park on this queue), in + # which case the queue might be empty by the time we resume. One solution is to just keep sleeping until we win + # the race. + while not self._deque: await self._empty_wait.park() return self._deque.popleft() @@ -48,12 +53,12 @@ class MaxHeap: self._heap = [] self._empty_wait = ParkingLot() - def push(self, item): + def push(self, item) -> None: heapq.heappush(self._heap, _ReverseOrdWrapper(item)) self._empty_wait.unpark() - async def pop(self): - if not self._heap: + async def pop(self) -> Any: + while not self._heap: await self._empty_wait.park() return heapq.heappop(self._heap).inner diff --git a/strikebot/strikebot/src/strikebot/tests.py b/strikebot/strikebot/src/strikebot/tests.py index 966bca6..02abb1c 100644 --- a/strikebot/strikebot/src/strikebot/tests.py +++ b/strikebot/strikebot/src/strikebot/tests.py @@ -1,5 +1,9 @@ +from trio import open_nursery +from trio.lowlevel import checkpoint from unittest import TestCase +import trio +from strikebot.queue import MaxHeap from strikebot.updates import parse_update @@ -60,3 +64,40 @@ class UpdateParsingTests(TestCase): pu = parse_update(_build_payload("
123\n456
"), None, "") self.assertEqual(pu.number, 123) + + +class QueueTests(TestCase): + def test_basic(self): + """ + Test the basic operation of the heap queue in conditions where some tasks consuming it don't wait in the parking + lot. This covers a prior concurrency bug in the implementation. + """ + + queue = MaxHeap() + + async def producer(): + while True: + queue.push(0) + await trio.sleep(0.001) + + async def no_park_consumer(): + while True: + if queue._heap: + await queue.pop() + await checkpoint() + + async def parking_consumer(): + while True: + await queue.pop() + + async def main(): + async with open_nursery() as n: + n.start_soon(producer) + for _ in range(2): + n.start_soon(no_park_consumer) + for _ in range(2): + n.start_soon(parking_consumer) + await trio.sleep(0.1) + n.cancel_scope.cancel() + + trio.run(main)