Fix concurrency bug in Strikebot's queues strikebot-0.0.8
authorJakob Cornell <jakob+gpg@jcornell.net>
Mon, 9 Jan 2023 00:11:40 +0000 (18:11 -0600)
committerJakob Cornell <jakob+gpg@jcornell.net>
Mon, 9 Jan 2023 03:54:20 +0000 (21:54 -0600)
strikebot/strikebot/setup.cfg
strikebot/strikebot/src/strikebot/queue.py
strikebot/strikebot/src/strikebot/tests.py

index cdd2e66e0bb226142d7e147936b85927adb2f89b..573a48ad99f97e890a4c3f943073c5a023e39491 100644 (file)
@@ -1,6 +1,6 @@
 [metadata]
 name = strikebot
-version = 0.0.7
+version = 0.0.8
 
 [options]
 package_dir =
index acf5ef00cef06a45bbfe01815d37877d16e60143..eb0fe18fd06163200fe0100321deae45c6826825 100644 (file)
@@ -26,7 +26,12 @@ class Queue:
                        self.push(el)
 
        async def pop(self) -> Any:
-               if not self._deque:
+               # When we're unparked, a new element is in the queue and "intended" for us, but it's up to us to actually
+               # consume it. This may create a race for the new element that's decided by the Trio scheduler; we may get the
+               # element or another task that's ready to run might (coming from a sleep not due to a park on this queue), in
+               # which case the queue might be empty by the time we resume. One solution is to just keep sleeping until we win
+               # the race.
+               while not self._deque:
                        await self._empty_wait.park()
                return self._deque.popleft()
 
@@ -48,12 +53,12 @@ class MaxHeap:
                self._heap = []
                self._empty_wait = ParkingLot()
 
-       def push(self, item):
+       def push(self, item) -> None:
                heapq.heappush(self._heap, _ReverseOrdWrapper(item))
                self._empty_wait.unpark()
 
-       async def pop(self):
-               if not self._heap:
+       async def pop(self) -> Any:
+               while not self._heap:
                        await self._empty_wait.park()
                return heapq.heappop(self._heap).inner
 
index 966bca64c691e1b75c851d924d046aac83cb10f9..02abb1c219c5df74303e330008a1e1c7a14335c0 100644 (file)
@@ -1,5 +1,9 @@
+from trio import open_nursery
+from trio.lowlevel import checkpoint
 from unittest import TestCase
+import trio
 
+from strikebot.queue import MaxHeap
 from strikebot.updates import parse_update
 
 
@@ -60,3 +64,40 @@ class UpdateParsingTests(TestCase):
 
                pu = parse_update(_build_payload("<pre>123\n456</pre>"), None, "")
                self.assertEqual(pu.number, 123)
+
+
+class QueueTests(TestCase):
+       def test_basic(self):
+               """
+               Test the basic operation of the heap queue in conditions where some tasks consuming it don't wait in the parking
+               lot. This covers a prior concurrency bug in the implementation.
+               """
+
+               queue = MaxHeap()
+
+               async def producer():
+                       while True:
+                               queue.push(0)
+                               await trio.sleep(0.001)
+
+               async def no_park_consumer():
+                       while True:
+                               if queue._heap:
+                                       await queue.pop()
+                               await checkpoint()
+
+               async def parking_consumer():
+                       while True:
+                               await queue.pop()
+
+               async def main():
+                       async with open_nursery() as n:
+                               n.start_soon(producer)
+                               for _ in range(2):
+                                       n.start_soon(no_park_consumer)
+                               for _ in range(2):
+                                       n.start_soon(parking_consumer)
+                               await trio.sleep(0.1)
+                               n.cancel_scope.cancel()
+
+               trio.run(main)