mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-25 17:45:55 +02:00
lib.App, .Cmd: Add modules
Add App and Cmd as generic base classes for multi-command applications. The code is taken from jw-python: The exising jw.pkg.App is very similar to the more capable jwutils.Cmds class, so, to avoid code duplication, add it here to allow for jwutils.Cmds and jw.pkg.App to derive from it at some point in the future. Both had to be slightly modified to work within jw-pkg's less equipped context, and will need futher code cleanup. Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
18467a6500
commit
0be02c7154
2 changed files with 256 additions and 0 deletions
149
src/python/jw/pkg/lib/App.py
Normal file
149
src/python/jw/pkg/lib/App.py
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from typing import Any
|
||||
|
||||
import os, sys, argcomplete, argparse, re, asyncio, cProfile
|
||||
|
||||
from .log import *
|
||||
from .Types import LoadTypes
|
||||
|
||||
class App: # export
|
||||
|
||||
def _add_arguments(self, parser):
|
||||
self.__parser.add_argument('--log-flags', help='Log flags', default=self.__default_log_flags)
|
||||
self.__parser.add_argument('--log-level', help='Log level', default=self.__default_log_level)
|
||||
self.__parser.add_argument('--log-file', help='Log file', default=self.__default_log_file)
|
||||
self.__parser.add_argument('--backtrace', help='Show exception backtraces', action='store_true', default=self.__back_trace)
|
||||
self.__parser.add_argument('--write-profile', help='Profile code and store output to file', default=None)
|
||||
|
||||
def __init__(self, description: str = '', name_filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None:
|
||||
|
||||
def add_cmd_to_parser(cmd, parsers):
|
||||
parser = parsers.add_parser(cmd.name, help=cmd.help, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.set_defaults(func=cmd.run)
|
||||
cmd.add_arguments(parser)
|
||||
return parser
|
||||
|
||||
def add_cmds_to_parser(parent, parser, cmds, all=False):
|
||||
if not cmds:
|
||||
return
|
||||
class SubCommand:
|
||||
def __init__(self, cmd: Cmd, parser: Any):
|
||||
self.cmd = cmd
|
||||
self.parser = parser
|
||||
title = 'Available subcommands'
|
||||
if hasattr(parent, 'name'):
|
||||
title += ' of ' + getattr(parent, 'name')
|
||||
subparsers = parser.add_subparsers(title=title, metavar='', dest='command')
|
||||
scs: dict[str, SubCommand] = {}
|
||||
for cmd in cmds:
|
||||
cmd.set_parent(parent)
|
||||
scs[cmd.name] = SubCommand(cmd, add_cmd_to_parser(cmd, subparsers))
|
||||
if all:
|
||||
for sc in scs.values():
|
||||
add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all)
|
||||
return
|
||||
args, unknown = self.__parser.parse_known_args()
|
||||
if args.command in scs:
|
||||
sc = scs[args.command]
|
||||
add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all)
|
||||
|
||||
from .Cmd import Cmd
|
||||
|
||||
self.__args: Namespace|None = None
|
||||
self.__default_log_flags: str = os.getenv('JW_DEFAULT_LOG_FLAGS', default='stderr,position,prio,color')
|
||||
self.__default_log_level: str|int|None = os.getenv('JW_DEFAULT_LOG_LEVEL', default=NOTICE)
|
||||
self.__default_log_file: str|None = os.getenv('JW_DEFAULT_LOG_FILE', default=None)
|
||||
backtrace: str|bool = os.getenv('JW_DEFAULT_SHOW_BACKTRACE', False)
|
||||
self.__back_trace = True if isinstance(backtrace, str) and backtrace.lower() in ['1', 'true'] else False
|
||||
set_log_flags(self.__default_log_flags)
|
||||
set_log_level(self.__default_log_level)
|
||||
|
||||
self.__eloop = eloop
|
||||
self.__own_eloop = False
|
||||
if eloop is None:
|
||||
self.__eloop = asyncio.get_event_loop()
|
||||
self.__own_eloop = True
|
||||
|
||||
self.__cmds: list[Cmd] = []
|
||||
self.__parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
description=description, add_help=False)
|
||||
self._add_arguments(self.__parser)
|
||||
|
||||
args, unknown = self.__parser.parse_known_args()
|
||||
set_log_flags(args.log_flags)
|
||||
set_log_level(args.log_level)
|
||||
|
||||
log(DEBUG, '-------------- Running: >' + ' '.join(sys.argv) + '<')
|
||||
|
||||
cmd_classes = LoadTypes(modules if modules else ['__main__'], type_name_filter=name_filter, type_filter=[Cmd])
|
||||
add_all_parsers = '-h' in sys.argv or '--help' in sys.argv
|
||||
add_cmds_to_parser(self, self.__parser, [cmd_class() for cmd_class in cmd_classes], all=add_all_parsers)
|
||||
|
||||
# -- Add help only now, wouldn't want to have parse_known_args() exit on --help with subcommands missing
|
||||
self.__parser.add_argument('-h', '--help', action='help', help='Show this help message and exit')
|
||||
|
||||
def __del__(self):
|
||||
if self.__own_eloop:
|
||||
if self.__eloop is not None:
|
||||
self.__eloop.close()
|
||||
self.__eloop = None
|
||||
self.__own_eloop = False
|
||||
|
||||
async def __run(self, argv=None) -> None:
|
||||
|
||||
argcomplete.autocomplete(self.__parser)
|
||||
|
||||
self.__args = self.__parser.parse_args(args=argv)
|
||||
|
||||
set_log_flags(self.__args.log_flags)
|
||||
set_log_level(self.__args.log_level)
|
||||
self.__back_trace = self.__args.backtrace
|
||||
|
||||
exit_status = 0
|
||||
|
||||
if not hasattr(self.__args, 'func'):
|
||||
self.__parser.print_help()
|
||||
return None
|
||||
|
||||
pr = None if self.__args.write_profile is None else cProfile.Profile()
|
||||
if pr is not None:
|
||||
pr.enable()
|
||||
|
||||
try:
|
||||
ret = await self._run(self.__args)
|
||||
except Exception as e:
|
||||
if hasattr(e, 'message'):
|
||||
log(ERR, e.message)
|
||||
else:
|
||||
log(ERR, f'Exception: {type(e)}: {e}')
|
||||
exit_status = 1
|
||||
if self.__back_trace:
|
||||
raise
|
||||
finally:
|
||||
if pr is not None:
|
||||
pr.disable()
|
||||
log(NOTICE, f'Writing profile statistics to {self.__args.write_profile}')
|
||||
pr.dump_stats(self.__args.write_profile)
|
||||
|
||||
if exit_status:
|
||||
sys.exit(exit_status)
|
||||
|
||||
# Run sub-command. Overwrite if you want to do anything before or after
|
||||
async def _run(self, args: argparse.Namespace) -> None:
|
||||
return await self.__args.func(args)
|
||||
|
||||
@property
|
||||
def args(self) -> argparse.Namespace:
|
||||
return self.__args
|
||||
|
||||
@property
|
||||
def parser(self) -> argparse.ArgumentParser:
|
||||
return self.__parser
|
||||
|
||||
def run(self, argv=None) -> None:
|
||||
return self.__eloop.run_until_complete(self.__run(argv)) # type: ignore
|
||||
|
||||
def run_sub_commands(description = '', name_filter = '^Cmd.*', modules=None, argv=None): # export
|
||||
app = App(description, name_filter, modules)
|
||||
return app.run(argv=argv)
|
||||
107
src/python/jw/pkg/lib/Cmd.py
Normal file
107
src/python/jw/pkg/lib/Cmd.py
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import inspect, sys, re, abc, argparse
|
||||
from argparse import ArgumentParser, _SubParsersAction
|
||||
|
||||
from .log import *
|
||||
from .Types import Types
|
||||
|
||||
# full blown example of one level of nested subcommands
|
||||
# git -C project remote -v show -n myremote
|
||||
|
||||
class Cmd(abc.ABC): # export
|
||||
|
||||
def __init__(self, name: str, help: str) -> None:
|
||||
from . import App
|
||||
self.__app: App|None = None
|
||||
self.__parent: App|Cmd|None = None
|
||||
self.__name = name
|
||||
self.__help = help
|
||||
self.__children: list[Cmd] = []
|
||||
self.__child_classes: list[type[Cmd]] = []
|
||||
|
||||
async def _run(self, args):
|
||||
pass
|
||||
|
||||
def set_parent(self, parent: Any|Cmd):
|
||||
self.__parent = parent
|
||||
|
||||
@property
|
||||
def parent(self) -> App|Cmd:
|
||||
if self.__parent is None:
|
||||
raise Exception(f'Tried to access inexistent parent of command {self.name}')
|
||||
return self.__parent
|
||||
|
||||
@property
|
||||
def app(self):
|
||||
from .App import App
|
||||
if self.__app is None:
|
||||
parent = self.__parent
|
||||
while True:
|
||||
if parent is None:
|
||||
raise Exception("Can't get application object from command without parent")
|
||||
if isinstance(parent, App):
|
||||
self.__app = parent
|
||||
break
|
||||
assert parent != parent.__parent
|
||||
parent = parent.__parent
|
||||
return self.__app
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.__name
|
||||
|
||||
@property
|
||||
def help(self) -> str:
|
||||
return self.__help
|
||||
|
||||
@property
|
||||
def children(self) -> list[Cmd]:
|
||||
return tuple(self.__children)
|
||||
|
||||
@property
|
||||
def child_classes(self) -> list[type[Cmd]]:
|
||||
return tuple(self.__child_classes)
|
||||
|
||||
@abc.abstractmethod
|
||||
async def run(self, args):
|
||||
pass
|
||||
|
||||
def add_subcommands(self, cmds: Cmd|list[Cmds]|Types|list[Types]) -> None:
|
||||
if isinstance(cmds, Cmd):
|
||||
assert False
|
||||
return
|
||||
if isinstance(cmds, list):
|
||||
for cmd in cmds:
|
||||
self.add_subcommands(cmd)
|
||||
return
|
||||
if isinstance(cmds, Types):
|
||||
try:
|
||||
for cmd_class in cmds:
|
||||
if cmd_class in self.__child_classes:
|
||||
continue
|
||||
self.__child_classes.append(cmd_class)
|
||||
cmd = cmd_class()
|
||||
cmd.set_parent(self)
|
||||
self.__children.append(cmd)
|
||||
assert len(self.__children) == len(self.__child_classes)
|
||||
except Exception as e:
|
||||
cmds.dump(ERR, f"Failed to add subcommands ({e})")
|
||||
raise
|
||||
return
|
||||
raise Exception(f'Tried to add sub-commands of unknown type {type(cmds)}')
|
||||
|
||||
# To be overridden by derived class in case the command does take arguments.
|
||||
# Will be called from App base class constructor and set up the parser hierarchy
|
||||
def add_arguments(self, parser: ArgumentParser) -> None:
|
||||
pass
|
||||
|
||||
def conf_value(self, path, default=None):
|
||||
ret = None if self.app is None else self.app.conf_value(path, default)
|
||||
if ret is None and default is not None:
|
||||
return default
|
||||
return ret
|
||||
Loading…
Add table
Add a link
Reference in a new issue