Add type annotations from monkeytype + jw-devops/test

Add type annotations as generated by monkeytype and jw-devops/test, plus some
hand editing to satisfy both monkeytype and mypy.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2020-04-10 17:55:36 +02:00
commit 77d43aebad
7 changed files with 51 additions and 42 deletions

View file

@ -1,6 +1,9 @@
from __future__ import annotations
import abc import abc
import argparse import argparse
from abc import ABC from abc import ABC
from argparse import ArgumentParser, _SubParsersAction
from typing import List, Type, Union, TypeVar
from jwutils import log from jwutils import log
@ -13,28 +16,28 @@ class Cmd(ABC): # export
async def run(self, args): async def run(self, args):
pass pass
def __init__(self, name, help): def __init__(self, name: str, help: str) -> None:
self.name = name self.name = name
self.help = help self.help = help
self.parent = None self.parent = None
self.children = [] self.children: List[Cmd] = []
self.child_classes = [] self.child_classes: List[Type[Cmd]] = []
async def _run(self, args): async def _run(self, args):
pass pass
def add_parser(self, parsers): def add_parser(self, parsers) -> ArgumentParser:
r = parsers.add_parser(self.name, help=self.help, r = parsers.add_parser(self.name, help=self.help,
formatter_class=argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
r.set_defaults(func=self.run) r.set_defaults(func=self.run)
return r return r
def add_subcommands(self, cmd): def add_subcommands(self, cmd: Union[Type[Cmd], List[Type[Cmd]]]) -> None:
if isinstance(cmd, list): if isinstance(cmd, list):
for c in cmd: for c in cmd:
self.add_subcommands(c) self.add_subcommands(c)
return return
self.child_classes.append(cmd) self.child_classes.append(cmd)
def add_arguments(self, parser): def add_arguments(self, parser: ArgumentParser) -> None:
pass pass

View file

@ -6,6 +6,8 @@ import inspect
import re import re
import pickle import pickle
import asyncio import asyncio
from argparse import ArgumentParser
from typing import Optional
import jwutils import jwutils
from jwutils.log import * from jwutils.log import *
@ -29,7 +31,7 @@ class Cmds: # export
for sub_cmd in cmd.children: for sub_cmd in cmd.children:
self.__add_cmd_to_parser(sub_cmd, subparsers) self.__add_cmd_to_parser(sub_cmd, subparsers)
def __init__(self, description = '', filter = '^Cmd.*', modules=None, eloop=None): def __init__(self, description: str = '', filter: str = '^Cmd.*', modules: None=None, eloop: None=None) -> None:
self.__description = description self.__description = description
self.__filter = filter self.__filter = filter
self.__modules = modules self.__modules = modules
@ -40,7 +42,7 @@ class Cmds: # export
self.eloop = asyncio.get_event_loop() self.eloop = asyncio.get_event_loop()
self.__own_eloop = True self.__own_eloop = True
log_level = NOTICE log_level = "notice"
log_flags = 'stderr,position,prio,color' log_flags = 'stderr,position,prio,color'
# poor man's parsing in the absence of a complete command-line definition # poor man's parsing in the absence of a complete command-line definition
for i in range(1, len(sys.argv)): for i in range(1, len(sys.argv)):
@ -65,7 +67,7 @@ class Cmds: # export
self.__modules = [ '__main__' ] self.__modules = [ '__main__' ]
subcmds = set() subcmds = set()
slog(DEBUG, '-- searching for commands') slog(DEBUG, '-- searching for commands')
for m in self.__modules: for m in self.__modules: # type: ignore
if m != '__main__': if m != '__main__':
importlib.import_module(m) importlib.import_module(m)
for name, c in inspect.getmembers(sys.modules[m], inspect.isclass): for name, c in inspect.getmembers(sys.modules[m], inspect.isclass):
@ -99,12 +101,12 @@ class Cmds: # export
self.eloop = None self.eloop = None
self.__own_eloop = False self.__own_eloop = False
def parser(self): def parser(self) -> ArgumentParser:
return self.__parser return self.__parser
def run(self): def run(self) -> None:
#return self.__run() #return self.__run()
return self.eloop.run_until_complete(self.__run()) return self.eloop.run_until_complete(self.__run()) # type: ignore
def run_sub_commands(description = '', filter = '^Cmd.*', modules=None): # export def run_sub_commands(description = '', filter = '^Cmd.*', modules=None): # export
cmds = Cmds(description, filter, modules) cmds = Cmds(description, filter, modules)

View file

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum, Flag, auto from enum import Enum, Flag, auto
from typing import * from typing import List
def _sigchld_handler(signum, process): def _sigchld_handler(signum, process):
if not signum == signal.SIGCHLD: if not signum == signal.SIGCHLD:

View file

@ -4,6 +4,7 @@ import sys
import inspect import inspect
from os.path import basename from os.path import basename
from datetime import datetime from datetime import datetime
from typing import List, Tuple
from . import misc from . import misc
# --- python 2 / 3 compatibility stuff # --- python 2 / 3 compatibility stuff
@ -74,11 +75,11 @@ _prio_colors = {
EMERG : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ], EMERG : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
} }
def get_caller_pos(up = 1): def get_caller_pos(up: int = 1) -> Tuple[str, int]:
caller = inspect.stack()[up+1] caller = inspect.stack()[up+1]
return (basename(caller.filename), caller.lineno) return (basename(caller.filename), caller.lineno)
def slog_m(prio, *args, **kwargs): # export def slog_m(prio: int, *args, **kwargs) -> None: # export
if prio > _level: if prio > _level:
return return
if len(args): if len(args):
@ -96,7 +97,7 @@ def slog_m(prio, *args, **kwargs): # export
for line in margs[1:].split('\n'): for line in margs[1:].split('\n'):
slog(prio, line, **kwargs, caller=caller) slog(prio, line, **kwargs, caller=caller)
def slog(prio, *args, **kwargs): # export def slog(prio: int, *args, **kwargs) -> None: # export
if prio > _level: if prio > _level:
return return
@ -145,11 +146,11 @@ def slog(prio, *args, **kwargs): # export
for file in files: for file in files:
print(msg, file=file) print(msg, file=file)
def parse_log_prio_str(prio): # export def parse_log_prio_str(prio: str) -> int: # export
try: try:
r = int(prio) r = int(prio)
if r < 0 or r > DEVEL: if r < 0 or r > DEVEL:
raise Exeption("Invalid log priority ", prio) raise Exception("Invalid log priority ", prio)
except ValueError: except ValueError:
map_prio_str_to_val = { map_prio_str_to_val = {
"EMERG" : EMERG, "EMERG" : EMERG,
@ -177,19 +178,19 @@ def parse_log_prio_str(prio): # export
return map_prio_str_to_val[prio] return map_prio_str_to_val[prio]
raise Exception("Unknown priority string \"", prio, "\"") raise Exception("Unknown priority string \"", prio, "\"")
def console_color_chars(prio): # export def console_color_chars(prio: int) -> List[str]: # export
if not sys.stdout.isatty(): if not sys.stdout.isatty():
return [ '', '' ] return [ '', '' ]
return _prio_colors[prio] return _prio_colors[prio]
def set_level(level_): # export def set_level(level_: str) -> None: # export
global _level global _level
if isinstance(level_, basestring): if isinstance(level_, basestring):
_level = parse_log_prio_str(level_) _level = parse_log_prio_str(level_)
return return
_level = level_ _level = level_
def set_flags(flags_): # export def set_flags(flags_: str) -> None: # export
global _flags global _flags
_flags = set(flags_.split(',')) _flags = set(flags_.split(','))
@ -208,20 +209,20 @@ def set_flags(flags_): # export
#pid #pid
#highlight_first_error #highlight_first_error
def append_to_prefix(prefix): # export def append_to_prefix(prefix: str) -> str: # export
global _log_prefix global _log_prefix
r = _log_prefix r = _log_prefix
if prefix: if prefix:
_log_prefix += prefix _log_prefix += prefix
return r return r
def remove_from_prefix(count): # export def remove_from_prefix(count: int) -> str: # export
global _log_prefix global _log_prefix
r = _log_prefix r = _log_prefix
_log_prefix = _log_prefix[:-count] _log_prefix = _log_prefix[:-count]
return r return r
def set_filename_length(l): # export def set_filename_length(l: int) -> int: # export
global _file_name_len global _file_name_len
r = _file_name_len r = _file_name_len
if l: if l:

View file

@ -1,9 +1,10 @@
import os, errno import os, errno
import atexit import atexit
import tempfile import tempfile
import filecmp
import inspect import inspect
from jwutils import log
from typing import Set from typing import Set
from jwutils import log
_tmpfiles: Set[str] = set() _tmpfiles: Set[str] = set()
@ -18,7 +19,7 @@ def silentremove(filename): #export
if e.errno != errno.ENOENT: if e.errno != errno.ENOENT:
raise # re-raise exception if a different error occurred raise # re-raise exception if a different error occurred
def pad(token, total_size, right_align = False): def pad(token: str, total_size: int, right_align: bool = False) -> str:
add = total_size - len(token) add = total_size - len(token)
if add <= 0: if add <= 0:
return token return token
@ -63,7 +64,7 @@ def get_derived_classes(mod, base): # export
r.append(c) r.append(c)
return r return r
def commit_tmpfile(tmp, path): # export def commit_tmpfile(tmp: str, path: str) -> None: # export
caller = log.get_caller_pos() caller = log.get_caller_pos()
if os.path.isfile(path) and filecmp.cmp(tmp, path): if os.path.isfile(path) and filecmp.cmp(tmp, path):
log.slog(log.INFO, "{} is up to date".format(path), caller=caller) log.slog(log.INFO, "{} is up to date".format(path), caller=caller)

View file

@ -1,4 +1,6 @@
from __future__ import annotations
from collections import OrderedDict from collections import OrderedDict
from typing import Optional, Union
from jwutils.log import * from jwutils.log import *
def quote(s): def quote(s):
@ -10,7 +12,7 @@ def quote(s):
return "'" + s + "'" return "'" + s + "'"
return '"' + s + '"' return '"' + s + '"'
def is_quoted(s): def is_quoted(s: str) -> bool:
if isinstance(s, StringTree): if isinstance(s, StringTree):
return False return False
s = s.strip() s = s.strip()
@ -21,7 +23,7 @@ def is_quoted(s):
return True return True
return False return False
def cleanup_string(s): def cleanup_string(s: str) -> str:
if isinstance(s, StringTree): if isinstance(s, StringTree):
return s return s
s = s.strip() s = s.strip()
@ -31,9 +33,9 @@ def cleanup_string(s):
class StringTree: # export class StringTree: # export
def __init__(self, path, content): def __init__(self, path: str, content: str) -> None:
slog(DEBUG, "ctor, path =", path, "content =", content) slog(DEBUG, "ctor, path =", path, "content =", content)
self.children = OrderedDict() self.children: OrderedDict[str, StringTree] = OrderedDict()
self.content = None self.content = None
self.__set(path, content) self.__set(path, content)
assert(hasattr(self, "content")) assert(hasattr(self, "content"))
@ -106,10 +108,10 @@ class StringTree: # export
self.children[nibble].children[gc.content] = gc self.children[nibble].children[gc.content] = gc
return self.children[nibble] return self.children[nibble]
def __str__(self): def __str__(self) -> str:
return 'st:"{}"'.format(self.content) return 'st:"{}"'.format(self.content)
def __getitem__(self, path): def __getitem__(self, path: str) -> str:
r = self.get(path) r = self.get(path)
if r is None: if r is None:
raise KeyError(path) raise KeyError(path)
@ -139,11 +141,11 @@ class StringTree: # export
raise Exception("Tried to set empty content") raise Exception("Tried to set empty content")
self.content = content self.content = content
def add(self, path, content = None, split = True): def add(self, path: str, content: Optional[Union[str, StringTree]] = None, split: bool = True) -> StringTree:
slog(DEBUG, "adding >{}< at >{}< to >{}<".format(content, path, self.content)) slog(DEBUG, "adding >{}< at >{}< to >{}<".format(content, path, self.content))
return self.__set(path, content, split) return self.__set(path, content, split)
def get(self, path_): def get(self, path_: str) -> Optional[StringTree]:
slog(DEBUG, 'looking for "{}" in "{}"'.format(path_, self.content)) slog(DEBUG, 'looking for "{}" in "{}"'.format(path_, self.content))
assert not isinstance(path_, int) assert not isinstance(path_, int)
path = cleanup_string(path_) path = cleanup_string(path_)
@ -168,10 +170,10 @@ class StringTree: # export
relpath = '.'.join(components[1:]) relpath = '.'.join(components[1:])
return self.children[name].get(relpath) return self.children[name].get(relpath)
def value(self): def value(self) -> str:
if len(self.children) == 0: if len(self.children) == 0:
return None raise Exception('tried to get value from leave "{}"'.format(self.content))
return self.children[next(reversed(self.children))].content return self.children[next(reversed(self.children))].content # type: ignore
def child_list(self, depth_first=True): def child_list(self, depth_first=True):
if depth_first == False: if depth_first == False:
@ -181,7 +183,7 @@ class StringTree: # export
r.append(c) r.append(c)
r.extend(c.to_list()) r.extend(c.to_list())
def dump(self, prio, *args, **kwargs): def dump(self, prio: int, *args, **kwargs) -> None:
caller = kwargs['caller'] if 'caller' in kwargs.keys() else get_caller_pos(1) caller = kwargs['caller'] if 'caller' in kwargs.keys() else get_caller_pos(1)
msg = '' msg = ''
if args is not None: if args is not None:

View file

@ -1,7 +1,7 @@
from jwutils.stree.StringTree import * from jwutils.stree.StringTree import *
from jwutils.log import * from jwutils.log import *
def _cleanup_line(line): def _cleanup_line(line: str) -> str:
line = line.strip() line = line.strip()
r = '' r = ''
in_quote = None in_quote = None
@ -20,7 +20,7 @@ def _cleanup_line(line):
return r[1:-1] return r[1:-1]
return r return r
def parse(s, allow_full_lines=True, root_content='root'): # export def parse(s: str, allow_full_lines: bool=True, root_content: str='root') -> StringTree: # export
slog(DEBUG, "parsing", s) slog(DEBUG, "parsing", s)
root = StringTree('', content=root_content) root = StringTree('', content=root_content)
sec = '' sec = ''
@ -66,7 +66,7 @@ def parse(s, allow_full_lines=True, root_content='root'): # export
root.add(sec + '.' + cleanup_string(lhs), cleanup_string(rhs), split=split) root.add(sec + '.' + cleanup_string(lhs), cleanup_string(rhs), split=split)
return root return root
def read(path, root_content='root'): # export def read(path: str, root_content: str='root') -> StringTree: # export
with open(path, 'r') as infile: with open(path, 'r') as infile:
s = infile.read() s = infile.read()
return parse(s, root_content=root_content) return parse(s, root_content=root_content)