From a19679fecc2d2cb8410eab1a683e93ce323bc668 Mon Sep 17 00:00:00 2001 From: Jan Lindemann Date: Thu, 16 Apr 2026 15:42:20 +0200 Subject: [PATCH] lib.ec.ssh.AsyncSSH: Revert "Reuse connection" This reverts commit 04fef1e67a3be98192cc75e8c364cab700b3f9fb. Reusing AsyncSSH's connection is fine and fast, but only if it's not combined with the AsyncRunner. See commit 67e51cf0 why it was introduced in the first place, along with a reasoning why it may be a bad idea. Looks like we're now reaping what we sowed. The current plan to get this to fly is to sprinkle async / await all over the code paths to App.os_release(). That is a lot of churn, so postpone and revert for now to keep CI working. File "~/local/src/jw.dev/proj/jw-pkg/scripts/jw/pkg/lib/ec/ssh/AsyncSSH.py", line 463, in _run_ssh return await self._run_on_conn( ^^^^^^^^^^^^^^^^^^^^^^^^ ...<7 lines>... ) ^ File "~/local/src/jw.dev/proj/jw-pkg/scripts/jw/pkg/lib/ec/ssh/AsyncSSH.py", line 403, in _run_on_conn proc = await conn.create_process( ^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<7 lines>... ) ^ File "/usr/lib/python3.13/site-packages/asyncssh/connection.py", line 4492, in create_process chan, process = await self.create_session( ^^^^^^^^^^^^^^^^^^^^^^^^^^ SSHClientProcess, *args, **kwargs) # type: ignore ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.13/site-packages/asyncssh/connection.py", line 4385, in create_session session = await chan.create(session_factory, command, subsystem, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<4 lines>... bool(self._agent_forward_path)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.13/site-packages/asyncssh/channel.py", line 1149, in create packet = await self._open(b'session') ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.13/site-packages/asyncssh/channel.py", line 717, in _open return await self._open_waiter ^^^^^^^^^^^^^^^^^^^^^^^ RuntimeError: Task cb=[_run_until_complete_cb() at /usr/lib64/python3.13/asyncio/base_events.py:181]> got Future attached to a different loop Signed-off-by: Jan Lindemann --- src/python/jw/pkg/lib/ec/ssh/AsyncSSH.py | 86 +++++++++++------------- 1 file changed, 38 insertions(+), 48 deletions(-) diff --git a/src/python/jw/pkg/lib/ec/ssh/AsyncSSH.py b/src/python/jw/pkg/lib/ec/ssh/AsyncSSH.py index f3c724e2..7177548e 100644 --- a/src/python/jw/pkg/lib/ec/ssh/AsyncSSH.py +++ b/src/python/jw/pkg/lib/ec/ssh/AsyncSSH.py @@ -33,7 +33,6 @@ class AsyncSSH(Base): self.known_hosts = known_hosts self.term_type = term_type or os.environ.get("TERM", "xterm") self.connect_timeout = connect_timeout - self.__conn: asyncssh.SSHClientConnection|None = None def _connect_kwargs(self) -> dict: kwargs: dict = { @@ -96,31 +95,20 @@ class AsyncSSH(Base): def _get_local_term_size() -> tuple[int, int, int, int]: cols, rows = shutil.get_terminal_size(fallback=(80, 24)) xpixel = ypixel = 0 + try: import fcntl, termios, struct + packed = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, b"\0" * 8) rows2, cols2, xpixel, ypixel = struct.unpack("HHHH", packed) + if cols2 > 0 and rows2 > 0: cols, rows = cols2, rows2 except Exception: pass + return (cols, rows, xpixel, ypixel) - @property - async def _conn(self) -> asyncssh.SSHClientConnection: - if self.__conn is None: - self.__conn = await asyncssh.connect(**self._connect_kwargs()) - return self.__conn - - async def _close(self) -> None: - if self.__conn is not None: - try: - self.__conn.close() - await self.__conn.wait_closed() - except: - pass - self.__conn = None - async def _read_stream( self, stream, @@ -137,24 +125,26 @@ class AsyncSSH(Base): chunk = await stream.read(4096) if not chunk: break + collector.append(chunk) + if verbose: buf += chunk while b"\n" in buf: line, buf = buf.split(b"\n", 1) log(prio, log_prefix, line.decode(log_enc, errors="replace")) + if verbose and buf: log(prio, log_prefix, buf.decode(log_enc, errors="replace")) async def _run_interactive_on_conn( self, + conn: asyncssh.SSHClientConnection, cmd: list[str], wd: str | None, cmd_input: bytes | None, env: dict[str, str] | None, ) -> Result: - - conn = await self._conn command = self._build_remote_command(cmd, wd) stdout_parts: list[bytes] = [] @@ -306,6 +296,7 @@ class AsyncSSH(Base): async def _run_captured_pty_on_conn( self, + conn: asyncssh.SSHClientConnection, cmd: list[str], wd: str | None, verbose: bool, @@ -313,8 +304,6 @@ class AsyncSSH(Base): env: dict[str, str] | None, log_prefix: str, ) -> Result: - - conn = await self._conn command = self._build_remote_command(cmd, wd) stdout_parts: list[bytes] = [] @@ -359,6 +348,7 @@ class AsyncSSH(Base): async def _run_on_conn( self, + conn: asyncssh.SSHClientConnection, cmd: list[str], wd: str | None, verbose: bool, @@ -367,12 +357,10 @@ class AsyncSSH(Base): interactive: bool, log_prefix: str, ) -> Result: - - conn = await self._conn - if interactive: if self._has_local_tty(): return await self._run_interactive_on_conn( + conn=conn, cmd=cmd, wd=wd, cmd_input=cmd_input, @@ -380,6 +368,7 @@ class AsyncSSH(Base): ) return await self._run_captured_pty_on_conn( + conn=conn, cmd=cmd, wd=wd, verbose=verbose, @@ -458,15 +447,17 @@ class AsyncSSH(Base): interactive: bool, log_prefix: str, ) -> Result: - return await self._run_on_conn( - cmd, - wd, - verbose, - cmd_input, - env, - interactive, - log_prefix, - ) + async with asyncssh.connect(**self._connect_kwargs()) as conn: + return await self._run_on_conn( + conn=conn, + cmd=cmd, + wd=wd, + verbose=verbose, + cmd_input=cmd_input, + env=env, + interactive=interactive, + log_prefix=log_prefix, + ) async def _sudo( self, @@ -476,25 +467,24 @@ class AsyncSSH(Base): *args, **kwargs, ) -> Result: - args, kwargs = self._merge_env_into_forwarded_args(args, kwargs, mod_env) - conn = await self._conn - uid_result = conn.run("id -u", check=False) - is_root = ( - uid_result.exit_status == 0 - and isinstance(uid_result.stdout, str) - and uid_result.stdout.strip() == "0" - ) + async with asyncssh.connect(**self._connect_kwargs()) as conn: + uid_result = await conn.run("id -u", check=False) + is_root = ( + uid_result.exit_status == 0 + and isinstance(uid_result.stdout, str) + and uid_result.stdout.strip() == "0" + ) - cmdline: list[str] = [] + cmdline: list[str] = [] - if not is_root: - cmdline.append("/usr/bin/sudo") - if mod_env: - cmdline.append("--preserve-env=" + ",".join(mod_env.keys())) - cmdline.extend(opts) + if not is_root: + cmdline.append("/usr/bin/sudo") + if mod_env: + cmdline.append("--preserve-env=" + ",".join(mod_env.keys())) + cmdline.extend(opts) - cmdline.extend(cmd) + cmdline.extend(cmd) - return await self._run_on_conn(cmdline, *args, **kwargs) + return await self._run_on_conn(conn, cmdline, *args, **kwargs)