Everywhere: Rename package "jw-build" to "jw-pkg"

jw-build doesn't stop at building software, packaging it afterwards
is also a core feature, so this commit gives the package a better
name.

The commit replaces strings s/jw-build/jw-pkg/ in text files and file
names. Fallout to the functionality is fixed, variable names are left
as they are, though. To be adjusted by later commits.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2025-11-26 17:48:01 +01:00
commit 9217d38964
50 changed files with 36 additions and 36 deletions

466
src/python/jw/pkg/App.py Normal file
View file

@ -0,0 +1,466 @@
# -*- coding: utf-8 -*-
#
# This source code file is a merge of various build tools and a horrible mess.
#
import os, sys, argparse, pwd, re
# meaning of pkg.requires.xxx variables
# build: needs to be built and installed before this can be built
# devel: needs to be installed before this-devel can be installed, i.e. before _other_ packages can be built against this
# run: needs to be installed before this-run can be installed, i.e. before this and other packages can run with this
# --------------------------------------------------------------------- helpers
class ResultCache(object):
def __init__(self):
self.__cache = {}
def run(self, func, args):
d = self.__cache
depth = 0
keys = [ func.__name__ ] + args
l = len(keys)
for k in keys:
if k is None:
k = 'None'
else:
k = str(k)
depth += 1
#self.projects.debug('depth = ', depth, 'key = ', k, 'd = ', str(d))
if k in d:
if l == depth:
return d[k]
d = d[k]
continue
if l == depth:
r = func(*args)
d[k] = r
return r
d = d[k] = {}
#d = d[k]
raise Exception("cache algorithm failed for function", func.__name__, "in depth", depth)
# ----------------------------------------------------------------- class App
class App(object):
def __init__(self):
self.global_args = []
self.opt_os = None
self.top_name = None
self.glob_os_cascade = None
self.dep_cache = {}
self.my_dir = os.path.dirname(os.path.realpath(__file__))
self.opt_debug = False
self.res_cache = ResultCache()
self.topdir = None
self.projs_root = os.path.expanduser("~") + '/local/src/jw.dev/proj'
def debug(self, *objs):
if self.opt_debug:
print("DEBUG: ", *objs, file = sys.stderr)
def warn(self, *objs):
print("WARNING: ", *objs, file = sys.stderr)
def err(self, *objs):
print("ERR: ", *objs, file = sys.stderr)
def proj_dir(self, name):
if name == self.top_name:
return self.topdir
for d in [ self.projs_root, '/opt' ]:
r = d + '/' + name
if os.path.exists(r):
return r
if os.path.exists(f'/usr/share/doc/packages/{name}/VERSION'):
# The package exists but does not have a dedicated project directory
return None
raise Exception('No project path found for module "{}"'.format(name))
def re_section(self, name):
return re.compile('[' + name + ']'
'.*?'
'(?=[)',
re.DOTALL)
def remove_duplicates(self, seq):
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
def get_os(self, args = ""):
import subprocess
for d in [ self.projs_root + '/jw-pkg/scripts', '/opt/jw-pkg/bin' ]:
script = d + '/get-os.sh'
if os.path.isfile(script):
cmd = '/bin/bash ' + script
if args:
cmd = cmd + ' ' + args
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
(out, rr) = p.communicate()
if rr:
self.err("failed to run ", cmd)
continue
out = re.sub('\n', '', out.decode('utf-8'))
return out
return "linux"
# TODO: add support for customizing this in project.conf
def htdocs_dir(self, name):
pd = self.proj_dir(name)
if pd is None:
return None
for r in [ pd + "/tools/html/htdocs", pd + "/htdocs", "/srv/www/proj/" + name ]:
if os.path.isdir(r):
return r
return None
def os_cascade(self):
import platform
if self.glob_os_cascade is not None:
return self.glob_os_cascade.copy()
r = [ 'os', platform.system().lower() ]
os = self.opt_os if self.opt_os is not None else self.res_cache.run(self.get_os, [])
name = re.sub('-.*', '', os)
series = os
while True:
n = re.sub(r'\.[0-9]+$', '', series)
if n == series:
break
r.append(n)
series = n
if not name in r:
r.append(name)
if not os in r:
r.append(os)
# e.g. os, linux, suse, suse-tumbleweed
#return [ 'os', platform.system().lower(), name, os ]
self.glob_os_cascade = r
return r
def strip_module_from_spec(self, mod):
return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip())
def get_section(self, path, section):
r = ''
file = open(path)
pat = '[' + section + ']'
in_section = False
for line in file:
if (line.rstrip() == pat):
in_section = True
continue
if in_section:
if len(line) and line[0] == '[':
break
r = r + line
file.close()
return r.rstrip()
def read_value(self, path, section, key):
def scan_section(f, key):
if key is None:
r = ''
for line in f:
if len(line) and line[0] == '[':
break
r += line
return r if len(r) else None
lines = []
cont_line = ''
for line in f:
if len(line) and line[0] == '[':
break
cont_line += line.rstrip()
if len(cont_line) and cont_line[-1] == '\\':
cont_line = cont_line[0:-1]
continue
lines.append(cont_line)
cont_line = ''
for line in lines:
#self.debug(" looking for >%s< in line=>%s<" % (key, line))
rr = re.findall('^ *' + key + ' *= *(.*)', line)
if len(rr) > 0:
return rr[0]
return None
def scan_section_debug(f, key):
rr = scan_section(f, key)
#self.debug(" returning", rr)
return rr
try:
#self.debug("looking for {}::[{}].{}".format(path, section, key))
with open(path, 'r') as f:
if not len(section):
rr = scan_section(f, key)
pat = '[' + section + ']'
for line in f:
if line.rstrip() == pat:
return scan_section(f, key)
return None
except:
self.debug(path, "not found")
# TODO: handle this special case cleaner somewhere up the stack
if section == 'build' and key == 'libname':
return 'none'
return None
def get_value(self, name, section, key):
self.debug("getting value [%s].%s for project %s (%s)" %(section, key, name, self.top_name))
if self.top_name and name == self.top_name:
proj_root = self.topdir
else:
proj_root = self.projs_root + '/' + name
self.debug("proj_root = " + proj_root)
if section == 'version':
proj_version_dirs = [ proj_root ]
if proj_root != self.topdir:
proj_version_dirs.append('/usr/share/doc/packages/' + name)
for d in proj_version_dirs:
version_path = d + '/VERSION'
try:
with open(version_path) as fd:
r = fd.read().replace('\n', '').replace('-dev', '')
fd.close()
return r
except EnvironmentError:
self.debug("ignoring unreadable file " + version_path)
continue
raise Exception("No version file found for project \"" + name + "\"")
path = proj_root + '/make/project.conf'
#print('path = ', path, 'self.top_name = ', self.top_name, 'name = ', name)
return self.res_cache.run(self.read_value, [path, section, key])
def collect_values(self, names, section, key):
r = ""
for n in names:
val = self.get_value(n, section, key)
if val:
r = r + " " + val
return self.remove_duplicates([x.strip() for x in r.split(",")])
# scope 0: no children
# scope 1: children
# scope 2: recursive
def add_modules_from_project_txt_cached(self, buf, visited, spec, section, key, add_self, scope,
names_only):
return self.res_cache.run(self.add_modules_from_project_txt, [buf, visited, spec, section, key,
add_self, scope, names_only])
def add_modules_from_project_txt(self, buf, visited, spec, section, key, add_self, scope,
names_only):
name = self.strip_module_from_spec(spec)
if names_only:
spec = name
if spec in buf:
return
if spec in visited:
if add_self:
buf.append(spec)
return
visited.add(spec)
deps = self.get_value(name, section, key)
self.debug("name = ", name, "section = ", section, "key = ", key, "deps = ", deps, "scope = ", scope, "visited = ", visited)
if deps and scope > 0:
if scope == 1:
subscope = 0
else:
subscope = 2
deps = deps.split(',')
for dep in deps:
dep = dep.strip()
if not(len(dep)):
continue
self.add_modules_from_project_txt_cached(buf, visited, dep,
section, key, add_self=True, scope=subscope,
names_only=names_only)
if add_self:
buf.append(spec)
def get_modules_from_project_txt(self, names, sections, keys, add_self, scope,
names_only = True):
if isinstance(keys, str):
keys = [ keys ]
#r = set()
r = []
for section in sections:
for key in keys:
visited = set()
for name in names:
rr = []
self.add_modules_from_project_txt_cached(rr, visited, name, section, key, add_self, scope,
names_only)
# TODO: this looks like a performance hogger
for m in rr:
if not m in r:
r.append(m)
return r
def get_libname(self, names):
vals = self.get_modules_from_project_txt(names, ['build'], 'libname',
scope = 1, add_self=False, names_only=True)
if not vals:
return ' '.join(names)
if 'none' in vals:
vals.remove('none')
return ' '.join(reversed(vals))
def is_excluded_from_build(self, module):
self.debug("checking if module " + module + " is excluded from build")
exclude = self.get_modules_from_project_txt([ module ], ['build'], 'exclude',
scope = 1, add_self=False, names_only=True)
cascade = self.os_cascade() + [ 'all' ]
for p1 in exclude:
for p2 in cascade:
if p1 == p2:
return p1
return None
# -L needs to contain more paths than libs linked with -l would require
def get_ldpathflags(self, names, exclude = []):
deps = self.get_modules_from_project_txt(names, ['pkg.requires.jw'], 'build',
scope = 2, add_self=True, names_only=True)
r = ''
for m in deps:
if m in exclude:
continue
libname = self.get_libname([m])
if len(libname):
r = r + ' -L' + self.proj_dir(m) + '/lib'
print(r[1:])
def get_ldflags(self, names, exclude = [], add_self_ = False):
#print(names)
deps = self.get_modules_from_project_txt(names, ['pkg.requires.jw'], 'build',
scope = 1, add_self=add_self_, names_only=True)
self.debug("deps = " + ' '.join(deps))
#print(deps)
r = ''
for m in reversed(deps):
if m in exclude:
continue
libname = self.get_libname([m])
if len(libname):
#r = r + ' -L' + self.proj_dir(m) + '/lib -l' + libname
r = r + ' -l' + libname
if len(r):
ldpathflags = self.get_ldpathflags(names, exclude)
if ldpathflags:
r = ldpathflags + ' ' + r
return r[1::]
return ''
def contains(self, small, big):
for i in xrange(len(big)-len(small)+1):
for j in xrange(len(small)):
if big[i+j] != small[j]:
break
else:
return i, i+len(small)
return False
def read_dep_graph(self, modules, section, graph):
for m in modules:
if m in graph:
continue
deps = self.get_modules_from_project_txt([ m ], ['pkg.requires.jw'], section,
scope = 1, add_self=False, names_only=True)
if not deps is None:
graph[m] = deps
for d in deps:
self.read_dep_graph([ d ], section, graph)
def flip_graph(self, graph):
r = {}
for m, deps in graph.items():
for d in deps:
if not d in r:
r[d] = set()
r[d].add(m)
return r
def check_circular_deps(self, module, section, graph, unvisited, temp, path):
if module in temp:
self.debug('found circular dependency at module', module)
return module
if not module in unvisited:
return None
temp.add(module)
if module in graph:
for m in graph[module]:
last = self.check_circular_deps(m, section, graph, unvisited, temp, path)
if last is not None:
path.insert(0, m)
return last
unvisited.remove(module)
temp.remove(module)
# -------------------------------------------------------------------- here we go
def run(self):
skip = 0
for a in sys.argv[1::]:
self.global_args.append(a)
if a in [ '-p', '--prefix', '-t', '--topdir', '-O', '--os' ]:
skip = 1
continue
if skip > 0:
skip = skip -1
continue
if a[0] != '-':
break
# -- defaults
self.projs_root = pwd.getpwuid(os.getuid()).pw_dir + "/local/src/jw.dev/proj"
parser = argparse.ArgumentParser(description='Project metadata evaluation')
parser.add_argument('-d', '--debug', action='store_true',
default=False, help='Output debug information to stderr')
parser.add_argument('-t', '--topdir', nargs=1, default = [], help='Project Path')
parser.add_argument('-p', '--prefix', nargs=1, default = [ self.projs_root ], help='App Path Prefix')
parser.add_argument('-O', '--os', nargs=1, default = [], help='Target operating system')
parser.add_argument('cmd', default='', help=f'Command, run "{sys.argv[0]} commands" for a list of supported commands')
parser.add_argument('arg', nargs='*', help='Command arguments')
args = parser.parse_args(self.global_args)
self.opt_debug = args.debug
if len(args.os):
self.opt_os = args.os[0]
self.debug("----------------------------------------- running ", ' '.join(sys.argv))
self.projs_root = args.prefix[0]
self.debug("projs_root = ", self.projs_root)
if len(args.topdir):
self.topdir = args.topdir[0]
if self.topdir:
self.top_name = self.res_cache.run(self.read_value, [self.topdir + '/make/project.conf', 'build', 'name'])
if not self.top_name:
self.top_name = re.sub('-[0-9.-]*$', '', os.path.basename(os.path.realpath(self.topdir)))
import importlib
cmd_name = args.cmd.replace('-', '_')
cc_cmd_name = 'Cmd' + ''.join(x.capitalize() for x in cmd_name.lower().split("_"))
module = importlib.import_module('jw.pkg.build.cmds.' + cc_cmd_name)
cmd = getattr(module, cc_cmd_name)()
cmd.app = self
subparser = argparse.ArgumentParser(description=cmd_name)
cmd.add_arguments(subparser)
args = subparser.parse_args(sys.argv[(len(self.global_args) + 1)::])
try:
return cmd.run(args)
except Exception as e:
self.err('Failed to run >{}<: {}'.format(' '.join(sys.argv), e))
raise
sys.exit(1)

