diff --git a/src/python/jw/pkg/lib/App.py b/src/python/jw/pkg/lib/App.py new file mode 100644 index 00000000..a1cb1688 --- /dev/null +++ b/src/python/jw/pkg/lib/App.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- + +from typing import Any + +import os, sys, argcomplete, argparse, re, asyncio, cProfile + +from .log import * +from .Types import LoadTypes + +class App: # export + + def _add_arguments(self, parser): + self.__parser.add_argument('--log-flags', help='Log flags', default=self.__default_log_flags) + self.__parser.add_argument('--log-level', help='Log level', default=self.__default_log_level) + self.__parser.add_argument('--log-file', help='Log file', default=self.__default_log_file) + self.__parser.add_argument('--backtrace', help='Show exception backtraces', action='store_true', default=self.__back_trace) + self.__parser.add_argument('--write-profile', help='Profile code and store output to file', default=None) + + def __init__(self, description: str = '', name_filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None: + + def add_cmd_to_parser(cmd, parsers): + parser = parsers.add_parser(cmd.name, help=cmd.help, formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.set_defaults(func=cmd.run) + cmd.add_arguments(parser) + return parser + + def add_cmds_to_parser(parent, parser, cmds, all=False): + if not cmds: + return + class SubCommand: + def __init__(self, cmd: Cmd, parser: Any): + self.cmd = cmd + self.parser = parser + title = 'Available subcommands' + if hasattr(parent, 'name'): + title += ' of ' + getattr(parent, 'name') + subparsers = parser.add_subparsers(title=title, metavar='', dest='command') + scs: dict[str, SubCommand] = {} + for cmd in cmds: + cmd.set_parent(parent) + scs[cmd.name] = SubCommand(cmd, add_cmd_to_parser(cmd, subparsers)) + if all: + for sc in scs.values(): + add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all) + return + args, unknown = self.__parser.parse_known_args() + if args.command in scs: + sc = scs[args.command] + add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all) + + from .Cmd import Cmd + + self.__args: Namespace|None = None + self.__default_log_flags: str = os.getenv('JW_DEFAULT_LOG_FLAGS', default='stderr,position,prio,color') + self.__default_log_level: str|int|None = os.getenv('JW_DEFAULT_LOG_LEVEL', default=NOTICE) + self.__default_log_file: str|None = os.getenv('JW_DEFAULT_LOG_FILE', default=None) + backtrace: str|bool = os.getenv('JW_DEFAULT_SHOW_BACKTRACE', False) + self.__back_trace = True if isinstance(backtrace, str) and backtrace.lower() in ['1', 'true'] else False + set_log_flags(self.__default_log_flags) + set_log_level(self.__default_log_level) + + self.__eloop = eloop + self.__own_eloop = False + if eloop is None: + self.__eloop = asyncio.get_event_loop() + self.__own_eloop = True + + self.__cmds: list[Cmd] = [] + self.__parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, + description=description, add_help=False) + self._add_arguments(self.__parser) + + args, unknown = self.__parser.parse_known_args() + set_log_flags(args.log_flags) + set_log_level(args.log_level) + + log(DEBUG, '-------------- Running: >' + ' '.join(sys.argv) + '<') + + cmd_classes = LoadTypes(modules if modules else ['__main__'], type_name_filter=name_filter, type_filter=[Cmd]) + add_all_parsers = '-h' in sys.argv or '--help' in sys.argv + add_cmds_to_parser(self, self.__parser, [cmd_class() for cmd_class in cmd_classes], all=add_all_parsers) + + # -- Add help only now, wouldn't want to have parse_known_args() exit on --help with subcommands missing + self.__parser.add_argument('-h', '--help', action='help', help='Show this help message and exit') + + def __del__(self): + if self.__own_eloop: + if self.__eloop is not None: + self.__eloop.close() + self.__eloop = None + self.__own_eloop = False + + async def __run(self, argv=None) -> None: + + argcomplete.autocomplete(self.__parser) + + self.__args = self.__parser.parse_args(args=argv) + + set_log_flags(self.__args.log_flags) + set_log_level(self.__args.log_level) + self.__back_trace = self.__args.backtrace + + exit_status = 0 + + if not hasattr(self.__args, 'func'): + self.__parser.print_help() + return None + + pr = None if self.__args.write_profile is None else cProfile.Profile() + if pr is not None: + pr.enable() + + try: + ret = await self._run(self.__args) + except Exception as e: + if hasattr(e, 'message'): + log(ERR, e.message) + else: + log(ERR, f'Exception: {type(e)}: {e}') + exit_status = 1 + if self.__back_trace: + raise + finally: + if pr is not None: + pr.disable() + log(NOTICE, f'Writing profile statistics to {self.__args.write_profile}') + pr.dump_stats(self.__args.write_profile) + + if exit_status: + sys.exit(exit_status) + + # Run sub-command. Overwrite if you want to do anything before or after + async def _run(self, args: argparse.Namespace) -> None: + return await self.__args.func(args) + + @property + def args(self) -> argparse.Namespace: + return self.__args + + @property + def parser(self) -> argparse.ArgumentParser: + return self.__parser + + def run(self, argv=None) -> None: + return self.__eloop.run_until_complete(self.__run(argv)) # type: ignore + +def run_sub_commands(description = '', name_filter = '^Cmd.*', modules=None, argv=None): # export + app = App(description, name_filter, modules) + return app.run(argv=argv) diff --git a/src/python/jw/pkg/lib/Cmd.py b/src/python/jw/pkg/lib/Cmd.py new file mode 100644 index 00000000..49c1e73f --- /dev/null +++ b/src/python/jw/pkg/lib/Cmd.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- + +from __future__ import annotations + +from typing import Any + +import inspect, sys, re, abc, argparse +from argparse import ArgumentParser, _SubParsersAction + +from .log import * +from .Types import Types + +# full blown example of one level of nested subcommands +# git -C project remote -v show -n myremote + +class Cmd(abc.ABC): # export + + def __init__(self, name: str, help: str) -> None: + from . import App + self.__app: App|None = None + self.__parent: App|Cmd|None = None + self.__name = name + self.__help = help + self.__children: list[Cmd] = [] + self.__child_classes: list[type[Cmd]] = [] + + async def _run(self, args): + pass + + def set_parent(self, parent: Any|Cmd): + self.__parent = parent + + @property + def parent(self) -> App|Cmd: + if self.__parent is None: + raise Exception(f'Tried to access inexistent parent of command {self.name}') + return self.__parent + + @property + def app(self): + from .App import App + if self.__app is None: + parent = self.__parent + while True: + if parent is None: + raise Exception("Can't get application object from command without parent") + if isinstance(parent, App): + self.__app = parent + break + assert parent != parent.__parent + parent = parent.__parent + return self.__app + + @property + def name(self) -> str: + return self.__name + + @property + def help(self) -> str: + return self.__help + + @property + def children(self) -> list[Cmd]: + return tuple(self.__children) + + @property + def child_classes(self) -> list[type[Cmd]]: + return tuple(self.__child_classes) + + @abc.abstractmethod + async def run(self, args): + pass + + def add_subcommands(self, cmds: Cmd|list[Cmds]|Types|list[Types]) -> None: + if isinstance(cmds, Cmd): + assert False + return + if isinstance(cmds, list): + for cmd in cmds: + self.add_subcommands(cmd) + return + if isinstance(cmds, Types): + try: + for cmd_class in cmds: + if cmd_class in self.__child_classes: + continue + self.__child_classes.append(cmd_class) + cmd = cmd_class() + cmd.set_parent(self) + self.__children.append(cmd) + assert len(self.__children) == len(self.__child_classes) + except Exception as e: + cmds.dump(ERR, f"Failed to add subcommands ({e})") + raise + return + raise Exception(f'Tried to add sub-commands of unknown type {type(cmds)}') + + # To be overridden by derived class in case the command does take arguments. + # Will be called from App base class constructor and set up the parser hierarchy + def add_arguments(self, parser: ArgumentParser) -> None: + pass + + def conf_value(self, path, default=None): + ret = None if self.app is None else self.app.conf_value(path, default) + if ret is None and default is not None: + return default + return ret