jw.pkg: Fix "make check" static code check fallout

The previous commits have put rules for linting and formatting via ruff, yapf, mypy and pyright into place. They are checked with the make check target, and this commit adds the fixes for the target to succeed.

It does some refactoring where type checking dug up dirty bits, and also adds lots of churn in the Python code. To a good deal, that's owed to mere formatting changes. It would have been better to seperate those from syntax and refactoring fixes into multiple commits, so that the interesting changes don't drown in the formatting nose. However, that would have been a lot of additional work only to be thrown away by later commits, hence this commit has a big diff in one piece. The size of the diff is regrettable but hopefully a one-off: What it buys is automatic format checking for CI and predictble formats for smaller diffs in the future.

Rules that "make check" enforces are, in the following order

- Syntax checkers:

- ruff check . - mypy . - pyright

- Format check:

- yapf --diff --recursive .

The refactoring includes:

- Turn the Result class into a more elaborate object, capable of doing more heavy lifting around stderr and stdout decoding, summarizing outcome, and matching error strings.
Aside from fixing broken type checks, this also removes lots of boilerplate calling code which is currently used for handling possible call outcome scenarios. Trying to access an inexistent, decoded string should raise a meaningful exception by itself now, which removes lots of code with case distinctions.

- Fix Cmd type hierarchy:

- Add the AbstractCmd class above Cmd. This is necessary because the checker rightfully complains it can't instantiate a Cmd instance where constructor arguments were needed. They never were, but the type used at the instantiating code's location in jw.pkg.App so claims.
- Lots of sub- and sub-subcommands are derived from the base class of the invoking command. That provides some properties shared across the ancestor hierarchy of a command, but is semantically unsound. Fix that by introducing jw.pkg.BaseCmd class as a place to provide basic helpers shared across all commands used in a jw.pkg.App's context, and derive all command classes from that afresh. The parent command is still reachable via a common parent property.

Formatting changes are conforming to PEP-8, mostly, with minor tweaks. All in all they include the following changes.

- Remove # -*- coding: utf-8 -*-

The line was needed by Python 2 which is not supported anylonger. For Python 3, the default encoding is UTF-8, anyway.
- Allow to run "make py-format" without having it produce any changes. It's basically "yapf --in-place --recursive ." with some code style settings, see conf/topdir/pyproject.toml. The settings may be debatable. I've had custom tweaks in place on that target, too, but then again, IDEs would have more hassle to integrate that.

- Introduce a 88 character line length limit

- One import per line, reshuffle them semantically, see [tool.isort] in pyproject.toml.

- Hide imports needed for type-checking only behind

if TYPE_CHECKING
- Spaces around assignments accounts for much churn. Having having no spaces in inline parameter list assignments and default parameter values would arguably be more compact where it's useful. On the other hand, I have not found a code formatter which allows spaces around assignments in parameter lists broken into one per line and that's often better than a wall of text.
- Add two spaces before # export, as this seems to be mandated by PEP-8

- Use single quotes by default

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-05-27 07:16:05 +02:00
commit 6db73873e7
Signed by: Jan Lindemann
GPG key ID: 3750640C9E25DD61
97 changed files with 3229 additions and 1893 deletions

View file

@ -1,15 +1,16 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from argparse import ArgumentParser
from typing import TYPE_CHECKING
from ..App import App
from ..lib.Cmd import Cmd as Base
from ..CmdBase import CmdBase
class Cmd(Base): # export
if TYPE_CHECKING:
from ..App import App
from ..lib.Distro import Distro
def __init__(self, parent: App|Base, name: str, help: str) -> None:
class Cmd(CmdBase): # export
def __init__(self, parent: App | CmdBase, name: str, help: str) -> None:
super().__init__(parent, name, help)
async def _run(self, args):

View file

@ -1,14 +1,12 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from ..App import App
from .Cmd import Cmd as CmdBase
class CmdPkg(CmdBase): # export
class CmdPkg(CmdBase): # export
def __init__(self, parent: App) -> None:
super().__init__(parent, 'pkg', help="System package manager wrapper")
super().__init__(parent, 'pkg', help = 'System package manager wrapper')
self.load_subcommands()
def add_arguments(self, p: ArgumentParser) -> None:

View file

@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from ..App import App
from .Cmd import Cmd as CmdBase
class CmdPlatform(CmdBase): # export
class CmdPlatform(CmdBase): # export
def __init__(self, parent: App) -> None:
super().__init__(parent, 'platform', help="Miscellaneous platform-related comamnds")
super().__init__(
parent, 'platform', help = 'Miscellaneous platform-related comamnds'
)
self.load_subcommands()
def add_arguments(self, p: ArgumentParser) -> None:

View file

@ -1,14 +1,19 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from ..App import App
from .Cmd import Cmd as CmdBase
class CmdPosix(CmdBase): # export
class CmdPosix(CmdBase): # export
def __init__(self, parent: App) -> None:
super().__init__(parent, 'posix', help='Perform various operations on a distro through its POSIX utility interface')
super().__init__(
parent,
'posix',
help = (
'Perform various operations on a distro through its '
'POSIX utility interface'
),
)
self.load_subcommands()
def add_arguments(self, p: ArgumentParser) -> None:

View file

@ -1,15 +1,18 @@
# -*- coding: utf-8 -*-
import sys
from argparse import ArgumentParser
from ..App import App
from .Cmd import Cmd as CmdBase
class CmdProjects(CmdBase): # export
class CmdProjects(CmdBase): # export
def __init__(self, parent: App) -> None:
super().__init__(parent, 'projects', help='Project metadata evaluation for building packages')
super().__init__(
parent,
'projects',
help = 'Project metadata evaluation for building packages'
)
self.load_subcommands()
def add_arguments(self, p: ArgumentParser) -> None:

View file

@ -1,14 +1,12 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from ..App import App
from .Cmd import Cmd as CmdBase
class CmdSecrets(CmdBase): # export
class CmdSecrets(CmdBase): # export
def __init__(self, parent: App) -> None:
super().__init__(parent, 'secrets', help="Manage package secrets")
super().__init__(parent, 'secrets', help = 'Manage package secrets')
self.load_subcommands()
def add_arguments(self, p: ArgumentParser) -> None:

View file

@ -4,6 +4,6 @@ __all__ = detect_modules(
package_name = __name__,
package_path = __path__,
namespace = globals(),
prefix = "Cmd",
skip = {"Cmd"},
) # pyright: ignore[reportUnsupportedDunderAll]
prefix = 'Cmd',
skip = {'Cmd'},
) # pyright: ignore[reportUnsupportedDunderAll]

View file

@ -1,16 +1,13 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ...lib.Distro import Distro
from ..CmdPkg import CmdPkg
from ..Cmd import Cmd as Base
class Cmd(Base): # export
class Cmd(Base): # export
def __init__(self, parent: CmdPkg, name: str, help: str) -> None:
super().__init__(parent, name, help)

View file

@ -1,18 +1,18 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd
from ..CmdPkg import CmdPkg
from .Cmd import Cmd
class CmdDelete(Cmd): # export
class CmdDelete(Cmd): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'delete', help="Delete packages by name")
super().__init__(parent, 'delete', help = 'Delete packages by name')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument("names", nargs="*", help="Names of packages to be deleted")
parser.add_argument(
'names', nargs = '*', help = 'Names of packages to be deleted'
)
async def _run(self, args: Namespace) -> None:
return await self.distro.delete(args.names)

View file

@ -1,18 +1,21 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd
from ..CmdPkg import CmdPkg
from .Cmd import Cmd
class CmdDup(Cmd): # export
class CmdDup(Cmd): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'dup', help="Upgrade distribution")
super().__init__(parent, 'dup', help = 'Upgrade distribution')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--download-only', default=False, action='store_true',
help='Only download packages from the repos, don\'t install them, yet')
parser.add_argument(
'--download-only',
default = False,
action = 'store_true',
help = "Only download packages from the repos, don't install them, yet",
)
async def _run(self, args: Namespace) -> None:
return await self.distro.dup(download_only=args.download_only)
return await self.distro.dup(download_only = args.download_only)

View file

@ -1,22 +1,35 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd
from ..CmdPkg import CmdPkg
from .Cmd import Cmd
class CmdInstall(Cmd): # export
class CmdInstall(Cmd): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'install', help="Install the distribution's notion of available packages")
super().__init__(
parent,
'install',
help = "Install the distribution's notion of available packages",
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument("names", nargs="*", help="Packages to be installed")
parser.add_argument('--only-update', default=False, action='store_true', help='Only update the listed packages, don\'t install them')
parser.add_argument('-F', '--fixed-strings', action='store_true',
help='Don\'t expand platform.expand_macros macros in <names>')
parser.add_argument('names', nargs = '*', help = 'Packages to be installed')
parser.add_argument(
'--only-update',
default = False,
action = 'store_true',
help = "Only update the listed packages, don't install them",
)
parser.add_argument(
'-F',
'--fixed-strings',
action = 'store_true',
help = "Don't expand platform.expand_macros macros in <names>",
)
async def _run(self, args: Namespace) -> None:
names = names if args.fixed_strings else self.app.distro.expand_macros(args.names)
return await self.distro.install(names, only_update=args.only_update)
names = (
args.names if args.fixed_strings else self.distro.expand_macros(args.names)
)
return await self.distro.install(names, only_update = args.only_update)

View file

@ -1,18 +1,16 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .NamedPkgsCmd import NamedPkgsCmd as Base
from ..CmdPkg import CmdPkg
from .NamedPkgsCmd import NamedPkgsCmd as Base
class CmdLs(Base): # export
class CmdLs(Base): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'ls', help="List package contents")
super().__init__(parent, 'ls', help = 'List package contents')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
async def _run(self, args: Namespace) -> None:
for name in args.names:
print('\n'.join(await self.parent.distro.pkg_files(name)))
print('\n'.join(await self.distro.pkg_files(name)))

View file

@ -1,21 +1,19 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .NamedPkgsCmd import NamedPkgsCmd as Base
from ..CmdPkg import CmdPkg
from .NamedPkgsCmd import NamedPkgsCmd as Base
class CmdMeta(Base): # export
class CmdMeta(Base): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'meta', help="List package metadata")
super().__init__(parent, 'meta', help = 'List package metadata')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
async def _run(self, args: Namespace) -> None:
packages = await self.distro.select(args.names)
for package in packages:
names = await self.distro.select(args.names)
for name in names:
if len(args.names) > 1:
print(f'-- {name}')
print(package)
print(name)

View file