View file

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Type, Union, TypeVar
import inspect, abc, argparse
from argparse import ArgumentParser
class Cmd(abc.ABC): # export
def __init__(self, name: str, help: str) -> None:
from ..App import App
self.name = name
self.help = help
self.parent = None
self.children: list[Cmd] = []
self.child_classes: list[Type[Cmd]] = []
self.app: App|None = None
@abc.abstractmethod
def _run(self, args):
pass
def run(self, args):
return self._run(args)
def add_parser(self, parsers) -> ArgumentParser:
r = parsers.add_parser(self.name, help=self.help,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
r.set_defaults(func=self.run)
return r
def add_subcommands(self, cmd: Union[str, Type[Cmd], list[Type[Cmd]]]) -> None:
if isinstance(cmd, str):
import sys, re
sc = []
for name, obj in inspect.getmembers(sys.modules[self.__class__.__module__]):
if inspect.isclass(obj):
if re.search(cmd, str(obj)):
sc.append(obj)
log.slog(log.DEBUG, f"Found subcommand {obj}")
self.add_subcommands(sc)
return
if isinstance(cmd, list):
for c in cmd:
self.add_subcommands(c)
return
self.child_classes.append(cmd)
# To be overridden by derived class in case the command does take arguments.
# Will be called from App base class constructor and set up the parser hierarchy
def add_arguments(self, parser: ArgumentParser) -> None:
pass
def conf_value(self, path, default=None):
ret = None if self.app is None else self.app.conf_value(path, default)
if ret is None and default is not None:
return default
return ret

View file

@ -0,0 +1,5 @@
TOPDIR = ../../../../..
PY_UPDATE_INIT_PY = false
include $(TOPDIR)/make/proj.mk
include $(JWBDIR)/make/py-mod.mk

View file

View file

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
import re
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class BaseCmdPkgRelations(Cmd):
def pkg_relations(self, rel_type, args):
version_pattern=re.compile("[0-9-.]*")
if args.subsections is None:
subsecs = self.app.os_cascade()
subsecs.append('jw')
else:
subsecs = args.subsections.split(',')
self.app.debug('flavour = ', args.flavour, ', subsecs = ', ' '.join(subsecs))
ignore = args.ignore.split(',')
self.app.debug("ignore = ", ignore)
r = []
flavours = args.flavour.split(',')
for flavour in flavours:
for s in subsecs:
section = 'pkg.' + rel_type + '.' + s
visited = set()
modules = args.module.copy()
while len(modules):
m = modules.pop(0)
if m in visited or m in ignore:
continue
visited.add(m)
value = self.app.get_value(m, section, flavour)
if not value:
continue
deps = value.split(',')
for spec in deps:
dep = re.split('([=><]+)', spec)
if args.no_version:
dep = dep[:1]
dep = list(map(str.strip, dep))
dep_name = re.sub('-dev$|-devel$|-run$', '', dep[0])
if dep_name in ignore or dep[0] in ignore:
continue
if args.no_subpackages:
dep[0] = dep_name
for i, item in enumerate(dep):
dep[i] = item.strip()
if s == 'jw':
if args.recursive and not dep_name in visited and not dep_name in modules:
modules.append(dep_name)
if len(dep) == 3:
if args.dont_expand_version_macros and dep_name in args.module:
version = dep[2]
else:
version = self.app.get_value(dep_name, 'version', '')
if dep[2] == 'VERSION':
if args.dont_strip_revision:
dep[2] = version
else:
dep[2] = version.split('-')[0]
elif dep[2] == 'VERSION-REVISION':
dep[2] = version
elif version_pattern.match(dep[2]):
# dep[2] = dep[2]
pass
else:
raise Exception("Unknown version specifier in " + spec)
cleaned_dep = ' '.join(dep)
if not cleaned_dep in r:
self.app.debug("appending", cleaned_dep)
r.append(cleaned_dep)
return args.delimiter.join(r)
def print_pkg_relations(self, rel_type, args_):
print(self.pkg_relations(rel_type, args_))
def __init__(self, relation: str, help: str) -> None:
super().__init__('pkg-' + relation, help=help)
self.relation = relation
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('-S', '--subsections', nargs='?', default=None, help='Subsections to consider, comma-separated')
parser.add_argument('-d', '--delimiter', nargs='?', default=', ', help='Output words delimiter')
parser.add_argument('flavour', help='Flavour')
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('-p', '--no-subpackages', action='store_true',
default=False, help='Cut -run and -devel from package names')
parser.add_argument('--no-version', action='store_true',
default=False, help='Don\'t report version information')
parser.add_argument('--dont-strip-revision', action='store_true',
default=False, help='Always treat VERSION macro as VERSION-REVISION')
parser.add_argument('--recursive', action='store_true',
default=False, help='Find dependencies recursively')
parser.add_argument('--dont-expand-version-macros', action='store_true',
default=False, help='Don\'t expand VERSION and REVISION macros')
parser.add_argument('--ignore', nargs='?', default='', help='Packages that '
'should be ignored together with their dependencies')
def _run(self, args: Namespace) -> None:
return self.print_pkg_relations(self.relation, args)

View file

@ -0,0 +1,174 @@
# -*- coding: utf-8 -*-
import os, re, sys, subprocess, datetime
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdBuild(Cmd): # export
def __init__(self) -> None:
super().__init__('build', help='janware software project build tool')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--exclude', default='', help='Space seperated ist of modules to be excluded from build')
parser.add_argument('-n', '--dry-run', action='store_true',
default=False, help='Don\'t build anything, just print what would be done.')
parser.add_argument('-O', '--build-order', action='store_true',
default=False, help='Don\'t build anything, just print the build order.')
parser.add_argument('-I', '--ignore-deps', action='store_true',
default=False, help='Don\'t build dependencies, i.e. build only modules specified on the command line')
parser.add_argument('target', default='all', help='Build target')
parser.add_argument('modules', nargs='+', default='', help='Modules to be built')
def _run(self, args: Namespace) -> None:
def read_deps(cur, prereq_type):
# dep cache doesn't make a difference at all
if prereq_type in self.app.dep_cache:
if cur in self.app.dep_cache[prereq_type]:
return self.app.dep_cache[prereq_type][cur]
else:
self.app.dep_cache[prereq_type] = {}
r = self.app.get_modules_from_project_txt([ cur ], ['pkg.requires.jw'],
prereq_type, scope = 2, add_self=False, names_only=True)
self.app.debug('prerequisites = ' + ' '.join(r))
if cur in r:
r.remove(cur)
self.app.debug('inserting', prereq_type, "prerequisites of", cur, ":", ' '.join(r))
self.app.dep_cache[prereq_type][cur] = r
return r
def read_deps_cached(cur, prereq_type):
return self.app.res_cache.run(read_deps, [ cur, prereq_type ])
def add_dep_tree(cur, prereq_types, tree, all_deps):
self.app.debug("adding prerequisites " + ' '.join(prereq_types) + " of module " + cur)
if cur in all_deps:
self.app.debug('already handled module ' + cur)
return 0
deps = set()
all_deps.add(cur)
for t in prereq_types:
self.app.debug("checking prereqisites of type " + t)
deps.update(read_deps_cached(cur, t))
for d in deps:
add_dep_tree(d, prereq_types, tree, all_deps)
tree[cur] = deps
return len(deps)
def calculate_order(order, modules, prereq_types):
all_deps = set()
dep_tree = {}
for m in modules:
self.app.debug("--- adding dependency tree of module " + m)
add_dep_tree(m, prereq_types, dep_tree, all_deps)
while len(all_deps):
# Find any leaf
for d in all_deps:
if not len(dep_tree[d]): # Dependency d doesn't have dependencies itself
break # found
else: # no Leaf found
print(all_deps)
raise Exception("fatal: the dependencies between these modules are unresolvable")
order.append(d) # do it
# bookkeep it
all_deps.remove(d)
for k in dep_tree.keys():
if d in dep_tree[k]:
dep_tree[k].remove(d)
return 1
def run_make(module, target, cur_project, num_projects):
#make_cmd = "make " + target + " 2>&1"
make_cmd = [ "make", target ]
path = self.app.proj_dir(module)
delim_len = 120
delim = '---- [%d/%d]: running %s in %s -' % (cur_project, num_projects, make_cmd, path)
delim = delim + '-' * (delim_len - len(delim))
print(',' + delim + ' >')
patt = self.app.is_excluded_from_build(module)
if patt is not None:
print('| Configured to skip build on platform >' + patt + '<')
print('`' + delim + ' <')
return
os.chdir(path)
p = subprocess.Popen(make_cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True)
for line in iter(p.stdout.readline, b''):
line = line.decode(sys.stdout.encoding)
sys.stdout.write('| ' + line) # avoid extra newlines from print()
sys.stdout.flush()
p.wait()
print('`' + delim + ' <')
if p.returncode:
print(' '.join(make_cmd) + ' failed')
raise Exception(time.strftime("%Y-%m-%d %H:%M") + ": failed to make target " + target + " in module " + module + " below base " + self.app.projs_root)
def run_make_on_modules(modules, order, target):
cur_project = 0
num_projects = len(order)
if target in ["clean", "distclean"]:
for m in reversed(order):
cur_project += 1
run_make(m, target, cur_project, num_projects)
if m in modules:
modules.remove(m)
if not len(modules):
print("all modules cleaned")
return
else:
for m in order:
cur_project += 1
run_make(m, target, cur_project, num_projects)
def run(args):
self.app.debug("----------------------------------------- running ", ' '.join(sys.argv))
modules = args.modules
exclude = args.exclude.split()
target = args.target
env_exclude = os.getenv('BUILD_EXCLUDE', '')
if len(env_exclude):
print("exluding modules from environment: " + env_exclude)
exclude += " " + env_exclude
# -- build
order = []
glob_prereq_types = [ "build" ]
if re.match("pkg-.*", target) is not None:
glob_prereq_types = [ "build", "run", "release", "devel" ]
if target != 'order' and not args.build_order:
print("using prerequisite types " + ' '.join(glob_prereq_types))
print("calculating order for modules ... ")
calculate_order(order, modules, glob_prereq_types)
if args.ignore_deps:
order = [m for m in order if m in args.modules]
order = [m for m in order if m not in exclude]
if target == 'order' or args.build_order:
print(' '.join(order))
exit(0)
cur_project = 0
print("Building target %s in %d projects:" % (target, len(order)))
for m in order:
cur_project += 1
print(" %3d %s" % (cur_project, m))
if args.dry_run:
exit(0)
run_make_on_modules(modules, order, target)
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
run(args)

View file

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdCflags(Cmd): # export
def __init__(self) -> None:
super().__init__('cflags', help='cflags')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
deps = self.app.get_modules_from_project_txt(args.module, ['pkg.requires.jw'], 'build',
scope = 2, add_self=True, names_only=True)
r = ''
for m in reversed(deps):
try:
pd = self.app.proj_dir(m)
if pd is None:
continue
r = r + ' -I' + pd + '/include'
except Exception as e:
self.app.warn(f'No include path for module "{m}", ignoring: {e}')
print(r[1:])

View file

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdCheck(Cmd): # export
def __init__(self) -> None:
super().__init__('check', help='Check for circular dependencies between given modules')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('-f', '--flavour', nargs='?', default = 'build')
def _run(self, args: Namespace) -> None:
graph = {}
path = []
self.app.read_dep_graph(args.module, args.flavour, graph)
unvisited = list(graph.keys())
temp = set()
while len(unvisited) != 0:
m = unvisited[0]
self.app.debug('Checking circular dependency of', m)
last = self.app.check_circular_deps(m, args.flavour, self.app.flip_graph(graph), unvisited, temp, path)
if last is not None:
self.app.debug('Found circular dependency below', m, ', last is', last)
print('Found circular dependency in flavour', args.flavour, ':', ' -> '.join(path))
exit(1)
print('No circular dependency found for flavour', args.flavour, ' in modules:',
' '.join(args.module))
exit(0)

View file

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdCommands(Cmd): # export
def __init__(self) -> None:
super().__init__('commands', help='List available commands')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
def _run(self, args: Namespace) -> None:
import sys, re, os, glob
this_dir = os.path.dirname(sys.modules[__name__].__file__)
ret = []
for file_name in glob.glob('Cmd*.py', root_dir=this_dir):
cc_name = re.sub(r'^Cmd|\.py', '', file_name)
name = re.sub(r'(?<!^)(?=[A-Z])', '-', cc_name).lower()
ret.append(name)
print(' '.join(ret))

View file

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
import textwrap
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdCreatePkgConfig(Cmd): # export
def __init__(self) -> None:
super().__init__('create-pkg-config', help='Generate a pkg-config file for a module')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('-F', '--project-descr-file', default=None)
parser.add_argument('-d', '--description', default=None)
parser.add_argument('-n', '--name', default=None)
parser.add_argument('-s', '--summary', default=None)
parser.add_argument('-p', '--prefix', default=None)
parser.add_argument('-v', '--version', default=None)
parser.add_argument('-c', '--cflags', default=None)
parser.add_argument('-l', '--libflags', default=None)
parser.add_argument('-r', '--requires_run', default=None)
parser.add_argument('-R', '--requires_build', default=None)
parser.add_argument('-V', '--variables', nargs='*')
def _run(self, args: Namespace) -> None:
project_conf_var_keys = ['description', 'summary', 'requires_run', 'requires_build']
merged: dict[str, str] = {}
for key in project_conf_var_keys:
val = getattr(args, key)
if val is not None and args.project_descr_file:
val = self.app.get_value(args.name, key, None)
merged[key] = val
contents = textwrap.dedent(f"""\
prefix={args.prefix}
exec_prefix={{prefix}}
includedir={{prefix}}/include
libdir={{exec_prefix}}/lib
Name: {args.name}
Description: {merged['summary']}
Version: {args.version}
""")
if args.cflags is not None:
contents += f"Cflags: {args.cflags}\n"
if args.libflags is not None:
contents += f"Libs: {args.libflags}\n"
if merged['requires_run'] is not None:
contents += f"Requires: {cleanup_requires(merged['requires_run'])}"
if merged['requires_build'] is not None:
contents += f"Requires.private: {cleanup_requires(merged['requires_build'])}"
# not sure what to do with requires_devel
print(contents)

View file

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdExepath(Cmd): # export
def __init__(self) -> None:
super().__init__('exepath', help='exepath')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
deps = self.app.get_modules_from_project_txt(args.module, ['pkg.requires.jw'], [ 'run', 'build', 'devel' ],
scope = 2, add_self=True, names_only=True)
self.app.debug('deps = ', deps)
r = ''
for m in deps:
pd = self.app.proj_dir(m)
if pd is None:
continue
r = r + ':' + pd + '/bin'
print(r[1:])

View file

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
import re, os
from argparse import Namespace, ArgumentParser
from urllib.parse import urlparse
from ..Cmd import Cmd
from ..lib.util import run_cmd
class CmdGetAuthInfo(Cmd): # export
def __init__(self) -> None:
super().__init__('get-auth-info', help='Try to retrieve authentication information from the source tree')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--only-values', help='Don\'t prefix values by "<field-name>="', action='store_true', default=False)
parser.add_argument('--username', help='Show user name', action='store_true', default=False)
parser.add_argument('--password', help='Show password', action='store_true', default=False)
parser.add_argument('--remote-base', help='Show remote base URL', action='store_true', default=False)
def _run(self, args: Namespace) -> None:
keys = ['username', 'password']
# --- Milk jw-pkg repo
jw_build_dir = self.app.proj_dir('jw-pkg')
if not os.path.isdir(jw_build_dir + '/.git'):
self.app.debug(f'jw-pkg directory is not a Git repo: {jw_build_dir}')
return
remotes = run_cmd(['git', '-C', jw_build_dir, 'remote', '-v'])
result: dict[str, str] = {}
for line in remotes.split('\n'):
if re.match(r'^\s*$', line):
continue
name, url, typ = re.split(r'\s+', line)
if name == 'origin' and typ == '(pull)': # TODO: Use other remotes, too?
parsed = urlparse(url)
for key in keys:
result[key] = getattr(parsed, key)
base = parsed.geturl()
base = re.sub(r'/jw-pkg', '', base)
base = re.sub(r'/proj$', '', base)
url['remote-base'] = base
break
# --- Print results
for key, val in result.items():
if getattr(args, key, None) != True:
continue
if val is None:
continue
if args.only_values:
print(val)
continue
print(f'{key}="{val}"')

