App.distro_xxx: Move properties to Distro.xxx

Commit a19679fec reverted the first attempt to make AsyncSSH reuse
one connection during an instance lifetime. That failed because a lot
of distribution-specific properties were filled in a new event loop
thread started by AsyncRunner, and AsyncSSH didn't like that.

This commit is the first part of the solution: Move those properties
from the App class to the Distro class, and load the Distro class
in an async loader. As soon as it's instantiated, it can provide all
its properties without cluttering the code with async keywords.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-04-18 12:08:00 +02:00
commit aa7275f426
7 changed files with 241 additions and 231 deletions

View file

@ -211,21 +211,15 @@ class App(Base):
return True
return False
def __init__(self) -> None:
def __init__(self, distro: Distro|None=None) -> None:
super().__init__("jw-pkg swiss army knife", modules=["jw.pkg.cmds"])
# -- Members without default values
self.__opt_os: str|None = None
self.__opt_interactive: bool|None = None
self.__opt_verbose: bool|None = None
self.__top_name: str|None = None
self.__os_release: str|None = None
self.__distro_id: str|None = None
self.__distro_name: str|None = None
self.__distro_codename: str|None = None
self.__distro: Distro|None = None
self.__os_cascade: list[str]|None = None
self.__distro = distro
self.__res_cache = ResultCache()
self.__topdir: str|None = None
self.__pretty_topdir: str|None = None
@ -236,6 +230,10 @@ class App(Base):
self.__projs_root = pwd.getpwuid(os.getuid()).pw_dir + "/local/src/jw.dev/proj"
self.__pretty_projs_root = None
async def __init_async(self) -> None:
if self.__distro is None:
self.__distro = await Distro.instantiate(ec=self.exec_context, id=self.args.distro_id)
async def __aexit__(self, exc_type, exc, tb) -> None:
if self.__exec_context is not None:
await self.__exec_context.close()
@ -249,14 +247,12 @@ class App(Base):
+ 'one of "make:<var-name>", "unaltered", "absolute". Absolute topdir by default')
parser.add_argument('-p', '--prefix', default = None,
help='Parent directory of project source directories')
parser.add_argument('-O', '--os', default = None, help='Target operating system')
parser.add_argument('--distro-id', default=None, help='Distribution ID (default is taken from /etc/os-release)')
parser.add_argument('--interactive', choices=['true', 'false', 'auto'], default='true', help="Wait for user input or try to proceed unattended")
parser.add_argument('--verbose', action='store_true', default=False, help="Be verbose on stderr about what's being done on the distro level")
parser.add_argument('--uri', default='local', help="Run commands on this host")
async def _run(self, args: argparse.Namespace) -> None:
self.__opt_os = args.os
self.__topdir = args.topdir
self.__pretty_topdir = self.__format_topdir(self.__topdir, args.topdir_format)
self.__topdir_fmt = args.topdir_format
@ -267,6 +263,7 @@ class App(Base):
if args.prefix is not None:
self.__projs_root = args.prefix
self.__pretty_projs_root = args.prefix
await self.__init_async()
return await super()._run(args)
@property
@ -303,158 +300,15 @@ class App(Base):
def projs_root(self):
return self.__projs_root
@property
def os_release(self) -> str:
if self.__os_release is None:
release_file = '/etc/os-release'
try:
result = self.call_async(self.exec_context.get(release_file, throw=True))
self.__os_release = result.stdout.decode().strip()
except Exception as e:
log(INFO, f'Failed to read {release_file} ({str(e)}), falling back to uname')
result = self.call_async(
self.exec_context.run(
['uname', '-s'],
throw=False,
cmd_input=InputMode.NonInteractive
)
)
if result.status != 0:
log(ERR, f'/etc/os-release and uname both failed, the latter with exit status {result.status}')
raise
uname = result.decode().stdout.strip().lower()
self.__os_release = f'ID={uname}\nVERSION_CODENAME=unknown'
return self.__os_release
def os_release_field(self, key: str, throw: bool=False) -> str:
m = re.search(r'^\s*' + key + r'\s*=\s*("?)([^"\n]+)\1\s*$', self.os_release, re.MULTILINE)
if m is None:
if throw:
raise Exception(f'Could not read "{key}=" from /etc/os-release')
return None
return m.group(2)
@property
def distro_name(self) -> str:
if self.__distro_name is None:
self.__distro_name = self.os_release_field('NAME', throw=True)
return self.__distro_name
@property
def distro_id(self) -> str:
if self.__distro_id is None:
ret = self.args.distro_id # The distribution ID requested by the command line
if ret is None:
# The ID of the distribution we run on
ret = self.os_release_field('ID', throw=True)
match ret:
case 'opensuse-tumbleweed':
ret = 'suse'
case 'kali':
ret = 'kali'
self.__distro_id = ret
return self.__distro_id
@property
def distro_codename(self) -> str:
match self.distro_id:
case 'suse':
self.__distro_codename = \
self.os_release_field('ID', throw=True).split('-')[1]
case 'kali':
self.__distro_codename = \
self.os_release_field('VERSION_CODENAME', throw=True).split('-')[1]
case _:
self.__distro_codename = \
self.os_release_field('VERSION_CODENAME', throw=True)
return self.__distro_codename
@property
def distro_cascade(self) -> str:
return ' '.join(self.os_cascade())
@cached_property
def distro_pkg_ext(self) -> str:
for entry in self.os_cascade():
ret = entry.replace('pkg-', '')
if ret != entry:
return ret
raise RuntimeError(f'No package extension in found in {self.distro_cascade}')
@property
def distro_gnu_triplet(self) -> str:
import sysconfig
import shutil
import subprocess
# Best: GNU host triplet Python was built for
for key in ("HOST_GNU_TYPE", "BUILD_GNU_TYPE"): # BUILD_GNU_TYPE can exist too
ret = sysconfig.get_config_var(key)
if isinstance(ret, str) and ret:
return ret
# Common on Debian/Ubuntu: multiarch component (often looks like a triplet)
ret = sysconfig.get_config_var("MULTIARCH")
if isinstance(ret, str) and ret:
return ret
# Sometimes exposed (privately) by CPython
ret = getattr(sys.implementation, "_multiarch", None)
if isinstance(ret, str) and ret:
return ret
# Last resort: ask the system compiler
for cc in ("gcc", "cc", "clang"):
path = shutil.which(cc)
if not path:
continue
try:
ret = subprocess.check_output([path, "-dumpmachine"], text=True, stderr=subprocess.DEVNULL).strip()
if ret:
return ret
except Exception:
pass
raise RuntimeError('Failed to get GNU triplet from running machine')
@property
def distro(self) -> Distro:
if self.__distro is None:
ret = Distro.instantiate(self.distro_id, ec=self.exec_context)
self.__distro = ret
raise Exception('No distro object')
return self.__distro
def distro_info(self, fmt: str|Iterable) -> str|Iterable:
if not isinstance(fmt, str):
ret: list[str] = []
for entry in fmt:
ret.append(self.distro_info(entry))
return ret
ret = fmt
for macro in re.findall("%{([A-Za-z_-]+)}", fmt):
try:
name = 'distro_' + macro.replace('-', '_')
if name == 'distro_info':
continue
val = getattr(self, name)
patt = r'%{' + macro + r'}'
if ret.find(patt) == -1:
continue
ret = ret.replace(patt, val)
except Exception as e:
log(ERR, f'Failed to expand macro "{macro}" inside "{fmt}": {str(e)}')
raise
return ret
def find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True):
return self.__find_dir(name, search_subdirs, search_absdirs, pretty)
@lru_cache(maxsize=None)
def get_os(self) -> str:
return self.distro_id + '-' + self.distro_codename
# TODO: add support for customizing this in project.conf
def htdocs_dir(self, project: str) -> str:
return self.find_dir(project, ["/src/html/htdocs", "/tools/html/htdocs", "/htdocs"],
@ -464,59 +318,6 @@ class App(Base):
def tmpl_dir(self, name: str) -> str:
return self.find_dir(name, ["/tmpl"], ["/opt/" + name + "/share/tmpl"])
def os_cascade(self) -> list[str]:
def __append(entry: str):
if not entry in ret:
ret.append(entry)
if self.__os_cascade is None:
ret = [ 'os' ]
match self.distro_id:
case 'centos':
__append('linux')
__append('pkg-rpm')
__append('pm-yum')
__append('redhat')
__append('rhel')
case 'fedora' | 'rhel':
__append('linux')
__append('pkg-rpm')
__append('pm-yum')
__append('redhat')
case 'suse':
__append('linux')
__append('pkg-rpm')
__append('pm-zypper')
case 'kali' | 'raspbian':
__append('linux')
__append('pkg-debian')
__append('pm-apt')
__append('debian')
case 'ubuntu':
__append('linux')
__append('pkg-debian')
__append('pm-apt')
case 'archlinux':
__append('linux')
__append('pkg-pm')
__append('pm-pacman')
os = self.__opt_os if self.__opt_os is not None else self.get_os()
name = re.sub(r'-.*', '', os)
series = os
rx = re.compile(r'\.[0-9]+$')
while True:
n = re.sub(rx, '', series)
if n == series:
break
ret.append(n)
series = n
__append(name)
__append(os)
__append(self.distro_id)
# e.g. os, linux, suse, suse-tumbleweed
self.__os_cascade = ret
return self.__os_cascade
def strip_module_from_spec(self, mod):
return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip())
@ -660,7 +461,7 @@ class App(Base):
log(DEBUG, "checking if project " + project + " is excluded from build")
exclude = self.get_project_refs([ project ], ['build'], 'exclude',
scope = Scope.One, add_self=False, names_only=True)
cascade = self.os_cascade() + [ 'all' ]
cascade = self.distro.os_cascade + [ 'all' ]
for p1 in exclude:
for p2 in cascade:
if p1 == p2:

