jw-pkg/src/python/jw/pkg/App.py
Jan Lindemann c538447cc5
App.__get_project_refs(): Code beautification

In __get_project_refs():

- Rename variable dep and deps to val and vals, respectively, because that's more what they are values of key-value pairs. In some cases that can represent dependencies, in some case other things.
- Make a scope case distinction a little clearer by mentioning all possible cases in a match / case block
Signed-off-by: Jan Lindemann <jan@janware.com>
2026-06-08 20:41:37 +02:00

670 lines
22 KiB
Python

#
# This source code file is a merge of various build tools and a horrible mess.
#
from __future__ import annotations
import os
import pwd
import re
import sys
from enum import Enum, auto
from functools import lru_cache
from typing import TYPE_CHECKING
from .lib.App import App as Base
from .lib.Distro import Distro
from .lib.log import DEBUG, ERR, log
if TYPE_CHECKING:
import argparse
from typing import TypeAlias
from .lib.ExecContext import ExecContext
from .lib.PackageFilter import PackageFilter
# Meaning of pkg.requires.xxx variables
# build: needs to be built and installed before this can be built
# devel: needs to be installed before this-devel can be installed,
# i.e. before _other_ packages can be built against this
# run: needs to be installed before this-run can be installed,
# i.e. before this and other packages can run with this
# --------------------------------------------------------------------- Helpers
class ResultCache(object):
def __init__(self):
self.__cache = {}
def run(self, func, args):
d = self.__cache
depth = 0
keys = [func.__name__] + args
sz = len(keys)
for k in keys:
if k is None:
k = 'None'
else:
k = str(k)
depth += 1
# log(DEBUG, 'depth = ', depth, 'key = ', k, 'd = ', str(d))
if k in d:
if sz == depth:
return d[k]
d = d[k]
continue
if sz == depth:
r = func(*args)
d[k] = r
return r
d = d[k] = {}
# d = d[k]
raise Exception(
'cache algorithm failed for function', func.__name__, 'in depth', depth
)
class Scope(Enum):
Self = auto()
One = auto()
Subtree = auto()
Graph: TypeAlias = dict[str, set[str]]
# ----------------------------------------------------------------- class App
class App(Base):
def __format_topdir(self, path: None | str, fmt: str) -> str | None:
if path is None:
return None
match fmt:
case 'unaltered':
return path
case None | 'absolute':
return os.path.abspath(path)
case _:
m = re.search(r'^make:(\S+)$', fmt)
if m is None:
raise Exception(
f'Can\'t interpret "{fmt}" as valid topdir reference, '
'expecting "unaltered", "absolute", or "make:<variable-name>"'
)
return '$(' + m.group(1) + ')'
@property
def __topdir(self) -> str:
if self.___topdir is None:
raise Exception('Tried to access undefined top directory')
return self.___topdir
@property
def __pretty_topdir(self) -> str:
if self.___pretty_topdir is None:
raise Exception('Tried to access undefined pretty top directory')
return self.___pretty_topdir
def __proj_dir(self, name: str, pretty: bool) -> str | None:
if name == self.__top_name:
if pretty:
return self.__pretty_topdir
return self.__topdir
for d in [self.__projs_root, '/opt']:
ret = d + '/' + name
if os.path.exists(ret):
return ret
if os.path.exists(f'/usr/share/doc/packages/{name}/VERSION'):
# The package exists but does not have a dedicated project directory
return None
raise Exception('No project path found for module "{}"'.format(name))
def __find_dir(
self,
name: str,
search_subdirs: list[str] = [],
search_absdirs: list[str] = [],
pretty: bool = True,
) -> str | None:
def format_pd(name: str, pd: str, pretty: bool):
if not pretty:
return pd
if self.__topdir_fmt == 'absolute':
return os.path.abspath(pd)
if self.__topdir_fmt == 'unaltered':
return pd
if name == self.__top_name:
return self.__pretty_topdir
raise NotImplementedError(
f'Tried to pretty-format directory {pd}, not implemented'
)
pd = self.__proj_dir(name, False)
if pd is None:
return None
if not search_subdirs and not search_absdirs:
return format_pd(name, pd, pretty)
for sd in search_subdirs:
path = pd + '/' + sd
if os.path.isdir(path):
ret = format_pd(name, pd, pretty)
if sd and sd[0] != '/':
ret += '/'
ret += sd
return ret
for ret in search_absdirs:
if os.path.isdir(ret):
return ret
return None
def __get_project_refs_cached(
self, buf, visited, spec, section, key, add_self, scope, names_only
):
return self.__res_cache.run(
self.__get_project_refs,
[buf, visited, spec, section, key, add_self, scope, names_only],
)
def __get_project_refs(
self,
buf: list[str],
visited: set[str],
spec: str,
section: str,
key: str,
add_self: bool,
scope: Scope,
names_only: bool,
) -> None:
name = self.strip_module_from_spec(spec)
if names_only:
spec = name
if spec in buf:
return
if spec in visited:
if add_self:
buf.append(spec)
return
visited.add(spec)
vals = self.get_value(name, section, key)
log(
DEBUG,
(
f'name={name}, section={section}, key={key}, deps={vals}, '
f'scope={scope.name}, visited={visited}'
),
)
vals_list = vals.split(',') if vals else []
match scope:
case Scope.Self:
buf += vals_list
case Scope.One | Scope.Subtree:
subscope = scope.Self if scope == Scope.One else scope
for val in vals_list:
val = val.strip()
if not (len(val)):
continue
self.__get_project_refs_cached(
buf,
visited,
val,
section,
key,
add_self = True,
scope = subscope,
names_only = names_only,
)
if add_self:
buf.append(spec)
def __read_dep_graph(
self,
projects: list[str],
sections: str | list[str],
graph: Graph,
) -> None:
if isinstance(sections, str):
sections = [sections]
for project in projects:
if project in graph:
continue
for section in sections:
deps = self.get_project_refs(
[project],
['pkg.requires.jw'],
sections,
scope = Scope.One,
add_self = False,
names_only = True,
)
if deps is None:
continue
graph[project] = set(deps)
for dep in deps:
self.__read_dep_graph([dep], sections, graph)
def __flip_dep_graph(self, graph: Graph):
ret: Graph = {}
for project, deps in graph.items():
for d in deps:
if d not in ret:
ret[d] = set()
ret[d].add(project)
return ret
def __find_circular_deps_recursive(
self,
project: str,
graph: Graph,
unvisited: list[str],
temp: set[str],
path: list[str],
) -> str | None:
if project in temp:
log(DEBUG, 'found circular dependency at project', project)
return project
if project not in unvisited:
return None
temp.add(project)
if project in graph:
for dep in graph[project]:
last = self.__find_circular_deps_recursive(
dep, graph, unvisited, temp, path
)
if last is not None:
path.insert(0, dep)
return last
unvisited.remove(project)
temp.remove(project)
return None
def __find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool:
graph: Graph = {}
ret: list[str] = []
self.__read_dep_graph(projects, flavours, graph)
unvisited = list(graph.keys())
temp: set[str] = set()
while unvisited:
project = unvisited[0]
log(DEBUG, 'Checking circular dependency of', project)
last = self.__find_circular_deps_recursive(
project, self.__flip_dep_graph(graph), unvisited, temp, ret
)
if last is not None:
log(DEBUG, f'Found circular dependency below {project}, last is {last}')
return True
return False
def __init__(self, distro: Distro | None = None) -> None:
super().__init__('jw-pkg swiss army knife', modules = ['jw.pkg.cmds'])
# -- Members without default values
self.__opt_interactive: bool | None = None
self.__opt_verbose: bool | None = None
self.__top_name: str | None = None
self.__distro = distro
self.__res_cache = ResultCache()
self.___topdir: str | None = None
self.___pretty_topdir: str | None = None
self.__exec_context: ExecContext | None = None
# -- Members with default values
self.__topdir_fmt = 'absolute'
self.__projs_root = pwd.getpwuid(os.getuid()).pw_dir + '/local/src/jw.dev/proj'
self.__pretty_projs_root = None
async def __init_async(self) -> None:
if self.__distro is None:
pkg_filter_str = self.args.pkg_filter
if pkg_filter_str is None:
pkg_filter_str = os.getenv('JW_DEFAULT_PKG_FILTER')
pkg_filter: PackageFilter | None = None
if pkg_filter_str is not None:
from .lib.PackageFilter import PackageFilterString
pkg_filter = PackageFilterString(pkg_filter_str)
self.__distro = await Distro.instantiate(
ec = self.exec_context,
id = self.args.distro_id,
default_pkg_filter = pkg_filter,
)
async def __aexit__(self, exc_type, exc, tb) -> None:
if self.__exec_context is not None:
await self.__exec_context.close()
self.__exec_context = None
def _add_arguments(self, parser) -> None:
super()._add_arguments(parser)
parser.add_argument('-t', '--topdir', default = None, help = 'Project Path')
parser.add_argument(
'--topdir-format',
default = 'absolute',
help = (
'Output references to topdir as one of "make:<var-name>", '
'"unaltered", "absolute". Absolute topdir by default'
),
)
parser.add_argument(
'-p',
'--prefix',
default = None,
help = 'Parent directory of project source directories',
)
parser.add_argument(
'--distro-id',
default = None,
help = 'Distribution ID (default is taken from /etc/os-release)',
)
parser.add_argument(
'--interactive',
choices = ['true', 'false', 'auto'],
default = 'true',
help = 'Wait for user input or try to proceed unattended',
)
parser.add_argument(
'--verbose',
action = 'store_true',
default = False,
help = "Be verbose on stderr about what's being done on the distro level",
)
parser.add_argument(
'--target', default = 'local', help = 'Run commands on this host'
)
parser.add_argument(
'--pkg-filter',
help = 'Default filter for all distribution package-related operations',
)
async def _run(self, args: argparse.Namespace) -> None:
self.___topdir = args.topdir
self.___pretty_topdir = self.__format_topdir(self.___topdir, args.topdir_format)
self.__topdir_fmt = args.topdir_format
if self.___topdir is not None:
self.__top_name = self.read_value(
self.___topdir + '/make/project.conf', 'build', 'name'
)
if not self.__top_name:
self.__top_name = re.sub(
'-[0-9.-]*$',
'',
os.path.basename(os.path.realpath(self.___topdir))
)
if args.prefix is not None:
self.__projs_root = args.prefix
self.__pretty_projs_root = args.prefix
await self.__init_async()
await super()._run(args)
@property
def interactive(self) -> bool:
if self.__opt_interactive is None:
match self.args.interactive:
case 'true':
self.__opt_interactive = True
case 'false':
self.__opt_interactive = False
case 'auto':
self.__opt_interactive = sys.stdin.isatty()
case _:
raise ValueError(
f'Unknown --interactive value: {self.args.interactive}'
)
# Not logically possible to fail, but this keeps pyright happy
assert self.__opt_interactive is not None
return self.__opt_interactive
@property
def verbose(self) -> bool:
if self.__opt_verbose is None:
self.__opt_verbose = self.args.verbose
# Not logically possible to fail, but this keeps pyright happy
assert self.__opt_verbose is not None
return self.__opt_verbose
@property
def exec_context(self) -> ExecContext:
if self.__exec_context is None:
from .lib.ExecContext import ExecContext
self.__exec_context = ExecContext.create(
self.args.target,
interactive = self.interactive,
verbose_default = self.verbose,
)
return self.__exec_context
@property
def top_name(self):
return self.__top_name
@property
def projs_root(self):
return self.__projs_root
@property
def distro(self) -> Distro:
if self.__distro is None:
raise Exception('No distro object')
return self.__distro
def find_dir(
self,
name: str,
search_subdirs: list[str] = [],
search_absdirs: list[str] = [],
pretty: bool = True,
throw: bool = False,
) -> str | None:
ret = self.__find_dir(name, search_subdirs, search_absdirs, pretty)
if ret is not None:
return ret
if not throw:
return None
msg = f'Failed to find directory for "{name}":'
log(ERR, msg)
for search_name, search in [
('subdirs', search_subdirs),
('absdirs', search_absdirs),
]:
if search:
log(ERR, f'Searched {search_name}:')
for d in search:
log(ERR, f' - {d}')
raise FileNotFoundError(msg)
# TODO: add support for customizing this in project.conf
def htdocs_dir(self, project: str) -> str | None:
return self.find_dir(
project,
['/src/html/htdocs', '/tools/html/htdocs', '/htdocs'],
['/srv/www/proj/' + project],
)
# TODO: add support for customizing this in project.conf
def tmpl_dir(self, name: str) -> str | None:
return self.find_dir(name, ['/tmpl'], ['/opt/' + name + '/share/tmpl'])
def strip_module_from_spec(self, mod):
return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip())
@lru_cache(maxsize = None)
def get_section(self, path: str, section: str) -> str:
ret = ''
pat = '[' + section + ']'
in_section = False
file = open(path)
for line in file:
if line.rstrip() == pat:
in_section = True
continue
if in_section:
if len(line) and line[0] == '[':
break
ret += line
file.close()
return ret.rstrip()
@lru_cache(maxsize = None)
def read_value(self, path: str, section: str, key: str) -> str | None:
def scan_section(f, key: str) -> str | None:
if key is None:
ret = ''
for line in f:
if len(line) and line[0] == '[':
break
ret += line
return ret if len(ret) else None
lines: list[str] = []
cont_line = ''
for line in f:
if len(line) and line[0] == '[':
break
cont_line += line.rstrip()
if len(cont_line) and cont_line[-1] == '\\':
cont_line = cont_line[0:-1]
continue
lines.append(cont_line)
cont_line = ''
rx = re.compile(r'^\s*' + key + r'\s*=\s*(.*)\s*$')
for line in lines:
# log(DEBUG, ' looking for "%s" in line="%s"' % (key, line))
m = re.search(rx, line)
if m is not None:
return m.group(1)
return None
def scan_section_debug(f, key: str) -> str | None:
ret = scan_section(f, key)
# log(DEBUG, ' returning', rr)
return ret
try:
# log(DEBUG, 'looking for {}::[{}].{}'.format(path, section, key))
# TODO: Parse
with open(path, 'r') as f:
if not len(section):
return scan_section(f, key)
pat = '[' + section + ']'
for line in f:
if line.rstrip() == pat:
return scan_section(f, key)
return None
except Exception:
log(DEBUG, f'Not found: {path}')
# TODO: handle this special case cleaner somewhere up the stack
if section == 'build' and key == 'libname':
return 'none'
return None
@lru_cache(maxsize = None)
def get_value(self, project: str, section: str, key: str) -> str | None:
ret: str | None
proj_dir = self.__proj_dir(project, pretty = False)
if proj_dir is None:
raise Exception(f"Can't get project directory for {project}")
if section == 'version':
proj_version_dirs = [proj_dir]
if proj_dir != self.___topdir:
proj_version_dirs.append('/usr/share/doc/packages/' + project)
for d in proj_version_dirs:
version_path = d + '/VERSION'
try:
with open(version_path) as fd:
ret = fd.read().replace('\n', '').replace('-dev', '')
fd.close()
return ret
except EnvironmentError:
log(DEBUG, f'Ignoring unreadable file "{version_path}"')
continue
raise Exception(f'No version file found for project "{project}"')
path = proj_dir + '/make/project.conf'
ret = self.read_value(path, section, key)
log(
DEBUG,
'Lookup %s -> %s / [%s%s] -> "%s"' %
(self.__top_name, project, section, '.' + key if key else '', ret),
)
return ret
@lru_cache(maxsize = None)
def get_version(self, project) -> str:
ret = self.get_value(project, 'version', '')
if ret is None:
raise Exception(f"Can't get version of project {project}")
return ret
def get_values(self, projects: list[str], sections: list[str],
keys: list[str]) -> list[str]:
"""
Collect a list of values from a list of given projects, sections and
keys, maintaining order
"""
ret: list[str] = []
for p in projects:
for section in sections:
for key in keys:
vals = self.get_value(p, section, key)
if vals:
ret += [val.strip() for val in vals.split(',')]
return list(dict.fromkeys(ret)) # Remove duplicates, keep ordering
def get_project_refs(
self,
projects: list[str],
sections: list[str],
keys: str | list[str],
add_self: bool,
scope: Scope,
names_only = True,
) -> list[str]:
if isinstance(keys, str):
keys = [keys]
ret: list[str] = []
for section in sections:
for key in keys:
visited: set[str] = set()
for name in projects:
rr: list[str] = []
self.__get_project_refs_cached(
rr, visited, name, section, key, add_self, scope, names_only
)
# TODO: this looks like a performance hogger
for m in rr:
if m not in ret:
ret.append(m)
return ret
def get_libname(self, projects) -> str:
vals = self.get_project_refs(
projects,
['build'],
'libname',
scope = Scope.One,
add_self = False,
names_only = True,
)
if not vals:
return ' '.join(projects)
if 'none' in vals:
vals.remove('none')
return ' '.join(reversed(vals))
def is_excluded_from_build(self, project: str) -> str | None:
log(DEBUG, 'checking if project ' + project + ' is excluded from build')
exclude = self.get_value(project, 'build', 'exclude')
if exclude is None:
return None
exclude_arr = re.split(r'[, ]+', exclude)
cascade = self.distro.os_cascade + ['all']
intersection = [x for x in cascade if x in set(exclude_arr)]
if intersection:
return ', '.join(intersection)
return None
def find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool:
return self.__find_circular_deps(projects, flavours)