mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-24 09:13:37 +02:00
lib.ProcFilter: Add module
Add the ProcFilter module, containing the following classes:
- ProcFilter:
Abstract base class defining a run(data: bytes) -> Result method,
allowing to do arbitrary data manipulation in subclasses.
- ProcFilterIdenty
A ProcFilter specialization which passes through the input
unchanged.
- ProcPipeline
A container for multiple chained ProcFilter classes
Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
9eda1c890e
commit
411bfd41ca
1 changed files with 44 additions and 0 deletions
44
src/python/jw/pkg/lib/ProcFilter.py
Normal file
44
src/python/jw/pkg/lib/ProcFilter.py
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
import abc
|
||||||
|
|
||||||
|
from .base import Result
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
class ProcFilter(abc.ABC):
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
async def _run(self, data: bytes) -> Result:
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
async def run(self, data: bytes) -> Result:
|
||||||
|
return await self._run(data)
|
||||||
|
|
||||||
|
class ProcFilterIdentity(ProcFilter):
|
||||||
|
|
||||||
|
async def _run(self, data: bytes) -> Result:
|
||||||
|
return Result(data, None, 0)
|
||||||
|
|
||||||
|
class ProcPipeline:
|
||||||
|
|
||||||
|
def __init__(self, f: Iterable[ProcFilter]|ProcFilter = []) -> None:
|
||||||
|
self.__filters: list[ProcFilter] = []
|
||||||
|
self.append(f)
|
||||||
|
|
||||||
|
def append(self, f: ProcFilter|Iterable[ProcFilter]) -> None:
|
||||||
|
if not isinstance(f, ProcFilter):
|
||||||
|
for e in f:
|
||||||
|
self.append(e)
|
||||||
|
return
|
||||||
|
self.__filters.append(f)
|
||||||
|
|
||||||
|
async def run(self, data: bytes|Result) -> Result:
|
||||||
|
ret = data if isinstance(data, Result) else Result(data, None, 0)
|
||||||
|
for f in self.__filters:
|
||||||
|
ret = await f.run(ret.stdout)
|
||||||
|
return ret
|
||||||
Loading…
Add table
Add a link
Reference in a new issue