View file

@ -3,20 +3,19 @@
from argparse import Namespace, ArgumentParser
from ...lib.log import *
from ...lib.Distro import Distro
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdInfo(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'info', help='Print information about target platform')
super().__init__(parent, 'info', help='Retrieve information about target platform')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
macro_names = ', '.join(['%%{' + name.replace('distro_', '').replace('_', '-') + '}'
for name in dir(self.app) if name.startswith('distro_')])
parser.add_argument('--format', default='%{cascade}',
help=f'Format string, expanding macros {macro_names}')
help=f'Format string, expanding macros {", ".join(Distro.macros())}')
async def _run(self, args: Namespace) -> None:
print(self.app.distro_info(args.format))
print(self.app.distro.expand_macros(args.format))

View file

@ -15,8 +15,8 @@ class CmdInstall(Cmd): # export
parser.add_argument("names", nargs="*", help="Packages to be installed")
parser.add_argument('--only-update', default=False, action='store_true', help='Only update the listed packages, don\'t install them')
parser.add_argument('-F', '--fixed-strings', action='store_true',
help='Don\'t expand platform info macros in <names>')
help='Don\'t expand platform.expand_macros macros in <names>')
async def _run(self, args: Namespace) -> None:
names = names if args.fixed_strings else self.app.distro_info(args.names)
names = names if args.fixed_strings else self.app.distro.expand_macros(args.names)
return await self.distro.install(names, only_update=args.only_update)

