lib.AsyncRunner: Add class

Add class AsyncRunner. This is a wrapper around the ceremony needed
to spawn an extra event loop in a synchronous function which wants to
call an async function.

Guido van Rossum considers it bad design that such a function exists
in the first place. While that may be true in the long run also for
jw-pkg, at this point I'm unwilling to flag every lazyly initialized
App property as async. It's not clear, yet, which will be async and
which not, and I dread the churn. So I will accept this as a
minimally invasive helper for now. When the API has stabilized, a
design without it may be better.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-03-18 16:21:44 +01:00
commit 67e51cf07c

View file

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
import abc, asyncio, contextlib, concurrent.futures
from collections.abc import Awaitable, Generator
from typing import TypeVar
T = TypeVar("T")
@contextlib.contextmanager
def loop_in_thread() -> Generator[asyncio.AbstractEventLoop, None, None]:
loop_fut: concurrent.futures.Future[asyncio.AbstractEventLoop] = (
concurrent.futures.Future()
)
stop_event = asyncio.Event()
async def main() -> None:
loop_fut.set_result(asyncio.get_running_loop())
await stop_event.wait()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as tpe:
complete_fut = tpe.submit(asyncio.run, main())
for fut in concurrent.futures.as_completed((loop_fut, complete_fut)):
if fut is loop_fut:
loop = loop_fut.result()
try:
yield loop
finally:
loop.call_soon_threadsafe(stop_event.set)
else:
fut.result()
class AsyncRunner:
def __init__(self) -> None:
self._cm = loop_in_thread()
self._loop = self._cm.__enter__()
def call(self, awaitable: Awaitable[T], timeout: float | None = None) -> T:
fut = asyncio.run_coroutine_threadsafe(awaitable, self._loop)
return fut.result(timeout)
def close(self) -> None:
self._cm.__exit__(None, None, None)
def __enter__(self) -> "AsyncRunner":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()