mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-devtest
synced 2026-01-15 10:23:32 +01:00
86 lines
2.6 KiB
Python
86 lines
2.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import asyncio
|
|
|
|
from jwutils.log import *
|
|
from jwutils.asyncio import ShellCmd
|
|
from ...Machine import Machine as MachineBase
|
|
|
|
class Machine(MachineBase): # export
|
|
|
|
def __init__(self, env):
|
|
super().__init__(env)
|
|
self.__clear_for_tests = True
|
|
self.__running = False
|
|
self.__shutdown_requested = False
|
|
self.__cmds = {
|
|
'request-shutdown': "jw-switch-allesnix.sh off",
|
|
'wait-power-off' : "sleep 2",
|
|
'request-power-on': "jw-switch-allesnix.sh on",
|
|
'cleanup' : None
|
|
}
|
|
|
|
async def __run(self, phase):
|
|
cmd = None
|
|
if phase in self.__cmds.keys():
|
|
cmd = self.__cmds[phase]
|
|
if cmd is None:
|
|
slog(INFO, 'No command registered for phase "{}", not running'.format(phase))
|
|
return
|
|
if isinstance(cmd, str):
|
|
cmd = cmd.split(' ')
|
|
sc = ShellCmd(cmd)
|
|
await sc.run()
|
|
|
|
# ---- reimplementation of class API methods
|
|
|
|
async def init(self):
|
|
pass
|
|
#self.console = ConnSerial(self.env, spec='path=/dev/ttyS0')
|
|
#self.__task = await self.env.eloop.create_task(self.__exec_qemu(self.env))
|
|
|
|
async def cleanup(self, env):
|
|
await self.__run('cleanup')
|
|
|
|
def clear_for_tests(self):
|
|
return self.__clear_for_tests
|
|
|
|
def register_connection(self, env, info):
|
|
return super().register_connection(env, info)
|
|
|
|
async def unregister_connection(self, conn):
|
|
return await super().unregister_connection(conn)
|
|
|
|
async def request_power_on(self, env):
|
|
if self.__running:
|
|
raise Exception("Tried to power on a running shell command machine")
|
|
slog(NOTICE, "switching on DUT")
|
|
await self.__run('request-power-on')
|
|
slog(NOTICE, "switched on DUT")
|
|
#await asyncio.sleep(1)
|
|
self.__running = True
|
|
self.__clear_for_tests = True
|
|
|
|
async def wait_up(self, env):
|
|
count = 5
|
|
while not self.__running and count > 0:
|
|
await asyncio.sleep(1)
|
|
count -= 1
|
|
return self.__running
|
|
|
|
async def request_shutdown(self, env):
|
|
if not self.__shutdown_requested:
|
|
slog(NOTICE, "requesting shutdown")
|
|
await self.__run('request-shutdown')
|
|
self.__shutdown_requested = True
|
|
await asyncio.sleep(1)
|
|
self.__running = False
|
|
#self.__clear_for_tests = False
|
|
|
|
async def wait_poweroff(self, env):
|
|
slog(NOTICE, "waiting on powerdown")
|
|
count = 5
|
|
while self.__running and count > 0:
|
|
asyncio.sleep(1)
|
|
count -= 1
|
|
return self.__running
|