mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-24 17:23:36 +02:00
lib.AsyncRunner: Add class
Add class AsyncRunner. This is a wrapper around the ceremony needed to spawn an extra event loop in a synchronous function which wants to call an async function. Guido van Rossum considers it bad design that such a function exists in the first place. While that may be true in the long run also for jw-pkg, at this point I'm unwilling to flag every lazyly initialized App property as async. It's not clear, yet, which will be async and which not, and I dread the churn. So I will accept this as a minimally invasive helper for now. When the API has stabilized, a design without it may be better. Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
95a384bfff
commit
67e51cf07c
1 changed files with 53 additions and 0 deletions
53
src/python/jw/pkg/lib/AsyncRunner.py
Normal file
53
src/python/jw/pkg/lib/AsyncRunner.py
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import abc, asyncio, contextlib, concurrent.futures
|
||||||
|
from collections.abc import Awaitable, Generator
|
||||||
|
from typing import TypeVar
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def loop_in_thread() -> Generator[asyncio.AbstractEventLoop, None, None]:
|
||||||
|
|
||||||
|
loop_fut: concurrent.futures.Future[asyncio.AbstractEventLoop] = (
|
||||||
|
concurrent.futures.Future()
|
||||||
|
)
|
||||||
|
|
||||||
|
stop_event = asyncio.Event()
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
loop_fut.set_result(asyncio.get_running_loop())
|
||||||
|
await stop_event.wait()
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as tpe:
|
||||||
|
|
||||||
|
complete_fut = tpe.submit(asyncio.run, main())
|
||||||
|
|
||||||
|
for fut in concurrent.futures.as_completed((loop_fut, complete_fut)):
|
||||||
|
if fut is loop_fut:
|
||||||
|
loop = loop_fut.result()
|
||||||
|
try:
|
||||||
|
yield loop
|
||||||
|
finally:
|
||||||
|
loop.call_soon_threadsafe(stop_event.set)
|
||||||
|
else:
|
||||||
|
fut.result()
|
||||||
|
|
||||||
|
class AsyncRunner:
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._cm = loop_in_thread()
|
||||||
|
self._loop = self._cm.__enter__()
|
||||||
|
|
||||||
|
def call(self, awaitable: Awaitable[T], timeout: float | None = None) -> T:
|
||||||
|
fut = asyncio.run_coroutine_threadsafe(awaitable, self._loop)
|
||||||
|
return fut.result(timeout)
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self._cm.__exit__(None, None, None)
|
||||||
|
|
||||||
|
def __enter__(self) -> "AsyncRunner":
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb) -> None:
|
||||||
|
self.close()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue