# -*- coding: utf-8 -*- # # This source code file is a merge of various build tools and a horrible mess. # from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import TypeAlias import os, sys, pwd, re import os, sys, argparse, pwd, re from functools import lru_cache from enum import Enum, auto from .lib.App import App as Base from .lib.log import * from .lib.Distro import Distro # Meaning of pkg.requires.xxx variables # build: needs to be built and installed before this can be built # devel: needs to be installed before this-devel can be installed, i.e. before _other_ packages can be built against this # run: needs to be installed before this-run can be installed, i.e. before this and other packages can run with this # --------------------------------------------------------------------- Helpers class ResultCache(object): def __init__(self): self.__cache = {} def run(self, func, args): d = self.__cache depth = 0 keys = [ func.__name__ ] + args l = len(keys) for k in keys: if k is None: k = 'None' else: k = str(k) depth += 1 #log(DEBUG, 'depth = ', depth, 'key = ', k, 'd = ', str(d)) if k in d: if l == depth: return d[k] d = d[k] continue if l == depth: r = func(*args) d[k] = r return r d = d[k] = {} #d = d[k] raise Exception("cache algorithm failed for function", func.__name__, "in depth", depth) class Scope(Enum): Self = auto() One = auto() Subtree = auto() Graph: TypeAlias = dict[str, set[str]] # ----------------------------------------------------------------- class App class App(Base): def __format_topdir(self, topdir: None|str, fmt: str) -> str: if topdir is None: return None match fmt: case 'unaltered': return topdir case None | 'absolute': return os.path.abspath(self.__topdir) case _: m = re.search(r'^make:(\S+)$', fmt) if m is None: raise Exception(f'Can\'t interpret "{fmt}" as valid topdir ' + 'reference, expecting "unaltered", "absolute", or "make:"') return '$(' + m.group(1) + ')' def __proj_dir(self, name: str, pretty) -> str: if name == self.__top_name: if pretty: return self.__pretty_topdir return self.__topdir for d in [ self.__projs_root, '/opt' ]: ret = d + '/' + name if os.path.exists(ret): return ret if os.path.exists(f'/usr/share/doc/packages/{name}/VERSION'): # The package exists but does not have a dedicated project directory return None raise Exception('No project path found for module "{}"'.format(name)) def __find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True) -> str|None: def format_pd(name: str, pd: str, pretty: bool): if not pretty: return pd if self.__topdir_fmt == 'absolute': return os.path.abspath(pd) if self.__topdir_fmt == 'unaltered': return pd if name == self.__top_name: return self.__pretty_topdir raise NotImplementedError(f'Tried to pretty-format directory {pd}, not implemented') pd = self.__proj_dir(name, False) if pd is None: return None if not search_subdirs and not search_absdirs: return format_pd(name, pd, pretty) for sd in search_subdirs: path = pd + '/' + sd if os.path.isdir(path): ret = format_pd(name, pd, pretty) if sd and sd[0] != '/': ret += '/' ret += sd return ret for ret in search_absdirs: if os.path.isdir(ret): return ret return None def __get_project_refs_cached(self, buf, visited, spec, section, key, add_self, scope, names_only): return self.__res_cache.run(self.__get_project_refs, [buf, visited, spec, section, key, add_self, scope, names_only]) def __get_project_refs(self, buf: list[str], visited: set[str], spec: str, section: str, key: str, add_self: bool, scope: Scope, names_only: bool) -> None: name = self.strip_module_from_spec(spec) if names_only: spec = name if spec in buf: return if spec in visited: if add_self: buf.append(spec) return visited.add(spec) deps = self.get_value(name, section, key) log(DEBUG, "name = ", name, "section = ", section, "key = ", key, "deps = ", deps, "scope = ", scope.name, "visited = ", visited) if deps and scope != Scope.Self: if scope == Scope.One: subscope = Scope.Self else: subscope = Scope.Subtree deps = deps.split(',') for dep in deps: dep = dep.strip() if not(len(dep)): continue self.__get_project_refs_cached(buf, visited, dep, section, key, add_self=True, scope=subscope, names_only=names_only) if add_self: buf.append(spec) def __read_dep_graph(self, projects: list[str], section: str, graph: Graph) -> None: for project in projects: if project in graph: continue deps = self.get_project_refs([ project ], ['pkg.requires.jw'], section, scope = Scope.One, add_self=False, names_only=True) if not deps is None: graph[project] = set(deps) for dep in deps: self.__read_dep_graph([ dep ], section, graph) def __flip_dep_graph(self, graph: Graph): ret: Graph = {} for project, deps in graph.items(): for d in deps: if not d in ret: ret[d] = set() ret[d].add(project) return ret def __find_circular_deps_recursive(self, project: str, graph: Graph, unvisited: list[str], temp: set[str], path: str) -> str|None: if project in temp: log(DEBUG, 'found circular dependency at project', project) return project if not project in unvisited: return None temp.add(project) if project in graph: for dep in graph[project]: last = self.__find_circular_deps_recursive(dep, graph, unvisited, temp, path) if last is not None: path.insert(0, dep) return last unvisited.remove(project) temp.remove(project) def __find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool: graph: Graph = {} ret: list[str] = [] self.__read_dep_graph(projects, flavours, graph) unvisited = list(graph.keys()) temp: set[str] = set() while unvisited: project = unvisited[0] log(DEBUG, 'Checking circular dependency of', project) last = self.__find_circular_deps_recursive(project, self.__flip_dep_graph(graph), unvisited, temp, ret) if last is not None: log(DEBUG, f'Found circular dependency below {project}, last is {last}') return True return False def __init__(self) -> None: super().__init__("jw-pkg swiss army knife", modules=["jw.pkg.cmds"]) # -- Members without default values self.__opt_os: str|None = None self.__opt_interactive: bool|None = None self.__opt_verbose: bool|None = None self.__top_name: str|None = None self.__os_release: str|None = None self.__distro_id: str|None = None self.__distro_name: str|None = None self.__distro_codename: str|None = None self.__distro: Distro|None = None self.__os_cascade: list[str]|None = None self.__res_cache = ResultCache() self.__topdir: str|None = None self.__pretty_topdir: str|None = None self.__exec_context: ExecContext|None = None # -- Members with default values self.__topdir_fmt = 'absolute' self.__projs_root = pwd.getpwuid(os.getuid()).pw_dir + "/local/src/jw.dev/proj" self.__pretty_projs_root = None def _add_arguments(self, parser) -> None: super()._add_arguments(parser) parser.add_argument('-t', '--topdir', default = None, help='Project Path') parser.add_argument('--topdir-format', default = 'absolute', help='Output references to topdir as ' + 'one of "make:", "unaltered", "absolute". Absolute topdir by default') parser.add_argument('-p', '--prefix', default = None, help='Parent directory of project source directories') parser.add_argument('-O', '--os', default = None, help='Target operating system') parser.add_argument('--distro-id', default=None, help='Distribution ID (default is taken from /etc/os-release)') parser.add_argument('--interactive', choices=['true', 'false', 'auto'], default='true', help="Wait for user input or try to proceed unattended") parser.add_argument('--verbose', action='store_true', default=False, help="Be verbose on stderr about what's being done on the distro level") parser.add_argument('--uri', default='local', help="Run commands on this host") async def _run(self, args: argparse.Namespace) -> None: self.__opt_os = args.os self.__topdir = args.topdir self.__pretty_topdir = self.__format_topdir(self.__topdir, args.topdir_format) self.__topdir_fmt = args.topdir_format if self.__topdir is not None: self.__top_name = self.read_value(self.__topdir + '/make/project.conf', 'build', 'name') if not self.__top_name: self.__top_name = re.sub('-[0-9.-]*$', '', os.path.basename(os.path.realpath(self.__topdir))) if args.prefix is not None: self.__projs_root = args.prefix self.__pretty_projs_root = args.prefix return await super()._run(args) @property def interactive(self) -> bool: if self.__opt_interactive is None: match self.args.interactive: case 'true': self.__opt_interactive = True case 'false': self.__opt_interactive = False case 'auto': self.__opt_interactive = sys.stdin.isatty() return self.__opt_interactive @property def verbose(self) -> bool: if self.__opt_verbose is None: self.__opt_verbose = self.args.verbose return self.__opt_verbose @property def exec_context(self) -> str: if self.__exec_context is None: from .lib.ExecContext import ExecContext self.__exec_context = ExecContext.create(self.args.uri, interactive=self.interactive, verbose_default=self.verbose) return self.__exec_context @property def top_name(self): return self.__top_name @property def projs_root(self): return self.__projs_root @property def os_release(self) -> str: if self.__os_release is None: os_release = '/etc/os-release' with open(os_release, 'r') as file: self.__os_release = file.read() return self.__os_release def os_release_field(self, key: str, throw: bool=False) -> str: m = re.search(r'^\s*' + key + r'\s*=\s*("?)([^"\n]+)\1\s*$', self.os_release, re.MULTILINE) if m is None: if throw: raise Exception(f'Could not read "{key}=" from /etc/os-release') return None return m.group(2) @property def distro_name(self) -> str: if self.__distro_name is None: self.__distro_name = self.os_release_field('NAME', throw=True) return self.__distro_name @property def distro_id(self) -> str: if self.__distro_id is None: ret = self.args.distro_id # The distribution ID requested by the command line if ret is None: # The ID of the distribution we run on ret = self.os_release_field('ID', throw=True) match ret: case 'opensuse-tumbleweed': ret = 'suse' case 'kali': ret = 'kali' self.__distro_id = ret return self.__distro_id @property def distro_codename(self) -> str: match self.distro_id: case 'suse': self.__distro_codename = \ self.os_release_field('ID', throw=True).split('-')[1] case 'kali': self.__distro_codename = \ self.os_release_field('VERSION_CODENAME', throw=True).split('-')[1] case _: self.__distro_codename = \ self.os_release_field('VERSION_CODENAME', throw=True) return self.__distro_codename @property def distro_cascade(self) -> str: return ' '.join(self.os_cascade()) @property def distro_gnu_triplet(self) -> str: import sysconfig import shutil import subprocess # Best: GNU host triplet Python was built for for key in ("HOST_GNU_TYPE", "BUILD_GNU_TYPE"): # BUILD_GNU_TYPE can exist too ret = sysconfig.get_config_var(key) if isinstance(ret, str) and ret: return ret # Common on Debian/Ubuntu: multiarch component (often looks like a triplet) ret = sysconfig.get_config_var("MULTIARCH") if isinstance(ret, str) and ret: return ret # Sometimes exposed (privately) by CPython ret = getattr(sys.implementation, "_multiarch", None) if isinstance(ret, str) and ret: return ret # Last resort: ask the system compiler for cc in ("gcc", "cc", "clang"): path = shutil.which(cc) if not path: continue try: ret = subprocess.check_output([path, "-dumpmachine"], text=True, stderr=subprocess.DEVNULL).strip() if ret: return ret except Exception: pass raise RuntimeError('Failed to get GNU triplet from running machine') @property def distro(self) -> Distro: if self.__distro is None: ret = Distro.instantiate(self.distro_id, ec=self.exec_context) self.__distro = ret return self.__distro def find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True): return self.__find_dir(name, search_subdirs, search_absdirs, pretty) @lru_cache(maxsize=None) def get_os(self) -> str: return self.distro_id + '-' + self.distro_codename # TODO: add support for customizing this in project.conf def htdocs_dir(self, project: str) -> str: return self.find_dir(project, ["/src/html/htdocs", "/tools/html/htdocs", "/htdocs"], ["/srv/www/proj/" + project]) # TODO: add support for customizing this in project.conf def tmpl_dir(self, name: str) -> str: return self.find_dir(name, ["/tmpl"], ["/opt/" + name + "/share/tmpl"]) def os_cascade(self) -> list[str]: def __append(entry: str): if not entry in ret: ret.append(entry) import platform if self.__os_cascade is None: ret = [ 'os', platform.system().lower() ] match self.distro_id: case 'centos': __append('pkg-rpm') __append('pm-yum') __append('redhat') __append('rhel') case 'fedora' | 'rhel': __append('pkg-rpm') __append('pm-yum') __append('redhat') case 'suse': __append('pkg-rpm') __append('pm-zypper') case 'kali' | 'raspbian': __append('pkg-debian') __append('pm-apt') __append('debian') case 'ubuntu': __append('pkg-debian') __append('pm-apt') case 'archlinux': __append('pkg-pm') __append('pm-pacman') os = self.__opt_os if self.__opt_os is not None else self.get_os() name = re.sub(r'-.*', '', os) series = os rx = re.compile(r'\.[0-9]+$') while True: n = re.sub(rx, '', series) if n == series: break ret.append(n) series = n __append(name) __append(os) __append(self.distro_id) # e.g. os, linux, suse, suse-tumbleweed self.__os_cascade = ret return self.__os_cascade def strip_module_from_spec(self, mod): return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip()) @lru_cache(maxsize=None) def get_section(self, path: str, section: str) -> str: ret = '' pat = '[' + section + ']' in_section = False file = open(path) for line in file: if (line.rstrip() == pat): in_section = True continue if in_section: if len(line) and line[0] == '[': break ret += line file.close() return ret.rstrip() @lru_cache(maxsize=None) def read_value(self, path: str, section: str, key: str) -> str|None: def scan_section(f, key: str) -> str|None: if key is None: ret = '' for line in f: if len(line) and line[0] == '[': break ret += line return ret if len(ret) else None lines: list[str] = [] cont_line = '' for line in f: if len(line) and line[0] == '[': break cont_line += line.rstrip() if len(cont_line) and cont_line[-1] == '\\': cont_line = cont_line[0:-1] continue lines.append(cont_line) cont_line = '' rx = re.compile(r'^\s*' + key + r'\s*=\s*(.*)\s*$') for line in lines: #log(DEBUG, " looking for >%s< in line=>%s<" % (key, line)) m = re.search(rx, line) if m is not None: return m.group(1) return None def scan_section_debug(f, key: str) -> str|None: ret = scan_section(f, key) #log(DEBUG, " returning", rr) return ret try: #log(DEBUG, "looking for {}::[{}].{}".format(path, section, key)) with open(path, 'r') as f: if not len(section): rr = scan_section(f, key) pat = '[' + section + ']' for line in f: if line.rstrip() == pat: return scan_section(f, key) return None except: log(DEBUG, path, "not found") # TODO: handle this special case cleaner somewhere up the stack if section == 'build' and key == 'libname': return 'none' return None @lru_cache(maxsize=None) def get_value(self, project: str, section: str, key: str) -> str: if self.__top_name and project == self.__top_name: proj_root = self.__topdir else: proj_root = self.__projs_root + '/' + project if section == 'version': proj_version_dirs = [ proj_root ] if proj_root != self.__topdir: proj_version_dirs.append('/usr/share/doc/packages/' + project) for d in proj_version_dirs: version_path = d + '/VERSION' try: with open(version_path) as fd: ret = fd.read().replace('\n', '').replace('-dev', '') fd.close() return ret except EnvironmentError: log(DEBUG, f'"Ignoring unreadable file "{version_path}"') continue raise Exception(f'No version file found for project "{project}"') path = proj_root + '/make/project.conf' ret = self.read_value(path, section, key) log(DEBUG, "Lookup %s -> %s / %s%s -> \"%s\"" % (self.__top_name, project, section, '.' + key if key else '', ret)) return ret def get_values(self, projects: list[str], sections: list[str], keys: list[str]) -> list[str]: """ Collect a list of values from a list of given projects, sections and keys, maintaining order """ ret: list[str] = [] for p in projects: for section in sections: for key in keys: vals = self.get_value(p, section, key) if vals: ret += [val.strip() for val in vals.split(",")] return list(dict.fromkeys(ret)) # Remove duplicates, keep ordering def get_project_refs(self, projects: list[str], sections: list[str], keys: str|list[str], add_self: bool, scope: Scope, names_only=True) -> list[str]: if isinstance(keys, str): keys = [ keys ] ret: list[str] = [] for section in sections: for key in keys: visited = set() for name in projects: rr: list[str] = [] self.__get_project_refs_cached(rr, visited, name, section, key, add_self, scope, names_only) # TODO: this looks like a performance hogger for m in rr: if not m in ret: ret.append(m) return ret def get_libname(self, projects) -> str: vals = self.get_project_refs(projects, ['build'], 'libname', scope = Scope.One, add_self=False, names_only=True) if not vals: return ' '.join(projects) if 'none' in vals: vals.remove('none') return ' '.join(reversed(vals)) def is_excluded_from_build(self, project: str) -> str|None: log(DEBUG, "checking if project " + project + " is excluded from build") exclude = self.get_project_refs([ project ], ['build'], 'exclude', scope = Scope.One, add_self=False, names_only=True) cascade = self.os_cascade() + [ 'all' ] for p1 in exclude: for p2 in cascade: if p1 == p2: return p1 return None def find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool: return self.__find_circular_deps(projects, flavours)