View file

@ -23,12 +23,12 @@ class CmdCopy(Cmd): # export
parser.add_argument('-g', '--group', default=None, help='Destination file group')
parser.add_argument('-m', '--mode', default=None, help='Destination file mode')
parser.add_argument('-F', '--fixed-strings', action='store_true',
help='Don\'t expand platform info macros in <src> and <dst>')
help='Don\'t expand platform.expand_macros macros in <src> and <dst>')
async def _run(self, args: Namespace) -> None:
def __expand(url: str) -> str:
if args.fixed_strings:
return url
return self.app.distro_info(url)
return self.app.distro.expand_macros(url)
await copy(__expand(args.src), __expand(args.dst),
owner=args.owner, group=args.group, mode=int(args.mode, 0))

View file

@ -37,7 +37,7 @@ class BaseCmdPkgRelations(Cmd):
) -> list[str]:
if subsections is None:
subsections = self.app.os_cascade()
subsections = self.app.distro.os_cascade
subsections.append('jw')
expand_semver_revision_range = expand_semver_revision_range

View file

@ -38,7 +38,7 @@ class CmdRequiredOsPkg(Cmd): # export
for d in deps:
if self.app.is_excluded_from_build(d) is not None:
deps.remove(d)
subsecs = self.app.os_cascade()
subsecs = self.app.distro.os_cascade
log(DEBUG, "subsecs = ", subsecs)
requires: set[str] = set()
for sec in subsecs:

