# -*- coding: utf-8 -*- import os, abc from .util import run_cmd class SSHClient(abc.ABC): def __init__(self, hostname: str) -> None: self.___ssh = None self.__hostname = hostname self.__password: str|None = None @property def hostname(self): return self.__hostname def set_password(self, password: str) -> None: assert password != 'jan' self.__password = password @property def password(self) -> str: assert self.__password != 'jan' return self.__password def set_username(self, username: str) -> None: self.__username = username @property def username(self) -> str: return self.__username @abc.abstractmethod def run_cmd(self, cmd: str): pass class SSHClientInternal(SSHClient): # export def __init__(self, hostname: str) -> None: super().__init__(hostname=hostname) def __ssh_connect(self): import paramiko # type: ignore # error: Library stubs not installed for "paramiko" ret = paramiko.SSHClient() ret.set_missing_host_key_policy(paramiko.AutoAddPolicy()) path_to_key=os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') ret.connect(self.__hostname, key_filename=path_to_key, allow_agent=True) s = ret.get_transport().open_session() # set up the agent request handler to handle agent requests from the server paramiko.agent.AgentRequestHandler(s) return ret @property def __ssh(self): if self.___ssh is None: self.___ssh = self.__ssh_connect(self.__server) return self.___ssh @property def __scp(self): return SCPClient(self.__ssh.get_transport()) def run_cmd(self, cmd: str): return self.__ssh.exec_command(find_cmd) class SSHClientCmd(SSHClient): # export def __init__(self, hostname: str) -> None: self.__askpass: str|None = None self.__askpass_orig: dict[str, str|None] = dict() super().__init__(hostname=hostname) def __del__(self): for key, val in self.__askpass_orig.items(): if key is None: del os.environ[key] else: os.setenv(key, val) if self.__askpass is not None: os.remove(self.__askpass) def __init_askpass(self): if self.__askpass is None and self.password is not None: import tempfile print(self.password) raise Exception('Huhu') prefix = os.path.basename(sys.argv[0]) + '-' f = NamedTemporaryFile(mode='w+t', prefix=prefix, delete=False) os.chmod(self.__askpass, 0o0700) self.__askpass = f.name f.write(f'#!/bin/bash\n\necho -n "{self.password}\n"') f.close() for key, val in {'SSH_ASKPASS': self.__askpass, 'SSH_ASKPASS_REQUIRE': 'force'}: self.__askpass_orig[key] = os.getenv(key) os.setenv(key, val) def run_cmd(self, cmd: str): self.__init_askpass() cmd_arr = ['ssh'] cmd_arr.append(self.hostname) return run_cmd(['ssh', self.hostname, cmd])