mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-25 17:45:55 +02:00
During __init__(), commands have no idea of their parent. This is not a problem as of now, but is easy to fix, and it's architecturally desirable to be prepared just in case, so add the parent argument to the ctor before more commands are added. Signed-off-by: Jan Lindemann <jan@janware.com>
148 lines
5.9 KiB
Python
148 lines
5.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Any
|
|
|
|
import os, sys, argcomplete, argparse, re, asyncio, cProfile
|
|
|
|
from .log import *
|
|
from .Types import LoadTypes
|
|
|
|
class App: # export
|
|
|
|
def _add_arguments(self, parser):
|
|
self.__parser.add_argument('--log-flags', help='Log flags', default=self.__default_log_flags)
|
|
self.__parser.add_argument('--log-level', help='Log level', default=self.__default_log_level)
|
|
self.__parser.add_argument('--log-file', help='Log file', default=self.__default_log_file)
|
|
self.__parser.add_argument('--backtrace', help='Show exception backtraces', action='store_true', default=self.__back_trace)
|
|
self.__parser.add_argument('--write-profile', help='Profile code and store output to file', default=None)
|
|
|
|
def __init__(self, description: str = '', name_filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None:
|
|
|
|
def add_cmd_to_parser(cmd, parsers):
|
|
parser = parsers.add_parser(cmd.name, help=cmd.help, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
parser.set_defaults(func=cmd.run)
|
|
cmd.add_arguments(parser)
|
|
return parser
|
|
|
|
def add_cmds_to_parser(parent, parser, cmds, all=False):
|
|
if not cmds:
|
|
return
|
|
class SubCommand:
|
|
def __init__(self, cmd: Cmd, parser: Any):
|
|
self.cmd = cmd
|
|
self.parser = parser
|
|
title = 'Available subcommands'
|
|
if hasattr(parent, 'name'):
|
|
title += ' of ' + getattr(parent, 'name')
|
|
subparsers = parser.add_subparsers(title=title, metavar='', dest='command')
|
|
scs: dict[str, SubCommand] = {}
|
|
for cmd in cmds:
|
|
cmd.set_parent(parent)
|
|
scs[cmd.name] = SubCommand(cmd, add_cmd_to_parser(cmd, subparsers))
|
|
if all:
|
|
for sc in scs.values():
|
|
add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all)
|
|
return
|
|
args, unknown = self.__parser.parse_known_args()
|
|
if args.command in scs:
|
|
sc = scs[args.command]
|
|
add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all=all)
|
|
|
|
from .Cmd import Cmd
|
|
|
|
self.__args: Namespace|None = None
|
|
self.__default_log_flags: str = os.getenv('JW_DEFAULT_LOG_FLAGS', default='stderr,position,prio,color')
|
|
self.__default_log_level: str|int|None = os.getenv('JW_DEFAULT_LOG_LEVEL', default=NOTICE)
|
|
self.__default_log_file: str|None = os.getenv('JW_DEFAULT_LOG_FILE', default=None)
|
|
backtrace: str|bool = os.getenv('JW_DEFAULT_SHOW_BACKTRACE', False)
|
|
self.__back_trace = True if isinstance(backtrace, str) and backtrace.lower() in ['1', 'true'] else False
|
|
set_log_flags(self.__default_log_flags)
|
|
set_log_level(self.__default_log_level)
|
|
|
|
self.__eloop = eloop
|
|
self.__own_eloop = False
|
|
if eloop is None:
|
|
self.__eloop = asyncio.get_event_loop()
|
|
self.__own_eloop = True
|
|
|
|
self.__parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
description=description, add_help=False)
|
|
self._add_arguments(self.__parser)
|
|
|
|
args, unknown = self.__parser.parse_known_args()
|
|
set_log_flags(args.log_flags)
|
|
set_log_level(args.log_level)
|
|
|
|
log(DEBUG, '-------------- Running: >' + ' '.join(sys.argv) + '<')
|
|
|
|
cmd_classes = LoadTypes(modules if modules else ['__main__'], type_name_filter=name_filter, type_filter=[Cmd])
|
|
add_all_parsers = '-h' in sys.argv or '--help' in sys.argv
|
|
add_cmds_to_parser(self, self.__parser, [cmd_class(self) for cmd_class in cmd_classes], all=add_all_parsers)
|
|
|
|
# -- Add help only now, wouldn't want to have parse_known_args() exit on --help with subcommands missing
|
|
self.__parser.add_argument('-h', '--help', action='help', help='Show this help message and exit')
|
|
|
|
def __del__(self):
|
|
if self.__own_eloop:
|
|
if self.__eloop is not None:
|
|
self.__eloop.close()
|
|
self.__eloop = None
|
|
self.__own_eloop = False
|
|
|
|
async def __run(self, argv=None) -> None:
|
|
|
|
argcomplete.autocomplete(self.__parser)
|
|
|
|
self.__args = self.__parser.parse_args(args=argv)
|
|
|
|
set_log_flags(self.__args.log_flags)
|
|
set_log_level(self.__args.log_level)
|
|
self.__back_trace = self.__args.backtrace
|
|
|
|
exit_status = 0
|
|
|
|
if not hasattr(self.__args, 'func'):
|
|
self.__parser.print_help()
|
|
return None
|
|
|
|
pr = None if self.__args.write_profile is None else cProfile.Profile()
|
|
if pr is not None:
|
|
pr.enable()
|
|
|
|
try:
|
|
ret = await self._run(self.__args)
|
|
except Exception as e:
|
|
if hasattr(e, 'message'):
|
|
log(ERR, e.message)
|
|
else:
|
|
log(ERR, f'Exception: {type(e)}: {e}')
|
|
exit_status = 1
|
|
if self.__back_trace:
|
|
raise
|
|
finally:
|
|
if pr is not None:
|
|
pr.disable()
|
|
log(NOTICE, f'Writing profile statistics to {self.__args.write_profile}')
|
|
pr.dump_stats(self.__args.write_profile)
|
|
|
|
if exit_status:
|
|
sys.exit(exit_status)
|
|
|
|
# Run sub-command. Overwrite if you want to do anything before or after
|
|
async def _run(self, args: argparse.Namespace) -> None:
|
|
return await self.args.func(args)
|
|
|
|
@property
|
|
def args(self) -> argparse.Namespace:
|
|
return self.__args
|
|
|
|
@property
|
|
def parser(self) -> argparse.ArgumentParser:
|
|
return self.__parser
|
|
|
|
def run(self, argv=None) -> None:
|
|
return self.__eloop.run_until_complete(self.__run(argv)) # type: ignore
|
|
|
|
def run_sub_commands(description = '', name_filter = '^Cmd.*', modules=None, argv=None): # export
|
|
app = App(description, name_filter, modules)
|
|
return app.run(argv=argv)
|