jw-python/tools/python/jwutils/Cmds.py
Jan Lindemann d35a5588cd Cmds.__run(): Don't exit(0)
WSGI doesn't like sys.exit() being called, so avoid it. Two cases
need to be taken into consideration:

  1. No exception thrown by self.args.func()

     The variable exit_status is zero, we can check that and _not_
     call sys.exit()

  2. Exception thrown by self.args.func()

     In that case, the exception should be raised, to be caught by
     WSGI / ASGI whatever to do what they want with it. The code
     calling sys.exit() is never reached. Hence, we need to add
     --backtrace to the invocation options.

Signed-off-by: Jan Lindemann <jan@janware.com>
2025-07-05 12:27:11 +02:00

158 lines
5.9 KiB
Python

import os
import sys
import argparse
import importlib
import inspect
import re
import pickle
import asyncio
from argparse import ArgumentParser
from typing import Optional
import cProfile
import jwutils
from jwutils.log import *
class Cmds: # export
def __instantiate(self, cls):
try:
r = cls()
except Exception as e:
slog(ERR, f'Failed to instantiate command of type {cls}: {e}')
raise
r.cmds = self # TODO: Rename Cmds class to App, "Cmds" isn't very self-explanatory
r.app = self
return r
def __add_cmd_to_parser(self, cmd, parsers):
parser = cmd.add_parser(parsers)
cmd.add_arguments(parser)
if len(cmd.child_classes) > len(cmd.children):
for c in cmd.child_classes:
cmd.children.append(self.__instantiate(c))
if len(cmd.children) > 0:
subparsers = parser.add_subparsers(title='Available subcommands of ' + cmd.name, metavar='')
for sub_cmd in cmd.children:
self.__add_cmd_to_parser(sub_cmd, subparsers)
def __init__(self, description: str = '', filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None:
self.__description = description
self.__filter = filter
self.__modules = modules
self.__cmds = []
self.eloop = eloop
self.__own_eloop = False
if eloop is None:
self.eloop = asyncio.get_event_loop()
self.__own_eloop = True
log_level = "notice"
log_flags = 'stderr,position,prio,color'
# poor man's parsing in the absence of a complete command-line definition
for i in range(1, len(sys.argv)):
if i >= len(sys.argv) - 1:
break
arg = sys.argv[i]
if arg == '--log-level':
i += 1
log_level = sys.argv[i]
continue
if arg == '--log-flags':
log_flags = sys.argv[i]
continue
set_flags(log_flags)
set_level(log_level)
slog(DEBUG, "set log level to {}".format(log_level))
self.__parser = argparse.ArgumentParser(usage=os.path.basename(sys.argv[0]) + ' [options]',
formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=self.__description)
self.__parser.add_argument('--log-flags', help='Log flags', default=log_flags)
self.__parser.add_argument('--log-level', help='Log level', default=log_level)
self.__parser.add_argument('--backtrace', help='Show exception backtraces', action='store_true', default=False)
self.__parser.add_argument('--write-profile', help='Profile code and store output to file', default=None)
if self.__modules == None:
self.__modules = [ '__main__' ]
subcmds = set()
slog(DEBUG, '-- searching for commands')
for m in self.__modules: # type: ignore
if m != '__main__':
importlib.import_module(m)
for name, c in inspect.getmembers(sys.modules[m], inspect.isclass):
if not re.match(self.__filter, name):
slog(DEBUG, 'o "{}.{}" has wrong name'.format(m, name))
continue
if inspect.isabstract(c):
slog(DEBUG, 'o "{}.{}" is abstract'.format(m, name))
continue
slog(DEBUG, 'o "{}.{}" is fine, instantiating'.format(m, name))
cmd = self.__instantiate(c)
#cmd.add_parser(subparsers)
self.__cmds.append(cmd)
subcmds.update(cmd.child_classes)
cmds = [cmd for cmd in self.__cmds if type(cmd) not in subcmds]
subparsers = self.__parser.add_subparsers(title='Available commands', metavar='')
for cmd in cmds:
slog(DEBUG, f'Adding top-level command {cmd} to parser')
self.__add_cmd_to_parser(cmd, subparsers)
async def __run(self, argv=None):
self.args = self.__parser.parse_args(args=argv)
set_flags(self.args.log_flags)
set_level(self.args.log_level)
self.__back_trace = self.args.backtrace
exit_status = 0
# This is the toplevel parser, i.e. no func member has been added to the args via
#
# Cmds.__init__()
# Cmds.__add_cmd_to_parser(cmd, subparsers)
# CmdXXX.add_parser(parsers)
# super().add_parser(parsers)
# Cmd.__parser.set_defaults(func=self.run)
#
if not hasattr(self.args, 'func'):
self.__parser.print_help()
return None
pr = None if self.args.write_profile is None else cProfile.Profile()
if pr is not None:
pr.enable()
try:
ret = await self.args.func(self.args)
except Exception as e:
if hasattr(e, 'message'):
slog(ERR, e.message)
else:
slog(ERR, f'Exception: {type(e)}: {e}')
exit_status = 1
if self.__back_trace:
raise
finally:
if pr is not None:
pr.disable()
slog(NOTICE, f'Writing profile statistics to {self.args.write_profile}')
pr.dump_stats(self.args.write_profile)
if exit_status:
sys.exit(exit_status)
def __del__(self):
if self.__own_eloop:
if self.eloop is not None:
self.eloop.close()
self.eloop = None
self.__own_eloop = False
def parser(self) -> ArgumentParser:
return self.__parser
def run(self, argv=None) -> None:
#return self.__run()
return self.eloop.run_until_complete(self.__run(argv)) # type: ignore
def run_sub_commands(description = '', filter = '^Cmd.*', modules=None, argv=None): # export
cmds = Cmds(description, filter, modules)
return cmds.run(argv=argv)