import asyncio from jwutils.log import * # FIXME: Derive this from Process, or merge the classes entirely class ShellCmd: # export class SubprocessProtocol(asyncio.SubprocessProtocol): def __init__(self, process, name): self.process = process self.name = name super().__init__() def pipe_data_received(self, fd, data): stream = "stdout" if fd == 1 else ("stderr" if fd == 2 else str(fd)) tag = stream + '@' + self.name data = data.decode().rstrip('\n') prio = WARNING if fd == 2 else INFO for line in data.split('\n'): slog(prio, "[%s] %s" % (tag, line.rstrip('\r\n'))) def process_exited(self): slog(DEBUG, "[%s] process exited" % (self.name)) super().process_exited() self.process.exited() class ShutdownState: Running = 1 Triggered = 2 Completed = 3 Unnecessary = 4 def __init__(self, cmdline, eloop=None, name=None): if eloop is None: eloop = asyncio.get_running_loop() self.__eloop = eloop self.__cmdline = cmdline self.__name = name if name is not None else cmdline[0] self.__transport = None self.__protocol = None self.__proc = None self.__rc = None self.__task = None self.__shutdown = self.ShutdownState.Unnecessary async def __exec(self): def format_cmdline(arr): r = '' for tok in arr: if re.search(' ', tok): r += ' "%s"' % tok continue r += ' ' + tok return r[1:] try: slog(INFO, "Running shell command [{}]: {}".format(self.__name, format_cmdline(self.__cmdline))) self.__transport, self.__protocol = await self.__eloop.subprocess_exec( lambda: self.SubprocessProtocol(self, self.__name), *self.__cmdline, ) self.__proc = self.__transport.get_extra_info('subprocess') # Popen instance except: slog(ERR, "Failed to run process [{}]".format(self.__name)) raise def __reap(self): if self.__rc is None and self.__transport: self.__transport = None self.__rc = self.__proc.wait() # to be called from SubprocessProtocol / SIGCHLD handler def exited(self): slog(DEBUG, "Process {} exited".format(self.__name)) self.__reap() async def __cleanup(self): pid = self.__reap() sd_fine = self.__shutdown in [ self.ShutdownState.Unnecessary, self.ShutdownState.Completed ] if self.__rc == 0 and sd_fine: slog(INFO, "The shell command [{}], pid {}, has exited cleanly".format(self.__name, self.__proc.pid)) self.monitor = self.console = self.__protocol = self.__task = None return 0 slog(ERR, "The process ([{}], pid {}) has exited {}with status code {}, aborting".format( self.__name, pid, "" if sd_fine else "prematurely ", self.__rc)) exit(1) async def init(self): self.__task = await self.__eloop.create_task(self.__exec()) async def cleanup(self): await self.__cleanup() async def run(self): await self.init() await self.cleanup() if __name__ == '__main__': import jwutils.log jwutils.log.set_level(INFO) async def run(): sp = ShellCmd([ 'echo', 'hello world!' ]) await sp.run() asyncio.run(run())