mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 09:53:32 +01:00
The parsed args container is passed to the run() function and friends for convenient use. Sometimes, though, the base classes or umbrella commands need to make use of it, too, even more so as they may define command line arguments via add_arguments(). However, run() or _run() or whatever is never called on them, neither any other callback, so make args available to them as a member variable. Signed-off-by: Jan Lindemann <jan@janware.com>
126 lines
4.7 KiB
Python
126 lines
4.7 KiB
Python
import os
|
|
import sys
|
|
import argparse
|
|
import importlib
|
|
import inspect
|
|
import re
|
|
import pickle
|
|
import asyncio
|
|
from argparse import ArgumentParser
|
|
from typing import Optional
|
|
|
|
import jwutils
|
|
from jwutils.log import *
|
|
|
|
class Cmds: # export
|
|
|
|
def __instantiate(self, cls):
|
|
r = cls()
|
|
r.cmds = self # TODO: Rename Cmds class to App, "Cmds" isn't very self-explanatory
|
|
r.app = self
|
|
return r
|
|
|
|
def __add_cmd_to_parser(self, cmd, parsers):
|
|
parser = cmd.add_parser(parsers)
|
|
cmd.add_arguments(parser)
|
|
if len(cmd.child_classes) > len(cmd.children):
|
|
for c in cmd.child_classes:
|
|
cmd.children.append(self.__instantiate(c))
|
|
if len(cmd.children) > 0:
|
|
subparsers = parser.add_subparsers(title='Available subcommands of ' + cmd.name, metavar='')
|
|
for sub_cmd in cmd.children:
|
|
self.__add_cmd_to_parser(sub_cmd, subparsers)
|
|
|
|
def __init__(self, description: str = '', filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None:
|
|
self.__description = description
|
|
self.__filter = filter
|
|
self.__modules = modules
|
|
self.__cmds = []
|
|
self.eloop = eloop
|
|
self.__own_eloop = False
|
|
if eloop is None:
|
|
self.eloop = asyncio.get_event_loop()
|
|
self.__own_eloop = True
|
|
|
|
log_level = "notice"
|
|
log_flags = 'stderr,position,prio,color'
|
|
# poor man's parsing in the absence of a complete command-line definition
|
|
for i in range(1, len(sys.argv)):
|
|
if i >= len(sys.argv) - 1:
|
|
break
|
|
arg = sys.argv[i]
|
|
if arg == '--log-level':
|
|
i += 1
|
|
log_level = sys.argv[i]
|
|
continue
|
|
if arg == '--log-flags':
|
|
log_flags = sys.argv[i]
|
|
continue
|
|
set_flags(log_flags)
|
|
set_level(log_level)
|
|
slog(DEBUG, "set log level to {}".format(log_level))
|
|
self.__parser = argparse.ArgumentParser(usage=os.path.basename(sys.argv[0]) + ' [options]',
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=self.__description)
|
|
self.__parser.add_argument('--log-flags', help='Log flags', default=log_flags)
|
|
self.__parser.add_argument('--log-level', help='Log level', default=log_level)
|
|
if self.__modules == None:
|
|
self.__modules = [ '__main__' ]
|
|
subcmds = set()
|
|
slog(DEBUG, '-- searching for commands')
|
|
for m in self.__modules: # type: ignore
|
|
if m != '__main__':
|
|
importlib.import_module(m)
|
|
for name, c in inspect.getmembers(sys.modules[m], inspect.isclass):
|
|
if not re.match(self.__filter, name):
|
|
slog(DEBUG, 'o "{}.{}" has wrong name'.format(m, name))
|
|
continue
|
|
if inspect.isabstract(c):
|
|
slog(DEBUG, 'o "{}.{}" is abstract'.format(m, name))
|
|
continue
|
|
slog(DEBUG, 'o "{}.{}" is fine, instantiating'.format(m, name))
|
|
cmd = self.__instantiate(c)
|
|
#cmd.add_parser(subparsers)
|
|
self.__cmds.append(cmd)
|
|
subcmds.update(cmd.child_classes)
|
|
|
|
cmds = [cmd for cmd in self.__cmds if type(cmd) not in subcmds]
|
|
subparsers = self.__parser.add_subparsers(title='Available commands', metavar='')
|
|
for cmd in cmds:
|
|
self.__add_cmd_to_parser(cmd, subparsers)
|
|
|
|
async def __run(self, argv=None):
|
|
self.args = self.__parser.parse_args(args=argv)
|
|
set_flags(self.args.log_flags)
|
|
set_level(self.args.log_level)
|
|
|
|
# This is the toplevel parser, i.e. no func member has been added to the args via
|
|
#
|
|
# Cmds.__init__()
|
|
# Cmds.__add_cmd_to_parser(cmd, subparsers)
|
|
# CmdXXX.add_parser(parsers)
|
|
# super().add_parser(parsers)
|
|
# Cmd.__parser.set_defaults(func=self.run)
|
|
#
|
|
if not hasattr(self.args, 'func'):
|
|
self.__parser.print_help()
|
|
return None
|
|
|
|
return await self.args.func(self.args)
|
|
|
|
def __del__(self):
|
|
if self.__own_eloop:
|
|
if self.eloop is not None:
|
|
self.eloop.close()
|
|
self.eloop = None
|
|
self.__own_eloop = False
|
|
|
|
def parser(self) -> ArgumentParser:
|
|
return self.__parser
|
|
|
|
def run(self, argv=None) -> None:
|
|
#return self.__run()
|
|
return self.eloop.run_until_complete(self.__run(argv)) # type: ignore
|
|
|
|
def run_sub_commands(description = '', filter = '^Cmd.*', modules=None, argv=None): # export
|
|
cmds = Cmds(description, filter, modules)
|
|
return cmds.run(argv=argv)
|