@ -1,17 +1,19 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd
from ..CmdPkg import CmdPkg
from .Cmd import Cmd
class CmdRebootRequired(Cmd): # export
class CmdRebootRequired(Cmd): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'reboot-required', help="Check whether the machine needs rebooting")
super().__init__(
parent,
'reboot-required',
help = 'Check whether the machine needs rebooting'
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
async def _run(self, args: Namespace) -> None:
return await self.distro.reboot_required()
await self.distro.reboot_required()

View file

@ -1,14 +1,16 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd
from ..CmdPkg import CmdPkg
from .Cmd import Cmd
class CmdRefresh(Cmd): # export
class CmdRefresh(Cmd): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'refresh', help="Refresh the distribution's notion of available packages")
super().__init__(
parent,
'refresh',
help = "Refresh the distribution's notion of available packages",
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)

View file

@ -1,23 +1,19 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
import re
from ...lib.Package import Package
from ...lib.PackageFilter import PackageFilterString
from ..CmdPkg import CmdPkg
from .Cmd import Cmd
class CmdSelect(Cmd): # export
class CmdSelect(Cmd): # export
def __init__(self, parent: CmdPkg) -> None:
super().__init__(parent, 'select', help="Select packages by filter")
super().__init__(parent, 'select', help = 'Select packages by filter')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument("filter", help="Package filter string")
parser.add_argument('filter', help = 'Package filter string')
async def _run(self, args: Namespace) -> None:
filter = PackageFilterString(args.filter) if args.filter else None
for p in await self.distro.select(filter=filter):
for p in await self.distro.select(filter = filter):
print(p.name)

View file

@ -1,15 +1,13 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd as Base
from ..CmdPkg import CmdPkg as Parent
from .Cmd import Cmd as Base
class NamedPkgsCmd(Base): # export
class NamedPkgsCmd(Base): # export
def __init__(self, parent: Parent, name: str, help: str) -> None:
super().__init__(parent, name, help)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('names', nargs='*', help='Package names')
parser.add_argument('names', nargs = '*', help = 'Package names')

View file

@ -4,6 +4,6 @@ __all__ = detect_modules(
package_name = __name__,
package_path = __path__,
namespace = globals(),
prefix = "Cmd",
skip = {"Cmd"},
) # pyright: ignore[reportUnsupportedDunderAll]
prefix = 'Cmd',
skip = {'Cmd'},
) # pyright: ignore[reportUnsupportedDunderAll]

View file

@ -1,16 +1,13 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ...lib.Distro import Distro
from ..CmdPlatform import CmdPlatform
from ..Cmd import Cmd as Base
class Cmd(Base): # export
class Cmd(Base): # export
def __init__(self, parent: CmdPlatform, name: str, help: str) -> None:
super().__init__(parent, name, help)

View file

@ -1,21 +1,23 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from ...lib.log import *
from ...lib.Distro import Distro
from ..Cmd import Cmd
from ..CmdPlatform import CmdPlatform
class CmdInfo(Cmd): # export
class CmdInfo(Cmd): # export
def __init__(self, parent: CmdPlatform) -> None:
super().__init__(parent, 'info', help='Retrieve information about target platform')
super().__init__(
parent, 'info', help = 'Retrieve information about target platform'
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--format', default='%{cascade}',
help=f'Format string, expanding macros {", ".join(Distro.macros())}')
parser.add_argument(
'--format',
default = '%{cascade}',
help = f'Format string, expanding macros {", ".join(Distro.macros())}',
)
async def _run(self, args: Namespace) -> None:
print(self.app.distro.expand_macros(args.format))

View file

@ -4,6 +4,6 @@ __all__ = detect_modules(
package_name = __name__,
package_path = __path__,
namespace = globals(),
prefix = "Cmd",
skip = {"Cmd"},
) # pyright: ignore[reportUnsupportedDunderAll]
prefix = 'Cmd',
skip = {'Cmd'},
) # pyright: ignore[reportUnsupportedDunderAll]

View file

@ -1,17 +1,16 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from argparse import ArgumentParser
from typing import TYPE_CHECKING
from ..Cmd import Cmd as Base
from ...CmdBase import CmdBase
if TYPE_CHECKING:
from ..CmdPosix import CmdPosix
from ..CmdPosix import CmdPosix as Parent
class Cmd(Base): # export
class Cmd(CmdBase): # export
def __init__(self, parent: CmdPosix, name: str, help: str) -> None:
def __init__(self, parent: Parent, name: str, help: str) -> None:
super().__init__(parent, name, help)
def add_arguments(self, parser: ArgumentParser) -> None:

View file

@ -1,34 +1,56 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from ...lib.util import copy
from .Cmd import Cmd
if TYPE_CHECKING:
from ..CmdPosix import CmdPosix
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
class CmdCopy(Cmd): # export
from ..CmdPosix import CmdPosix
class CmdCopy(Cmd): # export
def __init__(self, parent: CmdPosix) -> None:
super().__init__(parent, 'copy', help="Copy files")
super().__init__(parent, 'copy', help = 'Copy files')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('src', help='Source file URI')
parser.add_argument('dst', help='Destination file URI')
parser.add_argument('-o', '--owner', default=None, help='Destination file owner')
parser.add_argument('-g', '--group', default=None, help='Destination file group')
parser.add_argument('-m', '--mode', default=None, help='Destination file mode')
parser.add_argument('-F', '--fixed-strings', action='store_true',
help='Don\'t expand platform.expand_macros macros in <src> and <dst>')
parser.add_argument('src', help = 'Source file URI')
parser.add_argument('dst', help = 'Destination file URI')
parser.add_argument(
'-o', '--owner', default = None, help = 'Destination file owner'
)
parser.add_argument(
'-g', '--group', default = None, help = 'Destination file group'
)
parser.add_argument(
'-m', '--mode', default = None, help = 'Destination file mode'
)
parser.add_argument(
'-F',
'--fixed-strings',
action = 'store_true',
help = "Don't expand platform.expand_macros macros in <src> and <dst>",
)
async def _run(self, args: Namespace) -> None:
def __expand(url: str) -> str:
if args.fixed_strings:
return url
return self.app.distro.expand_macros(url)
await copy(__expand(args.src), __expand(args.dst),
owner=args.owner, group=args.group, mode=int(args.mode, 0))
ret = self.app.distro.expand_macros(url)
if not isinstance(ret, str):
raise Exception(
f'Expanding macros in "{url}" returned unexpected ret "{ret}"'
)
return ret
await copy(
__expand(args.src),
__expand(args.dst),
owner = args.owner,
group = args.group,
mode = int(args.mode, 0),
)

View file

@ -1,14 +1,12 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from ..CmdPosix import CmdPosix as Parent
from .Cmd import Cmd as Base
from .Cmd import Cmd
from ..CmdPosix import CmdPosix
class CmdTar(Base): # export
class CmdTar(Cmd): # export
def __init__(self, parent: CmdPosix) -> None:
super().__init__(parent, 'tar', help='Handle tar archives')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'tar', help = 'Handle tar archives')
self.load_subcommands()
def add_arguments(self, parser: ArgumentParser) -> None:

View file

@ -1,27 +1,29 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from ..Cmd import Cmd as Base
from ....CmdBase import CmdBase
from ....lib.FileContext import FileContext
from ....lib.ProcFilterGpg import ProcFilterGpg
from ....lib.TarIo import TarIo
from ..CmdTar import CmdTar as Parent
from ....lib.TarIo import TarIo
from ....lib.ProcFilterGpg import ProcFilterGpg
from ....lib.FileContext import FileContext
class Cmd(Base): # export
class Cmd(CmdBase): # export
def __init__(self, parent: Parent, name: str, help: str) -> None:
super().__init__(parent, name, help)
self.__tar_io: None = None
@asynccontextmanager
async def ctx(self, **kwargs) -> TarIo:
async with TarIo.create(src=self.app.args.archive_path, **kwargs) as ret:
ret.src.add_proc_filter(FileContext.Direction.In, ProcFilterGpg(ec=self.app.exec_context))
async def ctx(self, **kwargs) -> AsyncIterator[TarIo]:
async with TarIo.create(src = self.app.args.archive_path, **kwargs) as ret:
ret.src.add_proc_filter(
FileContext.Direction.In, ProcFilterGpg(ec = self.app.exec_context)
)
yield ret
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('-f', '--archive-path', required=True, help='Archive path')
parser.add_argument(
'-f', '--archive-path', required = True, help = 'Archive path'
)

View file

@ -1,27 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from ....lib.log import DEBUG, log
from .Cmd import Cmd, Parent
from .Cmd import Cmd
from ..CmdTar import CmdTar
class CmdExtract(Cmd): # export
from ....lib.FileContext import FileContext
from ....lib.log import *
class CmdExtract(Cmd): # export
def __init__(self, parent: CmdTar) -> None:
super().__init__(parent, 'x', help="Extract a tar archive")
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'x', help = 'Extract a tar archive')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('dst', help='Destination root URI')
parser.add_argument('dst', help = 'Destination root URI')
async def _run(self, args: Namespace) -> None:
async with self.ctx(dst=args.dst) as ctx:
async with self.ctx(dst = args.dst) as ctx:
paths = await ctx.extract(ctx.dst.root)
log(DEBUG, f'Extracted {len(paths)} files')

View file

