mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 01:52:56 +01:00
argcomplete takes the arguments added to argparse, and builds bash completion with it. Add it to all Cmds based executables. Signed-off-by: Jan Lindemann <jan@janware.com>
152 lines
5.9 KiB
Python
152 lines
5.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Optional
|
|
import os, sys, argcomplete, argparse, importlib, inspect, re, pickle, asyncio, cProfile
|
|
from argparse import ArgumentParser
|
|
|
|
import jwutils
|
|
from jwutils.log import *
|
|
|
|
class Cmds: # export
|
|
|
|
def __instantiate(self, cls):
|
|
try:
|
|
r = cls()
|
|
except Exception as e:
|
|
slog(ERR, f'Failed to instantiate command of type {cls}: {e}')
|
|
raise
|
|
r.cmds = self # TODO: Rename Cmds class to App, "Cmds" isn't very self-explanatory
|
|
r.app = self
|
|
return r
|
|
|
|
def __add_cmd_to_parser(self, cmd, parsers):
|
|
parser = cmd.add_parser(parsers)
|
|
cmd.add_arguments(parser)
|
|
if len(cmd.child_classes) > len(cmd.children):
|
|
for c in cmd.child_classes:
|
|
cmd.children.append(self.__instantiate(c))
|
|
if len(cmd.children) > 0:
|
|
subparsers = parser.add_subparsers(title='Available subcommands of ' + cmd.name, metavar='')
|
|
for sub_cmd in cmd.children:
|
|
self.__add_cmd_to_parser(sub_cmd, subparsers)
|
|
|
|
def __init__(self, description: str = '', filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None:
|
|
self.__description = description
|
|
self.__filter = filter
|
|
self.__modules = modules
|
|
self.__cmds = []
|
|
self.eloop = eloop
|
|
self.__own_eloop = False
|
|
if eloop is None:
|
|
self.eloop = asyncio.get_event_loop()
|
|
self.__own_eloop = True
|
|
|
|
log_level = "notice"
|
|
log_flags = 'stderr,position,prio,color'
|
|
# poor man's parsing in the absence of a complete command-line definition
|
|
for i in range(1, len(sys.argv)):
|
|
if i >= len(sys.argv) - 1:
|
|
break
|
|
arg = sys.argv[i]
|
|
if arg == '--log-level':
|
|
i += 1
|
|
log_level = sys.argv[i]
|
|
continue
|
|
if arg == '--log-flags':
|
|
log_flags = sys.argv[i]
|
|
continue
|
|
set_flags(log_flags)
|
|
set_level(log_level)
|
|
slog(DEBUG, "set log level to {}".format(log_level))
|
|
self.__parser = argparse.ArgumentParser(usage=os.path.basename(sys.argv[0]) + ' [options]',
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=self.__description)
|
|
self.__parser.add_argument('--log-flags', help='Log flags', default=log_flags)
|
|
self.__parser.add_argument('--log-level', help='Log level', default=log_level)
|
|
self.__parser.add_argument('--backtrace', help='Show exception backtraces', action='store_true', default=False)
|
|
self.__parser.add_argument('--write-profile', help='Profile code and store output to file', default=None)
|
|
if self.__modules == None:
|
|
self.__modules = [ '__main__' ]
|
|
subcmds = set()
|
|
slog(DEBUG, '-- searching for commands')
|
|
for m in self.__modules: # type: ignore
|
|
if m != '__main__':
|
|
importlib.import_module(m)
|
|
for name, c in inspect.getmembers(sys.modules[m], inspect.isclass):
|
|
if not re.match(self.__filter, name):
|
|
slog(DEBUG, 'o "{}.{}" has wrong name'.format(m, name))
|
|
continue
|
|
if inspect.isabstract(c):
|
|
slog(DEBUG, 'o "{}.{}" is abstract'.format(m, name))
|
|
continue
|
|
slog(DEBUG, 'o "{}.{}" is fine, instantiating'.format(m, name))
|
|
cmd = self.__instantiate(c)
|
|
#cmd.add_parser(subparsers)
|
|
self.__cmds.append(cmd)
|
|
subcmds.update(cmd.child_classes)
|
|
|
|
cmds = [cmd for cmd in self.__cmds if type(cmd) not in subcmds]
|
|
subparsers = self.__parser.add_subparsers(title='Available commands', metavar='')
|
|
for cmd in cmds:
|
|
slog(DEBUG, f'Adding top-level command {cmd} to parser')
|
|
self.__add_cmd_to_parser(cmd, subparsers)
|
|
|
|
async def __run(self, argv=None):
|
|
argcomplete.autocomplete(self.__parser)
|
|
self.args = self.__parser.parse_args(args=argv)
|
|
set_flags(self.args.log_flags)
|
|
set_level(self.args.log_level)
|
|
self.__back_trace = self.args.backtrace
|
|
exit_status = 0
|
|
|
|
# This is the toplevel parser, i.e. no func member has been added to the args via
|
|
#
|
|
# Cmds.__init__()
|
|
# Cmds.__add_cmd_to_parser(cmd, subparsers)
|
|
# CmdXXX.add_parser(parsers)
|
|
# super().add_parser(parsers)
|
|
# Cmd.__parser.set_defaults(func=self.run)
|
|
#
|
|
if not hasattr(self.args, 'func'):
|
|
self.__parser.print_help()
|
|
return None
|
|
|
|
pr = None if self.args.write_profile is None else cProfile.Profile()
|
|
if pr is not None:
|
|
pr.enable()
|
|
|
|
try:
|
|
ret = await self.args.func(self.args)
|
|
except Exception as e:
|
|
if hasattr(e, 'message'):
|
|
slog(ERR, e.message)
|
|
else:
|
|
slog(ERR, f'Exception: {type(e)}: {e}')
|
|
exit_status = 1
|
|
if self.__back_trace:
|
|
raise
|
|
finally:
|
|
if pr is not None:
|
|
pr.disable()
|
|
slog(NOTICE, f'Writing profile statistics to {self.args.write_profile}')
|
|
pr.dump_stats(self.args.write_profile)
|
|
|
|
if exit_status:
|
|
sys.exit(exit_status)
|
|
|
|
def __del__(self):
|
|
if self.__own_eloop:
|
|
if self.eloop is not None:
|
|
self.eloop.close()
|
|
self.eloop = None
|
|
self.__own_eloop = False
|
|
|
|
def parser(self) -> ArgumentParser:
|
|
return self.__parser
|
|
|
|
def run(self, argv=None) -> None:
|
|
#return self.__run()
|
|
return self.eloop.run_until_complete(self.__run(argv)) # type: ignore
|
|
|
|
def run_sub_commands(description = '', filter = '^Cmd.*', modules=None, argv=None): # export
|
|
cmds = Cmds(description, filter, modules)
|
|
return cmds.run(argv=argv)
|