View file

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdGetval(Cmd): # export
def __init__(self) -> None:
super().__init__('getval', help='Get value from project config')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('--project', default = self.app.top_name, help = 'Project name')
parser.add_argument('section', default = '', help = 'Config section')
parser.add_argument('key', default = '', help = 'Config key')
def _run(self, args: Namespace) -> None:
print(self.app.get_value(args.project, args.section, args.key))

View file

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdHtdocsDir(Cmd): # export
def __init__(self) -> None:
super().__init__('htdocs-dir', help='Print source directory containing document root of a given module')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
r = []
for m in args.module:
r.append(self.app.htdocs_dir(m))
print(' '.join(r))

View file

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdLdflags(Cmd): # export
def __init__(self) -> None:
super().__init__('ldflags', help='ldflags')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('--exclude', action='append', help='Exclude Modules', default=[])
parser.add_argument('-s', '--add-self', action='store_true',
default=False, help='Include libflags of specified modules, too, not only their dependencies')
def _run(self, args: Namespace) -> None:
print(self.app.get_ldflags(args.module, args.exclude, args.add_self))

View file

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdLdlibpath(Cmd): # export
def __init__(self) -> None:
super().__init__('ldlibpath', help='ldlibpath')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
deps = self.app.get_modules_from_project_txt(args.module, ['pkg.requires.jw'], [ 'run', 'build', 'devel' ],
scope = 2, add_self=True, names_only=True)
r = ''
for m in deps:
pd = self.app.proj_dir(m)
if pd is None:
continue
r = r + ':' + pd + '/lib'
print(r[1:])