@ -106,8 +106,8 @@ class BaseCmdPkgRelations(Cmd):
default = False,
help = (
"Don't consider or output modules matching the os cascade in their "
"[build].exclude config"
)
'[build].exclude config'
),
)
parser.add_argument(
'--hide-self',

View file

@ -1,65 +1,120 @@
# -*- coding: utf-8 -*-
import datetime
import os
import re
import os, re, sys, subprocess, datetime, time
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from functools import lru_cache
from ...lib.util import get_profile_env
from ...lib.log import *
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from ...App import Scope
from ...lib.log import DEBUG, ERR, NOTICE, log
from ...lib.util import get_profile_env, pretty_cmd
from .Cmd import Cmd, Parent
class CmdBuild(Cmd): # export
class CmdBuild(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'build', help='janware software project build tool')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'build', help = 'janware software project build tool')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--exclude', default='', help='Space seperated ist of modules to be excluded from build')
parser.add_argument('-n', '--dry-run', action='store_true',
default=False, help='Don\'t build anything, just print what would be done.')
parser.add_argument('-O', '--build-order', action='store_true',
default=False, help='Don\'t build anything, just print the build order.')
parser.add_argument('-I', '--ignore-deps', action='store_true',
default=False, help='Don\'t build dependencies, i.e. build only modules specified on the command line')
parser.add_argument('--env-reinit', action='store_true',
default=False, help='Source /etc/profile before each build step. Discard environment unless --env-keep is specified')
parser.add_argument('--env-keep', default='none', help='Comma seperated list of environment variables to keep, '
+ '"all" or "none", only meaningful if --env-reinit is specified')
parser.add_argument('target', default='all', help='Build target')
parser.add_argument('modules', nargs='+', default='', help='Modules to be built')
parser.add_argument(
'--exclude',
default = '',
help = 'Space seperated ist of modules to be excluded from build',
)
parser.add_argument(
'-n',
'--dry-run',
action = 'store_true',
default = False,
help = "Don't build anything, just print what would be done.",
)
parser.add_argument(
'-O',
'--build-order',
action = 'store_true',
default = False,
help = "Don't build anything, just print the build order.",
)
parser.add_argument(
'-I',
'--ignore-deps',
action = 'store_true',
default = False,
help = (
"Don't build dependencies, i.e. build only modules specified "
'on the command line'
),
)
parser.add_argument(
'--env-reinit',
action = 'store_true',
default = False,
help = (
'Source /etc/profile before each build step. Discard environment '
'unless --env-keep is specified'
),
)
parser.add_argument(
'--env-keep',
default = 'none',
help = (
'Comma seperated list of environment variables to keep, '
'"all" or "none", only meaningful if --env-reinit is specified'
),
)
parser.add_argument(
'target',
default = 'all',
help = 'Build target',
)
parser.add_argument(
'modules',
nargs = '+',
default = '',
help = 'Modules to be built',
)
async def _run(self, args: Namespace) -> None:
@lru_cache(maxsize=None)
def read_deps(cur, prereq_type):
@lru_cache(maxsize = None)
def read_deps(cur, prereq_type: str) -> list[str]:
# dep cache doesn't make a difference at all
if prereq_type in dep_cache:
if cur in dep_cache[prereq_type]:
return dep_cache[prereq_type][cur]
else:
dep_cache[prereq_type]: dict[str, str] = {}
dep_cache[prereq_type] = {}
ret = self.app.get_project_refs([ cur ], ['pkg.requires.jw'],
prereq_type, scope = Scope.Subtree, add_self=False, names_only=True)
ret = self.app.get_project_refs(
[cur],
['pkg.requires.jw'],
prereq_type,
scope = Scope.Subtree,
add_self = False,
names_only = True,
)
log(DEBUG, 'prerequisites = ' + ' '.join(ret))
if cur in ret:
ret.remove(cur)
log(DEBUG, 'inserting', prereq_type, "prerequisites of", cur, ":", ' '.join(ret))
log(
DEBUG,
(f'Inserting {prereq_type}, prerequisites of {cur}: {" ".join(ret)}'),
)
dep_cache[prereq_type][cur] = ret
return ret
def add_dep_tree(cur, prereq_types, tree, all_deps):
log(DEBUG, "adding prerequisites " + ' '.join(prereq_types) + " of module " + cur)
log(DEBUG, 'Adding deps "{" ".join(prereq_types)}" of module {cur)')
if cur in all_deps:
log(DEBUG, 'already handled module ' + cur)
log(DEBUG, 'Already handled module "{cur}"')
return 0
deps = set()
all_deps.add(cur)
for t in prereq_types:
log(DEBUG, "checking prereqisites of type " + t)
log(DEBUG, 'Checking deps of type "{t}"')
deps.update(read_deps(cur, t))
for d in deps:
add_dep_tree(d, prereq_types, tree, all_deps)
@ -70,17 +125,20 @@ class CmdBuild(Cmd): # export
all_deps = set()
dep_tree = {}
for m in modules:
log(DEBUG, "--- adding dependency tree of module " + m)
log(DEBUG, '--- Adding dependency tree of module "{m}"')
add_dep_tree(m, prereq_types, dep_tree, all_deps)
while len(all_deps):
# Find any leaf
for d in all_deps:
if not len(dep_tree[d]): # Dependency d doesn't have dependencies itself
break # found
else: # no Leaf found
# Dependency d doesn't have dependencies itself
if not len(dep_tree[d]):
break # found
else: # no Leaf found
print(all_deps)
raise Exception("fatal: the dependencies between these modules are unresolvable")
order.append(d) # do it
raise Exception(
'Fatal: the dependencies between these modules are unresolvable'
)
order.append(d) # do it
# bookkeep it
all_deps.remove(d)
for k in dep_tree.keys():
@ -88,56 +146,68 @@ class CmdBuild(Cmd): # export
dep_tree[k].remove(d)
return 1
async def run_make(module, target, cur_project, num_projects):
async def run_make(module, target, cur_project, num_projects) -> None:
patt = self.app.is_excluded_from_build(module)
if patt is not None:
title = f'---- {module}'
log(NOTICE, f',{title} >')
log(NOTICE, f'| Configured to skip build on platform >{patt}<')
log(NOTICE, f'`{title} <')
return
make_cmd = [ "make", target ]
wd = self.app.find_dir(module, pretty=False)
title = '---- [%d/%d]: Running "%s" in %s -' % (cur_project, num_projects, ' '.join(make_cmd), wd)
make_cmd = ['make', target]
wd = self.app.find_dir(module, pretty = False)
title = '---- [%d/%d]: Running "%s" in %s -' % (
cur_project,
num_projects,
' '.join(make_cmd),
wd,
)
mod_env = None
if args.env_reinit:
keep: bool|list[str] = False
keep: bool | list[str] = False
if args.env_keep is not None:
match args.env_keep:
case 'all':
keep=True
keep = True
case 'none':
keep=False
keep = False
case _:
keep = args.env_keep.split(',')
mod_env = await get_profile_env(keep=keep)
mod_env = await get_profile_env(keep = keep)
try:
await self.app.exec_context.run(
make_cmd,
wd=wd,
throw=True,
verbose=True,
mod_env=mod_env,
title=title
wd = wd,
throw = True,
verbose = True,
mod_env = mod_env,
title = title,
)
except Exception as e:
log(ERR, f'Failed to make target "{target}" in module "{module}" below base {self.app.projs_root}: {str(e)}')
log(
ERR,
(
f'Failed to make target "{target}" in module "{module}" '
f'below base {self.app.projs_root}: {str(e)}'
),
)
raise
async def run_make_on_modules(modules, order, target):
cur_project = 0
num_projects = len(order)
if target in ["clean", "distclean"]:
if target in ['clean', 'distclean']:
for m in reversed(order):
cur_project += 1
await run_make(m, target, cur_project, num_projects)
if m in modules:
modules.remove(m)
if not len(modules):
log(NOTICE, "All modules cleaned")
log(NOTICE, 'All modules cleaned')
return
else:
for m in order:
@ -146,7 +216,7 @@ class CmdBuild(Cmd): # export
async def run(args):
log(DEBUG, "----------------------------------------- running ", ' '.join(sys.argv))
log(DEBUG, f'-------------------------------------- running {pretty_cmd()}')
modules = args.modules
exclude = args.exclude.split()
@ -154,19 +224,19 @@ class CmdBuild(Cmd): # export
env_exclude = os.getenv('BUILD_EXCLUDE', '')
if len(env_exclude):
log(NOTICE, "Exluding modules from environment: " + env_exclude)
exclude += " " + env_exclude
log(NOTICE, 'Exluding modules from environment: ' + env_exclude)
exclude += ' ' + env_exclude
# -- build
order = []
glob_prereq_types = [ "build" ]
if re.match("pkg-.*", target) is not None:
glob_prereq_types = [ "build", "run", "release", "devel" ]
glob_prereq_types = ['build']
if re.match('pkg-.*', target) is not None:
glob_prereq_types = ['build', 'run', 'release', 'devel']
if target != 'order' and not args.build_order:
log(NOTICE, "Using prerequisite types " + ' '.join(glob_prereq_types))
log(NOTICE, "Calculating order for modules ... ")
log(NOTICE, 'Using prerequisite types ' + ' '.join(glob_prereq_types))
log(NOTICE, 'Calculating order for modules ... ')
calculate_order(order, modules, glob_prereq_types)
if args.ignore_deps:
@ -177,18 +247,24 @@ class CmdBuild(Cmd): # export
exit(0)
cur_project = 0
log(NOTICE, "Building target %s in %d projects:" % (target, len(order)))
log(NOTICE, 'Building target %s in %d projects:' % (target, len(order)))
for m in order:
cur_project += 1
log(NOTICE, " %3d %s" % (cur_project, m))
log(NOTICE, ' %3d %s' % (cur_project, m))
if args.dry_run:
exit(0)
await run_make_on_modules(modules, order, target)
log(NOTICE, 'Build done at %s' % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
log(
NOTICE,
(
'Build done at %s' %
(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
),
)
dep_cache: dict[dict[str, str]] = {}
dep_cache: dict[str, dict[str, list[str]]] = {}
await run(args)

View file

@ -1,54 +1,72 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from argparse import ArgumentParser, Namespace
from typing import TYPE_CHECKING
from argparse import Namespace, ArgumentParser
from ...lib.log import *
from ...lib.base import InputMode
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from ...App import Scope
from ...lib.log import NOTICE, log
from .Cmd import Cmd, Parent
if TYPE_CHECKING:
from ...lib.base import Result
class CmdCanonicalizeRemotes(Cmd): # export
class CmdCanonicalizeRemotes(Cmd): # export
def __rewrite_url(self, url: str) -> str:
return url.replace('/srv/git', '')
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'canonicalize-remotes', help='Streamline janware Git remotes')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent, 'canonicalize-remotes', help = 'Streamline janware Git remotes'
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('-n', '--dry-run', default=False, action='store_true', help='Only log what would be done')
parser.add_argument(
'-n',
'--dry-run',
default = False,
action = 'store_true',
help = 'Only log what would be done',
)
async def _run(self, args: Namespace) -> None:
async def git(cmd: list[str], ro=False, throw=True) -> Result:
async def git(cmd: list[str], ro = False, throw = True) -> Result:
cmd = ['/usr/bin/git', *cmd]
log(NOTICE, f'-- {" ".join(cmd)}')
if ro or not args.dry_run:
return await self.app.exec_context.run(cmd, cmd_input=InputMode.NonInteractive, throw=throw)
remotes: dict[str, dict[str, str]] = {}
stdout, stderr, status = await git(['remote', '-v'], ro=True)
for line in stdout.decode().splitlines():
return await self.app.exec_context.run(
cmd, cmd_input = InputMode.NonInteractive, throw = throw
)
return Result(b'', None, 0)
remotes: dict[str, dict[str, str | list[str]]] = {}
result = await git(['remote', '-v'], ro = True)
for line in result.stdout_str.splitlines():
name, url, fp = line.split()
remote = remotes.setdefault(name, {})
key = 'url' if fp == '(fetch)' else 'pushurl'
fpurls = remote.setdefault(key, [])
assert isinstance(fpurls, list)
fpurls.append(url)
for remote, config in remotes.items():
for name, remote in remotes.items():
dirty_keys: set[str] = set()
for key, urls in config.items():
for key, urls in remote.items():
for url in urls:
if url != self.__rewrite_url(url):
dirty_keys.add(key)
for key in dirty_keys:
urls = config[key]
await git(['config', '--unset-all', f'remote.{remote}.{key}'], throw=False)
urls = remote[key]
await git(
['config', '--unset-all', f'remote.{name}.{key}'], throw = False
)
for url in urls:
await git(['config', '--add', f'remote.{remote}.{key}', self.__rewrite_url(url)])
await git(
[
'config',
'--add',
f'remote.{name}.{key}',
self.__rewrite_url(url),
]
)

View file

@ -1,23 +1,26 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from ...App import Scope
from .Cmd import Cmd, Parent
class CmdCflags(Cmd): # export
class CmdCflags(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'cflags', help='cflags')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'cflags', help = 'cflags')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
deps = self.app.get_project_refs(args.module, ['pkg.requires.jw'], 'build',
scope = Scope.Subtree, add_self=True, names_only=True)
deps = self.app.get_project_refs(
args.module,
['pkg.requires.jw'],
'build',
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
out = []
for m in reversed(deps):
path = self.app.find_dir(m, ['/include'])

View file

@ -1,24 +1,28 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from ...lib.log import NOTICE, log
from .Cmd import Cmd, Parent
from ...lib.log import *
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdCheck(Cmd): # export
class CmdCheck(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'check', help='Check for circular dependencies between given modules')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'check',
help = 'Check for circular dependencies between given modules',
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('-f', '--flavour', nargs='?', default = 'build')
parser.add_argument('module', nargs = '*', help = 'Modules')
parser.add_argument('-f', '--flavour', nargs = '?', default = 'build')
async def _run(self, args: Namespace) -> None:
path = self.app.find_circular_deps(args.module, args.flavour)
if path:
log(NOTICE, f'Found circular dependency in flavour {args.flavour}:', ' -> '.join(path))
if self.app.find_circular_deps(args.module, args.flavour):
log(NOTICE, f'Found circular dependency in flavour {args.flavour}')
exit(1)
log(NOTICE, f'No circular dependency found for flavour {args.flavour} in modules:', ' '.join(args.module))
log(
NOTICE,
f'No circular dependency found for flavour {args.flavour} in modules:',
' '.join(args.module),
)

View file

@ -1,23 +1,23 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd, Parent
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdCommands(Cmd): # export
class CmdCommands(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'commands', help='List available commands')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'commands', help = 'List available commands')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
async def _run(self, args: Namespace) -> None:
import sys, re, os, glob
this_dir = os.path.dirname(sys.modules[__name__].__file__)
import glob
import os
import re
this_dir = os.path.dirname(__file__)
ret = []
for file_name in glob.glob('Cmd*.py', root_dir=this_dir):
for file_name in glob.glob('Cmd*.py', root_dir = this_dir):
cc_name = re.sub(r'^Cmd|\.py', '', file_name)
name = re.sub(r'(?<!^)(?=[A-Z])', '-', cc_name).lower()
ret.append(name)