View file

@ -3,32 +3,86 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from functools import cached_property
if TYPE_CHECKING:
import Iterable
import abc, importlib
import abc, importlib, re
from .ExecContext import ExecContext
from .base import Result
from .base import Result, InputMode
from .Package import Package
from .log import *
class Distro(abc.ABC):
def __init__(self, ec: ExecContext):
def __init__(self, ec: ExecContext, id: str|None=None, os_release_str: str|None=None):
assert ec is not None
assert id is not None
self.__exec_context = ec
self.__os_release_str: str|None = os_release_str
self.__id: str|None = None
def __set_id(self, id: str) -> None:
self.__id = id
# Names that can be used by code outside this class to retrieve
# distribution properties by
# getattr(instance, name.replace('-', '_'))
macro_names = [
'os',
'id',
'name',
'codename',
'gnu-triplet',
'os-cascade',
'os-release',
'pkg-ext',
]
# == Load
@classmethod
def instantiate(cls, distro_id: str, *args, **kwargs):
backend_id = distro_id.lower().replace('-', '_')
async def read_os_release_str(cls, ec: ExecContext) -> None:
release_file = '/etc/os-release'
try:
result = await ec.get(release_file, throw=True)
ret = result.stdout.decode().strip()
except Exception as e:
log(INFO, f'Failed to read {release_file} ({str(e)}), falling back to uname')
result = await ec.run(
['uname', '-s'],
throw=False,
cmd_input=InputMode.NonInteractive
)
if result.status != 0:
log(ERR, f'/etc/os-release and uname both failed, the latter with exit status {result.status}')
raise
uname = result.decode().stdout.strip().lower()
ret = f'ID={uname}\nVERSION_CODENAME=unknown'
return ret
@classmethod
def parse_os_release_field(self, key: str, os_release_str: str, throw: bool=False) -> str:
m = re.search(r'^\s*' + key + r'\s*=\s*("?)([^"\n]+)\1\s*$', os_release_str, re.MULTILINE)
if m is None:
if throw:
raise Exception(f'Could not read "{key}=" from /etc/os-release')
return None
return m.group(2)
@classmethod
def parse_os_release_field_id(cls, os_release_str: str, throw: bool=False) -> str:
ret = cls.parse_os_release_field('ID', os_release_str, throw=throw)
match ret:
case 'opensuse-tumbleweed':
return 'suse'
return ret
@classmethod
async def instantiate(cls, ec: ExecContext, *args, id: str|None=None, os_release_str: str|None=None, **kwargs):
if id is None:
os_release_str = await cls.read_os_release_str(ec)
id = cls.parse_os_release_field_id(os_release_str, throw=True)
backend_id = id.lower().replace('-', '_')
match backend_id:
case 'ubuntu' | 'raspbian' | 'kali':
backend_id = 'debian'
@ -43,16 +97,172 @@ class Distro(abc.ABC):
log(ERR, f'Failed to import Distro module {module_path} ({str(e)})')
raise
cls = getattr(module, 'Distro')
ret = cls(*args, **kwargs)
ret.__set_id(backend_id)
ret = cls(ec, *args, id=id, os_release_str=os_release_str, **kwargs)
return ret
def os_release_field(self, key: str, throw: bool=False) -> str:
return self.parse_os_release_field(key, self.os_release_str, throw)
async def cache(self) -> None:
if self.__os_release_str is None:
self.__os_release_str = await self.read_os_release_str(self.__exec_context)
@cached_property
def os_cascade(self) -> list[str]:
def __append(entry: str):
if not entry in ret:
ret.append(entry)
ret = [ 'os' ]
match self.id:
case 'centos':
__append('linux')
__append('pkg-rpm')
__append('pm-yum')
__append('redhat')
__append('rhel')
case 'fedora' | 'rhel':
__append('linux')
__append('pkg-rpm')
__append('pm-yum')
__append('redhat')
case 'suse':
__append('linux')
__append('pkg-rpm')
__append('pm-zypper')
case 'kali' | 'raspbian':
__append('linux')
__append('pkg-debian')
__append('pm-apt')
__append('debian')
case 'ubuntu':
__append('linux')
__append('pkg-debian')
__append('pm-apt')
case 'archlinux':
__append('linux')
__append('pkg-pm')
__append('pm-pacman')
os = self.os
name = re.sub(r'-.*', '', os)
series = os
rx = re.compile(r'\.[0-9]+$')
while True:
n = re.sub(rx, '', series)
if n == series:
break
ret.append(n)
series = n
__append(name)
__append(os)
__append(self.id)
# e.g. os, linux, suse, suse-tumbleweed
return ret
@cached_property
def cascade(self) -> str:
return ' '.join(self.os_cascade)
@property
def os_release_str(self) -> str:
if self.__os_release_str is None:
raise Exception(f'Tried to access OS release from an incompletely loaded Distro instance. Call reacache() before')
return self.__os_release_str
@cached_property
def name(self) -> str:
return self.os_release_field('NAME', throw=True)
@cached_property
def id(self) -> str:
return self.parse_os_release_field_id(self.__os_release_str, throw=True)
@cached_property
def codename(self) -> str:
match self.id:
case 'suse':
return self.os_release_field('ID', throw=True).split('-')[1]
case 'kali':
return self.os_release_field('VERSION_CODENAME', throw=True).split('-')[1]
case _:
return self.os_release_field('VERSION_CODENAME', throw=True)
raise NotImplementedError(f'Can\'t determine code name from distribution ID {self.id}')
@cached_property
def os(self) -> str:
return self.id + '-' + self.codename
@cached_property
def pkg_ext(self) -> str:
for entry in self.os_cascade():
ret = entry.replace('pkg-', '')
if ret != entry:
return ret
raise RuntimeError(f'No package extension in found in {self.os_cascade}')
@cached_property
def gnu_triplet(self) -> str:
import sysconfig
import shutil
import subprocess
# Best: GNU host triplet Python was built for
for key in ("HOST_GNU_TYPE", "BUILD_GNU_TYPE"): # BUILD_GNU_TYPE can exist too
ret = sysconfig.get_config_var(key)
if isinstance(ret, str) and ret:
return ret
# Common on Debian/Ubuntu: multiarch component (often looks like a triplet)
ret = sysconfig.get_config_var("MULTIARCH")
if isinstance(ret, str) and ret:
return ret
# Sometimes exposed (privately) by CPython
ret = getattr(sys.implementation, "_multiarch", None)
if isinstance(ret, str) and ret:
return ret
# Last resort: ask the system compiler
for cc in ("gcc", "cc", "clang"):
path = shutil.which(cc)
if not path:
continue
try:
ret = subprocess.check_output([path, "-dumpmachine"], text=True, stderr=subprocess.DEVNULL).strip()
if ret:
return ret
except Exception:
pass
raise RuntimeError('Failed to get GNU triplet from running machine')
@classmethod
def macros(cls) -> list[str]:
return ['%%{' + name + '}' for name in cls.macro_names]
def expand_macros(self, fmt: str|Iterable) -> str|Iterable:
if not isinstance(fmt, str):
ret: list[str] = []
for entry in fmt:
ret.append(self.expand_macros(entry))
return ret
ret = fmt
for macro in re.findall("%{([A-Za-z_-]+)}", fmt):
try:
name = macro.replace('-', '_')
val = getattr(self, name)
patt = r'%{' + macro + r'}'
if ret.find(patt) == -1:
continue
ret = ret.replace(patt, val)
except Exception as e:
log(ERR, f'Failed to expand macro "{macro}" inside "{fmt}": {str(e)}')
raise
return ret
# == Convenience methods
@property
def id(self) -> str:
return self.__id
@property
def ctx(self) -> ExecContext:
return self.__exec_context
@ -67,7 +277,7 @@ class Distro(abc.ABC):
def interactive(self) -> bool:
return self.__exec_context.interactive
# == Distribution specific methods
# == Distribution abstraction methods
# -- ref