View file

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdLibname(Cmd): # export
def __init__(self) -> None:
super().__init__('libname', help='libname')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
print(self.app.get_libname(args.module))

View file

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
import re
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
from ..lib.util import get_username, get_password, run_curl
class CmdListRepos(Cmd): # export
def __init__(self) -> None:
super().__init__('list-repos', help='Query a remote GIT server for repositories')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('base_url', help='Base URL of all Git repositories')
parser.add_argument('--name-only', help='Only list names of repos, not URLs')
parser.add_argument('--username', help='Username for SSH or HTTP authentication, don\'t specify for unauthenticated', default=None)
parser.add_argument('--askpass', help='Program to echo password for SSH or HTTP authentication, don\'t specify for unauthenticated', default=None)
parser.add_argument('--from-user', help='List from-user\'s projects', default='janware')
def _run(self, args: Namespace) -> None:
from urllib.parse import urlparse
url = urlparse(args.base_url)
askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS']
username = get_username(args=args, url=args.base_url, askpass_env=askpass_env)
password = None
if username is not None:
password = get_password(args=args, url=args.base_url, askpass_env=askpass_env)
match url.scheme:
case 'ssh':
if re.match(r'ssh://.*git\.janware\.com/', args.base_url):
from jw.pkg.build.lib.SSHClient import SSHClientCmd as SSHClient
ssh = SSHClient(hostname=url.hostname)
if username is not None:
ssh.set_username(username)
if password is not None:
ssh.set_password(password)
cmd = f'/opt/jw-pkg/bin/git-srv-admin.sh -u {args.from_user} -j list-personal-projects'
out = ssh.run_cmd(cmd)
print(out)
return
case 'https':
cmd_input = None
if re.match(r'https://github.com', args.base_url):
curl_args = [
'-f',
'-H', 'Accept: application/vnd.github+json',
'-H', 'X-GitHub-Api-Version: 2022-11-28',
]
if password is not None:
assert username is not None
cmd_input = (f'-u {username}:{password}').encode('utf-8')
curl_args.extend(['-K-'])
curl_args.append(f'https://api.github.com/users/{args.from_user}/repos')
repos = run_curl(curl_args, cmd_input=cmd_input)
for repo in repos:
print(repo['name'])
return
if re.match(r'https://', args.base_url):
# assume Forgejo Backend
curl_args = ['-f']
if re.match(r'https://janware.test', args.base_url):
curl_args.append('--insecure')
if password is not None:
assert username is not None
cmd_input = (f'-u {username}:{password}').encode('utf-8')
curl_args.extend(['-K-'])
curl_args.extend([
f'https://{url.hostname}/code/api/v1/orgs/{args.from_user}/repos'
])
repos = run_curl(curl_args, cmd_input=cmd_input)
for repo in repos:
print(repo['name'])
return
raise Exception(f'Don\'t know how to enumerate Git repos at base url {args.base_url}')