View file

@ -1,23 +1,26 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...App import Scope
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from .Cmd import Cmd, Parent
class CmdExepath(Cmd): # export
class CmdExepath(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'exepath', help='exepath')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'exepath', help = 'exepath')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
deps = self.app.get_project_refs(args.module, ['pkg.requires.jw'], [ 'run', 'build', 'devel' ],
scope = Scope.Subtree, add_self=True, names_only=True)
deps = self.app.get_project_refs(
args.module,
['pkg.requires.jw'],
['run', 'build', 'devel'],
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
out = []
for m in deps:
path = self.app.find_dir(m, ['/bin'])

View file

@ -1,44 +1,72 @@
# -*- coding: utf-8 -*-
import os
import re
import re, os
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...lib.log import *
from ...lib.log import DEBUG, log
from ...lib.Uri import Uri
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from .Cmd import Cmd, Parent
class CmdGetAuthInfo(Cmd): # export
class CmdGetAuthInfo(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'get-auth-info', help='Try to retrieve authentication information from the source tree')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'get-auth-info',
help = 'Try to retrieve authentication information from the source tree',
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--only-values', default=False, action='store_true',
help='Don\'t prefix values by "<field-name>="')
parser.add_argument('--username', default=False, action='store_true',
help='Show user name')
parser.add_argument('--password', default=False, action='store_true',
help='Show password')
parser.add_argument('--remote-owner-base', default=False, action='store_true',
help='Show remote base URL for owner jw-pkg was cloned from')
parser.add_argument('--remote-base', default=False, action='store_true',
help='Show remote base URL')
parser.add_argument(
'--only-values',
default = False,
action = 'store_true',
help = 'Don\'t prefix values by "<field-name>="',
)
parser.add_argument(
'--username',
default = False,
action = 'store_true',
help = 'Show user name'
)
parser.add_argument(
'--password',
default = False,
action = 'store_true',
help = 'Show password'
)
parser.add_argument(
'--remote-owner-base',
default = False,
action = 'store_true',
help = 'Show remote base URL for owner jw-pkg was cloned from',
)
parser.add_argument(
'--remote-base',
default = False,
action = 'store_true',
help = 'Show remote base URL',
)
async def _run(self, args: Namespace) -> None:
keys = ['username', 'password']
# --- Milk jw-pkg repo
jw_pkg_dir = self.app.find_dir('jw-pkg', pretty=False)
jw_pkg_dir = self.app.find_dir('jw-pkg', pretty = False)
if not os.path.isdir(jw_pkg_dir + '/.git'):
log(DEBUG, f'jw-pkg directory is not a Git repo: {jw_pkg_dir}')
return
remotes, stderr, status = (await self.app.exec_context.run(['git', '-C', jw_pkg_dir, 'remote', '-v'])).decode()
git_result = await self.app.exec_context.run(
['git', '-C', jw_pkg_dir, 'remote', '-v']
)
result: dict[str, str] = {}
for line in remotes.splitlines():
for line in git_result.stdout_str.splitlines():
name, url, typ = re.split(r'\s+', line)
if name == 'origin' and typ in ['(pull)', '(fetch)']: # TODO: Use other remotes, too?
if name == 'origin' and typ in [
'(pull)',
'(fetch)',
]: # TODO: Use other remotes, too?
parsed = Uri(url)
for key in keys:
result[key] = getattr(parsed, key)
@ -52,7 +80,7 @@ class CmdGetAuthInfo(Cmd): # export
# --- Print results
for key, val in result.items():
if getattr(args, key, None) != True:
if not getattr(args, key, None):
continue
if val is None:
continue

View file

@ -1,18 +1,19 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd, Parent
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdGetval(Cmd): # export
class CmdGetval(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'getval', help='Get value from project config')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'getval', help = 'Get value from project config')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--project', default = None, help = 'Project name, default is name of project\'s topdir')
parser.add_argument(
'--project',
default = None,
help = "Project name, default is name of project's topdir",
)
parser.add_argument('section', default = '', help = 'Config section')
parser.add_argument('key', default = '', help = 'Config key')

View file

@ -1,21 +1,21 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd, Parent
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdHtdocsDir(Cmd): # export
class CmdHtdocsDir(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'htdocs-dir', help='Print source directory containing document root of a given module')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'htdocs-dir',
help = 'Print source directory containing document root of a given module',
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
r = []
for m in args.module:
r.append(self.app.htdocs_dir(m))
print(' '.join(r))

View file

@ -1,27 +1,42 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...App import Scope
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from .Cmd import Cmd, Parent
class CmdLdflags(Cmd): # export
class CmdLdflags(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'ldflags', help='ldflags')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'ldflags', help = 'ldflags')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('--exclude', action='append', help='Exclude Modules', default=[])
parser.add_argument('-s', '--add-self', action='store_true',
default=False, help='Include libflags of specified modules, too, not only their dependencies')
parser.add_argument('module', nargs = '*', help = 'Modules')
parser.add_argument(
'--exclude', action = 'append', help = 'Exclude Modules', default = []
)
parser.add_argument(
'-s',
'--add-self',
action = 'store_true',
default = False,
help = (
'Include libflags of specified modules, too, '
'not only their dependencies'
),
)
# -L needs to contain more paths than libs linked with -l would require
def __get_ldpathflags(self, names: list[str], exclude: list[str] = []) -> str:
deps = self.app.get_project_refs(names, ['pkg.requires.jw'], 'build',
scope = Scope.Subtree, add_self=True, names_only=True)
def __get_ldpathflags(
self, names: list[str], exclude: list[str] = []
) -> str | None:
deps = self.app.get_project_refs(
names,
['pkg.requires.jw'],
'build',
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
ret = []
for m in deps:
if m in exclude:
@ -35,11 +50,17 @@ class CmdLdflags(Cmd): # export
ret.append('-L' + path)
if not ret:
return None
return(' '.join(ret))
return ' '.join(ret)
async def _run(self, args: Namespace) -> None:
deps = self.app.get_project_refs(args.module, ['pkg.requires.jw'], 'build',
scope = Scope.One, add_self=args.add_self, names_only=True)
deps = self.app.get_project_refs(
args.module,
['pkg.requires.jw'],
'build',
scope = Scope.One,
add_self = args.add_self,
names_only = True,
)
out = []
for m in reversed(deps):
if m in args.exclude:

View file

@ -1,23 +1,26 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...App import Scope
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from .Cmd import Cmd, Parent
class CmdLdlibpath(Cmd): # export
class CmdLdlibpath(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'ldlibpath', help='ldlibpath')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'ldlibpath', help = 'ldlibpath')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
deps = self.app.get_project_refs(args.module, ['pkg.requires.jw'], [ 'run', 'build', 'devel' ],
scope = Scope.Subtree, add_self=True, names_only=True)
deps = self.app.get_project_refs(
args.module,
['pkg.requires.jw'],
['run', 'build', 'devel'],
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
out = []
for m in deps:
path = self.app.find_dir(m, ['/lib'])

View file

@ -1,18 +1,15 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd, Parent
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdLibname(Cmd): # export
class CmdLibname(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'libname', help='libname')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'libname', help = 'libname')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
print(self.app.get_libname(args.module))

View file

