lib.Distro, ExecContext: Add classes, refactor lib.distro

The code below lib.distro, as left behind by the previous commit, is
geared towards being directly used as a command-line API. This commit
introduces the abstract base class Distro, a proxy for
distribution-specific interactions. The proxy abstracts distro
specifics into an API with proper method prototypes, not
argparse.Namespace contents, and can thus be more easily driven by
arbitrary code.

The Distro class is initialized with a member variable of type
ExecContext, another new class introduced by this commit. It is
designed to abstract the communication channel to the distribution
instance.  Currently only one specialization exists, Local, which
interacts with the distribution and root file system it is running
in, but is planned to be subclassed to support interaction via SSH,
serial, chroot, or chains thereof.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-03-05 17:33:52 +01:00
commit 3e897f4df8
55 changed files with 426 additions and 720 deletions

View file

@ -116,7 +116,7 @@ ifeq ($(TIME),)
endif
JW_PKG_PY_PROJECTS = $(TIME) $(JW_PKG_PY) projects
JW_PKG_PY_BUILD = $(JW_PKG_PY_PROJECTS) build $(JW_PKG_PY_EXTRA_BUILD_OPTS)
PKG_MANAGER ?= $(TIME) $(JW_PKG_PY) distro --interactive=$(INTERACTIVE)
PKG_MANAGER ?= $(TIME) $(JW_PKG_PY) --interactive=$(INTERACTIVE) distro
ifneq ($(origin PROJECTS_DIR_REMOTE_BASE),undefined)
PGIT_SH += --remote-base $(PROJECTS_DIR_REMOTE_BASE)

View file

@ -22,7 +22,7 @@ GIT_MAIN_BRANCH ?= master
OPT_JANWARE_PROJECT ?= -j
INTERACTIVE ?= auto
PKG_MANAGER ?= $(JW_PKG_PY) distro --interactive=$(INTERACTIVE)
PKG_MANAGER ?= $(JW_PKG_PY) --interactive=$(INTERACTIVE) distro
ifeq ($(OPT_JANWARE_PROJECT),-j)
REMOTE_GIT_FLAVOUR ?= proj

View file

@ -265,6 +265,13 @@ class App(Base):
self.__opt_interactive = sys.stdin.isatty()
return self.__opt_interactive
@property
def exec_context(self) -> str:
if self.__exec_context is None:
from .lib.ec.Local import Local
self.__exec_context = Local(interactive=self.interactive)
return self.__exec_context
@property
def top_name(self):
return self.__top_name

View file

@ -3,6 +3,7 @@
import re, sys
from argparse import ArgumentParser
from ..lib.Distro import Distro
from ..App import App
from .Cmd import Cmd as Base
@ -12,7 +13,16 @@ class DistroBase(Base): # export
self.__id = None
super().__init__(parent, name, help)
self._add_subcommands()
self.__interactive: bool|None = None
self.__distro: Distro|None = None
@property
def distro(self) -> Distro:
if self.__distro is None:
ret = Distro.instantiate(self.distro_id, ec=self.app.exec_context)
self.__distro = ret
return self.__distro
# --------------- legacy methods
@property
def distro_id(self):
@ -28,19 +38,6 @@ class DistroBase(Base): # export
def add_arguments(self, p: ArgumentParser) -> None:
super().add_arguments(p)
p.add_argument('--id', default=None, help='Distribution ID (default is taken from /etc/os-release)')
p.add_argument('--interactive', choices=['true', 'false', 'auto'], default='true', help="Wait for user input or try to proceed unattended")
@property
def interactive(self) -> bool:
if self.__interactive is None:
match self.app.args.interactive:
case 'true':
self.__interactive = True
case 'false':
self.__interactive = False
case 'auto':
self.__interactive = sys.stdin.isatty()
return self.__interactive
async def _run(self, args):
# Missing subcommand

View file

