# -*- coding: utf-8 -*- # # This source code file is a merge of various build tools and a horrible mess. # import os, sys, argparse, pwd, re from functools import lru_cache from .lib.App import App as Base from .lib.log import * # meaning of pkg.requires.xxx variables # build: needs to be built and installed before this can be built # devel: needs to be installed before this-devel can be installed, i.e. before _other_ packages can be built against this # run: needs to be installed before this-run can be installed, i.e. before this and other packages can run with this # --------------------------------------------------------------------- helpers class ResultCache(object): def __init__(self): self.__cache = {} def run(self, func, args): d = self.__cache depth = 0 keys = [ func.__name__ ] + args l = len(keys) for k in keys: if k is None: k = 'None' else: k = str(k) depth += 1 #log(DEBUG, 'depth = ', depth, 'key = ', k, 'd = ', str(d)) if k in d: if l == depth: return d[k] d = d[k] continue if l == depth: r = func(*args) d[k] = r return r d = d[k] = {} #d = d[k] raise Exception("cache algorithm failed for function", func.__name__, "in depth", depth) # ----------------------------------------------------------------- class App class App(Base): def __format_topdir(self, topdir: None|str, fmt: str) -> str: if topdir is None: return None match fmt: case 'unaltered': return topdir case None | 'absolute': return os.path.abspath(self.__topdir) case _: m = re.search(r'^make:(\S+)$', fmt) if m is None: raise Exception(f'Can\'t interpret "{fmt}" as valid topdir ' + 'reference, expecting "unaltered", "absolute", or "make:"') return '$(' + m.group(1) + ')' def __proj_dir(self, name: str, pretty) -> str: if name == self.__top_name: if pretty: return self.__pretty_topdir return self.__topdir for d in [ self.__projs_root, '/opt' ]: ret = d + '/' + name if os.path.exists(ret): return ret if os.path.exists(f'/usr/share/doc/packages/{name}/VERSION'): # The package exists but does not have a dedicated project directory return None raise Exception('No project path found for module "{}"'.format(name)) def __find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True): def format_pd(name: str, pd: str, pretty: bool): if not pretty: return pd if self.__topdir_fmt == 'absolute': return os.path.abspath(pd) if self.__topdir_fmt == 'unaltered': return pd if name == self.__top_name: return self.__pretty_topdir raise NotImplementedError(f'Tried to pretty-format directory {pd}, not implemented') pd = self.__proj_dir(name, False) if pd is None: return None if not search_subdirs and not search_absdirs: return format_pd(name, pd, pretty) for sd in search_subdirs: path = pd + '/' + sd if os.path.isdir(path): ret = format_pd(name, pd, pretty) if sd and sd[0] != '/': ret += '/' ret += sd return ret for ret in search_absdirs: if os.path.isdir(ret): return ret return None def __init__(self): super().__init__("jw-pkg swiss army knife", modules=["jw.pkg.cmds"]) # -- Members without default values self.__opt_os: str|None = None self.__top_name: str|None = None self.__os_cascade: list[str]|None = None self.__res_cache = ResultCache() self.__topdir: str|None = None self.__pretty_topdir: str|None = None # -- Members with default values self.__topdir_fmt = 'absolute' self.__projs_root = pwd.getpwuid(os.getuid()).pw_dir + "/local/src/jw.dev/proj" self.__pretty_projs_root = None def _add_arguments(self, parser): super()._add_arguments(parser) parser.add_argument('-t', '--topdir', default = None, help='Project Path') parser.add_argument('--topdir-format', default = 'absolute', help='Output references to topdir as ' + 'one of "make:", "unaltered", "absolute". Absolute topdir by default') parser.add_argument('-p', '--prefix', default = None, help='Parent directory of project source directories') parser.add_argument('-O', '--os', default = None, help='Target operating system') async def _run(self, args: argparse.Namespace) -> None: self.__opt_os = args.os self.__topdir = args.topdir self.__pretty_topdir = self.__format_topdir(self.__topdir, args.topdir_format) self.__topdir_fmt = args.topdir_format if self.__topdir is not None: self.__top_name = self.read_value(self.__topdir + '/make/project.conf', 'build', 'name') if not self.__top_name: self.__top_name = re.sub('-[0-9.-]*$', '', os.path.basename(os.path.realpath(self.__topdir))) if args.prefix is not None: self.__projs_root = args.prefix self.__pretty_projs_root = args.prefix return await super()._run(args) @property def top_name(self): return self.__top_name def find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True): return self.__find_dir(name, search_subdirs, search_absdirs, pretty) def re_section(self, name): return re.compile('[' + name + ']' '.*?' '(?=[)', re.DOTALL) def remove_duplicates(self, seq): seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))] @lru_cache(maxsize=None) def get_os(self, args = ""): import subprocess for d in [ self.__projs_root + '/jw-pkg/scripts', '/opt/jw-pkg/bin' ]: script = d + '/get-os.sh' if os.path.isfile(script): cmd = '/bin/bash ' + script if args: cmd = cmd + ' ' + args p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) (out, rr) = p.communicate() if rr: log(ERR, "failed to run ", cmd) continue out = re.sub('\n', '', out.decode('utf-8')) return out return "linux" # TODO: add support for customizing this in project.conf def htdocs_dir(self, name: str) -> str: return self.find_dir(name,["/src/html/htdocs", "/tools/html/htdocs", "/htdocs"], ["/srv/www/proj/" + name]) # TODO: add support for customizing this in project.conf def tmpl_dir(self, name: str) -> str: return self.find_dir(name, ["/tmpl"], ["/opt/" + name + "/share/tmpl"]) def os_cascade(self): import platform if self.__os_cascade is not None: return self.__os_cascade.copy() r = [ 'os', platform.system().lower() ] os = self.__opt_os if self.__opt_os is not None else self.get_os() name = re.sub('-.*', '', os) series = os while True: n = re.sub(r'\.[0-9]+$', '', series) if n == series: break r.append(n) series = n if not name in r: r.append(name) if not os in r: r.append(os) # e.g. os, linux, suse, suse-tumbleweed #return [ 'os', platform.system().lower(), name, os ] self.__os_cascade = r return r def strip_module_from_spec(self, mod): return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip()) def get_section(self, path, section): r = '' file = open(path) pat = '[' + section + ']' in_section = False for line in file: if (line.rstrip() == pat): in_section = True continue if in_section: if len(line) and line[0] == '[': break r = r + line file.close() return r.rstrip() @lru_cache(maxsize=None) def read_value(self, path, section, key): def scan_section(f, key): if key is None: r = '' for line in f: if len(line) and line[0] == '[': break r += line return r if len(r) else None lines = [] cont_line = '' for line in f: if len(line) and line[0] == '[': break cont_line += line.rstrip() if len(cont_line) and cont_line[-1] == '\\': cont_line = cont_line[0:-1] continue lines.append(cont_line) cont_line = '' for line in lines: #log(DEBUG, " looking for >%s< in line=>%s<" % (key, line)) rr = re.findall('^ *' + key + ' *= *(.*)', line) if len(rr) > 0: return rr[0] return None def scan_section_debug(f, key): rr = scan_section(f, key) #log(DEBUG, " returning", rr) return rr try: #log(DEBUG, "looking for {}::[{}].{}".format(path, section, key)) with open(path, 'r') as f: if not len(section): rr = scan_section(f, key) pat = '[' + section + ']' for line in f: if line.rstrip() == pat: return scan_section(f, key) return None except: log(DEBUG, path, "not found") # TODO: handle this special case cleaner somewhere up the stack if section == 'build' and key == 'libname': return 'none' return None def get_value(self, name, section, key): log(DEBUG, "getting value [%s].%s for project %s (%s)" %(section, key, name, self.__top_name)) if self.__top_name and name == self.__top_name: proj_root = self.__topdir else: proj_root = self.__projs_root + '/' + name log(DEBUG, "proj_root = " + proj_root) if section == 'version': proj_version_dirs = [ proj_root ] if proj_root != self.__topdir: proj_version_dirs.append('/usr/share/doc/packages/' + name) for d in proj_version_dirs: version_path = d + '/VERSION' try: with open(version_path) as fd: r = fd.read().replace('\n', '').replace('-dev', '') fd.close() return r except EnvironmentError: log(DEBUG, "ignoring unreadable file " + version_path) continue raise Exception("No version file found for project \"" + name + "\"") path = proj_root + '/make/project.conf' #print('path = ', path, 'self.__top_name = ', self.__top_name, 'name = ', name) return self.read_value(path, section, key) def collect_values(self, names, section, key): r = "" for n in names: val = self.get_value(n, section, key) if val: r = r + " " + val return self.remove_duplicates([x.strip() for x in r.split(",")]) # scope 0: no children # scope 1: children # scope 2: recursive def add_modules_from_project_txt_cached(self, buf, visited, spec, section, key, add_self, scope, names_only): return self.__res_cache.run(self.add_modules_from_project_txt, [buf, visited, spec, section, key, add_self, scope, names_only]) def add_modules_from_project_txt(self, buf, visited, spec, section, key, add_self, scope, names_only): name = self.strip_module_from_spec(spec) if names_only: spec = name if spec in buf: return if spec in visited: if add_self: buf.append(spec) return visited.add(spec) deps = self.get_value(name, section, key) log(DEBUG, "name = ", name, "section = ", section, "key = ", key, "deps = ", deps, "scope = ", scope, "visited = ", visited) if deps and scope > 0: if scope == 1: subscope = 0 else: subscope = 2 deps = deps.split(',') for dep in deps: dep = dep.strip() if not(len(dep)): continue self.add_modules_from_project_txt_cached(buf, visited, dep, section, key, add_self=True, scope=subscope, names_only=names_only) if add_self: buf.append(spec) def get_modules_from_project_txt(self, names, sections, keys, add_self, scope, names_only = True): if isinstance(keys, str): keys = [ keys ] #r = set() r = [] for section in sections: for key in keys: visited = set() for name in names: rr = [] self.add_modules_from_project_txt_cached(rr, visited, name, section, key, add_self, scope, names_only) # TODO: this looks like a performance hogger for m in rr: if not m in r: r.append(m) return r def get_libname(self, names): vals = self.get_modules_from_project_txt(names, ['build'], 'libname', scope = 1, add_self=False, names_only=True) if not vals: return ' '.join(names) if 'none' in vals: vals.remove('none') return ' '.join(reversed(vals)) def is_excluded_from_build(self, module): log(DEBUG, "checking if module " + module + " is excluded from build") exclude = self.get_modules_from_project_txt([ module ], ['build'], 'exclude', scope = 1, add_self=False, names_only=True) cascade = self.os_cascade() + [ 'all' ] for p1 in exclude: for p2 in cascade: if p1 == p2: return p1 return None def contains(self, small, big): for i in xrange(len(big)-len(small)+1): for j in xrange(len(small)): if big[i+j] != small[j]: break else: return i, i+len(small) return False def read_dep_graph(self, modules, section, graph): for m in modules: if m in graph: continue deps = self.get_modules_from_project_txt([ m ], ['pkg.requires.jw'], section, scope = 1, add_self=False, names_only=True) if not deps is None: graph[m] = deps for d in deps: self.read_dep_graph([ d ], section, graph) def flip_graph(self, graph): r = {} for m, deps in graph.items(): for d in deps: if not d in r: r[d] = set() r[d].add(m) return r def check_circular_deps(self, module, section, graph, unvisited, temp, path): if module in temp: log(DEBUG, 'found circular dependency at module', module) return module if not module in unvisited: return None temp.add(module) if module in graph: for m in graph[module]: last = self.check_circular_deps(m, section, graph, unvisited, temp, path) if last is not None: path.insert(0, m) return last unvisited.remove(module) temp.remove(module)