@ -1,62 +1,105 @@
# -*- coding: utf-8 -*-
import os
import re
import re, os
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...lib.util import get_username, get_password, run_curl
from ...lib.log import *
from ...lib.base import Input
from ...lib.log import DEBUG, log
from ...lib.Uri import Uri
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from ...lib.util import get_password, get_username, run_curl_into
from .Cmd import Cmd, Parent
class CmdListRepos(Cmd): # export
class CmdListRepos(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'list-repos', help='Query a remote GIT server for repositories')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent, 'list-repos', help = 'Query a remote GIT server for repositories'
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('base_url', help='Base URL of all Git repositories without user part')
parser.add_argument('--username', help='Username for SSH or HTTP authentication, don\'t specify for unauthenticated', default=None)
parser.add_argument('--askpass', help='Program to echo password for SSH or HTTP authentication, don\'t specify for unauthenticated', default=None)
parser.add_argument('--from-owner', help='List from-owner\'s projects', default='janware')
parser.add_argument(
'base_url', help = 'Base URL of all Git repositories without user part'
)
parser.add_argument(
'--username',
help = (
"Username for SSH or HTTP authentication, don't "
'specify for unauthenticated'
),
default = None,
)
parser.add_argument(
'--askpass',
help = (
'Program to echo password for SSH or HTTP authentication, '
"don't specify for unauthenticated"
),
default = None,
)
parser.add_argument(
'--from-owner', help = "List from-owner's projects", default = 'janware'
)
async def _run(self, args: Namespace) -> None:
base_url = Uri(args.base_url)
askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS']
username = await get_username(args=args, url=args.base_url, askpass_env=askpass_env)
askpass_env = ['GIT_ASKPASS', 'SSH_ASKPASS']
username = await get_username(
args = args, url = args.base_url, askpass_env = askpass_env
)
password = None
if username is not None:
password = await get_password(args=args, url=args.base_url, askpass_env=askpass_env)
password = await get_password(
args = args, url = args.base_url, askpass_env = askpass_env
)
match base_url.scheme:
case 'ssh':
if re.match(r'ssh://.*devgit\.janware\.com/', args.base_url):
from jw.pkg.lib.ec.SSHClient import SSHClient, ssh_client
from jw.pkg.lib.ec.SSHClient import ssh_client
if username is not None:
base_url.set_username(username)
if password is not None:
base_url.set_password(password)
ssh = ssh_client(base_url, interactive=self.app.interactive, verbose_default=self.app.verbose)
cmd = ['/opt/jw-pkg/bin/git-srv-admin.sh', '-u', args.from_owner, '-j', 'list-personal-projects']
ssh = ssh_client(
base_url,
interactive = self.app.interactive,
verbose_default = self.app.verbose,
)
cmd = [
'/opt/jw-pkg/bin/git-srv-admin.sh',
'-u',
args.from_owner,
'-j',
'list-personal-projects',
]
result = await ssh.run(cmd)
print('\n'.join(result.stdout.decode().splitlines()))
print('\n'.join(result.stdout_str.splitlines()))
return
case 'https':
from jw.pkg.lib.base import InputMode
cmd_input = InputMode.NonInteractive
cmd_input: Input = InputMode.NonInteractive
if re.match(r'https://github.com', args.base_url):
curl_args = [
'-f',
'-H', 'Accept: application/vnd.github+json',
'-H', 'X-GitHub-Api-Version: 2022-11-28',
'-H',
'Accept: application/vnd.github+json',
'-H',
'X-GitHub-Api-Version: 2022-11-28',
]
if password is not None:
assert username is not None, f'Assertion failed: username is empty but password isn\'t for "{args.base_url}"'
assert username is not None, (
'Assertion failed: username is empty but password '
'isn\'t for "{args.base_url}"'
)
cmd_input = (f'-u {username}:{password}').encode('utf-8')
curl_args.extend(['-K-'])
curl_args.append(f'https://api.github.com/users/{args.from_owner}/repos')
repos, stderr, status = await run_curl(curl_args, cmd_input=cmd_input, parse_json=True)
curl_args.append(
f'https://api.github.com/users/{args.from_owner}/repos'
)
repos = await run_curl_into(list, curl_args, cmd_input = cmd_input)
for repo in repos:
print(repo['name'])
return
@ -69,33 +112,44 @@ class CmdListRepos(Cmd): # export
cmd_input = (f'-u {username}:{password}').encode('utf-8')
curl_args.extend(['-K-'])
for entities_dir in ['orgs', 'users']:
api_url = f'{args.base_url}/api/v1/{entities_dir}/{args.from_owner}/repos'
api_url = (
f'{args.base_url}/api/v1/{entities_dir}/'
f'{args.from_owner}/repos'
)
try:
tried.append(api_url)
repos, stderr, status = await run_curl(curl_args + [api_url], cmd_input=cmd_input, parse_json=True)
repos = await run_curl_into(
list,
curl_args + [api_url],
cmd_input = cmd_input,
)
for repo in repos:
print(repo['name'])
break
except Exception as e:
msg = 'curl {} failed ({}), trying next'.format(
' '.join(curl_args + [api_url]),
str(e)
' '.join(curl_args + [api_url]), str(e)
)
log(DEBUG, msg)
tried[-1] += ': ' + msg
raise
else:
raise RuntimeError(f'Failed to fetch repository list from assumed Forgejo instance at {args.base_url}, tried {', '.join(tried)}')
raise RuntimeError(
f'Failed to fetch repository list from assumed Forgejo '
f'instance at {args.base_url}, tried {", ".join(tried)}'
)
return
if os.path.isdir(args.base_url):
for subdir in ["." , args.from_owner]:
for subdir in ['.', args.from_owner]:
out = []
for entry in os.scandir(args.base_url + "/" + subdir):
for entry in os.scandir(args.base_url + '/' + subdir):
path = entry.path
if os.path.isdir(path + "/.git") or os.path.exists(path + "/HEAD"):
if os.path.isdir(path + '/.git') or os.path.exists(path + '/HEAD'):
out.append(path)
if out:
print('\n'.join(out))
break
return
raise Exception(f'Don\'t know how to enumerate Git repos at base url {args.base_url}')
raise Exception(
f"Don't know how to enumerate Git repos at base url {args.base_url}"
)

View file

@ -1,44 +1,57 @@
# -*- coding: utf-8 -*-
import re
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...lib.log import *
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from ...lib.log import DEBUG, log
from .Cmd import Cmd, Parent
class CmdModules(Cmd): # export
class CmdModules(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'modules', help='Query existing janware packages')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'modules', help = 'Query existing janware packages')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('-F', '--filter', nargs='?', default=None, help='Key-value pairs, seperated by commas, to be searched for in project.conf')
parser.add_argument(
'-F',
'--filter',
nargs = '?',
default = None,
help =
'Key-value pairs, seperated by commas, to be searched for in project.conf',
)
async def _run(self, args: Namespace) -> None:
import pathlib
proj_root = self.app.projs_root
log(DEBUG, "proj_root = " + proj_root)
log(DEBUG, 'proj_root = ' + proj_root)
path = pathlib.Path(self.app.projs_root)
modules = [p.parents[1].name for p in path.glob('*/make/project.conf')]
log(DEBUG, "modules = ", modules)
log(DEBUG, 'modules = ', modules)
out = []
filters = None if args.filter is None else [re.split("=", f) for f in re.split(",", args.filter)]
filters = (
None if args.filter is None else
[re.split('=', f) for f in re.split(',', args.filter)]
)
for m in modules:
if filters:
for f in filters:
path = f[0].rsplit('.')
if len(path) > 1:
sec = path[0]
key = path[1]
else:
sec = None
key = path[0]
val = self.app.get_value(m, sec, key)
log(DEBUG, 'Checking in {} if {}="{}", is "{}"'.format(m, f[0], f[1], val))
if val and val == f[1]:
out.append(m)
break
else:
if not filters:
out.append(m)
continue
for f in filters:
path_str = f[0].rsplit('.')
if len(path_str) > 1:
sec = path_str[0]
key = path_str[1]
else:
sec = None
key = path_str[0]
val = self.app.get_value(m, sec, key)
log(
DEBUG,
'Checking in {} if {}="{}", is "{}"'.format(m, f[0], f[1], val),
)
if val and val == f[1]:
out.append(m)
break
print(' '.join(out))

View file

@ -1,26 +1,29 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...App import Scope
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from .Cmd import Cmd, Parent
class CmdPath(Cmd): # export
class CmdPath(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'path', help='path')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'path', help = 'path')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
deps = self.app.get_project_refs(args.module, ['pkg.requires.jw'], 'run',
scope = Scope.Subtree, add_self=True, names_only=True)
deps = self.app.get_project_refs(
args.module,
['pkg.requires.jw'],
'run',
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
out = []
for m in deps:
path = self.app.find_dir(m, '/bin')
path = self.app.find_dir(m, ['/bin'])
if path is not None:
out.append(path)
print(':'.join(out))

View file

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from .BaseCmdPkgRelations import BaseCmdPkgRelations as Base
from .Cmd import Parent
class CmdPkgConflicts(Base): # export
class CmdPkgConflicts(Base): # export
def __init__(self, parent: Base) -> None:
super().__init__(parent, 'conflicts', help='Print packages conflicting with a given package')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'conflicts',
help = 'Print packages conflicting with a given package'
)

View file

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from .BaseCmdPkgRelations import BaseCmdPkgRelations as Base
from .Cmd import Parent
class CmdPkgProvides(Base): # export
class CmdPkgProvides(Base): # export
def __init__(self, parent: Base) -> None:
super().__init__(parent, 'provides', help='Print packages and capabilities provided by a given package')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'provides',
help = 'Print packages and capabilities provided by a given package',
)

View file

@ -1,10 +1,9 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from .BaseCmdPkgRelations import BaseCmdPkgRelations as Base
from .Cmd import Parent
class CmdPkgRequires(Base): # export
class CmdPkgRequires(Base): # export
def __init__(self, parent: Base) -> None:
super().__init__(parent, 'requires', help='Print packages required for a given package')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent, 'requires', help = 'Print packages required for a given package'
)

View file

@ -1,19 +1,18 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from ...lib.log import WARNING, log
from .Cmd import Cmd, Parent
from ...lib.log import *
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdProjDir(Cmd): # export
class CmdProjDir(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'proj-dir', help='Print directory of a given package')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent, 'proj-dir', help = 'Print directory of a given package'
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
out = []

View file