@ -1,64 +1,28 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
import os, importlib
from ...lib.log import *
from ...lib.distros.Util import Util
if TYPE_CHECKING:
from ...lib.Distro import Distro
from ..Cmd import Cmd as Base
from ..CmdDistro import CmdDistro
class Cmd(Base): # export
from ...lib.distros.Backend import Backend
def __init__(self, parent: CmdDistro, name: str, help: str) -> None:
super().__init__(parent, name, help)
self.__backend_path: str|None = None
self.__util: Util|None = None
self.__backend: Backend|None = None
@property
def distro(self) -> Distro:
return self.parent.distro
@property
def distro_id(self) -> str:
return self.parent.distro_id
@property
def interactive(self) -> bool:
return self.parent.interactive
@property
def _backend_path(self):
if self.__backend_path is None:
backend_id = self.parent.distro_id.lower().replace('-', '_')
match backend_id:
case 'ubuntu' | 'raspbian' | 'kali':
backend_id = 'debian'
case 'centos':
backend_id = 'redhat'
case 'opensuse' | 'suse':
backend_id = 'suse'
self.__backend_path = 'jw.pkg.lib.distros.' + backend_id + '.'
return self.__backend_path
def _instantiate(self, name: str, *args, **kwargs):
module_path = self._backend_path + name
try:
module = importlib.import_module(module_path)
except Exception as e:
log(ERR, f'Failed to import module {module_path} ({str(e)})')
raise
cls = getattr(module, name)
return cls(self, *args, **kwargs)
@property
def util(self) -> Util:
if self.__util is None:
self.__util = self._instantiate('Util')
return self.__util
@property
def _backend(self) -> Backend:
if self.__backend is None:
name = self.__class__.__name__[3:] # Get rid of "Cmd"
self.__backend = self._instantiate(name)
return self.__backend

View file

@ -15,4 +15,4 @@ class CmdDelete(Cmd): # export
parser.add_argument("names", nargs="*", help="Names of packages to be deleted")
async def _run(self, args: Namespace) -> None:
return await self._backend.run(args)
return await self.distro.delete(args.names)

View file

@ -14,6 +14,5 @@ class CmdDup(Cmd): # export
super().add_arguments(parser)
parser.add_argument('--download-only', default=False, action='store_true',
help='Only download packages from the repos, don\'t install them, yet')
async def _run(self, args: Namespace) -> None:
return await self._backend.run(args)
return await self.distro.dup(download_only=args.download_only)

View file

@ -12,8 +12,8 @@ class CmdInstall(Cmd): # export
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument("packages", nargs="*", help="Packages to be installed")
parser.add_argument("names", nargs="*", help="Packages to be installed")
parser.add_argument('--only-update', default=False, action='store_true', help='Only update the listed packages, don\'t install them')
async def _run(self, args: Namespace) -> None:
return await self._backend.run(args)
return await self.distro.install(args.names, only_update=args.only_update)

View file

@ -15,4 +15,4 @@ class CmdRebootRequired(Cmd): # export
parser.add_argument('--verbose', default=False, action='store_true', help='Be chatty about the check')
async def _run(self, args: Namespace) -> None:
return await self._backend.run(args)
return await self.distro.reboot_required(verbose=args.verbose)

View file

@ -14,4 +14,4 @@ class CmdRefresh(Cmd): # export
super().add_arguments(parser)
async def _run(self, args: Namespace) -> None:
return await self._backend.run(args)
return await self.distro.ref()

View file

@ -4,7 +4,6 @@ from argparse import Namespace, ArgumentParser
import re
from ...lib.Package import Package
from ...lib.distros.BeSelect import BeSelect
from ..CmdDistro import CmdDistro
from .Cmd import Cmd
@ -27,5 +26,5 @@ class CmdSelect(Cmd): # export
async def _run(self, args: Namespace) -> None:
# TODO: Semantics probably change heavily in the future
for p in self.filter_packages(args.filter, await self._backend.all_installed_packages):
for p in self.filter_packages(args.filter, await self.distro.select()):
print(p.name)

View file

@ -7,15 +7,9 @@ from ..CmdPkg import CmdPkg as Parent
class Cmd(Base): # export
from ....lib.distros.Backend import Backend
def __init__(self, parent: Parent, name: str, help: str) -> None:
super().__init__(parent, name, help)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('names', nargs='*', help='Package names')
@property
def _backend(self) -> Backend:
return self.parent._backend

View file

@ -15,4 +15,4 @@ class CmdLs(Cmd): # export
async def _run(self, args: Namespace) -> None:
for name in args.names:
print('\n'.join(await self._backend.files(name)))
print('\n'.join(await self.parent.distro.pkg_files(name)))

View file

@ -14,10 +14,8 @@ class CmdMeta(Cmd): # export
super().add_arguments(parser)
async def _run(self, args: Namespace) -> None:
for name in args.names:
packages = await self._backend.meta_data([name])
if packages:
assert len(packages) == 1
if len(args.names) > 1:
print(f'-- {name}')
print(packages[0])
packages = await self.distro.select(args.names)
for package in packages:
if len(args.names) > 1:
print(f'-- {name}')
print(package)