View file

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdModules(Cmd): # export
def __init__(self) -> None:
super().__init__('modules', help='Query existing janware packages')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('-F', '--filter', nargs='?', default=None, help='Key-value pairs, seperated by commas, to be searched for in project.conf')
def _run(self, args: Namespace) -> None:
import pathlib
proj_root = self.app.projs_root
self.app.debug("proj_root = " + proj_root)
path = pathlib.Path(self.app.projs_root)
modules = [p.parents[1].name for p in path.glob('*/make/project.conf')]
self.app.debug("modules = ", modules)
out = []
filters = None if args.filter is None else [re.split("=", f) for f in re.split(",", args.filter)]
for m in modules:
if filters:
for f in filters:
path = f[0].rsplit('.')
if len(path) > 1:
sec = path[0]
key = path[1]
else:
sec = None
key = path[0]
val = self.app.get_value(m, sec, key)
self.app.debug('Checking in {} if {}="{}", is "{}"'.format(m, f[0], f[1], val))
if val and val == f[1]:
out.append(m)
break
else:
out.append(m)
print(' '.join(out))

View file

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdOsCascade(Cmd): # export
def __init__(self) -> None:
super().__init__('os-cascade', help='Print project.conf\'s OS configuration precedence of machine this script runs on')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
def _run(self, args: Namespace) -> None:
print(' '.join(self.app.os_cascade()))

