lib + cmds.projects: Use lib.Uri

Remove the feeble attempts at unifying URI handling, and use class Uri from lib.Uri instead.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-04-27 09:37:28 +02:00
commit d9746cd20b
Signed by: Jan Lindemann
GPG key ID: 3750640C9E25DD61
8 changed files with 65 additions and 98 deletions

View file

@ -2,9 +2,9 @@
import re, os
from argparse import Namespace, ArgumentParser
from urllib.parse import urlparse
from ...lib.log import *
from ...lib.Uri import Uri
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
@ -39,10 +39,10 @@ class CmdGetAuthInfo(Cmd): # export
for line in remotes.splitlines():
name, url, typ = re.split(r'\s+', line)
if name == 'origin' and typ in ['(pull)', '(fetch)']: # TODO: Use other remotes, too?
parsed = urlparse(url)
parsed = Uri(url)
for key in keys:
result[key] = getattr(parsed, key)
base = parsed.geturl()
base = parsed.to_string
base = re.sub(r'/jw-pkg', '', base)
base = re.sub(r'/proj$', '', base)
base = re.sub(r'/proj$', '', base)

View file

@ -5,6 +5,7 @@ from argparse import Namespace, ArgumentParser
from ...lib.util import get_username, get_password, run_curl
from ...lib.log import *
from ...lib.Uri import Uri
from ..Cmd import Cmd
from ..CmdProjects import CmdProjects
@ -22,22 +23,21 @@ class CmdListRepos(Cmd): # export
async def _run(self, args: Namespace) -> None:
from urllib.parse import urlparse
url = urlparse(args.base_url)
base_url = Uri(args.base_url)
askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS']
username = await get_username(args=args, url=args.base_url, askpass_env=askpass_env)
password = None
if username is not None:
password = await get_password(args=args, url=args.base_url, askpass_env=askpass_env)
match url.scheme:
match base_url.scheme:
case 'ssh':
if re.match(r'ssh://.*git\.janware\.com/', args.base_url):
from jw.pkg.lib.ec.SSHClient import SSHClient, ssh_client
ssh = ssh_client(args.base_url, interactive=self.app.interactive, verbose_default=self.app.verbose)
if username is not None:
ssh.set_username(username)
base_url.set_username(username)
if password is not None:
ssh.set_password(password)
base_url.set_password(password)
ssh = ssh_client(base_url, interactive=self.app.interactive, verbose_default=self.app.verbose)
cmd = ['/opt/jw-pkg/bin/git-srv-admin.sh', '-u', args.from_owner, '-j', 'list-personal-projects']
result = await ssh.run(cmd)
print('\n'.join(result.stdout.decode().splitlines()))

View file