@ -1,23 +1,28 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...App import Scope
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from .Cmd import Cmd, Parent
class CmdPythonpath(Cmd): # export
class CmdPythonpath(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'pythonpath', help='Generate PYTHONPATH for given modules')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent, 'pythonpath', help = 'Generate PYTHONPATH for given modules'
)
def add_arguments(self, p: ArgumentParser) -> None:
super().add_arguments(p)
p.add_argument('module', help='Modules', nargs='*')
p.add_argument('module', help = 'Modules', nargs = '*')
async def _run(self, args: Namespace) -> None:
deps = self.app.get_project_refs(args.module, ['pkg.requires.jw'], [ 'run', 'build' ],
scope = Scope.Subtree, add_self=True, names_only=True)
deps = self.app.get_project_refs(
args.module,
['pkg.requires.jw'],
['run', 'build'],
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
out = []
for m in deps:
path = self.app.find_dir(m, ['src/python', 'tools/python'])

View file

@ -1,30 +1,35 @@
# -*- coding: utf-8 -*-
import os
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...App import Scope
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from .Cmd import Cmd, Parent
class CmdPythonpathOrig(Cmd): # export
class CmdPythonpathOrig(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'pythonpath_orig', help='pythonpath')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'pythonpath_orig', help = 'pythonpath')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
deps = self.app.get_project_refs(args.module, ['pkg.requires.jw'], [ 'run', 'build' ],
scope = Scope.Subtree, add_self=True, names_only=True)
deps = self.app.get_project_refs(
args.module,
['pkg.requires.jw'],
['run', 'build'],
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
r = ''
for m in deps:
pd = self.app.find_dir(m, pretty=False)
pd = self.app.find_dir(m, pretty = False)
if pd is None:
continue
for subdir in [ 'src/python', 'tools/python' ]:
cand = pd + "/" + subdir
if isdir(cand):
for subdir in ['src/python', 'tools/python']:
cand = pd + '/' + subdir
if os.path.isdir(cand):
r = r + ':' + cand
print(r[1:])

View file

@ -1,45 +1,60 @@
# -*- coding: utf-8 -*-
from typing import Iterable
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
from ...App import Scope
from ...lib.log import *
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
from ...lib.log import DEBUG, log
from .Cmd import Cmd, Parent
# TODO: seems at least partly redundant to CmdPkgRequires / print_pkg_relations
class CmdRequiredOsPkg(Cmd): # export
class CmdRequiredOsPkg(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'required-os-pkg', help='List distribution packages required for a package')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'required-os-pkg',
help = 'List distribution packages required for a package',
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('flavours', help='Dependency flavours', default='build')
parser.add_argument('modules', nargs='*', help='Modules')
parser.add_argument('--skip-excluded', action='store_true', default=False,
help='Output empty prerequisite list for excluded modules')
parser.add_argument('--quote', action='store_true', default=False,
help='Put double quotes around each listed dependency')
parser.add_argument('flavours', help = 'Dependency flavours', default = 'build')
parser.add_argument('modules', nargs = '*', help = 'Modules')
parser.add_argument(
'--skip-excluded',
action = 'store_true',
default = False,
help = 'Output empty prerequisite list for excluded modules',
)
parser.add_argument(
'--quote',
action = 'store_true',
default = False,
help = 'Put double quotes around each listed dependency',
)
async def _run(self, args: Namespace) -> None:
modules = args.modules
flavours = set(args.flavours.split(','))
if 'build' in flavours:
# TODO: This adds too much. Only the run dependencies of the build dependencies would be needed.
# TODO: This adds too much. Only the run dependencies of the build
# dependencies would be needed.
flavours.add('run')
if 'release' in flavours:
flavours |= set(['run', 'devel', 'build'])
log(DEBUG, "flavours = " + args.flavours)
deps = self.app.get_project_refs(modules, ['pkg.requires.jw'], flavours,
scope = Scope.Subtree, add_self=True, names_only=True)
log(DEBUG, 'flavours = ' + args.flavours)
deps = self.app.get_project_refs(
modules,
['pkg.requires.jw'],
list(flavours),
scope = Scope.Subtree,
add_self = True,
names_only = True,
)
if args.skip_excluded:
for d in deps:
if self.app.is_excluded_from_build(d) is not None:
deps.remove(d)
subsecs = self.app.distro.os_cascade
log(DEBUG, "subsecs = ", subsecs)
log(DEBUG, 'subsecs = ', subsecs)
requires: set[str] = set()
for sec in subsecs:
for flavour in flavours:
@ -47,6 +62,6 @@ class CmdRequiredOsPkg(Cmd): # export
if vals:
requires |= set(vals)
if args.quote:
requires = [f'"{dep}"' for dep in requires]
out = [f'"{dep}"' for dep in requires]
# TODO: add all not in build tree as -devel
print(' '.join(requires))
print(' '.join(out))

View file

@ -1,23 +1,22 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd, Parent
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdSummary(Cmd): # export
class CmdSummary(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'summary', help='Print summary description of given modules')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent, 'summary', help = 'Print summary description of given modules'
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
r = []
for m in args.module:
summary = self.app.get_value(m, "summary", None)
summary = self.app.get_value(m, 'summary', None)
if summary is not None:
r.append(summary)
print(' '.join(r))

View file

@ -1,18 +1,15 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd, Parent
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdTest(Cmd): # export
class CmdTest(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'test', help='Test')
def __init__(self, parent: Parent) -> None:
super().__init__(parent, 'test', help = 'Test')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('blah', default='', help='The blah argument')
parser.add_argument('blah', default = '', help = 'The blah argument')
async def _run(self, args: Namespace) -> None:
print("blah = " + args.blah)
print('blah = ' + args.blah)

View file

@ -1,18 +1,19 @@
# -*- coding: utf-8 -*-
from argparse import ArgumentParser, Namespace
from argparse import Namespace, ArgumentParser
from .Cmd import Cmd, Parent
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
class CmdTmplDir(Cmd): # export
class CmdTmplDir(Cmd): # export
def __init__(self, parent: CmdProjects) -> None:
super().__init__(parent, 'tmpl-dir', help='Print directory containing templates of a given module')
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'tmpl-dir',
help = 'Print directory containing templates of a given module',
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('module', nargs = '*', help = 'Modules')
async def _run(self, args: Namespace) -> None:
r = []

View file

@ -4,6 +4,6 @@ __all__ = detect_modules(
package_name = __name__,
package_path = __path__,
namespace = globals(),
prefix = "Cmd",
skip = {"Cmd"},
) # pyright: ignore[reportUnsupportedDunderAll]
prefix = 'Cmd',
skip = {'Cmd'},
) # pyright: ignore[reportUnsupportedDunderAll]

View file

@ -1,24 +1,22 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from argparse import ArgumentParser
from functools import cached_property
from typing import TYPE_CHECKING
from ..Cmd import Cmd as Base
from ...CmdBase import CmdBase
from ..CmdSecrets import CmdSecrets as Parent
if TYPE_CHECKING:
from typing import Iterable
from ...lib.Distro import Distro
from ...lib.ExecContext import ExecContext
from ..CmdSecrets import CmdSecrets
from .lib.base import Attrs
from .lib.DistroContext import DistroContext
class Cmd(Base): # export
class Cmd(CmdBase): # export
@cached_property
def ctx(self) -> ExecContext:
def ctx(self) -> DistroContext:
return DistroContext(self.app.distro)
async def _match_files(self, packages: Iterable[str], pattern: str) -> list[str]:
@ -27,21 +25,27 @@ class Cmd(Base): # export
async def _list_template_files(self, packages: Iterable[str]) -> list[str]:
return await self.ctx.list_template_files(packages)
async def _list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
async def _list_secret_paths(
self, packages: Iterable[str], ignore_missing: bool = False
) -> list[str]:
return await self.ctx.list_secret_paths(packages, ignore_missing)
async def _list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
async def _list_compilation_targets(
self, packages: Iterable[str], ignore_missing: bool = False
) -> list[str]:
return await self.ctx.list_compilation_targets(packages, ignore_missing)
async def _remove_compilation_targets(self, packages: Iterable[str]) -> list[str]:
return await self.ctx.remove_compilation_targets(packages)
async def _compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]:
async def _compile_template_files(
self, packages: Iterable[str], default_attrs: Attrs
) -> list[str]:
return await self.ctx.compile_template_files(packages, default_attrs)
def __init__(self, parent: CmdSecrets, name: str, help: str) -> None:
def __init__(self, parent: Parent, name: str, help: str) -> None:
super().__init__(parent, name, help)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument("packages", nargs='*', help="Package names")
parser.add_argument('packages', nargs = '*', help = 'Package names')

View file

@ -1,20 +1,21 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from .Cmd import Cmd
from .lib.base import Attrs
from .Cmd import Cmd
if TYPE_CHECKING:
from ..CmdSecrets import CmdSecrets
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
class CmdCompileTemplates(Cmd): # export
from ..CmdSecrets import CmdSecrets
class CmdCompileTemplates(Cmd): # export
def __init__(self, parent: CmdSecrets) -> None:
super().__init__(parent, 'compile-templates', help="Compile package template files")
super().__init__(
parent, 'compile-templates', help = 'Compile package template files'
)
async def _run(self, args: Namespace) -> None:
attrs = Attrs(args.mode, args.owner, args.group, None)
@ -22,6 +23,12 @@ class CmdCompileTemplates(Cmd): # export
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--owner', '-o', default=None, help='Default output file owner')
parser.add_argument('--group', '-g', default=None, help='Default output file group')
parser.add_argument('--mode', '-m', default=None, help='Default output file mode')
parser.add_argument(
'--owner', '-o', default = None, help = 'Default output file owner'
)
parser.add_argument(
'--group', '-g', default = None, help = 'Default output file group'
)
parser.add_argument(
'--mode', '-m', default = None, help = 'Default output file mode'
)

View file

@ -1,22 +1,33 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from .Cmd import Cmd
if TYPE_CHECKING:
from ..CmdSecrets import CmdSecrets
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
class CmdInstall(Cmd): # export
from ..CmdSecrets import CmdSecrets
class CmdInstall(Cmd): # export
def __init__(self, parent: CmdSecrets) -> None:
super().__init__(parent, 'install', help='Install secrets from various sources as static secrets onto the target')
super().__init__(
parent,
'install',
help = (
'Install secrets from various sources as static secrets onto the target'
),
)
def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument('src', help='URI of secret source')
parser.add_argument('--only-missing', action='store_true', default=False, help='Install only secrets not already on the target')
parser.add_argument('src', help = 'URI of secret source')
parser.add_argument(
'--only-missing',
action = 'store_true',
default = False,
help = 'Install only secrets not already on the target',
)
super().add_arguments(parser)
async def _run(self, args: Namespace) -> None:

View file

@ -1,22 +1,37 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from .Cmd import Cmd
if TYPE_CHECKING:
from ..CmdSecrets import CmdSecrets
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
class CmdListCompilationOutput(Cmd): # export
from ..CmdSecrets import CmdSecrets
class CmdListCompilationOutput(Cmd): # export
def __init__(self, parent: CmdSecrets) -> None:
super().__init__(parent, 'list-compilation-output', help="List package compilation output files")
super().__init__(
parent,
'list-compilation-output',
help = 'List package compilation output files',
)
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument("--all", action='store_true', default=False, help="Show all output targets, including non-existent files")
parser.add_argument(
'--all',
action = 'store_true',
default = False,
help = 'Show all output targets, including non-existent files',
)
async def _run(self, args: Namespace) -> None:
print('\n'.join(await self._list_compilation_targets(args.packages, ignore_missing=(not args.all))))
print(
'\n'.join(
await self._list_compilation_targets(
args.packages, ignore_missing = (not args.all)
)
)
)