View file

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdPath(Cmd): # export
def __init__(self) -> None:
super().__init__('path', help='path')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
deps = self.app.get_modules_from_project_txt(args.module, ['pkg.requires.jw'], 'run',
scope = 2, add_self=True, names_only=True)
r = ''
for m in deps:
pd = self.app.proj_dir(m)
if pd is None:
continue
r = r + ':' + pd + '/bin'
print(r[1:])

View file

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from .BaseCmdPkgRelations import BaseCmdPkgRelations as Base
class CmdPkgConflicts(Base): # export
def __init__(self) -> None:
super().__init__('conflicts', help='Print packages conflicting with a given package')

View file

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from .BaseCmdPkgRelations import BaseCmdPkgRelations as Base
class CmdPkgProvides(Base): # export
def __init__(self) -> None:
super().__init__('provides', help='Print packages and capabilities provided by a given package')

View file

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from .BaseCmdPkgRelations import BaseCmdPkgRelations as Base
class CmdPkgRequires(Base): # export
def __init__(self) -> None:
super().__init__('requires', help='Print packages required for a given package')

View file

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
# TODO: seems at least partly redundant to CmdPkgRequires / print_pkg_relations
class CmdPrereq(Cmd): # export
def __init__(self) -> None:
super().__init__('prereq', help='path')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('flavour', help='Flavour')
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
deps = self.app.get_modules_from_project_txt(args.module, ['pkg.requires.jw'],
args.flavour, scope = 2, add_self=False, names_only=True)
print(' '.join(deps))

