# -*- coding: utf-8 -*- from __future__ import annotations from typing import Any, TYPE_CHECKING import os, sys, argparse, re, asyncio, cProfile from .AsyncRunner import AsyncRunner from .log import * from .Types import LoadTypes if TYPE_CHECKING: from typing import TypeVar from collections.abc import Awaitable T = TypeVar("T") class App: # export def _add_arguments(self, parser): self.__parser.add_argument('--log-flags', help='Log flags', default=self.__default_log_flags) self.__parser.add_argument('--log-level', help='Log level', default=self.__default_log_level) self.__parser.add_argument('--log-file', help='Log file', default=self.__default_log_file) self.__parser.add_argument('--backtrace', help='Show exception backtraces', action='store_true', default=self.__back_trace) self.__parser.add_argument('--write-profile', help='Profile code and store output to file', default=None) def __init__(self, description: str = '', name_filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None: def add_cmd_to_parser(cmd, parsers): parser = parsers.add_parser(cmd.name, help=cmd.help, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.set_defaults(func=cmd.run) cmd.add_arguments(parser) cmd.set_parser(parser) return parser def add_cmds_to_parser(parent, parser, cmds, all=False): if not cmds: return class SubCommand: def __init__(self, cmd: Cmd, parser: Any): self.cmd = cmd self.parser = parser title = 'Available subcommands' if hasattr(parent, 'name'): title += ' of ' + getattr(parent, 'name') subparsers = parser.add_subparsers(title=title, metavar='', dest='command') scs: dict[str, SubCommand] = {} for cmd in cmds: cmd.set_parent(parent) scs[cmd.name] = SubCommand(cmd, add_cmd_to_parser(cmd, subparsers)) if all: for sc in scs.values(): add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all) return args, unknown = self.__parser.parse_known_args() if args.command in scs: sc = scs[args.command] add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all) from .Cmd import Cmd self.__args: Namespace|None = None self.__cmdline: str|None = None self.__default_log_flags: str = os.getenv('JW_DEFAULT_LOG_FLAGS', default='stderr,position,prio,color') self.__default_log_level: str|int|None = os.getenv('JW_DEFAULT_LOG_LEVEL', default=NOTICE) self.__default_log_file: str|None = os.getenv('JW_DEFAULT_LOG_FILE', default=None) backtrace: str|bool = os.getenv('JW_DEFAULT_SHOW_BACKTRACE', False) self.__back_trace = True if isinstance(backtrace, str) and backtrace.lower() in ['1', 'true'] else False set_log_flags(self.__default_log_flags) set_log_level(self.__default_log_level) self.__eloop = eloop self.__own_eloop = False if eloop is None: self.__eloop = asyncio.get_event_loop() self.__own_eloop = True self.__async_runner: AsyncRunner|None = None self.__parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=description, add_help=False) self._add_arguments(self.__parser) args, unknown = self.__parser.parse_known_args() set_log_flags(args.log_flags) set_log_level(args.log_level) log(DEBUG, '-------------- Running: >' + ' '.join(sys.argv) + '<') cmd_classes = LoadTypes(modules if modules else ['__main__'], type_name_filter=name_filter, type_filter=[Cmd]) add_all_parsers = '-h' in sys.argv or '--help' in sys.argv or '_ARGCOMPLETE' in os.environ add_cmds_to_parser(self, self.__parser, [cmd_class(self) for cmd_class in cmd_classes], all=add_all_parsers) # -- Add help only now, wouldn't want to have parse_known_args() exit on --help with subcommands missing self.__parser.add_argument('-h', '--help', action='help', help='Show this help message and exit') def __del__(self): if self.__own_eloop: if self.__eloop is not None: self.__eloop.close() self.__eloop = None self.__own_eloop = False async def __run(self, argv=None) -> None: try: class NoopCompleter: def __call__(self, **kwargs): return () import argcomplete # Don't require it to be compatible with minimal environments argcomplete.autocomplete(self.__parser, default_completer=NoopCompleter()) except: pass self.__args = self.__parser.parse_args(args=argv) set_log_flags(self.__args.log_flags) set_log_level(self.__args.log_level) self.__back_trace = self.__args.backtrace exit_status = 0 if not hasattr(self.__args, 'func'): self.__parser.print_help() return None pr = None if self.__args.write_profile is None else cProfile.Profile() if pr is not None: pr.enable() try: ret = await self._run(self.__args) if isinstance(ret, int) and ret >= 0 and ret <= 0xFF: exit_status = ret except Exception as e: log(ERR, repr(e) if self.__back_trace else str(e)) exit_status = 1 if self.__back_trace: raise finally: if pr is not None: pr.disable() log(NOTICE, f'Writing profile statistics to {self.__args.write_profile}') pr.dump_stats(self.__args.write_profile) if exit_status: sys.exit(exit_status) # Run sub-command. Overwrite if you want to do anything before or after async def _run(self, args: argparse.Namespace) -> None: return await self.args.func(args) def call_async(self, awaitable: Awaitable[T], timeout: float | None = None) -> T: return self.async_runner.call(awaitable, timeout) @property def eloop(self) -> asyncio.AbstractEventLoop: return self.__eloop @property def async_runner(self) -> AsyncRunner: if self.__async_runner is None: self.__async_runner = AsyncRunner() return self.__async_runner @property def cmdline(self) -> str: if self.__cmdline is None: with open('/proc/self/cmdline', 'rb') as f: raw = f.read().split(b'\0')[:-1] self.__cmdline = ' '.join(shlex.quote(arg.decode()) for arg in raw) return self.__cmdline @property def args(self) -> argparse.Namespace: return self.__args @property def parser(self) -> argparse.ArgumentParser: return self.__parser def run(self, argv=None) -> None: try: ret = self.__eloop.run_until_complete(self.__run(argv)) # type: ignore finally: if self.__async_runner: self.__async_runner.close() self.__async_runner = None return ret def run_sub_commands(description = '', name_filter = '^Cmd.*', modules=None, argv=None): # export app = App(description, name_filter, modules) return app.run(argv=argv)