mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-24 09:13:37 +02:00
lib.ec.ssh.AsyncSSH: Revert "Reuse connection"
This reverts commit04fef1e67a. Reusing AsyncSSH's connection is fine and fast, but only if it's not combined with the AsyncRunner. See commit67e51cf0why it was introduced in the first place, along with a reasoning why it may be a bad idea. Looks like we're now reaping what we sowed. The current plan to get this to fly is to sprinkle async / await all over the code paths to App.os_release(). That is a lot of churn, so postpone and revert for now to keep CI working. File "~/local/src/jw.dev/proj/jw-pkg/scripts/jw/pkg/lib/ec/ssh/AsyncSSH.py", line 463, in _run_ssh return await self._run_on_conn( ^^^^^^^^^^^^^^^^^^^^^^^^ ...<7 lines>... ) ^ File "~/local/src/jw.dev/proj/jw-pkg/scripts/jw/pkg/lib/ec/ssh/AsyncSSH.py", line 403, in _run_on_conn proc = await conn.create_process( ^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<7 lines>... ) ^ File "/usr/lib/python3.13/site-packages/asyncssh/connection.py", line 4492, in create_process chan, process = await self.create_session( ^^^^^^^^^^^^^^^^^^^^^^^^^^ SSHClientProcess, *args, **kwargs) # type: ignore ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.13/site-packages/asyncssh/connection.py", line 4385, in create_session session = await chan.create(session_factory, command, subsystem, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<4 lines>... bool(self._agent_forward_path)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.13/site-packages/asyncssh/channel.py", line 1149, in create packet = await self._open(b'session') ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.13/site-packages/asyncssh/channel.py", line 717, in _open return await self._open_waiter ^^^^^^^^^^^^^^^^^^^^^^^ RuntimeError: Task <Task pending name='Task-1' coro=<App.__run() running at ~/local/src/jw.dev/proj/jw-pkg/scripts/jw/pkg/lib/App.py:137> cb=[_run_until_complete_cb() at /usr/lib64/python3.13/asyncio/base_events.py:181]> got Future <Future pending> attached to a different loop Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
7478206c38
commit
a19679fecc
1 changed files with 38 additions and 48 deletions
|
|
@ -33,7 +33,6 @@ class AsyncSSH(Base):
|
||||||
self.known_hosts = known_hosts
|
self.known_hosts = known_hosts
|
||||||
self.term_type = term_type or os.environ.get("TERM", "xterm")
|
self.term_type = term_type or os.environ.get("TERM", "xterm")
|
||||||
self.connect_timeout = connect_timeout
|
self.connect_timeout = connect_timeout
|
||||||
self.__conn: asyncssh.SSHClientConnection|None = None
|
|
||||||
|
|
||||||
def _connect_kwargs(self) -> dict:
|
def _connect_kwargs(self) -> dict:
|
||||||
kwargs: dict = {
|
kwargs: dict = {
|
||||||
|
|
@ -96,31 +95,20 @@ class AsyncSSH(Base):
|
||||||
def _get_local_term_size() -> tuple[int, int, int, int]:
|
def _get_local_term_size() -> tuple[int, int, int, int]:
|
||||||
cols, rows = shutil.get_terminal_size(fallback=(80, 24))
|
cols, rows = shutil.get_terminal_size(fallback=(80, 24))
|
||||||
xpixel = ypixel = 0
|
xpixel = ypixel = 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import fcntl, termios, struct
|
import fcntl, termios, struct
|
||||||
|
|
||||||
packed = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, b"\0" * 8)
|
packed = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, b"\0" * 8)
|
||||||
rows2, cols2, xpixel, ypixel = struct.unpack("HHHH", packed)
|
rows2, cols2, xpixel, ypixel = struct.unpack("HHHH", packed)
|
||||||
|
|
||||||
if cols2 > 0 and rows2 > 0:
|
if cols2 > 0 and rows2 > 0:
|
||||||
cols, rows = cols2, rows2
|
cols, rows = cols2, rows2
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return (cols, rows, xpixel, ypixel)
|
return (cols, rows, xpixel, ypixel)
|
||||||
|
|
||||||
@property
|
|
||||||
async def _conn(self) -> asyncssh.SSHClientConnection:
|
|
||||||
if self.__conn is None:
|
|
||||||
self.__conn = await asyncssh.connect(**self._connect_kwargs())
|
|
||||||
return self.__conn
|
|
||||||
|
|
||||||
async def _close(self) -> None:
|
|
||||||
if self.__conn is not None:
|
|
||||||
try:
|
|
||||||
self.__conn.close()
|
|
||||||
await self.__conn.wait_closed()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
self.__conn = None
|
|
||||||
|
|
||||||
async def _read_stream(
|
async def _read_stream(
|
||||||
self,
|
self,
|
||||||
stream,
|
stream,
|
||||||
|
|
@ -137,24 +125,26 @@ class AsyncSSH(Base):
|
||||||
chunk = await stream.read(4096)
|
chunk = await stream.read(4096)
|
||||||
if not chunk:
|
if not chunk:
|
||||||
break
|
break
|
||||||
|
|
||||||
collector.append(chunk)
|
collector.append(chunk)
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
buf += chunk
|
buf += chunk
|
||||||
while b"\n" in buf:
|
while b"\n" in buf:
|
||||||
line, buf = buf.split(b"\n", 1)
|
line, buf = buf.split(b"\n", 1)
|
||||||
log(prio, log_prefix, line.decode(log_enc, errors="replace"))
|
log(prio, log_prefix, line.decode(log_enc, errors="replace"))
|
||||||
|
|
||||||
if verbose and buf:
|
if verbose and buf:
|
||||||
log(prio, log_prefix, buf.decode(log_enc, errors="replace"))
|
log(prio, log_prefix, buf.decode(log_enc, errors="replace"))
|
||||||
|
|
||||||
async def _run_interactive_on_conn(
|
async def _run_interactive_on_conn(
|
||||||
self,
|
self,
|
||||||
|
conn: asyncssh.SSHClientConnection,
|
||||||
cmd: list[str],
|
cmd: list[str],
|
||||||
wd: str | None,
|
wd: str | None,
|
||||||
cmd_input: bytes | None,
|
cmd_input: bytes | None,
|
||||||
env: dict[str, str] | None,
|
env: dict[str, str] | None,
|
||||||
) -> Result:
|
) -> Result:
|
||||||
|
|
||||||
conn = await self._conn
|
|
||||||
command = self._build_remote_command(cmd, wd)
|
command = self._build_remote_command(cmd, wd)
|
||||||
stdout_parts: list[bytes] = []
|
stdout_parts: list[bytes] = []
|
||||||
|
|
||||||
|
|
@ -306,6 +296,7 @@ class AsyncSSH(Base):
|
||||||
|
|
||||||
async def _run_captured_pty_on_conn(
|
async def _run_captured_pty_on_conn(
|
||||||
self,
|
self,
|
||||||
|
conn: asyncssh.SSHClientConnection,
|
||||||
cmd: list[str],
|
cmd: list[str],
|
||||||
wd: str | None,
|
wd: str | None,
|
||||||
verbose: bool,
|
verbose: bool,
|
||||||
|
|
@ -313,8 +304,6 @@ class AsyncSSH(Base):
|
||||||
env: dict[str, str] | None,
|
env: dict[str, str] | None,
|
||||||
log_prefix: str,
|
log_prefix: str,
|
||||||
) -> Result:
|
) -> Result:
|
||||||
|
|
||||||
conn = await self._conn
|
|
||||||
command = self._build_remote_command(cmd, wd)
|
command = self._build_remote_command(cmd, wd)
|
||||||
|
|
||||||
stdout_parts: list[bytes] = []
|
stdout_parts: list[bytes] = []
|
||||||
|
|
@ -359,6 +348,7 @@ class AsyncSSH(Base):
|
||||||
|
|
||||||
async def _run_on_conn(
|
async def _run_on_conn(
|
||||||
self,
|
self,
|
||||||
|
conn: asyncssh.SSHClientConnection,
|
||||||
cmd: list[str],
|
cmd: list[str],
|
||||||
wd: str | None,
|
wd: str | None,
|
||||||
verbose: bool,
|
verbose: bool,
|
||||||
|
|
@ -367,12 +357,10 @@ class AsyncSSH(Base):
|
||||||
interactive: bool,
|
interactive: bool,
|
||||||
log_prefix: str,
|
log_prefix: str,
|
||||||
) -> Result:
|
) -> Result:
|
||||||
|
|
||||||
conn = await self._conn
|
|
||||||
|
|
||||||
if interactive:
|
if interactive:
|
||||||
if self._has_local_tty():
|
if self._has_local_tty():
|
||||||
return await self._run_interactive_on_conn(
|
return await self._run_interactive_on_conn(
|
||||||
|
conn=conn,
|
||||||
cmd=cmd,
|
cmd=cmd,
|
||||||
wd=wd,
|
wd=wd,
|
||||||
cmd_input=cmd_input,
|
cmd_input=cmd_input,
|
||||||
|
|
@ -380,6 +368,7 @@ class AsyncSSH(Base):
|
||||||
)
|
)
|
||||||
|
|
||||||
return await self._run_captured_pty_on_conn(
|
return await self._run_captured_pty_on_conn(
|
||||||
|
conn=conn,
|
||||||
cmd=cmd,
|
cmd=cmd,
|
||||||
wd=wd,
|
wd=wd,
|
||||||
verbose=verbose,
|
verbose=verbose,
|
||||||
|
|
@ -458,15 +447,17 @@ class AsyncSSH(Base):
|
||||||
interactive: bool,
|
interactive: bool,
|
||||||
log_prefix: str,
|
log_prefix: str,
|
||||||
) -> Result:
|
) -> Result:
|
||||||
return await self._run_on_conn(
|
async with asyncssh.connect(**self._connect_kwargs()) as conn:
|
||||||
cmd,
|
return await self._run_on_conn(
|
||||||
wd,
|
conn=conn,
|
||||||
verbose,
|
cmd=cmd,
|
||||||
cmd_input,
|
wd=wd,
|
||||||
env,
|
verbose=verbose,
|
||||||
interactive,
|
cmd_input=cmd_input,
|
||||||
log_prefix,
|
env=env,
|
||||||
)
|
interactive=interactive,
|
||||||
|
log_prefix=log_prefix,
|
||||||
|
)
|
||||||
|
|
||||||
async def _sudo(
|
async def _sudo(
|
||||||
self,
|
self,
|
||||||
|
|
@ -476,25 +467,24 @@ class AsyncSSH(Base):
|
||||||
*args,
|
*args,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Result:
|
) -> Result:
|
||||||
|
|
||||||
args, kwargs = self._merge_env_into_forwarded_args(args, kwargs, mod_env)
|
args, kwargs = self._merge_env_into_forwarded_args(args, kwargs, mod_env)
|
||||||
|
|
||||||
conn = await self._conn
|
async with asyncssh.connect(**self._connect_kwargs()) as conn:
|
||||||
uid_result = conn.run("id -u", check=False)
|
uid_result = await conn.run("id -u", check=False)
|
||||||
is_root = (
|
is_root = (
|
||||||
uid_result.exit_status == 0
|
uid_result.exit_status == 0
|
||||||
and isinstance(uid_result.stdout, str)
|
and isinstance(uid_result.stdout, str)
|
||||||
and uid_result.stdout.strip() == "0"
|
and uid_result.stdout.strip() == "0"
|
||||||
)
|
)
|
||||||
|
|
||||||
cmdline: list[str] = []
|
cmdline: list[str] = []
|
||||||
|
|
||||||
if not is_root:
|
if not is_root:
|
||||||
cmdline.append("/usr/bin/sudo")
|
cmdline.append("/usr/bin/sudo")
|
||||||
if mod_env:
|
if mod_env:
|
||||||
cmdline.append("--preserve-env=" + ",".join(mod_env.keys()))
|
cmdline.append("--preserve-env=" + ",".join(mod_env.keys()))
|
||||||
cmdline.extend(opts)
|
cmdline.extend(opts)
|
||||||
|
|
||||||
cmdline.extend(cmd)
|
cmdline.extend(cmd)
|
||||||
|
|
||||||
return await self._run_on_conn(cmdline, *args, **kwargs)
|
return await self._run_on_conn(conn, cmdline, *args, **kwargs)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue