diff --git a/tools/python/jwutils/asyncio/ShellCmd.py b/tools/python/jwutils/asyncio/ShellCmd.py new file mode 100644 index 0000000..a2889ba --- /dev/null +++ b/tools/python/jwutils/asyncio/ShellCmd.py @@ -0,0 +1,108 @@ +import asyncio +from jwutils.log import * + +# FIXME: Derive this from Process, or merge the classes entirely + +class ShellCmd: # export + + class SubprocessProtocol(asyncio.SubprocessProtocol): + + def __init__(self, process, name): + self.process = process + self.name = name + super().__init__() + + def pipe_data_received(self, fd, data): + stream = "stdout" if fd == 1 else ("stderr" if fd == 2 else str(fd)) + tag = stream + '@' + self.name + data = data.decode().rstrip('\n') + prio = WARNING if fd == 2 else INFO + for line in data.split('\n'): + slog(prio, "[%s] %s" % (tag, line.rstrip('\r\n'))) + + def process_exited(self): + slog(DEBUG, "[%s] process exited" % (self.name)) + super().process_exited() + self.process.exited() + + class ShutdownState: + Running = 1 + Triggered = 2 + Completed = 3 + Unnecessary = 4 + + def __init__(self, cmdline, eloop=None, name=None): + if eloop is None: + eloop = asyncio.get_running_loop() + self.__eloop = eloop + self.__cmdline = cmdline + self.__name = name if name is not None else cmdline[0] + self.__transport = None + self.__protocol = None + self.__proc = None + self.__rc = None + self.__task = None + self.__shutdown = self.ShutdownState.Unnecessary + + async def __exec(self): + + def format_cmdline(arr): + r = '' + for tok in arr: + if re.search(' ', tok): + r += ' "%s"' % tok + continue + r += ' ' + tok + return r[1:] + + try: + slog(INFO, "Running shell command [{}]: {}".format(self.__name, format_cmdline(self.__cmdline))) + self.__transport, self.__protocol = await self.__eloop.subprocess_exec( + lambda: self.SubprocessProtocol(self, self.__name), + *self.__cmdline, + ) + self.__proc = self.__transport.get_extra_info('subprocess') # Popen instance + except: + slog(ERR, "Failed to run process [{}]".format(self.__name)) + raise + + def __reap(self): + if self.__rc is None and self.__transport: + self.__transport = None + self.__rc = self.__proc.wait() + + # to be called from SubprocessProtocol / SIGCHLD handler + def exited(self): + slog(DEBUG, "Process {} exited".format(self.__name)) + self.__reap() + + async def __cleanup(self): + pid = self.__reap() + sd_fine = self.__shutdown in [ self.ShutdownState.Unnecessary, self.ShutdownState.Completed ] + if self.__rc == 0 and sd_fine: + slog(INFO, "The shell command [{}], pid {}, has exited cleanly".format(self.__name, self.__proc.pid)) + self.monitor = self.console = self.__protocol = self.__task = None + return 0 + slog(ERR, "The process ([{}], pid {}) has exited {}with status code {}, aborting".format( + self.__name, pid, "" if sd_fine else "prematurely ", self.__rc)) + exit(1) + + async def init(self): + self.__task = await self.__eloop.create_task(self.__exec()) + + async def cleanup(self): + await self.__cleanup() + + async def run(self): + await self.init() + await self.cleanup() + +if __name__ == '__main__': + import jwutils.log + jwutils.log.set_level(INFO) + async def run(): + sp = ShellCmd([ 'echo', 'hello world!' ]) + await sp.run() + + asyncio.run(run()) +