jw-pkg/src/python/jw/pkg/lib/SSHClient.py
Jan Lindemann 565946643b jw.pkg.*.run_xxx(): Return exit status
Most run_xxx() return stdout and stderr. There's no way, really, for
the caller to get hold of the exit code of the spawned executable. It
can pass throw=true, catch, and assume a non-zero exit status. But
that's not semantically clean, since the spawned function can well be
a test function which is expected to return a non-zero status code,
and the caller might be interested in what code that was, exactly.

The clearest way to solve this is to return the exit code as well.
This commit does that.

Signed-off-by: Jan Lindemann <jan@janware.com>
2026-03-03 11:23:30 +01:00

101 lines
3.1 KiB
Python

# -*- coding: utf-8 -*-
import os, abc
from .util import run_cmd
class SSHClient(abc.ABC):
def __init__(self, hostname: str) -> None:
self.___ssh = None
self.__hostname = hostname
self.__password: str|None = None
@property
def hostname(self):
return self.__hostname
def set_password(self, password: str) -> None:
assert password != 'jan'
self.__password = password
@property
def password(self) -> str:
assert self.__password != 'jan'
return self.__password
def set_username(self, username: str) -> None:
self.__username = username
@property
def username(self) -> str:
return self.__username
@abc.abstractmethod
async def run_cmd(self, cmd: str):
pass
class SSHClientInternal(SSHClient): # export
def __init__(self, hostname: str) -> None:
super().__init__(hostname=hostname)
def __ssh_connect(self):
import paramiko # type: ignore # error: Library stubs not installed for "paramiko"
ret = paramiko.SSHClient()
ret.set_missing_host_key_policy(paramiko.AutoAddPolicy())
path_to_key=os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
ret.connect(self.__hostname, key_filename=path_to_key, allow_agent=True)
s = ret.get_transport().open_session()
# set up the agent request handler to handle agent requests from the server
paramiko.agent.AgentRequestHandler(s)
return ret
@property
def __ssh(self):
if self.___ssh is None:
self.___ssh = self.__ssh_connect(self.__server)
return self.___ssh
@property
def __scp(self):
return SCPClient(self.__ssh.get_transport())
async def run_cmd(self, cmd: str):
return self.__ssh.exec_command(find_cmd)
class SSHClientCmd(SSHClient): # export
def __init__(self, hostname: str) -> None:
self.__askpass: str|None = None
self.__askpass_orig: dict[str, str|None] = dict()
super().__init__(hostname=hostname)
def __del__(self):
for key, val in self.__askpass_orig.items():
if val is None:
del os.environ[key]
else:
os.environ[key] = val
if self.__askpass is not None:
os.remove(self.__askpass)
def __init_askpass(self):
if self.__askpass is None and self.password is not None:
import sys, tempfile
prefix = os.path.basename(sys.argv[0]) + '-'
f = tempfile.NamedTemporaryFile(mode='w+t', prefix=prefix, delete=False)
os.chmod(f.name, 0o0700)
self.__askpass = f.name
f.write(f'#!/bin/bash\n\necho -n "{self.password}\n"')
f.close()
for key, val in {'SSH_ASKPASS': self.__askpass, 'SSH_ASKPASS_REQUIRE': 'force'}.items():
self.__askpass_orig[key] = os.getenv(key)
os.environ[key] = val
async def run_cmd(self, cmd: str):
self.__init_askpass()
cmd_arr = ['ssh']
cmd_arr.append(self.hostname)
stdout, stderr, status = await run_cmd(['ssh', self.hostname, cmd])
return stdout