# -*- coding: utf-8 -*- from typing import Any import os, abc, sys from enum import Flag, auto from ..util import pretty_cmd from ..log import * from ..base import Result from ..ExecContext import ExecContext from urllib.parse import urlparse class SSHClient(ExecContext): class Caps(Flag): LogOutput = auto() Interactive = auto() Env = auto() Wd = auto() def __init__(self, uri: str, caps: Caps=Caps(0), *args, **kwargs) -> None: super().__init__(uri=uri, *args, **kwargs) self.__caps = caps try: parsed = urlparse(uri) except Exception as e: log(ERR, f'Failed to parse SSH URI "{uri}"') raise self.__hostname = parsed.hostname if self.__hostname is None: raise Exception(f'Can\'t parse host name from SSH URI "{uri}"') self.__port = parsed.port self.__password = parsed.password self.__username = parsed.username @abc.abstractmethod async def _run_ssh( cmd: list[str], wd: str|None, verbose: bool, cmd_input: bytes|None, env: dict[str, str]|None, interactive: bool, log_prefix: str ) -> Result: pass async def _run( self, cmd: list[str], wd: str|None, verbose: bool, cmd_input: bytes|None, env: dict[str, str]|None, interactive: bool, log_prefix: str ) -> Result: def __log(prio: int, *args): log(prio, log_prefix, *args) def __log_block(prio: int, title: str, block: str): if self.__caps & self.Caps.LogOutput: return encoding = sys.stdout.encoding or 'utf-8' block = block.decode(encoding).strip() if not block: return delim = f'---- {title} ----' __log(prio, f',{delim}') for line in block.splitlines(): __log(prio, '|', line) __log(prio, f'`{delim}') if wd is not None and not self.__caps & self.Caps.Wd: cmd = ['cd', wd, '&&', *cmd] if interactive and not self.__caps & self.Caps.Interactive: raise NotImplementedError('Interactive SSH is not yet implemented') if env is not None and not self.__caps & self.Caps.Env: raise NotImplementedError('Passing an environment to SSH commands is not yet implemented') ret = await self._run_ssh( cmd=cmd, wd=wd, verbose=verbose, cmd_input=cmd_input, env=env, interactive=interactive, log_prefix=log_prefix ) if verbose: __log_block(NOTICE, 'stdout', ret.stdout) __log_block(NOTICE, 'stderr', ret.stderr) if ret.status != 0: __log(WARNING, f'Exit code {ret.status}') return ret async def _sudo(self, cmd: list[str], mod_env: dict[str, str], opts: list[str], *args, **kwargs) -> Result: if self.username != 'root': cmd = ['sudo', *opts, *cmd] if mod_env: log(WARNING, f'Modifying environment over SSH is not implemented, ignored') return await self._run(cmd, *args, **kwargs) @property def hostname(self): return self.__hostname @property def port(self): return self.__port def set_password(self, password: str) -> None: self.__password = password @property def password(self) -> str: return self.__password def set_username(self, username: str) -> None: self.__username = username @property def username(self) -> str: return self.__username def ssh_client(*args, type=['AsyncSSH', 'Paramiko', 'Exec'], **kwargs) -> SSHClient: # export from importlib import import_module errors: list[str] = [] if isinstance(type, str): type = [type] for name in type: try: ret = getattr(import_module(f'jw.pkg.lib.ec.ssh.{name}'), name)(*args, **kwargs) log(INFO, f'Using SSH-client "{name}"') return ret except Exception as e: msg = f'Can\'t instantiate SSH client class {name} ({str(e)})' errors.append(msg) log(DEBUG, f'{msg}, trying next') msg = f'No working SSH clients for {" ".join(args)}' log(ERR, f'----- {msg}') for error in errors: log(ERR, error) raise Exception(msg)