View file

@ -1,22 +1,32 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from .Cmd import Cmd
if TYPE_CHECKING:
from ..CmdSecrets import CmdSecrets
from argparse import Namespace, ArgumentParser
from argparse import ArgumentParser, Namespace
class CmdListSecrets(Cmd): # export
from ..CmdSecrets import CmdSecrets
class CmdListSecrets(Cmd): # export
def __init__(self, parent: CmdSecrets) -> None:
super().__init__(parent, 'list-secrets', help="List package secret files")
super().__init__(parent, 'list-secrets', help = 'List package secret files')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument("--all", action='store_true', default=False, help="Show all secret paths, including non-existent files")
parser.add_argument(
'--all',
action = 'store_true',
default = False,
help = 'Show all secret paths, including non-existent files',
)
async def _run(self, args: Namespace) -> None:
print('\n'.join(await self._list_secret_paths(args.packages, ignore_missing=(not args.all))))
print(
'\n'.join(
await
self._list_secret_paths(args.packages, ignore_missing = (not args.all))
)
)

View file

@ -1,18 +1,18 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from .Cmd import Cmd
if TYPE_CHECKING:
from ..CmdSecrets import CmdSecrets
from argparse import Namespace, ArgumentParser
from argparse import Namespace
class CmdListTemplates(Cmd): # export
from ..CmdSecrets import CmdSecrets
class CmdListTemplates(Cmd): # export
def __init__(self, parent: CmdSecrets) -> None:
super().__init__(parent, 'list-templates', help="List package template files")
super().__init__(parent, 'list-templates', help = 'List package template files')
async def _run(self, args: Namespace) -> None:
print('\n'.join(await self._list_template_files(args.packages)))

View file

@ -1,18 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from .Cmd import Cmd
from .Cmd import Cmd, Parent
if TYPE_CHECKING:
from ..CmdSecrets import CmdSecrets
from argparse import Namespace, ArgumentParser
from argparse import Namespace
class CmdRmCompilationOutput(Cmd): # export
class CmdRmCompilationOutput(Cmd): # export
def __init__(self, parent: CmdSecrets) -> None:
super().__init__(parent, 'rm-compilation-output', help="Remove package compilation output files")
def __init__(self, parent: Parent) -> None:
super().__init__(
parent,
'rm-compilation-output',
help = 'Remove package compilation output files',
)
async def _run(self, args: Namespace) -> None:
await self._remove_compilation_targets(args.packages)

View file

@ -1,24 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
import sys
from pathlib import Path
from typing import TYPE_CHECKING
from ....lib.log import DEBUG, NOTICE, WARNING, log
from ....lib.ProcFilterGpg import ProcFilterGpg
from .base import Attrs
from .FilesContext import FilesContext
if TYPE_CHECKING:
from typing import Iterable
from ....lib.FileContext import FileContext
from ....lib.log import *
from ....lib.util import run_cmd
from ....lib.TarIo import TarIo
from ....lib.ProcFilterGpg import ProcFilterGpg
from .base import Attrs
from .FilesContext import FilesContext
from ....lib.Distro import Distro
class DistroContext(FilesContext):
@ -37,18 +33,22 @@ class DistroContext(FilesContext):
async def list_template_files(self, pkg_names: Iterable[str]) -> list[str]:
if not pkg_names:
pkg_names = [p.name for p in await self.__distro.select()]
return await self.match_files(pkg_names, pattern=r'.*\.jw-tmpl$')
return await self.match_files(pkg_names, pattern = r'.*\.jw-tmpl$')
async def list_secret_paths(self, pkg_names: Iterable[str], ignore_missing: bool=False) -> list[str]:
async def list_secret_paths(
self, pkg_names: Iterable[str], ignore_missing: bool = False
) -> list[str]:
ret = []
for tmpl in await self.list_template_files(pkg_names):
path = str(Path(tmpl).with_suffix(".jw-secret"))
path = str(Path(tmpl).with_suffix('.jw-secret'))
if ignore_missing and not await self.ctx.file_exists(path):
continue
ret.append(path)
return ret
async def list_compilation_targets(self, pkg_names: Iterable[str], ignore_missing: bool=False) -> list[str]:
async def list_compilation_targets(
self, pkg_names: Iterable[str], ignore_missing: bool = False
) -> list[str]:
ret = []
for tmpl in await self.list_template_files(pkg_names):
path = tmpl.removesuffix('.jw-tmpl')
@ -58,72 +58,119 @@ class DistroContext(FilesContext):
return ret
async def remove_compilation_targets(self, pkg_names: Iterable[str]) -> list[str]:
ret: list[str] = []
for path in await self.list_compilation_targets(pkg_names):
try:
self.ctx.stat(path)
await self.ctx.stat(path)
log(NOTICE, f'Removing {path}')
await self.ctx.unlink(path)
except FileNotFoundError as e:
log(DEBUG, f'Compilation target {path} doesn\'t exist (ignored)')
ret.append(path)
except FileNotFoundError:
log(DEBUG, f"Compilation target {path} doesn't exist (ignored)")
continue
return ret
async def compile_template_files(self, pkg_names: Iterable[str], default_attrs: Attrs) -> list[str]:
async def compile_template_files(
self, pkg_names: Iterable[str], default_attrs: Attrs
) -> list[str]:
ret: list[str] = []
missing = 0
for target in await self.list_compilation_targets(pkg_names):
if not await self.compile_template_file(target, default_attrs):
if await self.compile_template_file(target, default_attrs):
ret.append(target)
else:
missing += 1
if missing > 0:
log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')
from ....lib.util import pretty_cmd
async def install(self, src_uri: str, pkg_names: Iterable[str], only_missing: bool=False, verbose: bool=False) -> None:
cmdline = pretty_cmd(sys.argv)
log(
WARNING,
(
f'{missing} missing secrets found. You might want to add them and '
f'run sudo {cmdline} again'
),
)
return ret
async def _read_secret_tar_blob(src_uri: str):
async def install(
self,
src_uri: str,
pkg_names: Iterable[str],
only_missing: bool = False,
verbose: bool = False,
) -> None:
async def _read_secret_tar_blob(src_uri: str) -> bytes:
ec = self.ctx
from ....lib.ec.Local import Local
from ....lib.util import get
if not isinstance(ec, Local):
ec = Local() # Security: Use a local exec context for decrypting and filtering secrets
return (await get(src_uri, content_filter=ProcFilterGpg(ec=ec))).stdout
# Security: Use a local exec context for decrypting and
# filtering secrets
ec = Local()
return (await get(src_uri, content_filter = ProcFilterGpg(ec = ec))).stdout
def _matches_host_prefix(path: str) -> bool:
return re.match(r'^' + root_in_tar, path)
return re.match(r'^' + host_root_in_tar, path) is not None
def _crop_host_prefix(path: str) -> bool:
return re.sub(r'^' + root_in_tar, '', path)
return re.sub(r'^' + host_root_in_tar, '', path) is not None
def _crop_default_prefix(path: str) -> bool:
return re.sub(r'^' + default, '', path)
return re.sub(default_rx, '', path) is not None
def _matches_default_prefix(path: str) -> bool:
return re.match(r'^default', path)
return re.match(r'^default', path) is not None
def _is_needed_secret(path: str) -> bool:
return path in secret_paths
from .tar import filter as tar_filter, rewrite as tar_rewrite, extract as tar_extract, merge as tar_merge
from .tar import extract as tar_extract
from .tar import filter as tar_filter
from .tar import merge as tar_merge
from .tar import rewrite as tar_rewrite
if only_missing:
raise NotImplementedError('--only-missing is not yet implemented')
secret_paths = await self.list_secret_paths(pkg_names)
host_root_in_tar = '/'.join(reversed(self.ctx.hostname.split('.')))
hostname = self.ctx.uri.hostname
if not hostname:
raise Exception('Have no hostname to find secrets in tar file')
host_root_in_tar = '/'.join(reversed(hostname.split('.')))
host_rx = re.compile(r'^' + host_root_in_tar)
default_rx = re.compile(r'^default')
filtered_paths: list[str] = []
extracted_paths: list[str] = []
blob_all = await _read_secret_tar_blob(src_uri)
blob_host_filtered = tar_filter(blob_all, lambda p: re.match(host_rx, p), filtered_paths)
blob_host_transformed = tar_rewrite(blob_host_filtered, lambda p: re.sub(host_rx, '', p))
blob_default_filtered = tar_filter(blob_all, lambda p: re.match(default_rx, p), filtered_paths)
blob_default_transformed = tar_rewrite(blob_default_filtered, lambda p: re.sub(default_rx, '', p))
blob_secret_material = tar_merge([blob_host_transformed, blob_default_transformed], overwrite=False)
blob_needed = tar_filter(blob_secret_material, _is_needed_secret, extracted_paths)
blob_all = await _read_secret_tar_blob(src_uri)
if blob_all is None:
raise Exception(f'Tar blob {src_uri} is empty')
blob_host_filtered = tar_filter(
blob_all, lambda p: re.match(host_rx, p) is not None, filtered_paths
)
blob_host_transformed = tar_rewrite(
blob_host_filtered, lambda p: re.sub(host_rx, '', p)
)
blob_default_filtered = tar_filter(
blob_all, lambda p: re.match(default_rx, p) is not None, filtered_paths
)
blob_default_transformed = tar_rewrite(
blob_default_filtered, lambda p: re.sub(default_rx, '', p)
)
blob_secret_material = tar_merge(
[blob_host_transformed, blob_default_transformed], overwrite = False
)
blob_needed = tar_filter(
blob_secret_material, _is_needed_secret, extracted_paths
)
await tar_extract(self.ctx, blob_needed, root='/', verbose=verbose)
await tar_extract(self.ctx, blob_needed, root = '/', verbose = verbose)
for path in secret_paths:
if not path in extracted_paths:
log(NOTICE, f'not extracted: {path}')
else:
log(NOTICE, f'extracted: {path}')
if path not in extracted_paths:
log(NOTICE, f'not extracted: {path}')
else:
log(NOTICE, f'extracted: {path}')

View file

