2025-11-18 12:07:02 +01:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
2026-03-06 11:45:15 +01:00
|
|
|
from __future__ import annotations
|
2026-01-27 16:20:57 +01:00
|
|
|
|
2026-03-06 11:45:15 +01:00
|
|
|
from typing import TYPE_CHECKING, Iterable
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from typing import Sequence
|
|
|
|
|
from ExecContext import ExecContext
|
|
|
|
|
|
|
|
|
|
import os, sys, json
|
2026-01-27 16:20:57 +01:00
|
|
|
|
2025-11-18 12:07:02 +01:00
|
|
|
from argparse import Namespace
|
|
|
|
|
from urllib.parse import urlparse
|
2025-11-20 11:29:50 +01:00
|
|
|
from enum import Enum, auto
|
|
|
|
|
|
2026-01-27 16:20:57 +01:00
|
|
|
from .log import *
|
|
|
|
|
|
2025-11-20 11:29:50 +01:00
|
|
|
class AskpassKey(Enum):
|
|
|
|
|
Username = auto()
|
|
|
|
|
Password = auto()
|
2025-11-18 12:07:02 +01:00
|
|
|
|
2025-11-20 10:44:14 +01:00
|
|
|
def pretty_cmd(cmd: list[str], wd=None):
|
2025-11-18 12:07:02 +01:00
|
|
|
tokens = [cmd[0]]
|
|
|
|
|
for token in cmd[1:]:
|
|
|
|
|
if token.find(' ') != -1:
|
|
|
|
|
token = '"' + token + '"'
|
|
|
|
|
tokens.append(token)
|
2026-02-23 10:26:59 +01:00
|
|
|
ret = ' '.join(tokens)
|
2025-11-18 12:07:02 +01:00
|
|
|
if wd is not None:
|
2025-11-20 10:44:14 +01:00
|
|
|
ret += f' in {wd}'
|
|
|
|
|
return ret
|
|
|
|
|
|
2026-03-06 11:45:15 +01:00
|
|
|
# See ec.Local.run() for what this function does
|
|
|
|
|
async def run_cmd(*args, ec: ExecContext|None=None, **kwargs) -> tuple[str|bytes|None, str|bytes|None]:
|
|
|
|
|
if ec is not None:
|
|
|
|
|
ec = Local()
|
|
|
|
|
return await ec.run(*args, **kwargs)
|
2026-01-27 16:20:57 +01:00
|
|
|
|
|
|
|
|
async def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export
|
2025-11-18 12:07:02 +01:00
|
|
|
cmd = ['curl']
|
|
|
|
|
if not verbose:
|
|
|
|
|
cmd.append('-s')
|
|
|
|
|
cmd.extend(args)
|
2026-03-03 08:34:27 +01:00
|
|
|
ret, stderr, status = await run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input)
|
2025-11-18 12:07:02 +01:00
|
|
|
if parse_json:
|
2025-11-20 10:44:14 +01:00
|
|
|
try:
|
2026-03-03 08:34:27 +01:00
|
|
|
ret = json.loads(ret)
|
2025-11-20 10:44:14 +01:00
|
|
|
except Exception as e:
|
2026-02-27 15:45:08 +01:00
|
|
|
size = 'unknown number of'
|
2026-01-27 16:20:57 +01:00
|
|
|
try:
|
|
|
|
|
size = len(ret)
|
|
|
|
|
except:
|
|
|
|
|
pass
|
2026-02-27 15:45:08 +01:00
|
|
|
print(f'Failed to parse {size} bytes output of command '
|
|
|
|
|
+ f'>{pretty_cmd(cmd, wd)}< ({str(e)}): "{ret}"', file=sys.stderr)
|
2025-11-20 10:44:14 +01:00
|
|
|
raise
|
2026-03-03 08:34:27 +01:00
|
|
|
return ret, stderr, status
|
2025-11-18 12:07:02 +01:00
|
|
|
|
2026-01-27 16:20:57 +01:00
|
|
|
async def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None):
|
2025-11-20 11:29:50 +01:00
|
|
|
assert host is None # Currently unsupported
|
|
|
|
|
for var in askpass_env:
|
|
|
|
|
exe = os.getenv(var)
|
|
|
|
|
if exe is None:
|
|
|
|
|
continue
|
|
|
|
|
exe_arg = ''
|
|
|
|
|
match var:
|
|
|
|
|
case 'GIT_ASKPASS':
|
|
|
|
|
match key:
|
|
|
|
|
case AskpassKey.Username:
|
|
|
|
|
exe_arg += 'Username'
|
|
|
|
|
case AskpassKey.Password:
|
|
|
|
|
exe_arg += 'Password'
|
|
|
|
|
case 'SSH_ASKPASS':
|
|
|
|
|
match key:
|
|
|
|
|
case AskpassKey.Username:
|
|
|
|
|
continue # Can't get user name from SSH_ASKPASS
|
|
|
|
|
case AskpassKey.Password:
|
|
|
|
|
exe_arg += 'Password'
|
2026-03-03 08:34:27 +01:00
|
|
|
ret, stderr, status = await run_cmd([exe, exe_arg], throw=False)
|
2025-11-20 11:29:50 +01:00
|
|
|
if ret is not None:
|
|
|
|
|
return ret
|
|
|
|
|
return None
|
|
|
|
|
|
2026-02-20 18:36:32 +01:00
|
|
|
async def run_sudo(cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], interactive: bool=True, verbose=True):
|
lib.Distro, ExecContext: Add classes, refactor lib.distro
The code below lib.distro, as left behind by the previous commit, is
geared towards being directly used as a command-line API. This commit
introduces the abstract base class Distro, a proxy for
distribution-specific interactions. The proxy abstracts distro
specifics into an API with proper method prototypes, not
argparse.Namespace contents, and can thus be more easily driven by
arbitrary code.
The Distro class is initialized with a member variable of type
ExecContext, another new class introduced by this commit. It is
designed to abstract the communication channel to the distribution
instance. Currently only one specialization exists, Local, which
interacts with the distribution and root file system it is running
in, but is planned to be subclassed to support interaction via SSH,
serial, chroot, or chains thereof.
Signed-off-by: Jan Lindemann <jan@janware.com>
2026-03-05 17:33:52 +01:00
|
|
|
from .ec.Local import Local
|
|
|
|
|
ec = Local()
|
|
|
|
|
return await ec.sudo(cmd, mod_env, opts, interactive, verbose)
|
2026-02-17 10:19:57 +01:00
|
|
|
|
2026-01-27 16:20:57 +01:00
|
|
|
async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
|
2025-11-18 12:07:02 +01:00
|
|
|
url_user = None if url is None else urlparse(url).username
|
|
|
|
|
if args is not None:
|
|
|
|
|
if args.username is not None:
|
|
|
|
|
if url_user is not None and url_user != args.username:
|
|
|
|
|
raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"')
|
|
|
|
|
return args.username
|
2025-11-20 11:29:50 +01:00
|
|
|
if url_user is not None:
|
2025-11-18 12:07:02 +01:00
|
|
|
return url_user
|
2026-01-27 16:20:57 +01:00
|
|
|
return await run_askpass(askpass_env, AskpassKey.Username)
|
2025-11-18 12:07:02 +01:00
|
|
|
|
2026-01-27 16:20:57 +01:00
|
|
|
async def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
|
2025-11-18 12:07:02 +01:00
|
|
|
if args is None and url is None and not askpass_env:
|
|
|
|
|
raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password')
|
|
|
|
|
if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options
|
|
|
|
|
ret = getattr(args, 'password')
|
|
|
|
|
if ret is not None:
|
|
|
|
|
return ret
|
|
|
|
|
if url is not None:
|
|
|
|
|
parsed = urlparse(url)
|
|
|
|
|
if parsed.password is not None:
|
|
|
|
|
return parsed.password
|
2026-01-27 16:20:57 +01:00
|
|
|
return await run_askpass(askpass_env, AskpassKey.Password)
|
2026-02-18 14:18:26 +01:00
|
|
|
|
2026-02-19 07:33:59 +01:00
|
|
|
async def get_profile_env(throw: bool=True, keep: Iterable[str]|bool=False) -> dict[str, str]: # export
|
|
|
|
|
"""
|
|
|
|
|
Get a fresh environment from /etc/profile
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
keep:
|
|
|
|
|
- False -> Don't keep anything
|
|
|
|
|
- True -> Keep what's in the current environment
|
|
|
|
|
- List of strings -> Keep those variables
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dictionary with fresh environment
|
|
|
|
|
"""
|
|
|
|
|
env: dict[str,str]|None = None
|
|
|
|
|
if keep == False or isinstance(keep, Iterable):
|
|
|
|
|
env = {
|
|
|
|
|
'HOME': os.environ.get('HOME', '/'),
|
|
|
|
|
'USER': os.environ.get('USER', ''),
|
|
|
|
|
'PATH': '/usr/bin:/bin',
|
|
|
|
|
}
|
|
|
|
|
# Run bash as a login shell, which sources /etc/profile, then print environment as NUL-separated key=value pairs
|
|
|
|
|
cmd = ['/usr/bin/env', '-i', '/bin/bash', '-lc', 'env -0']
|
2026-03-03 08:34:27 +01:00
|
|
|
stdout, stderr, status = await run_cmd(cmd, throw=throw, output_encoding="bytes", verbose=True, env=env)
|
2026-02-18 14:18:26 +01:00
|
|
|
ret: dict[str, str] = {}
|
|
|
|
|
for entry in stdout.split(b"\0"):
|
|
|
|
|
if not entry:
|
|
|
|
|
continue
|
|
|
|
|
key, val = entry.split(b"=", 1)
|
|
|
|
|
ret[key.decode()] = val.decode()
|
2026-02-19 07:33:59 +01:00
|
|
|
if isinstance(keep, Iterable):
|
|
|
|
|
for key in keep:
|
|
|
|
|
val = os.getenv(key)
|
|
|
|
|
if val is not None:
|
|
|
|
|
ret[key] = val
|
2026-02-18 14:18:26 +01:00
|
|
|
return ret
|