21,13 → 21,26 |
def is_osx(): |
return True if sys.platform == "darwin" else False |
|
class TestTimeoutException(Exception): |
pass |
class TestException(Exception): |
def __init__(self, error_kind, message, qemu_cmd): |
self.error_kind = error_kind |
super().__init__(message) |
self.qemu_cmd = qemu_cmd |
|
class TestFailureException(Exception): |
def __init__(self, message): |
self.message = message |
def kind(self): |
return self.error_kind |
|
def cmd(self): |
return self.qemu_cmd |
|
class TestException_Timeout(TestException): |
def __init__(self, message, qemu_cmd): |
super().__init__("TIMEOUT", message, qemu_cmd) |
|
class TestException_Failure(TestException): |
def __init__(self, message, qemu_cmd): |
super().__init__("FAILURE", message, qemu_cmd) |
|
class Qemu: |
def __init__(self, popen, debug_log): |
self.popen = popen |
34,7 → 47,7 |
# Qemu needs time to create debug.log file |
while not os.path.exists(debug_log) and self.qemu_is_alive(): |
self.wait(0.250) |
self.assert_qemu_not_died("waiting for the debug log file") |
self.assert_qemu_not_died("waiting for the debug log file creation") |
self.debug = open(debug_log, "rb") |
|
def qemu_is_alive(self): |
42,7 → 55,7 |
|
def assert_qemu_not_died(self, while_): |
if not self.qemu_is_alive(): |
raise TestFailureException(f"Qemu has finished while {while_}.") |
raise TestException_Failure(f"Qemu has finished while {while_}.", ' '.join(self.popen.args)) |
|
def wait_for_debug_log(self, needle, timeout = 1): |
needle = bytes(needle, "utf-8") |
69,7 → 82,7 |
|
self.assert_qemu_not_died("waiting for the debug log") |
|
self.timeout() |
self.timeout("waiting for the debug log") |
|
def kill(self): |
if is_win32(): |
78,9 → 91,9 |
else: |
os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) |
|
def timeout(self): |
def timeout(self, while_): |
self.kill() |
raise TestTimeoutException() |
raise TestException_Timeout(while_, ' '.join(self.popen.args)) |
|
def wait(self, seconds): |
time.sleep(seconds) |