View file

@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import Iterable
import abc, importlib
from .ExecContext import ExecContext, Result
from .Package import Package
from .log import *
class Distro(abc.ABC):
def __init__(self, ec: ExecContext):
assert ec is not None
self.__exec_context = ec
# == Load
@classmethod
def instantiate(cls, distro_id: str, *args, **kwargs):
backend_id = distro_id.lower().replace('-', '_')
match backend_id:
case 'ubuntu' | 'raspbian' | 'kali':
backend_id = 'debian'
case 'centos':
backend_id = 'redhat'
case 'opensuse' | 'suse':
backend_id = 'suse'
module_path = 'jw.pkg.lib.distros.' + backend_id + '.Distro'
try:
module = importlib.import_module(module_path)
except Exception as e:
log(ERR, f'Failed to import Distro module {module_path} ({str(e)})')
raise
cls = getattr(module, 'Distro')
return cls(*args, **kwargs)
# == Convenience methods
@property
def ctx(self) -> ExecContext:
return self.__exec_context
def run(self, *args, **kwargs) -> Result:
return self.__exec_context.run(*args, **kwargs)
def sudo(self, *args, **kwargs) -> Result:
return self.__exec_context.sudo(*args, **kwargs)
@property
def interactive(self) -> bool:
return self.__exec_context.interactive
# == Distribution specific methods
# -- ref
@abc.abstractmethod
async def _ref(self) -> None:
pass
async def ref(self) -> None:
return await self._ref()
# -- dup
@abc.abstractmethod
async def _dup(self, download_only: bool) -> None:
pass
async def dup(self, download_only: bool=False) -> None:
return await self._dup(download_only=download_only)
# -- reboot_required
@abc.abstractmethod
async def _reboot_required(self, verbose: bool) -> bool:
pass
async def reboot_required(self, verbose: bool=False) -> bool:
return await self._reboot_required(verbose=verbose)
# -- select
@abc.abstractmethod
async def _select(self, names: Iterable[str]) -> Iterable[Package]:
pass
async def select(self, names: Iterable[str] = []) -> Iterable[Package]:
return await self._select(names)
# -- install
@abc.abstractmethod
async def _install(self, names: Iterable[str], only_update: bool) -> None:
pass
async def install(self, names: Iterable[str], only_update: bool=False) -> None:
return await self._install(names, only_update=only_update)
# -- delete
@abc.abstractmethod
async def _delete(self, names: Iterable[str]) -> None:
pass
async def delete(self, names: Iterable[str]) -> None:
return await self._delete(names)
# -- pkg_files
@abc.abstractmethod
async def _pkg_files(self, name: str) -> Iterable[str]:
pass
async def pkg_files(self, name: str) -> Iterable[str]:
return await self._pkg_files(name)

View file

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
import abc
from typing import NamedTuple
class Result(NamedTuple):
stdout: str|None
stderr: str|None
status: int|None
class ExecContext(abc.ABC):
def __init__(self, interactive: bool=True):
self.__interactive = interactive
@property
def interactive(self):
return self.__interactive
@abc.abstractmethod
async def _run(self, *args, **kwargs) -> Result:
pass
async def run(self, *args, **kwargs) -> Result:
return await self._run(*args, **kwargs)
@abc.abstractmethod
async def _sudo(self, cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], verbose=True) -> Result:
pass
async def sudo(self, cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], verbose=True) -> Result:
return await self._sudo(cmd, mod_env, opts, verbose)

View file

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from ..util import run_sudo
if TYPE_CHECKING:
from ..Cmd import Cmd
class Backend:
def __init__(self, parent: Cmd):
self.__parent = parent
async def _sudo(self, *args, **kwargs):
return await run_sudo(*args, interactive=self.interactive, **kwargs)
@property
def util(self):
return self.__parent.util
@property
def parent(self):
return self.__parent
@property
def interactive(self) -> bool:
return self.__parent.interactive

View file

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
import abc
from argparse import Namespace
from .Backend import Backend as Base
from ..Cmd import Cmd as Parent
class BeDelete(Base):
def __init__(self, parent: Parent):
super().__init__(parent)
@abc.abstractmethod
async def run(self, args: Namespace) -> None:
pass