View file

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdProjDir(Cmd): # export
def __init__(self) -> None:
super().__init__('proj-dir', help='Print directory of a given package')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
r = []
for m in args.module:
try:
pd = self.app.proj_dir(m)
if pd is None:
continue
r.append(pd)
except Exception as e:
self.app.warn(f'No project directory for module "{m}: {e}')
continue
print(' '.join(r))

View file

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdPythonpath(Cmd): # export
def __init__(self) -> None:
super().__init__('pythonpath', help='Generate PYTHONPATH for given modules')
def add_arguments(self, p: ArgumentParser) -> None:
super().add_arguments(p)
p.add_argument('module', help='Modules', nargs='*')
def _run(self, args: Namespace) -> None:
import os
deps = self.app.get_modules_from_project_txt(args.module, ['pkg.requires.jw'], [ 'run', 'build' ],
scope = 2, add_self=True, names_only=True)
r = ''
for m in deps:
pd = self.app.proj_dir(m)
if pd is None:
continue
for subdir in [ 'src/python', 'tools/python' ]:
cand = pd + "/" + subdir
if os.path.isdir(cand):
r = r + ':' + cand
print(r[1:])

View file

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdPythonpathOrig(Cmd): # export
def __init__(self) -> None:
super().__init__('pythonpath_orig', help='pythonpath')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
deps = self.app.get_modules_from_project_txt(args.module, ['pkg.requires.jw'], [ 'run', 'build' ],
scope = 2, add_self=True, names_only=True)
r = ''
for m in deps:
pd = self.app.proj_dir(m)
if pd is None:
continue
for subdir in [ 'src/python', 'tools/python' ]:
cand = pd + "/" + subdir
if isdir(cand):
r = r + ':' + cand
print(r[1:])

View file

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
# TODO: seems at least partly redundant to CmdPkgRequires / print_pkg_relations
class CmdRequiredOsPkg(Cmd): # export
def __init__(self) -> None:
super().__init__('required-os-pkg', help='List distribution packages required for a package')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
parser.add_argument('--flavours', help='Dependency flavours', default='build')
parser.add_argument('--skip-excluded', action='store_true', default=False,
help='Output empty prerequisite list if module is excluded')
def _run(self, args: Namespace) -> None:
modules = args.module
flavours = args.flavours.split()
if 'build' in flavours and not 'run' in flavours:
# TODO: This adds too much. Only the run dependencies of the build dependencies would be needed.
flavours.append('run')
self.app.debug("flavours = " + args.flavours)
deps = self.app.get_modules_from_project_txt(modules, ['pkg.requires.jw'], flavours,
scope = 2, add_self=True, names_only=True)
if args.skip_excluded:
for d in deps:
if self.app.is_excluded_from_build(d) is not None:
deps.remove(d)
subsecs = self.app.os_cascade()
self.app.debug("subsecs = ", subsecs)
requires = []
for s in subsecs:
for f in flavours:
vals = self.app.collect_values(deps, 'pkg.requires.' + s, f)
if vals:
requires = requires + vals
# TODO: add all not in build tree as -devel
r = ''
for m in requires:
r = r + ' ' + m
print(r[1:])

View file

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdSummary(Cmd): # export
def __init__(self) -> None:
super().__init__('summary', help='Print summary description of given modules')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('module', nargs='*', help='Modules')
def _run(self, args: Namespace) -> None:
r = []
for m in args.module:
summary = self.app.get_value(m, "summary", None)
if summary is not None:
r.append(summary)
print(' '.join(r))

View file

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
from argparse import Namespace, ArgumentParser
from ..Cmd import Cmd
class CmdTest(Cmd): # export
def __init__(self) -> None:
super().__init__('test', help='Test')
def add_arguments(self, parser: ArgumentParser) -> None:
super().add_arguments(parser)
parser.add_argument('blah', default='', help='The blah argument')
def _run(self, args: Namespace) -> None:
print("blah = " + args.blah)

View file

@ -0,0 +1,5 @@
TOPDIR = ../../../../../..
PY_UPDATE_INIT_PY = false
include $(TOPDIR)/make/proj.mk
include $(JWBDIR)/make/py-mod.mk

