# -*- coding: utf-8 -*- import re import os import asyncio import shlex import traceback from operator import itemgetter from functools import total_ordering from jwutils.log import * from jwutils import Options from devtest.os import * from devtest.os.test import * from .. import TestCase class ListCmd(TestCase): # export # ------------------------------------- class Row @total_ordering class Row: def field(self, key, default=None, throw=True): if key in self.__fields.keys(): return self.__fields[key] if default is not None or not throw: return default raise KeyError('No field "{}" in row "{}"'.format(key, self.__fields)) def attrib(self, key, default=None, throw=True): if self.__attribs is not None: return self.__attribs.get(key, default) if default is not None or not throw: return default raise KeyError('No attrib "{}" in row "{}"'.format(key, self.__fields)) @property def fields(self): return self.__fields @property def attribs(self): return self.__attribs # "needed": [ "dummyd", "v3.23" ] # "key_": [ feature ] def check_first_match(self, key_, features): if self.__attribs is None: return False for feature in features: for key, val in self.__attribs.items(): if type(val) == bool: if key_ == key: return val continue if type(val) != list: raise Exception('Found attribute {}="{}" of unexpected value type {}'.format(key, val, type(val))) if feature in val: # first match return True return False def check_attrib(self, key, features): val = self.attrib(key, False) if type(val) == bool: return val if type(val) != list: raise Exception('Found attribute {}="{}" of unexpected value type {}'.format(key, val, type(val))) for feature in features: if feature in val: return True return False def cmp(self, other): decisive = self.parent.decisive for field in decisive: s, o = self.field(field, throw=False), other.field(field, throw=False) if (s and not o) or (o and not s): if s: # certainly not o return 1 return -1 # certainly not s if s == o: continue if s > o: return 1 if o > s: return -1 return 0 def to_str(self, only_values=False, quotes=None, fields=['fields']): use_fields = None for f_set_name in fields: use_fields = self.parent.row_info(f_set_name, throw=False) if use_fields is not None: break if use_fields is None: raise Exception("None of the fields wanted for formatting are available: {}".format(fields)) q = '"' if quotes == True else ('' if quotes is None and only_values else '') if only_values: r = ', '.join(['{}{}{}'.format(q, self.field(f), q) for f in use_fields]) else: r = ', '.join(['{}={}{}{}'.format(f, q, self.field(f), q) for f in use_fields]) if self.__attribs is not None and len(self.__attribs) > 0: r += " | " + str(self.__attribs) return r def dump(self, prio, msg=None, **kwargs): if 'caller' not in kwargs: caller = get_caller_pos(1) else: caller = kwargs['caller'] del kwargs['caller'] if msg is not None: slog(prio, ',----------------------- {}'.format(msg), caller=caller) slog(prio, '| line="{}"'.format(self.__line), caller=caller) slog(prio, '| ---------- types', caller=caller) for t in self.__types: slog(prio, '| {}'.format(t), caller=caller) if len(self.__fields): slog(prio, '| ---------- fields', caller=caller) for key, val in self.__fields.items(): slog(prio, '| {}="{}"'.format(key, val), caller=caller) if self.__attribs is not None and len(self.__attribs): slog(prio, '| ---------- attribs', caller=caller) for key, val in self.__attribs.items(): slog(prio, '| {}="{}"'.format(key, val), caller=caller) if msg is not None: slog(prio, '`----------------------- {}'.format(msg), caller=caller) @property def name(self): return '(' + self.to_str(fields=['name-fields', 'cmp-fields'], only_values=True) + ')' @property def line(self): return self.__line def __lt__(self, other): return self.cmp(other) < 0 def __le__(self, other): return self.cmp(other) <= 0 def __eq__(self, other): return self.cmp(other) == 0 def __ne__(self, other): return self.cmp(other) != 0 def __gt__(self, other): return self.cmp(other) > 0 def __ge__(self, other): return self.cmp(other) >= 0 def __str__(self): return self.to_str() def __repr__(self): return self.to_str() def __format__(self, fmt): return self.to_str() def __init__(self, parent, line, fields, attribs): self.parent = parent self.__line = line self.__fields = fields self.__attribs = attribs def __getitem__(self, key): return self.field(key) def __hash__(self): decisive = self.parent.decisive return hash(tuple([str(self.field(field, '')) for field in decisive])) # ------------------------------------- class ListCmd methods def __init__(self, refpath, act_timeout=2, total_timeout=None, write_response=None): self.refpath = refpath self.act_timeout = act_timeout self.total_timeout = total_timeout self.__decisive = None self.__row_info = None self.__write_raw_response = True if write_response is not None: self.__write_response = write_response else: key = "JW_DEVTEST_WRITE_LIST_CMD_RESPONSE_DEFAULT" val = os.getenv(key) if val is not None: val = val.lower() if val in ['true', '1', 'y', 'yes' ]: self.__write_response = True elif val in ['false', '0', 'n', 'no' ]: self.__write_response = False else: raise Exception('Invalid value "{}" of environment variable "{}"'.format(val, key)) # override this def _row_info(self): return { 'cmd': "/bin/ps", # PID USER VSZ STAT COMMAND # 38 root 0 SW [kjournald] # 40 root 2300 S {rc.start} /bin/sh /etc/rc.start # 40 root 2300 S {rc.start} /bin/sh /etc/rc.start 'regex': "^ *([0-9]+) +([a-z_][a-z0-9_-]*) +([0-9]+) +([A-Z]+) +(.+)", 'fields': [ 'pid', 'user', 'size', 'stat', 'cmd'], 'cmp-fields': [ 'user', 'cmd'], 'name-fields': [ 'cmd'], } def _filter(self, output): return output def _row_name(self, row): return '(' + row.to_str(fields=['name-fields', 'cmp-fields'], only_values=True) + ')' def row_info(self, key, default=None, throw=False): if self.__row_info == None: info = self._row_info() if type(info) == dict: self.__row_info = info else: # be backwards compatible and swallow any iterable container self.__row_info = dict() keys = ['cmd', 'regex', 'fields', 'cmp-fields', 'name-fields'] for i in range(0, len(info)): self.__row_info[keys[i]] = info[i] if not key in self.__row_info.keys(): if default is not None or not throw: return default raise Exception('Required row info "{}" missing'.format(key)) return self.__row_info[key] @property def decisive(self): if self.__decisive == None: self.__decisive = self.row_info('cmp-fields', True) return self.__decisive def parse(self, lines): def parse_line(line, fields): slog(DEBUG, "parsing line >%s<" % line) parsed = line.split('# ') line = parsed[0].strip() match = regex.search(line) if not match: slog(INFO, "Ignoring unparseable line >%s<" % line) return None attribs = None if len(parsed) <= 1 else Options(parsed[1]) for k in range(0, len(fields)): slog(DEBUG, " match {} = >{}<".format(k+1, match.group(k+1))) fields = {fields[k]: match.group(k+1) for k in range(0, len(fields)) } return self.Row(self, line=line, fields=fields, attribs=attribs) fields = self.row_info('fields') re_str = self.row_info('regex') try: regex = re.compile(re_str) except Exception as e: raise Exception('Failed to compile regex ({}): >{}<'.format(e, str(re_str))) return list(filter(None, [parse_line(l, fields) for l in lines])) def _eval(self, output, features, header=None): def format_rows(rows, quotes=False): #def cmp(r1, r2): # for k in sort_keys: # if r1[k] < r2[k]: # return True # return False sort_keys = [] key_sets = ['name-fields', 'cmp-fields'] for s in key_sets: for k in self.row_info(s, []): if k not in sort_keys: sort_keys.append(k) #return [row.to_str(fields=['cmp-fields', 'fields'], only_values=True, quotes=quotes) for row in sorted(rows, key=itemgetter(*sort_keys))] return [row.to_str(fields=['cmp-fields', 'fields'], only_values=True, quotes=quotes) for row in sorted(rows)] if self.__write_response and not os.path.exists(self.refpath): ref_lines = [] else: slog(INFO, 'Reading reference from "{}"'.format(self.refpath)) with open(self.refpath, "r") as f: ref_lines = f.readlines() if self.__write_raw_response: raw_response_path = self.refpath + '.raw' with open(raw_response_path, "w") as f: slog(INFO, 'Writing raw response to "{}"'.format(raw_response_path)) if header: f.write(header) f.write('\n'.join(output)) output = self._filter(output) last_features = set() if self.__write_response: response_path = self.refpath + '.last' if os.path.exists(response_path): with open(response_path, "r") as f: for line in f: payload, matches = re.subn('^ *# *features: *', '', line) if matches > 0: last_features = set(shlex.split(payload)) break with open(response_path, "w") as f: slog(INFO, 'Writing response to "{}"'.format(response_path)) if header: f.write(header) f.write('\n'.join(output)) reference = set(self.parse(ref_lines)) actual = set(self.parse(output)) # -- Attributes: # default: error-if-not-present (no matching feature, same as "needed', see below # ignore: no-error-if-present and no-error-if-not-present # bad: error-if-present ignore = set([row for row in reference if row.check_first_match('ignore', features)]) bad = set([row for row in reference if row.check_first_match('bad', features)]) slog_m(DEBUG, "--- reference:\n", format_rows(reference)) slog_m(DEBUG, "--- ignore:\n", format_rows(ignore)) slog_m(DEBUG, "--- bad:\n", format_rows(bad)) # the "needed" attribute is actually just as good as any string: it # gets ignored, leading to the line being seen as mandatory needed = reference - ignore - bad missing = needed - actual too_many = actual - needed - ignore ignored = ignore - (ignore - actual) slog_m(NOTICE, "--- needed:\n", format_rows(needed)) slog_m(NOTICE, "--- actual:\n", format_rows(actual)) slog_m(NOTICE, "--- ignored:\n", format_rows(ignored)) r = [] if len(missing): slog_m(ERR, "--- should be there but are not:\n", format_rows(missing, quotes=True)) r.append("missing:" + ', '.join([row.name for row in missing])) if len(too_many): slog_m(ERR, "--- too many:\n", format_rows(too_many, quotes=True)) r.append("too many: " + ', '.join([row.name for row in too_many])) if not len(r): return None feature_diff = set(features) - last_features if self.__write_response and len(feature_diff): response_path = self.refpath + '.bad' feature_diff_str = ', '.join(['"{}"'.format(f) for f in feature_diff]) with open(response_path, "w") as f: slog(INFO, 'Writing feature diff to "{}"'.format(response_path)) if header: f.write(header) if len(missing): f.write("# --- missing {}\n".format(feature_diff)) for row in missing: f.write(row.line + ' # "needed": [{}], "bad": ["default"]\n'.format(feature_diff_str)) if len(too_many): f.write("# --- too many {}\n".format(feature_diff)) for row in too_many: f.write(row.line + ' # "bad" [{}]\n'.format(feature_diff_str)) return ' and '.join(r) async def _run(self, env, machine, phase): console = get_console(env) cmd = self.row_info('cmd') output = await cmd_exec(console, cmd, act_timeout=self.act_timeout, total_timeout=self.total_timeout, echo_cmd=False) if output is None: return "Failed to run command: " + cmd if not self.__write_response: header = None else: header = '# ' + cmd + '\n' header += '# features: {}\n'.format(' '.join(env.features)) return self._eval(output, env.features, header=header) def dump(self, prio, *args, **kwargs): caller = kwargs['caller'] if 'caller' in kwargs.keys() else get_caller_pos(1) with open(self.refpath, "r") as f: reference = self.parse(f.readlines()) for l in reference: slog(NOTICE, "{}".format(l), caller=caller) def test(self, output, features): self._eval(output, features)