View file

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
import abc
from argparse import Namespace
from .Backend import Backend as Base
from ..Cmd import Cmd as Parent
class BeDup(Base):
def __init__(self, parent: Parent):
super().__init__(parent)
@abc.abstractmethod
async def run(self, args: Namespace) -> None:
pass

View file

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
import abc
from argparse import Namespace
from .Backend import Backend as Base
from ..Cmd import Cmd as Parent
class BeInstall(Base):
def __init__(self, parent: Parent):
super().__init__(parent)
@abc.abstractmethod
async def run(self, args: Namespace) -> None:
pass

View file

@ -1,31 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Iterable, TYPE_CHECKING
import abc
from ..Package import Package
from .Backend import Backend as Base
if TYPE_CHECKING:
from ..Cmd import Cmd as Parent
class BePkg(Base):
def __init__(self, parent: Parent):
super().__init__(parent)
async def files(self, name: str) -> Iterable[str]:
return await self._files(name)
@abc.abstractmethod
async def _files(self, name: str) -> Iterable[str]:
pass
async def meta_data(self, names: Iterable[str]) -> Iterable[Package]:
return await self._meta_data(names)
@abc.abstractmethod
async def _meta_data(self, names: Iterable[str]) -> Iterable[Package]:
pass

View file

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
import abc
from argparse import Namespace
from .Backend import Backend as Base
from ..Cmd import Cmd as Parent
class BeRebootRequired(Base):
def __init__(self, parent: Parent):
super().__init__(parent)
@abc.abstractmethod
async def run(self, args: Namespace) -> None:
pass

View file

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
import abc
from argparse import Namespace
from .Backend import Backend as Base
from ..Cmd import Cmd as Parent
class BeRefresh(Base):
def __init__(self, parent: Parent):
super().__init__(parent)
@abc.abstractmethod
async def run(self, args: Namespace) -> None:
pass

View file

@ -1,26 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
import abc
from typing import Iterable
from .Backend import Backend as Base
if TYPE_CHECKING:
from ..Cmd import Cmd as Parent
from .Package import Package
class BeSelect(Base):
def __init__(self, parent: Parent):
super().__init__(parent)
@property
async def all_installed_packages(self) -> Iterable[Package]:
return await self._all_installed_packages()
@abc.abstractmethod
async def _all_installed_packages(self) -> Iterable[Package]:
pass

View file

@ -1,25 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from ..util import run_sudo
if TYPE_CHECKING:
from ..Cmd import Cmd
class Util:
def __init__(self, parent: Cmd):
self.__parent = parent
async def _sudo(self, *args, **kwargs):
return await run_sudo(*args, interactive=self.interactive, **kwargs)
@property
def parent(self):
return self.__parent
@property
def interactive(self) -> bool:
return self.__parent.interactive

View file

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from ...Distro import Distro as Base
if TYPE_CHECKING:
from typing import Iterable
from ...ExecContext import Result
from ...Package import Package
class Distro(Base):
async def pacman(self, args: list[str]) -> Result:
cmd = ['/usr/bin/pacman']
if not self.interactive:
cmd.extend(['--noconfirm'])
cmd.extend(args)
return await self.sudo(cmd)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _ref(self) -> None:
raise NotImplementedError('distro refresh is not yet implemented for Arch-like distributions')
async def _dup(self, download_only: bool) -> None:
args = ['-Su']
if args.download_only:
args.append('-w')
return await self.util.pacman(args)
async def _reboot_required(self, verbose: bool) -> bool:
raise NotImplementedError('distro reboot-required is not yet implemented for Arch-like distributions')
async def _select(self, names: Iterable[str]) -> Iterable[Package]:
return await query_packages(names)
async def _install(self, names: Iterable[str], only_update: bool) -> None:
if only_update:
raise NotImplementedError('--only-update is not yet implemented for pacman')
args = ['-S', '--needed']
args.extend(args.packages)
await self.util.pacman(args)
async def _delete(self, names: Iterable[str]) -> None:
raise NotImplementedError('distro delete not yet implemented for Arch-like distributions')
async def _pkg_files(self, name: str) -> Iterable[str]:
raise NotImplementedError('distro pkg ls yet implemented for Arch-like distributions')

View file