View file

View file

@ -0,0 +1,4 @@
TOPDIR = ../../../../../..
include $(TOPDIR)/make/proj.mk
include $(JWBDIR)/make/py-mod.mk

View file

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
import os, abc
from .util import run_cmd
class SSHClient(abc.ABC):
def __init__(self, hostname: str) -> None:
self.___ssh = None
self.__hostname = hostname
self.__password: str|None = None
@property
def hostname(self):
return self.__hostname
def set_password(self, password: str) -> None:
assert password != 'jan'
self.__password = password
@property
def password(self) -> str:
assert self.__password != 'jan'
return self.__password
def set_username(self, username: str) -> None:
self.__username = username
@property
def username(self) -> str:
return self.__username
@abc.abstractmethod
def run_cmd(self, cmd: str):
pass
class SSHClientInternal(SSHClient): # export
def __init__(self, hostname: str) -> None:
super().__init__(hostname=hostname)
def __ssh_connect(self):
import paramiko # type: ignore # error: Library stubs not installed for "paramiko"
ret = paramiko.SSHClient()
ret.set_missing_host_key_policy(paramiko.AutoAddPolicy())
path_to_key=os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
ret.connect(self.__hostname, key_filename=path_to_key, allow_agent=True)
s = ret.get_transport().open_session()
# set up the agent request handler to handle agent requests from the server
paramiko.agent.AgentRequestHandler(s)
return ret
@property
def __ssh(self):
if self.___ssh is None:
self.___ssh = self.__ssh_connect(self.__server)
return self.___ssh
@property
def __scp(self):
return SCPClient(self.__ssh.get_transport())
def run_cmd(self, cmd: str):
return self.__ssh.exec_command(find_cmd)
class SSHClientCmd(SSHClient): # export
def __init__(self, hostname: str) -> None:
self.__askpass: str|None = None
self.__askpass_orig: dict[str, str|None] = dict()
super().__init__(hostname=hostname)
def __del__(self):
for key, val in self.__askpass_orig.items():
if val is None:
del os.environ[key]
else:
os.environ[key] = val
if self.__askpass is not None:
os.remove(self.__askpass)
def __init_askpass(self):
if self.__askpass is None and self.password is not None:
import sys, tempfile
prefix = os.path.basename(sys.argv[0]) + '-'
f = tempfile.NamedTemporaryFile(mode='w+t', prefix=prefix, delete=False)
os.chmod(f.name, 0o0700)
self.__askpass = f.name
f.write(f'#!/bin/bash\n\necho -n "{self.password}\n"')
f.close()
for key, val in {'SSH_ASKPASS': self.__askpass, 'SSH_ASKPASS_REQUIRE': 'force'}.items():
self.__askpass_orig[key] = os.getenv(key)
os.environ[key] = val
def run_cmd(self, cmd: str):
self.__init_askpass()
cmd_arr = ['ssh']
cmd_arr.append(self.hostname)
return run_cmd(['ssh', self.hostname, cmd])

View file

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
import os, sys, subprocess, json, time
from argparse import Namespace
from urllib.parse import urlparse
from enum import Enum, auto
class AskpassKey(Enum):
Username = auto()
Password = auto()
def pretty_cmd(cmd: list[str], wd=None):
tokens = [cmd[0]]
for token in cmd[1:]:
if token.find(' ') != -1:
token = '"' + token + '"'
tokens.append(token)
ret = ' '.join(tokens)
if wd is not None:
ret += f' in {wd}'
return ret
def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False, cmd_input=None) -> str|None: # export
if verbose:
delim_len = 120
delim = f'---- running {pretty_cmd(cmd, wd)} -'
delim = delim + '-' * (delim_len - len(delim))
print(',' + delim + ' >')
cwd: str|None = None
if wd is not None:
cwd = os.getcwd()
os.chdir(wd)
ret = ''
try:
stdin = None
if cmd_input is not None:
stdin = subprocess.PIPE
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True, stdin=stdin)
if cmd_input is not None:
ret = p.communicate(input=cmd_input)[0]
else:
for line in iter(p.stdout.readline, b''):
line = line.decode(sys.stdout.encoding)
ret += line
p.wait()
if verbose:
print('`' + delim + ' <')
if p.returncode:
if verbose:
print(' '.join(cmd) + ' failed')
raise Exception(time.strftime('%Y-%m-%d %H:%M') + f': Command returned an error: "{pretty_cmd(cmd, wd)}"')
finally:
if cwd:
os.chdir(cwd)
return ret
def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export
cmd = ['curl']
if not verbose:
cmd.append('-s')
cmd.extend(args)
ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input)
if parse_json:
try:
return json.loads(ret)
except Exception as e:
print(f'Failed to parse {len(ret)} bytes output of command >{pretty_cmd(cmd, wd)}< ({e})', file=sys.stderr)
raise
return ret
def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None):
assert host is None # Currently unsupported
for var in askpass_env:
exe = os.getenv(var)
if exe is None:
continue
exe_arg = ''
match var:
case 'GIT_ASKPASS':
match key:
case AskpassKey.Username:
exe_arg += 'Username'
case AskpassKey.Password:
exe_arg += 'Password'
case 'SSH_ASKPASS':
match key:
case AskpassKey.Username:
continue # Can't get user name from SSH_ASKPASS
case AskpassKey.Password:
exe_arg += 'Password'
ret = run_cmd([exe, exe_arg], throw=False)
if ret is not None:
return ret
return None
def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
url_user = None if url is None else urlparse(url).username
if args is not None:
if args.username is not None:
if url_user is not None and url_user != args.username:
raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"')
return args.username
if url_user is not None:
return url_user
return run_askpass(askpass_env, AskpassKey.Username)
def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
if args is None and url is None and not askpass_env:
raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password')
if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options
ret = getattr(args, 'password')
if ret is not None:
return ret
if url is not None:
parsed = urlparse(url)
if parsed.password is not None:
return parsed.password
return run_askpass(askpass_env, AskpassKey.Password)