self.push(el)
async def pop(self) -> Any:
- if not self._deque:
+ # When we're unparked, a new element is in the queue and "intended" for us, but it's up to us to actually
+ # consume it. This may create a race for the new element that's decided by the Trio scheduler; we may get the
+ # element or another task that's ready to run might (coming from a sleep not due to a park on this queue), in
+ # which case the queue might be empty by the time we resume. One solution is to just keep sleeping until we win
+ # the race.
+ while not self._deque:
await self._empty_wait.park()
return self._deque.popleft()
self._heap = []
self._empty_wait = ParkingLot()
- def push(self, item):
+ def push(self, item) -> None:
heapq.heappush(self._heap, _ReverseOrdWrapper(item))
self._empty_wait.unpark()
- async def pop(self):
- if not self._heap:
+ async def pop(self) -> Any:
+ while not self._heap:
await self._empty_wait.park()
return heapq.heappop(self._heap).inner
+from trio import open_nursery
+from trio.lowlevel import checkpoint
from unittest import TestCase
+import trio
+from strikebot.queue import MaxHeap
from strikebot.updates import parse_update
pu = parse_update(_build_payload("<pre>123\n456</pre>"), None, "")
self.assertEqual(pu.number, 123)
+
+
+class QueueTests(TestCase):
+ def test_basic(self):
+ """
+ Test the basic operation of the heap queue in conditions where some tasks consuming it don't wait in the parking
+ lot. This covers a prior concurrency bug in the implementation.
+ """
+
+ queue = MaxHeap()
+
+ async def producer():
+ while True:
+ queue.push(0)
+ await trio.sleep(0.001)
+
+ async def no_park_consumer():
+ while True:
+ if queue._heap:
+ await queue.pop()
+ await checkpoint()
+
+ async def parking_consumer():
+ while True:
+ await queue.pop()
+
+ async def main():
+ async with open_nursery() as n:
+ n.start_soon(producer)
+ for _ in range(2):
+ n.start_soon(no_park_consumer)
+ for _ in range(2):
+ n.start_soon(parking_consumer)
+ await trio.sleep(0.1)
+ n.cancel_scope.cancel()
+
+ trio.run(main)