jw-devtest/src/python/devtest/os/be/shellcmd/Machine.py

86 lines
2.6 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import asyncio
from jwutils.log import *
from jwutils.asyncio import ShellCmd
from ...Machine import Machine as MachineBase
class Machine(MachineBase): # export
def __init__(self, env):
super().__init__(env)
self.__clear_for_tests = True
self.__running = False
self.__shutdown_requested = False
self.__cmds = {
'request-shutdown': "jw-switch-allesnix.sh off",
'wait-power-off' : "sleep 2",
'request-power-on': "jw-switch-allesnix.sh on",
'cleanup' : None
}
async def __run(self, phase):
cmd = None
if phase in self.__cmds.keys():
cmd = self.__cmds[phase]
if cmd is None:
slog(INFO, 'No command registered for phase "{}", not running'.format(phase))
return
if isinstance(cmd, str):
cmd = cmd.split(' ')
sc = ShellCmd(cmd)
await sc.run()
# ---- reimplementation of class API methods
async def init(self):
pass
#self.console = ConnSerial(self.env, spec='path=/dev/ttyS0')
#self.__task = await self.env.eloop.create_task(self.__exec_qemu(self.env))
async def cleanup(self, env):
await self.__run('cleanup')
def clear_for_tests(self):
return self.__clear_for_tests
def register_connection(self, env, info):
return super().register_connection(env, info)
async def unregister_connection(self, conn):
return await super().unregister_connection(conn)
async def request_power_on(self, env):
if self.__running:
raise Exception("Tried to power on a running shell command machine")
slog(NOTICE, "switching on CPU")
await self.__run('request-power-on')
slog(NOTICE, "switched on CPU")
#await asyncio.sleep(1)
self.__running = True
self.__clear_for_tests = True
async def wait_up(self, env):
count = 5
while not self.__running and count > 0:
await asyncio.sleep(1)
count -= 1
return self.__running
async def request_shutdown(self, env):
if not self.__shutdown_requested:
slog(NOTICE, "requesting shutdown")
await self.__run('request-shutdown')
self.__shutdown_requested = True
await asyncio.sleep(1)
self.__running = False
#self.__clear_for_tests = False
async def wait_poweroff(self, env):
slog(NOTICE, "waiting on powerdown")
count = 5
while self.__running and count > 0:
asyncio.sleep(1)
count -= 1
return self.__running