@ -1,21 +1,17 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import re, stat, copy
import copy
import re
import stat
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Iterable
from ....lib.FileContext import FileContext
from ....lib.log import *
from ....lib.util import run_cmd
from ....lib.log import DEBUG, NOTICE, WARNING, log
from .base import Attrs
class FilesContext:
@ -27,17 +23,17 @@ class FilesContext:
def ctx(self) -> FileContext:
return self.__ctx
async def _read_key_value_file(self, path: str, throw=False) -> dict[str, str]:
async def _read_key_value_file(self, path: str, throw = False) -> dict[str, str]:
ret: dict[str, str] = {}
try:
result = await self.ctx.get(path)
for line in result.stdout.decode().splitlines():
for line in result.stdout_str.splitlines():
line = line.strip()
if not line or line.startswith("#"):
if not line or line.startswith('#'):
continue
if "=" not in line:
if '=' not in line:
continue
key, val = line.split("=", 1)
key, val = line.split('=', 1)
key = key.strip()
val = val.strip()
if key:
@ -46,39 +42,39 @@ class FilesContext:
log(DEBUG, f'File not found {path}')
return ret
def _parse_attributes(self, content: str) -> Attrs:
def _parse_attributes(self, content: str) -> Attrs | None:
if not content:
return None
first_line = content.splitlines()[0]
if not re.match(r"^\s*#\s*conf\s*:", first_line):
if not re.match(r'^\s*#\s*conf\s*:', first_line):
return None
ret = Attrs()
ret.conf = first_line
m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line)
m = re.match(r'^\s*#\s*conf\s*:\s*(.*?)\s*$', first_line)
if not m:
return ret
for part in re.split(r'[; ,]', m.group(1)):
part = part.strip()
if not part or "=" not in part:
if not part or '=' not in part:
continue
key, val = part.split("=", 1)
key, val = part.split('=', 1)
key = key.strip()
val = val.strip()
if key == "owner":
if key == 'owner':
ret.owner = val or None
elif key == "group":
elif key == 'group':
ret.group = val or None
elif key == "mode":
elif key == 'mode':
if val:
try:
if re.fullmatch(r"0[0-7]+", val):
if re.fullmatch(r'0[0-7]+', val):
ret.mode = int(val, 8)
else:
ret.mode = int(val, 0)
@ -87,20 +83,26 @@ class FilesContext:
return ret
async def _read_attributes(self, paths: Iterable[str]) -> Attrs|None:
async def _read_attributes(self, paths: Iterable[str]) -> Attrs | None:
ret = Attrs()
for path in paths:
try:
result = await self.ctx.get(path)
lines = result.stdout.decode().splitlines()
lines = result.stdout_str.splitlines()
if lines:
ret.update(self._parse_attributes(lines[0]))
except FileNotFoundError:
log(DEBUG, f'Can\'t parse "{path}" for attributes, file doesn\'t exist (ignored)')
log(
DEBUG,
(
f'Can\'t parse "{path}" for attributes, '
"file doesn't exist (ignored)"
),
)
return ret
def _format_metadata(self, owner: str, group: str, mode: int) -> str:
return f"{owner}:{group} {mode:o}"
return f'{owner}:{group} {mode:o}'
async def _compile_one_template_file(
self,
@ -110,11 +112,12 @@ class FilesContext:
replace: dict[str, str] = {},
) -> None:
owner = "root"
group = "root"
owner = 'root'
group = 'root'
mode = 0o400
new_content = (await self.ctx.get(src)).stdout.decode()
result = await self.ctx.get(src)
new_content = result.stdout_str
attrs = self._parse_attributes(new_content)
if attrs is None:
@ -133,15 +136,21 @@ class FilesContext:
for key, val in replace.items():
new_content = new_content.replace(key, val)
tmp_path: str|None = None
tmp_path: str | None = None
try:
tmp_path = await self.ctx.mktemp(dst + '.jw-pkg.XXXXX')
await self.ctx.put(tmp_path, new_content.encode('utf-8'), owner=owner, group=group, mode=mode)
await self.ctx.put(
tmp_path,
new_content.encode('utf-8'),
owner = owner,
group = group,
mode = mode,
)
content_changed = True
metadata_changed = True
old_meta = "<missing>"
old_meta = '<missing>'
try:
st = await self.ctx.stat(dst)
@ -150,23 +159,21 @@ class FilesContext:
else:
old_mode = stat.S_IMODE(st.mode)
old_meta = self._format_metadata(st.owner, st.group, old_mode)
old_content = (await self.ctx.get(dst)).stdout.decode()
old_content = (await self.ctx.get(dst)).stdout_str
content_changed = old_content != new_content
metadata_changed = (
st.owner != owner
or st.group != group
or old_mode != mode
st.owner != owner or st.group != group or old_mode != mode
)
changes = []
if content_changed:
changes.append("@content")
changes.append('@content')
if metadata_changed:
changes.append(f"@metadata ({old_meta} -> {new_meta})")
changes.append(f'@metadata ({old_meta} -> {new_meta})')
details = ", ".join(changes) if changes else "no changes"
log(NOTICE, f"Applying macros in {src} to {dst}: {details}")
details = ', '.join(changes) if changes else 'no changes'
log(NOTICE, f'Applying macros in {src} to {dst}: {details}')
if not changes:
await self.ctx.unlink(tmp_path)
@ -181,23 +188,28 @@ class FilesContext:
with suppress(FileNotFoundError):
await self.ctx.unlink(tmp_path)
async def compile_template_file(self, target_path: str, default_attrs: Attrs|None=None) -> bool:
path_tmpl = target_path + '.jw-tmpl'
async def compile_template_file(
self, target_path: str, default_attrs: Attrs | None = None
) -> bool:
path_tmpl = target_path + '.jw-tmpl'
path_secret_file = target_path + '.jw-secret-file'
path_secret = target_path + '.jw-secret'
path_secret = target_path + '.jw-secret'
attrs = copy.deepcopy(default_attrs if default_attrs is not None else Attrs())
attrs.update(await self._read_attributes([
path_tmpl,
path_secret_file,
path_secret
]))
attrs.update(
await self._read_attributes([path_tmpl, path_secret_file, path_secret])
)
replace = await self._read_key_value_file(path_secret)
for src in [ path_secret_file, path_tmpl ]:
for src in [path_secret_file, path_tmpl]:
try:
await self._compile_one_template_file(src=src, dst=target_path, default_attrs=attrs, replace=replace)
await self._compile_one_template_file(
src = src,
dst = target_path,
default_attrs = attrs,
replace = replace
)
return True
except FileNotFoundError as e:
log(DEBUG, f'Compilation source {src} doesn\'t exist (ignored)')
except FileNotFoundError:
log(DEBUG, f"Compilation source {src} doesn't exist (ignored)")
continue
log(WARNING, f'No secret found for target {target_path}, not compiling')

View file

@ -1,18 +1,15 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class Attrs:
mode: int | None = None
owner: str | None = None
group: str | None = None
conf: str | None = None
def update(self, rhs: Args|None) -> Args:
def update(self, rhs: Attrs | None) -> Attrs:
if rhs is not None:
if rhs.mode:
self.mode = rhs.mode
@ -32,4 +29,3 @@ class Attrs:
if self.group is not None:
return False
return True

View file

@ -1,21 +1,27 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import io
import tarfile
from tarfile import TarFile
from typing import TYPE_CHECKING, Callable
import tarfile, io
from tarfile import TarFile
from ....lib.log import *
from ....lib.log import DEBUG, log
if TYPE_CHECKING:
from ....lib.ExecContext import ExecContext
from typing import Iterable
def filter(blob: bytes, path_filter: Callable[[str], bool]|None, matched: list[str]|None=None) -> bytes:
from ....lib.ExecContext import ExecContext
from ....lib.FileContext import FileContext
def filter(
blob: bytes,
path_filter: Callable[[str], bool] | None,
matched: list[str] | None = None,
) -> bytes:
ret = io.BytesIO()
with tarfile.open(fileobj=ret, mode='w') as tf_out:
tf_in = TarFile(fileobj=io.BytesIO(blob))
with tarfile.open(fileobj = ret, mode = 'w') as tf_out:
tf_in = TarFile(fileobj = io.BytesIO(blob))
for info in tf_in.getmembers():
if path_filter is not None and not path_filter(info.name):
continue
@ -28,8 +34,8 @@ def filter(blob: bytes, path_filter: Callable[[str], bool]|None, matched: list[s
def rewrite(blob: bytes, rewrite_filter: Callable[[str], str]) -> bytes:
ret = io.BytesIO()
with tarfile.open(fileobj=ret, mode='w') as tf_out:
tf_in = TarFile(fileobj=io.BytesIO(blob))
with tarfile.open(fileobj = ret, mode = 'w') as tf_out:
tf_in = TarFile(fileobj = io.BytesIO(blob))
for info in tf_in.getmembers():
new_name = rewrite_filter(info.name)
log(DEBUG, f'Rewriting {info.name} -> {new_name}')
@ -38,11 +44,11 @@ def rewrite(blob: bytes, rewrite_filter: Callable[[str], str]) -> bytes:
tf_out.addfile(info, buf)
return ret.getvalue()
def merge(blobs: Iterable[bytes], overwrite: bool=False) -> bytes:
def merge(blobs: Iterable[bytes], overwrite: bool = False) -> bytes:
ret = io.BytesIO()
with tarfile.open(fileobj=ret, mode='w') as tf_out:
with tarfile.open(fileobj = ret, mode = 'w') as tf_out:
for blob in blobs:
tf_in = TarFile(fileobj=io.BytesIO(blob))
tf_in = TarFile(fileobj = io.BytesIO(blob))
existing_names = tf_out.getnames()
for info in tf_in.getmembers():
if not overwrite and info.name in existing_names:
@ -51,11 +57,21 @@ def merge(blobs: Iterable[bytes], overwrite: bool=False) -> bytes:
tf_out.addfile(info, buf)
return ret.getvalue()
async def extract(dst: ExecContext, blob: bytes, root: str|None=None, verbose: bool=False) -> None:
async def extract(
dst: FileContext,
blob: bytes,
root: str | None = None,
verbose: bool = False
) -> None:
cmd = ['tar']
if root is not None:
cmd += ['-C', root]
if verbose:
cmd += '-v'
cmd += ['-x', '-f', '-']
await dst.run(cmd, verbose=verbose, cmd_input=blob)
if not isinstance(dst, ExecContext):
raise NotImplementedError(
'Extracting tar files to a non-executable '
f'context is not yet implemented: {dst}'
)
await dst.run(cmd, verbose = verbose, cmd_input = blob)

View file

@ -1,17 +1,21 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from ....lib.FileContext import FileContext
from ....lib.ec.Local import Local
from ....lib.FileContext import FileContext
from .FilesContext import FilesContext
if TYPE_CHECKING:
from .base import Attrs
async def compile_template_file(target_path: str, default_attrs: Attrs|None=None, ctx: FileContext|None=None) -> bool:
async def compile_template_file(
target_path: str,
default_attrs: Attrs | None = None,
ctx: FileContext | None = None
) -> bool:
if ctx is None:
ctx = Local()
await FilesContext(ctx).compile_template_file(target_path, default_attrs=default_attrs)
return await FilesContext(ctx).compile_template_file(
target_path, default_attrs = default_attrs
)