jw-pkg/src/python/jw/pkg/App.py

449 lines
16 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
#
# This source code file is a merge of various build tools and a horrible mess.
#
import os, sys, argparse, pwd, re
from .lib.App import App as Base
from .lib.log import *
# meaning of pkg.requires.xxx variables
# build: needs to be built and installed before this can be built
# devel: needs to be installed before this-devel can be installed, i.e. before _other_ packages can be built against this
# run: needs to be installed before this-run can be installed, i.e. before this and other packages can run with this
# --------------------------------------------------------------------- helpers
class ResultCache(object):
def __init__(self):
self.__cache = {}
def run(self, func, args):
d = self.__cache
depth = 0
keys = [ func.__name__ ] + args
l = len(keys)
for k in keys:
if k is None:
k = 'None'
else:
k = str(k)
depth += 1
#log(DEBUG, 'depth = ', depth, 'key = ', k, 'd = ', str(d))
if k in d:
if l == depth:
return d[k]
d = d[k]
continue
if l == depth:
r = func(*args)
d[k] = r
return r
d = d[k] = {}
#d = d[k]
raise Exception("cache algorithm failed for function", func.__name__, "in depth", depth)
# ----------------------------------------------------------------- class App
class App(Base):
def __format_topdir(self, topdir: None|str, fmt: str) -> str:
if topdir is None:
return None
match fmt:
case 'unaltered':
return topdir
case None | 'absolute':
return os.path.abspath(self.topdir)
case _:
m = re.search(r'^make:(\S+)$', fmt)
if m is None:
raise Exception(f'Can\'t interpret "{fmt}" as valid topdir ' +
'reference, expecting "unaltered", "absolute", or "make:<variable-name>"')
return '$(' + m.group(1) + ')'
def __proj_dir(self, name: str, pretty) -> str:
if name == self.top_name:
if pretty:
return self.__pretty_topdir
return self.topdir
for d in [ self.projs_root, '/opt' ]:
ret = d + '/' + name
if os.path.exists(ret):
return ret
if os.path.exists(f'/usr/share/doc/packages/{name}/VERSION'):
# The package exists but does not have a dedicated project directory
return None
raise Exception('No project path found for module "{}"'.format(name))
def __find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True):
def format_pd(name: str, pd: str, pretty: bool):
if not pretty:
return pd
if self.__topdir_fmt == 'absolute':
return os.path.abspath(pd)
if self.__topdir_fmt == 'unaltered':
return pd
if name == self.top_name:
return self.__pretty_topdir
raise NotImplementedError(f'Tried to pretty-format directory {pd}, not implemented')
pd = self.__proj_dir(name, False)
if pd is None:
return None
if not search_subdirs and not search_absdirs:
return format_pd(name, pd, pretty)
for sd in search_subdirs:
path = pd + '/' + sd
if os.path.isdir(path):
ret = format_pd(name, pd, pretty)
if sd and sd[0] != '/':
ret += '/'
ret += sd
return ret
for ret in search_absdirs:
if os.path.isdir(ret):
return ret
return None
def __init__(self):
super().__init__("jw-pkg swiss army knife", modules=["jw.pkg.cmds"])
self.global_args = []
self.opt_os = None
self.top_name = None
self.glob_os_cascade = None
self.dep_cache = {}
self.my_dir = os.path.dirname(os.path.realpath(__file__))
self.res_cache = ResultCache()
self.__topdir_fmt = 'absolute'
self.topdir = None
self.__pretty_topdir = None
# -- defaults
self.projs_root = pwd.getpwuid(os.getuid()).pw_dir + "/local/src/jw.dev/proj"
self.__pretty_projs_root = None
def _add_arguments(self, parser):
super()._add_arguments(parser)
parser.add_argument('-t', '--topdir', default = None, help='Project Path')
parser.add_argument('--topdir-format', default = 'absolute', help='Output references to topdir as '
+ 'one of "make:<var-name>", "unaltered", "absolute". Absolute topdir by default')
parser.add_argument('-p', '--prefix', default = None,
help='Parent directory of project source directories')
parser.add_argument('-O', '--os', default = None, help='Target operating system')
async def _run(self, args: argparse.Namespace) -> None:
self.opt_os = args.os
self.topdir = args.topdir
self.__pretty_topdir = self.__format_topdir(self.topdir, args.topdir_format)
self.__topdir_fmt = args.topdir_format
if self.topdir is not None:
self.top_name = self.res_cache.run(self.read_value, [self.topdir + '/make/project.conf', 'build', 'name'])
if not self.top_name:
self.top_name = re.sub('-[0-9.-]*$', '', os.path.basename(os.path.realpath(self.topdir)))
if args.prefix is not None:
self.projs_root = args.prefix
self.__pretty_projs_root = args.prefix
#if self.__topdir is not None:
# self.
log(DEBUG, "projs_root = ", self.projs_root)
return await super()._run(args)
def find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True):
return self.__find_dir(name, search_subdirs, search_absdirs, pretty)
def re_section(self, name):
return re.compile('[' + name + ']'
'.*?'
'(?=[)',
re.DOTALL)
def remove_duplicates(self, seq):
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
def get_os(self, args = ""):
import subprocess
for d in [ self.projs_root + '/jw-pkg/scripts', '/opt/jw-pkg/bin' ]:
script = d + '/get-os.sh'
if os.path.isfile(script):
cmd = '/bin/bash ' + script
if args:
cmd = cmd + ' ' + args
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
(out, rr) = p.communicate()
if rr:
log(ERR, "failed to run ", cmd)
continue
out = re.sub('\n', '', out.decode('utf-8'))
return out
return "linux"
# TODO: add support for customizing this in project.conf
def htdocs_dir(self, name: str) -> str:
return self.find_dir(name,["/src/html/htdocs", "/tools/html/htdocs", "/htdocs"],
["/srv/www/proj/" + name])
# TODO: add support for customizing this in project.conf
def tmpl_dir(self, name: str) -> str:
return self.find_dir(name, ["/tmpl"], ["/opt/" + name + "/share/tmpl"])
def os_cascade(self):
import platform
if self.glob_os_cascade is not None:
return self.glob_os_cascade.copy()
r = [ 'os', platform.system().lower() ]
os = self.opt_os if self.opt_os is not None else self.res_cache.run(self.get_os, [])
name = re.sub('-.*', '', os)
series = os
while True:
n = re.sub(r'\.[0-9]+$', '', series)
if n == series:
break
r.append(n)
series = n
if not name in r:
r.append(name)
if not os in r:
r.append(os)
# e.g. os, linux, suse, suse-tumbleweed
#return [ 'os', platform.system().lower(), name, os ]
self.glob_os_cascade = r
return r
def strip_module_from_spec(self, mod):
return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip())
def get_section(self, path, section):
r = ''
file = open(path)
pat = '[' + section + ']'
in_section = False
for line in file:
if (line.rstrip() == pat):
in_section = True
continue
if in_section:
if len(line) and line[0] == '[':
break
r = r + line
file.close()
return r.rstrip()
def read_value(self, path, section, key):
def scan_section(f, key):
if key is None:
r = ''
for line in f:
if len(line) and line[0] == '[':
break
r += line
return r if len(r) else None
lines = []
cont_line = ''
for line in f:
if len(line) and line[0] == '[':
break
cont_line += line.rstrip()
if len(cont_line) and cont_line[-1] == '\\':
cont_line = cont_line[0:-1]
continue
lines.append(cont_line)
cont_line = ''
for line in lines:
#log(DEBUG, " looking for >%s< in line=>%s<" % (key, line))
rr = re.findall('^ *' + key + ' *= *(.*)', line)
if len(rr) > 0:
return rr[0]
return None
def scan_section_debug(f, key):
rr = scan_section(f, key)
#log(DEBUG, " returning", rr)
return rr
try:
#log(DEBUG, "looking for {}::[{}].{}".format(path, section, key))
with open(path, 'r') as f:
if not len(section):
rr = scan_section(f, key)
pat = '[' + section + ']'
for line in f:
if line.rstrip() == pat:
return scan_section(f, key)
return None
except:
log(DEBUG, path, "not found")
# TODO: handle this special case cleaner somewhere up the stack
if section == 'build' and key == 'libname':
return 'none'
return None
def get_value(self, name, section, key):
log(DEBUG, "getting value [%s].%s for project %s (%s)" %(section, key, name, self.top_name))
if self.top_name and name == self.top_name:
proj_root = self.topdir
else:
proj_root = self.projs_root + '/' + name
log(DEBUG, "proj_root = " + proj_root)
if section == 'version':
proj_version_dirs = [ proj_root ]
if proj_root != self.topdir:
proj_version_dirs.append('/usr/share/doc/packages/' + name)
for d in proj_version_dirs:
version_path = d + '/VERSION'
try:
with open(version_path) as fd:
r = fd.read().replace('\n', '').replace('-dev', '')
fd.close()
return r
except EnvironmentError:
log(DEBUG, "ignoring unreadable file " + version_path)
continue
raise Exception("No version file found for project \"" + name + "\"")
path = proj_root + '/make/project.conf'
#print('path = ', path, 'self.top_name = ', self.top_name, 'name = ', name)
return self.res_cache.run(self.read_value, [path, section, key])
def collect_values(self, names, section, key):
r = ""
for n in names:
val = self.get_value(n, section, key)
if val:
r = r + " " + val
return self.remove_duplicates([x.strip() for x in r.split(",")])
# scope 0: no children
# scope 1: children
# scope 2: recursive
def add_modules_from_project_txt_cached(self, buf, visited, spec, section, key, add_self, scope,
names_only):
return self.res_cache.run(self.add_modules_from_project_txt, [buf, visited, spec, section, key,
add_self, scope, names_only])
def add_modules_from_project_txt(self, buf, visited, spec, section, key, add_self, scope,
names_only):
name = self.strip_module_from_spec(spec)
if names_only:
spec = name
if spec in buf:
return
if spec in visited:
if add_self:
buf.append(spec)
return
visited.add(spec)
deps = self.get_value(name, section, key)
log(DEBUG, "name = ", name, "section = ", section, "key = ", key, "deps = ", deps, "scope = ", scope, "visited = ", visited)
if deps and scope > 0:
if scope == 1:
subscope = 0
else:
subscope = 2
deps = deps.split(',')
for dep in deps:
dep = dep.strip()
if not(len(dep)):
continue
self.add_modules_from_project_txt_cached(buf, visited, dep,
section, key, add_self=True, scope=subscope,
names_only=names_only)
if add_self:
buf.append(spec)
def get_modules_from_project_txt(self, names, sections, keys, add_self, scope,
names_only = True):
if isinstance(keys, str):
keys = [ keys ]
#r = set()
r = []
for section in sections:
for key in keys:
visited = set()
for name in names:
rr = []
self.add_modules_from_project_txt_cached(rr, visited, name, section, key, add_self, scope,
names_only)
# TODO: this looks like a performance hogger
for m in rr:
if not m in r:
r.append(m)
return r
def get_libname(self, names):
vals = self.get_modules_from_project_txt(names, ['build'], 'libname',
scope = 1, add_self=False, names_only=True)
if not vals:
return ' '.join(names)
if 'none' in vals:
vals.remove('none')
return ' '.join(reversed(vals))
def is_excluded_from_build(self, module):
log(DEBUG, "checking if module " + module + " is excluded from build")
exclude = self.get_modules_from_project_txt([ module ], ['build'], 'exclude',
scope = 1, add_self=False, names_only=True)
cascade = self.os_cascade() + [ 'all' ]
for p1 in exclude:
for p2 in cascade:
if p1 == p2:
return p1
return None
def contains(self, small, big):
for i in xrange(len(big)-len(small)+1):
for j in xrange(len(small)):
if big[i+j] != small[j]:
break
else:
return i, i+len(small)
return False
def read_dep_graph(self, modules, section, graph):
for m in modules:
if m in graph:
continue
deps = self.get_modules_from_project_txt([ m ], ['pkg.requires.jw'], section,
scope = 1, add_self=False, names_only=True)
if not deps is None:
graph[m] = deps
for d in deps:
self.read_dep_graph([ d ], section, graph)
def flip_graph(self, graph):
r = {}
for m, deps in graph.items():
for d in deps:
if not d in r:
r[d] = set()
r[d].add(m)
return r
def check_circular_deps(self, module, section, graph, unvisited, temp, path):
if module in temp:
log(DEBUG, 'found circular dependency at module', module)
return module
if not module in unvisited:
return None
temp.add(module)
if module in graph:
for m in graph[module]:
last = self.check_circular_deps(m, section, graph, unvisited, temp, path)
if last is not None:
path.insert(0, m)
return last
unvisited.remove(module)
temp.remove(module)