mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 01:52:56 +01:00
- Add coding statement
- Import all modules in one line where possible
- Order: __future__, typing, plain imports, from imports,
janware modules
Signed-off-by: Jan Lindemann <jan@janware.com>
345 lines
10 KiB
Python
345 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import re, shlex
|
|
from collections import namedtuple
|
|
|
|
from ..log import *
|
|
|
|
# --- python 2 / 3 compatibility stuff
|
|
try:
|
|
basestring # type: ignore
|
|
except NameError:
|
|
basestring = str
|
|
|
|
L, R = 'Left Right'.split()
|
|
ARG, KEYW, QUOTED, LPAREN, RPAREN = 'arg kw quoted ( )'.split()
|
|
|
|
class Operator: # export
|
|
|
|
def __init__(self, func=None, nargs=2, precedence=3, assoc=L):
|
|
self.func = func
|
|
self.nargs = nargs
|
|
self.prec = precedence
|
|
self.assoc = assoc
|
|
|
|
class Stack:
|
|
|
|
def __init__(self, itemlist=[]):
|
|
self.items = itemlist
|
|
|
|
def isEmpty(self):
|
|
if self.items == []:
|
|
return True
|
|
return False
|
|
|
|
def peek(self):
|
|
return self.items[-1:][0]
|
|
|
|
def pop(self):
|
|
return self.items.pop()
|
|
|
|
def push(self, item):
|
|
self.items.append(item)
|
|
return 0
|
|
|
|
class ShuntingYard(object): # export
|
|
|
|
def __init__(self, operators = None):
|
|
self.do_debug = False
|
|
#self.do_debug = True
|
|
self.__ops = {}
|
|
if operators is not None:
|
|
for k, v in operators.items():
|
|
self.add_operator(k, v.func, v.nargs, v.prec, v.assoc)
|
|
|
|
def debug(self, *args):
|
|
if self.do_debug:
|
|
msg = ""
|
|
for count, thing in enumerate(args):
|
|
msg += ' ' + str(thing)
|
|
if len(msg):
|
|
slog(DEBUG, msg[1:], caller=get_caller_pos())
|
|
|
|
def operator(self, key: str) -> Operator:
|
|
return self.__ops[key]
|
|
|
|
def token_string(self):
|
|
r = ""
|
|
for k in sorted(self.__ops):
|
|
v = self.__ops[k]
|
|
buf = ", \"" + k
|
|
if v.nargs == 1:
|
|
if k[len(k)-1].isalnum():
|
|
buf = buf + ' '
|
|
buf = buf + "xxx"
|
|
buf = buf + "\""
|
|
r = r + buf
|
|
|
|
if len(r):
|
|
return r[2:]
|
|
return r
|
|
|
|
def tokenize(self, spec):
|
|
|
|
regex = ""
|
|
for k in self.__ops.keys():
|
|
regex = regex + "|" + re.escape(k)
|
|
|
|
regex = regex[1:]
|
|
|
|
scanner = re.Scanner([
|
|
(regex, lambda scanner,token:(KEYW, token)),
|
|
(r"\"[^\"]*\"|'[^']*'", lambda scanner,token:(QUOTED, token[1:-1])),
|
|
(r"[^\s()]+", lambda scanner,token:(ARG, token)),
|
|
(r"\s+", None), # None == skip token.
|
|
])
|
|
|
|
tokens, remainder = scanner.scan(spec)
|
|
if len(remainder)>0:
|
|
raise Exception("Failed to tokenize " + spec + ", remaining bit is ", remainder)
|
|
|
|
#self.debug(tokens)
|
|
return tokens
|
|
r = []
|
|
for e in tokens:
|
|
if e[0] == "quoted":
|
|
r.append(e[1][1:-1])
|
|
else:
|
|
r.append(e[1])
|
|
|
|
return r
|
|
|
|
def add_operator(self, name, func, nargs, precedence, assoc):
|
|
self.__ops[name] = Operator(func, nargs, precedence, assoc)
|
|
|
|
def infix_to_postfix(self, infix):
|
|
tokenized = self.tokenize(infix)
|
|
self.debug("tokenized = ", tokenized)
|
|
outq, stack = [], []
|
|
table = ['TOKEN,ACTION,RPN OUTPUT,OP STACK,NOTES'.split(',')]
|
|
for toktype, token in tokenized:
|
|
self.debug("Checking token", token)
|
|
note = action = ''
|
|
if toktype in [ ARG, QUOTED ]:
|
|
action = 'Add arg to output'
|
|
outq.append(token)
|
|
table.append( (token, action, outq, (s[0] for s in stack), note) )
|
|
elif toktype == KEYW:
|
|
val = self.__ops[token]
|
|
t1, op1 = token, val
|
|
v = t1
|
|
note = 'Pop ops from stack to output'
|
|
while stack:
|
|
t2, op2 = stack[-1]
|
|
if (op1.assoc == L and op1.prec <= op2.prec) or (op1.assoc == R and op1.prec < op2.prec):
|
|
if t1 != RPAREN:
|
|
if t2 != LPAREN:
|
|
stack.pop()
|
|
action = '(Pop op)'
|
|
outq.append(t2)
|
|
else:
|
|
break
|
|
else:
|
|
if t2 != LPAREN:
|
|
stack.pop()
|
|
action = '(Pop op)'
|
|
outq.append(t2)
|
|
else:
|
|
stack.pop()
|
|
action = '(Pop & discard "(")'
|
|
table.append( (v, action, outq, (s[0] for s in stack), note) )
|
|
break
|
|
table.append( (v, action, (outq), (s[0] for s in stack), note) )
|
|
v = note = ''
|
|
else:
|
|
note = ''
|
|
break
|
|
note = ''
|
|
note = ''
|
|
if t1 != RPAREN:
|
|
stack.append((token, val))
|
|
action = 'Push op token to stack'
|
|
else:
|
|
action = 'Discard ")"'
|
|
table.append( (v, action, (outq), (s[0] for s in stack), note) )
|
|
note = 'Drain stack to output'
|
|
while stack:
|
|
v = ''
|
|
t2, op2 = stack[-1]
|
|
action = '(Pop op)'
|
|
stack.pop()
|
|
outq.append(t2)
|
|
table.append( (v, action, outq, (s[0] for s in stack), note) )
|
|
v = note = ''
|
|
if self.do_debug:
|
|
maxcolwidths = [len(max(x, key=len)) for x in [zip(*table)]]
|
|
caller = get_caller_pos()
|
|
row = table[0]
|
|
slog(DEBUG, ' '.join('{cell:^{width}}'.format(width=width, cell=cell) for (width, cell) in zip(maxcolwidths, row)))
|
|
for row in table[1:]:
|
|
slog(DEBUG, ' '.join('{cell:<{width}}'.format(width=width, cell=cell) for (width, cell) in zip(maxcolwidths, row)))
|
|
return table[-1][2]
|
|
|
|
def infix_to_postfix_orig(self, infix):
|
|
|
|
s = Stack()
|
|
r = []
|
|
tokens = self.tokenize(infix)
|
|
|
|
for tokinfo in tokens:
|
|
|
|
self.debug(tokinfo)
|
|
toktype, token = tokinfo[0], tokinfo[1]
|
|
|
|
self.debug("Checking token ", token)
|
|
|
|
if token not in self.__ops.keys():
|
|
r.append(token)
|
|
continue
|
|
|
|
if token == '(':
|
|
s.push(token)
|
|
continue
|
|
|
|
if token == ')':
|
|
topToken = s.pop()
|
|
while topToken != '(':
|
|
r.append(topToken)
|
|
topToken = s.pop()
|
|
continue
|
|
|
|
while (not s.isEmpty()) and (self.__ops[s.peek()].prec >= self.__ops[token].prec):
|
|
#self.debug(token)
|
|
r.append(s.pop())
|
|
#self.debug(r)
|
|
|
|
s.push(token)
|
|
self.debug((s.peek()))
|
|
|
|
while not s.isEmpty():
|
|
opToken = s.pop()
|
|
r.append(opToken)
|
|
#self.debug(r)
|
|
|
|
return r
|
|
#return " ".join(r)
|
|
|
|
def eval_postfix(self, postfixexpr):
|
|
|
|
self.debug("postfix = ", postfixexpr)
|
|
vals = Stack()
|
|
|
|
for token in postfixexpr:
|
|
|
|
self.debug("Evaluating token %s" % (token))
|
|
if token not in self.__ops.keys():
|
|
vals.push(token)
|
|
continue
|
|
|
|
op = self.__ops[token]
|
|
args = []
|
|
self.debug("Adding %d arguments" % (op.nargs))
|
|
for i in range(0, op.nargs):
|
|
self.debug("Adding argument %d" % (i))
|
|
args.append(vals.pop())
|
|
#self.debug("running %s(%s)" % (token, ', '.join(reversed(args))))
|
|
val = op.func(*reversed(args))
|
|
self.debug("%s(%s) = %s" % (token, ', '.join(map(str, reversed(args))), val))
|
|
vals.push(val)
|
|
|
|
return vals.pop()
|
|
|
|
def eval(self, infix):
|
|
if not isinstance(infix, basestring):
|
|
return infix
|
|
postfix = self.infix_to_postfix(infix)
|
|
self.debug(infix, "-->", postfix)
|
|
for token in postfix:
|
|
self.debug("Token is %s" % (token))
|
|
return self.eval_postfix(postfix)
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# ------------- testbed calculator
|
|
|
|
from locale import atof
|
|
|
|
class Calculator(ShuntingYard):
|
|
|
|
#def tokenize(self, string):
|
|
# return string.split()
|
|
|
|
def f_mult(self, a, b):
|
|
return str(atof(a) * atof(b));
|
|
|
|
def f_div(self, a, b):
|
|
return str(atof(a) / atof(b));
|
|
|
|
def f_add(self, a, b):
|
|
return str(atof(a) + atof(b));
|
|
|
|
def f_sub(self, a, b):
|
|
return str(atof(a) - atof(b));
|
|
|
|
def __init__(self):
|
|
Op = Operator
|
|
operators = {
|
|
'^': Op(None, 2, 4, R),
|
|
'*': Op(self.f_mult, 2, 3, L),
|
|
'/': Op(self.f_div, 2, 3, L),
|
|
'+': Op(self.f_add, 2, 2, L),
|
|
'-': Op(self.f_sub, 2, 2, L),
|
|
'(': Op(None, 0, 9, L),
|
|
')': Op(None, 0, 0, L),
|
|
}
|
|
super(Calculator, self).__init__(operators)
|
|
|
|
rr = Calculator().eval("( 2 * 3 + 4 * 5 ) / ( 5 - 3 )")
|
|
print("Result =", rr)
|
|
|
|
# ------------- testbed match object
|
|
|
|
Object = namedtuple("Object", [ "Name", "Label" ])
|
|
|
|
class Matcher(ShuntingYard):
|
|
|
|
def f_is_name(self, a):
|
|
if obj.Name == a:
|
|
return "True"
|
|
return "False"
|
|
|
|
def f_matches_label(self, a):
|
|
if re.match(a, obj.Label):
|
|
return "True"
|
|
return "False"
|
|
|
|
def f_is_not(self, a):
|
|
if a == "True":
|
|
return "False"
|
|
return "True"
|
|
|
|
def f_and(self, a_, b_):
|
|
a = a_ == "True"
|
|
b = b_ == "True"
|
|
if a and b:
|
|
return "True"
|
|
return "False"
|
|
|
|
def __init__(self, obj):
|
|
Op = Operator
|
|
operators = {
|
|
'(': Op(None, 2, 9, L),
|
|
')': Op(None, 2, 0, L),
|
|
'name=': Op(self.f_is_name, 1, 3, R),
|
|
'and': Op(self.f_and, 2, 3, L),
|
|
'label~=': Op(self.f_matches_label, 1, 3, R),
|
|
'False': Op(None, 0, 3, L),
|
|
'True': Op(None, 0, 3, L),
|
|
'not': Op(self.f_is_not, 1, 3, R),
|
|
}
|
|
super(Matcher, self).__init__(operators)
|
|
|
|
obj = Object("hans", "wurst")
|
|
|
|
r = Matcher(obj).eval("name=hans and (not label~=worst)")
|
|
print("Result =", r)
|