jw.pkg.build.lib: Move to jw.pkg.lib

In preparation of reorganizing the tree below cmds, move the lib
subdirectory a level up.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-01-25 09:55:02 +01:00
commit 2e69639362
5 changed files with 4 additions and 4 deletions

View file

@ -0,0 +1,4 @@
TOPDIR = ../../../../..
include $(TOPDIR)/make/proj.mk
include $(JWBDIR)/make/py-mod.mk

View file

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
import os, abc
from .util import run_cmd
class SSHClient(abc.ABC):
def __init__(self, hostname: str) -> None:
self.___ssh = None
self.__hostname = hostname
self.__password: str|None = None
@property
def hostname(self):
return self.__hostname
def set_password(self, password: str) -> None:
assert password != 'jan'
self.__password = password
@property
def password(self) -> str:
assert self.__password != 'jan'
return self.__password
def set_username(self, username: str) -> None:
self.__username = username
@property
def username(self) -> str:
return self.__username
@abc.abstractmethod
def run_cmd(self, cmd: str):
pass
class SSHClientInternal(SSHClient): # export
def __init__(self, hostname: str) -> None:
super().__init__(hostname=hostname)
def __ssh_connect(self):
import paramiko # type: ignore # error: Library stubs not installed for "paramiko"
ret = paramiko.SSHClient()
ret.set_missing_host_key_policy(paramiko.AutoAddPolicy())
path_to_key=os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
ret.connect(self.__hostname, key_filename=path_to_key, allow_agent=True)
s = ret.get_transport().open_session()
# set up the agent request handler to handle agent requests from the server
paramiko.agent.AgentRequestHandler(s)
return ret
@property
def __ssh(self):
if self.___ssh is None:
self.___ssh = self.__ssh_connect(self.__server)
return self.___ssh
@property
def __scp(self):
return SCPClient(self.__ssh.get_transport())
def run_cmd(self, cmd: str):
return self.__ssh.exec_command(find_cmd)
class SSHClientCmd(SSHClient): # export
def __init__(self, hostname: str) -> None:
self.__askpass: str|None = None
self.__askpass_orig: dict[str, str|None] = dict()
super().__init__(hostname=hostname)
def __del__(self):
for key, val in self.__askpass_orig.items():
if val is None:
del os.environ[key]
else:
os.environ[key] = val
if self.__askpass is not None:
os.remove(self.__askpass)
def __init_askpass(self):
if self.__askpass is None and self.password is not None:
import sys, tempfile
prefix = os.path.basename(sys.argv[0]) + '-'
f = tempfile.NamedTemporaryFile(mode='w+t', prefix=prefix, delete=False)
os.chmod(f.name, 0o0700)
self.__askpass = f.name
f.write(f'#!/bin/bash\n\necho -n "{self.password}\n"')
f.close()
for key, val in {'SSH_ASKPASS': self.__askpass, 'SSH_ASKPASS_REQUIRE': 'force'}.items():
self.__askpass_orig[key] = os.getenv(key)
os.environ[key] = val
def run_cmd(self, cmd: str):
self.__init_askpass()
cmd_arr = ['ssh']
cmd_arr.append(self.hostname)
return run_cmd(['ssh', self.hostname, cmd])

View file

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
import os, sys, subprocess, json, time
from argparse import Namespace
from urllib.parse import urlparse
from enum import Enum, auto
class AskpassKey(Enum):
Username = auto()
Password = auto()
def pretty_cmd(cmd: list[str], wd=None):
tokens = [cmd[0]]
for token in cmd[1:]:
if token.find(' ') != -1:
token = '"' + token + '"'
tokens.append(token)
ret = ' '.join(tokens)
if wd is not None:
ret += f' in {wd}'
return ret
def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False, cmd_input=None) -> str|None: # export
if verbose:
delim_len = 120
delim = f'---- running {pretty_cmd(cmd, wd)} -'
delim = delim + '-' * (delim_len - len(delim))
print(',' + delim + ' >')
cwd: str|None = None
if wd is not None:
cwd = os.getcwd()
os.chdir(wd)
ret = ''
try:
stdin = None
if cmd_input is not None:
stdin = subprocess.PIPE
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True, stdin=stdin)
if cmd_input is not None:
ret = p.communicate(input=cmd_input)[0]
else:
for line in iter(p.stdout.readline, b''):
line = line.decode(sys.stdout.encoding)
ret += line
p.wait()
if verbose:
print('`' + delim + ' <')
if p.returncode:
if verbose:
print(' '.join(cmd) + ' failed')
raise Exception(time.strftime('%Y-%m-%d %H:%M') + f': Command returned an error: "{pretty_cmd(cmd, wd)}"')
finally:
if cwd:
os.chdir(cwd)
return ret
def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export
cmd = ['curl']
if not verbose:
cmd.append('-s')
cmd.extend(args)
ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input)
if parse_json:
try:
return json.loads(ret)
except Exception as e:
print(f'Failed to parse {len(ret)} bytes output of command >{pretty_cmd(cmd, wd)}< ({e})', file=sys.stderr)
raise
return ret
def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None):
assert host is None # Currently unsupported
for var in askpass_env:
exe = os.getenv(var)
if exe is None:
continue
exe_arg = ''
match var:
case 'GIT_ASKPASS':
match key:
case AskpassKey.Username:
exe_arg += 'Username'
case AskpassKey.Password:
exe_arg += 'Password'
case 'SSH_ASKPASS':
match key:
case AskpassKey.Username:
continue # Can't get user name from SSH_ASKPASS
case AskpassKey.Password:
exe_arg += 'Password'
ret = run_cmd([exe, exe_arg], throw=False)
if ret is not None:
return ret
return None
def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
url_user = None if url is None else urlparse(url).username
if args is not None:
if args.username is not None:
if url_user is not None and url_user != args.username:
raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"')
return args.username
if url_user is not None:
return url_user
return run_askpass(askpass_env, AskpassKey.Username)
def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
if args is None and url is None and not askpass_env:
raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password')
if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options
ret = getattr(args, 'password')
if ret is not None:
return ret
if url is not None:
parsed = urlparse(url)
if parsed.password is not None:
return parsed.password
return run_askpass(askpass_env, AskpassKey.Password)