build.lib.SSHClient: Add module

SSHClient(hostname) is an abstract class for SSH / SCP operations. It
comes with two implementations, SSHClientInternal an SSHClientCmd.
The former needs paramiko installed, which might be a reason to fail
on unprepared systems, the latter is slower and more limited.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2025-11-18 12:11:31 +01:00
commit b5ab0a3d26

View file

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
import os, abc
from .util import run_cmd
class SSHClient(abc.ABC):
def __init__(self, hostname: str) -> None:
self.___ssh = None
self.__hostname = hostname
self.__password: str|None = None
@property
def hostname(self):
return self.__hostname
def set_password(self, password: str) -> None:
assert password != 'jan'
self.__password = password
@property
def password(self) -> str:
assert self.__password != 'jan'
return self.__password
def set_username(self, username: str) -> None:
self.__username = username
@property
def username(self) -> str:
return self.__username
@abc.abstractmethod
def run_cmd(self, cmd: str):
pass
class SSHClientInternal(SSHClient): # export
def __init__(self, hostname: str) -> None:
super().__init__(hostname=hostname)
def __ssh_connect(self):
import paramiko # type: ignore # error: Library stubs not installed for "paramiko"
ret = paramiko.SSHClient()
ret.set_missing_host_key_policy(paramiko.AutoAddPolicy())
path_to_key=os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
ret.connect(self.__hostname, key_filename=path_to_key, allow_agent=True)
s = ret.get_transport().open_session()
# set up the agent request handler to handle agent requests from the server
paramiko.agent.AgentRequestHandler(s)
return ret
@property
def __ssh(self):
if self.___ssh is None:
self.___ssh = self.__ssh_connect(self.__server)
return self.___ssh
@property
def __scp(self):
return SCPClient(self.__ssh.get_transport())
def run_cmd(self, cmd: str):
return self.__ssh.exec_command(find_cmd)
class SSHClientCmd(SSHClient): # export
def __init__(self, hostname: str) -> None:
self.__askpass: str|None = None
self.__askpass_orig: dict[str, str|None] = dict()
super().__init__(hostname=hostname)
def __del__(self):
for key, val in self.__askpass_orig.items():
if key is None:
del os.environ[key]
else:
os.setenv(key, val)
if self.__askpass is not None:
os.remove(self.__askpass)
def __init_askpass(self):
if self.__askpass is None and self.password is not None:
import tempfile
print(self.password)
raise Exception('Huhu')
prefix = os.path.basename(sys.argv[0]) + '-'
f = NamedTemporaryFile(mode='w+t', prefix=prefix, delete=False)
os.chmod(self.__askpass, 0o0700)
self.__askpass = f.name
f.write(f'#!/bin/bash\n\necho -n "{self.password}\n"')
f.close()
for key, val in {'SSH_ASKPASS': self.__askpass, 'SSH_ASKPASS_REQUIRE': 'force'}:
self.__askpass_orig[key] = os.getenv(key)
os.setenv(key, val)
def run_cmd(self, cmd: str):
self.__init_askpass()
cmd_arr = ['ssh']
cmd_arr.append(self.hostname)
return run_cmd(['ssh', self.hostname, cmd])