mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 09:53:32 +01:00
108 lines
3.5 KiB
Python
108 lines
3.5 KiB
Python
|
|
import asyncio
|
||
|
|
from jwutils.log import *
|
||
|
|
|
||
|
|
# FIXME: Derive this from Process, or merge the classes entirely
|
||
|
|
|
||
|
|
class ShellCmd: # export
|
||
|
|
|
||
|
|
class SubprocessProtocol(asyncio.SubprocessProtocol):
|
||
|
|
|
||
|
|
def __init__(self, process, name):
|
||
|
|
self.process = process
|
||
|
|
self.name = name
|
||
|
|
super().__init__()
|
||
|
|
|
||
|
|
def pipe_data_received(self, fd, data):
|
||
|
|
stream = "stdout" if fd == 1 else ("stderr" if fd == 2 else str(fd))
|
||
|
|
tag = stream + '@' + self.name
|
||
|
|
data = data.decode().rstrip('\n')
|
||
|
|
prio = WARNING if fd == 2 else INFO
|
||
|
|
for line in data.split('\n'):
|
||
|
|
slog(prio, "[%s] %s" % (tag, line.rstrip('\r\n')))
|
||
|
|
|
||
|
|
def process_exited(self):
|
||
|
|
slog(DEBUG, "[%s] process exited" % (self.name))
|
||
|
|
super().process_exited()
|
||
|
|
self.process.exited()
|
||
|
|
|
||
|
|
class ShutdownState:
|
||
|
|
Running = 1
|
||
|
|
Triggered = 2
|
||
|
|
Completed = 3
|
||
|
|
Unnecessary = 4
|
||
|
|
|
||
|
|
def __init__(self, cmdline, eloop=None, name=None):
|
||
|
|
if eloop is None:
|
||
|
|
eloop = asyncio.get_running_loop()
|
||
|
|
self.__eloop = eloop
|
||
|
|
self.__cmdline = cmdline
|
||
|
|
self.__name = name if name is not None else cmdline[0]
|
||
|
|
self.__transport = None
|
||
|
|
self.__protocol = None
|
||
|
|
self.__proc = None
|
||
|
|
self.__rc = None
|
||
|
|
self.__task = None
|
||
|
|
self.__shutdown = self.ShutdownState.Unnecessary
|
||
|
|
|
||
|
|
async def __exec(self):
|
||
|
|
|
||
|
|
def format_cmdline(arr):
|
||
|
|
r = ''
|
||
|
|
for tok in arr:
|
||
|
|
if re.search(' ', tok):
|
||
|
|
r += ' "%s"' % tok
|
||
|
|
continue
|
||
|
|
r += ' ' + tok
|
||
|
|
return r[1:]
|
||
|
|
|
||
|
|
try:
|
||
|
|
slog(INFO, "Running shell command [{}]: {}".format(self.__name, format_cmdline(self.__cmdline)))
|
||
|
|
self.__transport, self.__protocol = await self.__eloop.subprocess_exec(
|
||
|
|
lambda: self.SubprocessProtocol(self, self.__name),
|
||
|
|
*self.__cmdline,
|
||
|
|
)
|
||
|
|
self.__proc = self.__transport.get_extra_info('subprocess') # Popen instance
|
||
|
|
except:
|
||
|
|
slog(ERR, "Failed to run process [{}]".format(self.__name))
|
||
|
|
raise
|
||
|
|
|
||
|
|
def __reap(self):
|
||
|
|
if self.__rc is None and self.__transport:
|
||
|
|
self.__transport = None
|
||
|
|
self.__rc = self.__proc.wait()
|
||
|
|
|
||
|
|
# to be called from SubprocessProtocol / SIGCHLD handler
|
||
|
|
def exited(self):
|
||
|
|
slog(DEBUG, "Process {} exited".format(self.__name))
|
||
|
|
self.__reap()
|
||
|
|
|
||
|
|
async def __cleanup(self):
|
||
|
|
pid = self.__reap()
|
||
|
|
sd_fine = self.__shutdown in [ self.ShutdownState.Unnecessary, self.ShutdownState.Completed ]
|
||
|
|
if self.__rc == 0 and sd_fine:
|
||
|
|
slog(INFO, "The shell command [{}], pid {}, has exited cleanly".format(self.__name, self.__proc.pid))
|
||
|
|
self.monitor = self.console = self.__protocol = self.__task = None
|
||
|
|
return 0
|
||
|
|
slog(ERR, "The process ([{}], pid {}) has exited {}with status code {}, aborting".format(
|
||
|
|
self.__name, pid, "" if sd_fine else "prematurely ", self.__rc))
|
||
|
|
exit(1)
|
||
|
|
|
||
|
|
async def init(self):
|
||
|
|
self.__task = await self.__eloop.create_task(self.__exec())
|
||
|
|
|
||
|
|
async def cleanup(self):
|
||
|
|
await self.__cleanup()
|
||
|
|
|
||
|
|
async def run(self):
|
||
|
|
await self.init()
|
||
|
|
await self.cleanup()
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
import jwutils.log
|
||
|
|
jwutils.log.set_level(INFO)
|
||
|
|
async def run():
|
||
|
|
sp = ShellCmd([ 'echo', 'hello world!' ])
|
||
|
|
await sp.run()
|
||
|
|
|
||
|
|
asyncio.run(run())
|
||
|
|
|