from __future__ import annotations from typing import TYPE_CHECKING import paramiko # type: ignore # error: Library stubs not installed for "paramiko" from ...log import * from ...ExecContext import Result from ..SSHClient import SSHClient as Base from .util import join_cmd class Paramiko(Base): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.__timeout: float|None = None # Untested self.___ssh: Any|None = None def __ssh_connect(self): ret = paramiko.SSHClient() ret.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ret.connect( hostname=self.hostname, username=self.username, allow_agent=True ) except Exception as e: log(ERR, f'Failed to connect to {self.hostname} ({str(e)})') raise s = ret.get_transport().open_session() # set up the agent request handler to handle agent requests from the server paramiko.agent.AgentRequestHandler(s) return ret @property def __ssh(self): if self.___ssh is None: self.___ssh = self.__ssh_connect() return self.___ssh @property def __scp(self): return SCPClient(self.__ssh.get_transport()) async def _run_ssh(self, cmd: list[str], cmd_input: str|None, *args, **kwargs) -> Result: try: stdin, stdout, stderr = self.__ssh.exec_command(join_cmd(cmd), timeout=self.__timeout) except Exception as e: log(ERR, f'Command failed for {self.uri}: "{join_cmd(cmd)}"') raise if cmd_input is not None: stdin.write(cmd_input) exit_status = stdout.channel.recv_exit_status() return Result(stdout.read(), stderr.read(), exit_status)