@ -243,14 +243,6 @@ class ExecContext(Base):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
def _username(self) -> str:
pass
@property
def username(self) -> str:
return self._username()
async def run(
self,
cmd: list[str],

View file

@ -12,6 +12,7 @@ if TYPE_CHECKING:
from .log import *
from .base import Input, InputMode, Result, StatResult
from .Uri import Uri
from .ProcFilter import ProcPipeline
class FileContext(abc.ABC):
@ -22,15 +23,14 @@ class FileContext(abc.ABC):
def __init__(
self,
uri: str,
uri: str|Uri,
interactive: bool|None = None,
verbose_default = False,
chroot: bool = False,
in_pipe: ProcPipeline|None = None,
out_pipe: ProcPipeline|None = None,
):
self.__uri = uri
self.__id, self.__root = self.split_uri(uri)
self.__uri = Uri.pimp(uri)
self.__chroot = chroot
self.__interactive = interactive
self.__verbose_default = verbose_default
@ -48,6 +48,9 @@ class FileContext(abc.ABC):
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def __repr__(self) -> str:
return self.__uri.id
def __pipe(self, d: Direction):
match d:
case self.Direction.In:
@ -65,10 +68,10 @@ class FileContext(abc.ABC):
if not self.__chroot:
return path
if not len(path):
return self.__root
return self.root
if path[-1] == '/':
return self.__root + path
return self.__root + '/' + path
return self.root + path
return self.root + '/' + path
def add_proc_filter(self, d: Direction, proc_filter: ProcFilter):
self.__pipe(d).append(proc_filter)
@ -88,47 +91,27 @@ class FileContext(abc.ABC):
if self.__open_count == 1:
await self._close()
self.__open_count -= 1
assert self.__open_count >= 0, f'Closed file context "{self.__uri}" more often than opened'
@classmethod
def schema_from_uri(cls, uri: str) -> str:
tokens = re.split(r'://', uri)
return tokens[0] if tokens[0] != uri else 'file'
@classmethod
@cache
def split_uri(cls, uri: str) -> tuple[str, str]:
from urllib.parse import urlparse
p = urlparse(uri)
netloc = p.netloc if p.netloc else ''
return f'{cls.schema_from_uri(uri)}://{netloc}', p.path
assert self.__open_count >= 0, f'Closed file context "{self}" more often than opened'
@property
def uri(self) -> str:
def uri(self) -> Uri:
return self.__uri
@property
def id(self) -> str:
return self.__id
return self.__uri.id
@cached_property
def root(self) -> str:
return self.__uri.path
@property
def root(self) -> str:
return self.__root
def username(self) -> str|None:
return self.__uri.username
@property
def log_name(self) -> str:
if self.__log_name is None:
self.__log_name = self.__class__.__name__.lower()
from urllib.parse import urlparse
parsed = urlparse(self.__uri)
uri: list[str] = []
if parsed.scheme:
uri.append(parsed.scheme)
if parsed.hostname:
uri.append(parsed.hostname)
if uri:
self.__log_name += ' ' + '://'.join(uri)
return self.__log_name
return self.__uri.id
@property
def interactive(self) -> bool|None:
@ -295,8 +278,9 @@ class FileContext(abc.ABC):
return await self._is_dir(self._chroot(path), follow_symlinks=follow_symlinks)
@classmethod
def create(cls, uri: str, *args, **kwargs) -> Self:
match cls.schema_from_uri(uri):
def create(cls, uri: str|Uri, *args, **kwargs) -> Self:
uri = Uri.pimp(uri)
match uri.protocol:
case 'local' | 'file':
from .ec.Local import Local
return Local(uri, *args, **kwargs)
@ -308,4 +292,4 @@ class FileContext(abc.ABC):
return Curl(uri, *args, **kwargs)
case _:
pass
raise Exception(f'Can\'t create file transfer instance for {uri} with unknown schema "{cls.schema_from_uri(uri)}"')
raise Exception(f'Can\'t create file context instance for "{uri}" with unsupported protocol "{uri.protocol}"')

View file

@ -3,26 +3,22 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from ..FileContext import FileContext as Base
from ..base import Result
if TYPE_CHECKING:
from ..ExecContext import ExecContext
from ..Uri import Uri
class Curl(Base):
def __init__(self, uri: str, *args, ec: ExecContext|None=None, **kwargs) -> None:
def __init__(self, uri: str|Uri, *args, ec: ExecContext|None=None, **kwargs) -> None:
super().__init__(uri=uri, *args, **kwargs)
self.__ec: ExecContext|None = ec
if ec is None:
from .Local import Local
self.__ec = Local(interactive=False, *args, **kwargs)
p = urlparse(uri)
self.__base_url = f'{p.scheme}://{p.hostname}'
if p.port is not None:
self.__base_url += f':{p.port}'
async def _get(
self,
@ -42,5 +38,5 @@ class Curl(Base):
path = wd + '/' + path
if not len(path) or path[0] != '/':
path = '/' + path
cmd.append(self.__base_url + path)
cmd.append(self.url.to_string + self._chroot(path))
return await self.__ec.run(cmd, throw=throw, verbose=verbose)

View file

@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
import os, sys, subprocess, asyncio, pwd, grp, stat
from functools import cache
@ -10,9 +13,12 @@ from ..base import Result, StatResult
from ..log import *
from ..util import pretty_cmd
if TYPE_CHECKING:
from ..Uri import Uri
class Local(Base):
def __init__(self, uri='local', *args, **kwargs) -> None:
def __init__(self, uri: str|Uri='local', *args, **kwargs) -> None:
super().__init__(uri, *args, **kwargs)
@cache

View file

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from typing import Any
from __future__ import annotations
from typing import Any, TYPE_CHECKING
import os, abc, sys, pwd
from enum import Flag, auto
@ -9,7 +11,10 @@ from ..util import pretty_cmd
from ..log import *
from ..base import Result
from ..ExecContext import ExecContext
from urllib.parse import urlparse
from ..Uri import Uri
if TYPE_CHECKING:
from ..Uri import Uri
class SSHClient(ExecContext):
@ -19,21 +24,12 @@ class SSHClient(ExecContext):
ModEnv = auto()
Wd = auto()
def __init__(self, uri: str, caps: Caps=Caps(0), *args, **kwargs) -> None:
def __init__(self, uri: Uri|str, caps: Caps=Caps(0), *args, **kwargs) -> None:
uri = Uri.pimp(uri)
if uri.username is None:
uri.set_username(pwd.getpwuid(os.getuid()).pw_name)
super().__init__(uri=uri, *args, **kwargs)
self.__caps = caps
try:
parsed = urlparse(uri)
except Exception as e:
log(ERR, f'Failed to parse SSH URI "{uri}"')
raise
self.__hostname = parsed.hostname
if self.__hostname is None:
raise Exception(f'Can\'t parse host name from SSH URI "{uri}"')
self.__port = parsed.port
self.__password = parsed.password
self.__username = parsed.username
@abc.abstractmethod
async def _run_ssh(
@ -105,26 +101,19 @@ class SSHClient(ExecContext):
@property
def hostname(self) -> str|None:
return self.__hostname
return self.uri.hostname
@property
def port(self) -> int|None:
return self.__port
return self.uri.port
def set_password(self, password: str) -> None:
self.__password = password
@property
def username(self) -> str|None:
return self.uri.username
@property
def password(self) -> str|None:
return self.__password
def set_username(self, username: str) -> None:
self.__username = username
def _username(self) -> str:
if self.__username is None:
return pwd.getpwuid(os.getuid()).pw_name
return self.__username
return self.uri.password
def ssh_client(*args, type: str|list[str]|None=None, **kwargs) -> SSHClient: # export
from importlib import import_module
@ -146,7 +135,7 @@ def ssh_client(*args, type: str|list[str]|None=None, **kwargs) -> SSHClient: # e
msg = f'Can\'t instantiate SSH client class {name} ({str(e)})'
errors.append(msg)
log(DEBUG, f'{msg}, trying next')
msg = f'No working SSH clients for {" ".join(args)}'
msg = f'No working SSH clients for {" ".join([str(arg) for arg in args])}'
log(ERR, f'----- {msg}')
for error in errors:
log(ERR, error)

View file

@ -11,11 +11,11 @@ if TYPE_CHECKING:
import os, sys, json
from argparse import Namespace
from urllib.parse import urlparse
from enum import Enum, auto
from .log import *
from .base import InputMode
from .Uri import Uri
class AskpassKey(Enum):
Username = auto()
@ -126,7 +126,7 @@ async def copy(src_uri: str, dst_uri: str, owner: str|None=None, group: str|None
return dst_path
async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[], ec: ExecContext|None=None) -> str: # export
url_user = None if url is None else urlparse(url).username
url_user = None if url is None else Uri(url).username
if args is not None:
if args.username is not None:
if url_user is not None and url_user != args.username:
@ -144,9 +144,9 @@ async def get_password(args: Namespace|None=None, url: str|None=None, askpass_en
if ret is not None:
return ret
if url is not None:
parsed = urlparse(url)
if parsed.password is not None:
return parsed.password
ret = Uri(url).password
if ret is not None:
return ret
return await run_askpass(askpass_env, AskpassKey.Password, ec=ec)
async def get_profile_env(throw: bool=True, keep: Iterable[str]|bool=False, ec: ExecContext|None=None) -> dict[str, str]: # export