@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeDup import BeDup as Base
class Dup(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
pm_args = ['-Su']
if args.download_only:
pm_args.append('-w')
return await self.util.pacman(pm_args)

View file

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeInstall import BeInstall as Base
class Install(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
if args.only_update:
raise NotImplementedError('--only-update is not yet implemented for pacman')
pacman_args = ['-S', '--needed']
pacman_args.extend(args.packages)
await self.util.pacman(*pacman_args)

View file

@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeRefresh import BeRefresh as Base
class Refresh(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
raise NotImplementedError('distro refresh is not yet implemented for Arch-like distributions')

View file

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
from ...Cmd import Cmd
from ..Util import Util as Base
class Util(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def pacman(self, *args):
cmd = ['/usr/bin/pacman']
if not self.interactive:
cmd.extend(['--noconfirm'])
cmd.extend(args)
return await self._sudo(cmd)

View file

@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeDelete import BeDelete as Base
class Delete(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
return await self.util.dpkg(['-P', *args.names], sudo=True)

View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
import os
from ...log import *
from ...Distro import Distro as Base
from ...pm.dpkg import run_dpkg, run_dpkg_query, query_packages, list_files
if TYPE_CHECKING:
from typing import Iterable
from ...ExecContext import Result
from ...Package import Package
class Distro(Base):
async def apt_get(self, args: list[str]):
cmd = ['/usr/bin/apt-get']
mod_env = None
if not self.interactive:
cmd.extend(['--yes', '--quiet'])
mod_env = { 'DEBIAN_FRONTEND': 'noninteractive' }
cmd.extend(args)
return await self.sudo(cmd, mod_env=mod_env)
async def dpkg(self, *args, **kwargs):
return await run_dpkg(*args, **kwargs)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _ref(self) -> None:
return await self.apt_get(['update'])
async def _dup(self, download_only: bool) -> None:
args: list[str] = []
if download_only:
args.append('--download-only')
args.append('upgrade')
return await self.apt_get(args)
async def _reboot_required(self, verbose: bool) -> bool:
reboot_required = '/run/reboot_required'
if os.path.exists(reboot_required):
if verbose:
log(NOTICE, f'Yes. {reboot_required} exists.')
required_pkgs = '/run/reboot-required.pkgs'
if os.path.exists(required_pkgs):
with open(required_pkgs, 'r') as f:
content = f.read()
print(f'-- From {required_pkgs}:')
print(content.strip())
return True
if verbose:
log(NOTICE, f'No. {reboot_required} doesn\'t exist.')
return False
async def _select(self, names: Iterable[str]) -> Iterable[Package]:
return await query_packages(names)
async def _install(self, names: Iterable[str], only_update: bool) -> None:
args = ['install']
if only_update:
args.append('--only-upgrade')
args.append('--no-install-recommends')
args.extend(names)
return await self.apt_get(args)
async def _delete(self, names: Iterable[str]) -> None:
return await self.dpkg(['-P', *names], sudo=True)
async def _pkg_files(self, name: str) -> Iterable[str]:
return await list_files(name)

View file

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeDup import BeDup as Base
class Dup(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
apt_get_args: list[str] = []
if args.download_only:
apt_get_args.append('--download-only')
apt_get_args.append('upgrade')
return await self.util.apt_get(apt_get_args)

View file

@ -1,19 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeInstall import BeInstall as Base
class Install(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
apt_get_args = ['install']
if args.only_update:
apt_get_args.append('--only-upgrade')
apt_get_args.append('--no-install-recommends')
apt_get_args.extend(args.packages)
return await self.util.apt_get(apt_get_args)

View file

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-
from typing import Iterable
from argparse import Namespace
from ...Package import Package
from ...pm.dpkg import list_files, query_packages
from ...Cmd import Cmd
from ..BePkg import BePkg as Base
class Pkg(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def _files(self, name: str) -> Iterable[str]:
return await list_files(name)
async def _meta_data(self, names: Iterable[str]) -> Iterable[Package]:
return await query_packages(names)

View file

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
import os
from argparse import Namespace
from ...log import *
from ...Cmd import Cmd
from ..BeRebootRequired import BeRebootRequired as Base
class RebootRequired(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
reboot_required = '/run/reboot_required'
if os.path.exists(reboot_required):
if args.verbose:
log(NOTICE, f'Yes. {reboot_required} exists.')
required_pkgs = '/run/reboot-required.pkgs'
if os.path.exists(required_pkgs):
with open(required_pkgs, 'r') as f:
content = f.read()
print(f'-- From {required_pkgs}:')
print(content.strip())
return 1
if args.verbose:
log(NOTICE, f'No. {reboot_required} doesn\'t exist.')
return 0

View file

@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeRefresh import BeRefresh as Base
class Refresh(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
return await self.util.apt_get(['update'])

View file

@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
from typing import Iterable
from argparse import Namespace
from ...Package import Package
from ...pm.dpkg import query_packages
from ...Cmd import Cmd
from ..BeSelect import BeSelect as Base
class Select(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def _all_installed_packages(self) -> Iterable[Package]:
return await query_packages()

View file

@ -1,22 +0,0 @@
# -*- coding: utf-8 -*-
from ...Cmd import Cmd
from ...pm.dpkg import run_dpkg
from ..Util import Util as Base
class Util(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def apt_get(self, args: list[str]):
cmd = ['/usr/bin/apt-get']
mod_env = None
if not self.interactive:
cmd.extend(['--yes', '--quiet'])
mod_env = { 'DEBIAN_FRONTEND': 'noninteractive' }
cmd.extend(args)
return await self._sudo(cmd, mod_env=mod_env)
async def dpkg(self, *args, **kwargs):
return await run_dpkg(*args, **kwargs)

View file

@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeDup import BeDup as Base
class Dup(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
yum_args: list[str] = ['update']
if args.download_only:
yum_args.append('--downloadonly')
return await self.yum(yum_args)

View file

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeInstall import BeInstall as Base
class Install(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
yum_args = ['update' if args.only_update else 'install']
if not self.interactive:
yum_args.append['-y']
yum_args.extend(args.packages)
return await self.util.yum(*yum_args)

View file

@ -1,15 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeRefresh import BeRefresh as Base
class Refresh(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
await self.util.yum('clean', 'expire-cache')
await self.util.yum('makecache')

View file

@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
from ...Cmd import Cmd
from ..Util import Util as Base
class Util(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def yum(self, *args):
cmd = ['/usr/bin/yum']
cmd.extend(args)
return await self._sudo(cmd)

View file

@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeDelete import BeDelete as Base
class Delete(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
return await self.util.rpm(['-e', *args.names], sudo=True)

View file

@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from ...Distro import Distro as Base
from ...pm.rpm import run_rpm, query_packages, list_files
if TYPE_CHECKING:
from typing import Iterable
from ...ExecContext import Result
from ...Package import Package
class Distro(Base):
async def zypper(self, args: list[str], verbose: bool=True, sudo: bool=True) -> Result:
cmd = ['/usr/bin/zypper']
if not self.interactive:
cmd.extend(['--non-interactive', '--gpg-auto-import-keys', '--no-gpg-checks'])
cmd.extend(args)
if sudo:
# Run sudo --login in case /etc/profile modifies ZYPP_CONF
return await self.sudo(cmd, opts=['--login'], verbose=verbose)
return await self.run(cmd, verbose=verbose)
async def rpm(self, *args, **kwargs) -> Result:
return await run_rpm(*args, **kwargs)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def _ref(self) -> None:
return await self.zypper(['refresh'])
async def _dup(self, download_only: bool) -> None:
args = ['dup', '--force-resolution', '--auto-agree-with-licenses']
if download_only:
args.append('--download-only')
return await self.zypper(args)
async def _reboot_required(self, verbose: bool) -> bool:
opts = []
if not verbose:
pass
#opts.append('--quiet')
opts.append('needs-rebooting')
stdout, stderr, ret = await self.zypper(opts, sudo=False, verbose=verbose)
if ret != 0:
return True
return False
async def _select(self, names: Iterable[str]) -> Iterable[Package]:
return await query_packages(names)
async def _install(self, names: Iterable[str], only_update: bool) -> None:
cmd = 'update' if only_update else 'install'
return await self.zypper([cmd, *names])
async def _delete(self, names: Iterable[str]) -> None:
return await self.rpm(['-e', *names], sudo=True)
async def _pkg_files(self, name: str) -> Iterable[str]:
return await list_files(name)

View file

@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeDup import BeDup as Base
class Dup(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
zypper_args = ['dup', '--force-resolution', '--auto-agree-with-licenses']
if args.download_only:
zypper_args.append('--download-only')
return await self.util.zypper(zypper_args)

View file

@ -1,15 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeInstall import BeInstall as Base
class Install(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
zypper_cmd = 'update' if args.only_update else 'install'
return await self.util.zypper([zypper_cmd, *args.packages])

View file

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-
from typing import Iterable
from argparse import Namespace
from ...Package import Package
from ...pm.rpm import list_files, query_packages
from ...Cmd import Cmd
from ..BePkg import BePkg as Base
class Pkg(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def _files(self, name: str) -> Iterable[str]:
return await list_files(name)
async def _meta_data(self, names: Iterable[str]) -> Iterable[Package]:
return await query_packages(names)

View file

@ -1,22 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeRebootRequired import BeRebootRequired as Base
class RebootRequired(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
opts = []
if not args.verbose:
pass
#opts.append('--quiet')
opts.append('needs-rebooting')
stdout, stderr, ret = await self.util.zypper(opts, sudo=False, verbose=args.verbose)
if ret != 0:
return 1
return 0

View file

@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
from argparse import Namespace
from ...Cmd import Cmd
from ..BeRefresh import BeRefresh as Base
class Refresh(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def run(self, args: Namespace):
return await self.util.zypper(['refresh'])

View file

@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
from typing import Iterable
from argparse import Namespace
from ...Package import Package
from ...pm.rpm import query_packages
from ...Cmd import Cmd
from ..BeSelect import BeSelect as Base
class Select(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def _all_installed_packages(self) -> Iterable[Package]:
return await query_packages()

View file

@ -1,25 +0,0 @@
# -*- coding: utf-8 -*-
from ...util import run_cmd
from ...Cmd import Cmd
from ..Util import Util as Base
from ...pm.rpm import run_rpm
class Util(Base):
def __init__(self, parent: Cmd):
super().__init__(parent)
async def zypper(self, args: list[str], verbose: bool=False, sudo: bool=True):
cmd = ['/usr/bin/zypper']
if not self.interactive:
cmd.extend(['--non-interactive', '--gpg-auto-import-keys', '--no-gpg-checks'])
cmd.extend(args)
if sudo:
# Run sudo --login in case /etc/profile modifies ZYPP_CONF
return await self._sudo(cmd, opts=['--login'], verbose=verbose)
return await run_cmd(cmd, verbose=verbose)
async def rpm(self, *args, **kwargs):
return await run_rpm(*args, **kwargs)

View file

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
import os
from ..util import run_cmd
from ..ExecContext import ExecContext as Base
from ..ExecContext import Result
class Local(Base):
async def _run(self, *args, **kwargs) -> Result:
return await run_cmd(*args, **kwargs)
async def _sudo(self, cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], verbose=True) -> Result:
env: dict[str, str]|None = None
cmd_input: str|None = None
if mod_env:
env = os.environ.copy()
env.update(mod_env)
cmdline = []
if os.getuid() != 0:
cmdline.append('/usr/bin/sudo')
if env is not None:
cmdline.append('--preserve-env=' + ','.join(mod_env.keys()))
cmdline.extend(opts)
cmdline.extend(cmd)
if self.interactive:
cmd_input = "mode:interactive"
return await self._run(cmdline, throw=True, verbose=verbose, env=env, cmd_input=cmd_input)

View file

@ -0,0 +1,4 @@
TOPDIR = ../../../../../..
include $(TOPDIR)/make/proj.mk
include $(JWBDIR)/make/py-mod.mk

View file

@ -69,9 +69,9 @@ async def run_cmd(
if code == 0:
return
if (throw or verbose):
msg = f'Command returned error {code}: {pretty_cmd(args, wd)}: '
msg = f'Command returned error {code}: {pretty_cmd(args, wd)}'
if stderr:
msg += stderr.strip()
msg += ': ' + stderr.strip()
if throw:
raise RuntimeError(msg)
@ -279,21 +279,9 @@ async def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=No
return None
async def run_sudo(cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], interactive: bool=True, verbose=True):
env: dict[str, str]|None = None
cmd_input: str|None = None
if mod_env:
env = os.environ.copy()
env.update(mod_env)
cmdline = []
if os.getuid() != 0:
cmdline.append('/usr/bin/sudo')
if env is not None:
cmdline.append('--preserve-env=' + ','.join(mod_env.keys()))
cmdline.extend(opts)
cmdline.extend(cmd)
if interactive:
cmd_input = "mode:interactive"
return await run_cmd(cmdline, throw=True, verbose=verbose, env=env, cmd_input=cmd_input)
from .ec.Local import Local
ec = Local()
return await ec.sudo(cmd, mod_env, opts, interactive, verbose)
async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
url_user = None if url is None else urlparse(url).username