from __future__ import annotations from typing import TYPE_CHECKING from ...base import InputMode from ...util import run_cmd from ..SSHClient import SSHClient as Base from .util import join_cmd if TYPE_CHECKING: from ...base import Result class Exec(Base): def __init__(self, uri, *args, **kwargs) -> None: self.__askpass: str|None = None self.__askpass_orig: dict[str, str|None] = dict() super().__init__( uri = uri, caps = self.Caps.Env, **kwargs ) def __del__(self): for key, val in self.__askpass_orig.items(): if val is None: del os.environ[key] else: os.environ[key] = val if self.__askpass is not None: os.remove(self.__askpass) def __init_askpass(self): if self.__askpass is None and self.password is not None: import sys, tempfile prefix = os.path.basename(sys.argv[0]) + '-' f = tempfile.NamedTemporaryFile(mode='w+t', prefix=prefix, delete=False) os.chmod(f.name, 0o0700) self.__askpass = f.name f.write(f'#!/bin/bash\n\necho -n "{self.password}\n"') f.close() for key, val in {'SSH_ASKPASS': self.__askpass, 'SSH_ASKPASS_REQUIRE': 'force'}.items(): self.__askpass_orig[key] = os.getenv(key) os.environ[key] = val async def _run_ssh( self, cmd: list[str], wd: str|None, verbose: bool, cmd_input: bytes|None, env: dict[str, str]|None, interactive: bool, log_prefix: str ) -> Result: self.__init_askpass() if cmd_input is None: cmd_input = InputMode.Interactive if self.interactive else InputMode.NonInteractive opts: dict[str, str] = [] if env: for key, val in env.items(): opts.extend(['-o', f'SetEnv {key}="{val}"']) return await run_cmd(['ssh', *opts, self.hostname, join_cmd(cmd)], cmd_input=cmd_input, throw=False)