import logging
from trio import open_memory_channel, open_nursery, open_signal_receiver
-from trio.lowlevel import current_root_task, Task
+
+try:
+ from trio.lowlevel import current_root_task, Task
+except ImportError:
+ from trio.hazmat import current_root_task, Task
+
import trio_asyncio
import triopg
def _channel_sender(method):
async def wrapped(self, resp_channel, *args, **kwargs):
ret = await method(self, *args, **kwargs)
- with resp_channel:
+ async with resp_channel:
await resp_channel.send(ret)
return wrapped
try:
async with conn_ctx as conn:
self._logger.debug("scope up: {}".format(obj_digest(cancel_scope)))
- with refresh_tx:
+ async with refresh_tx:
silent_timeout = False
with cancel_scope, suppress(ConnectionClosed):
while True:
async def conn_refresher_impl(self):
"""Task to monitor and replace WS connections as they disconnect or go silent."""
- with self._refresh_queue_rx:
+ async with self._refresh_queue_rx:
async for old_scope in self._refresh_queue_rx:
await self._pool_event_tx.send(_ConnectionDown(trio.current_time(), old_scope))
await self._spawn_reader()
async def event_reader_impl(self):
"""Drop unused messages, deduplicate useful ones, and communicate with the timeout handler."""
- with self._pool_event_rx, self._timer_poke_tx:
+ async with self._pool_event_rx, self._timer_poke_tx:
async for event in self._pool_event_rx:
if isinstance(event, _ConnectionUp):
# An early add of an active scope could mean it's expected on a message that fired before it opened,
async def timeout_handler_impl(self):
"""When connections have had enough time to reach parity on a message, signal replacement of any slackers."""
- with self._timer_poke_rx:
+ async with self._timer_poke_rx:
while True:
if self._buckets:
now = trio.current_time()