25,16 → 25,25 |
pass |
|
class TestFailureException(Exception): |
pass |
def __init__(self, message): |
self.message = message |
|
class Qemu: |
def __init__(self, popen, debug_log): |
self.popen = popen |
# Qemu needs time to create debug.log file |
while not os.path.exists(debug_log): |
self.wait() |
while not os.path.exists(debug_log) and self.qemu_is_alive(): |
self.wait(0.250) |
self.assert_qemu_not_died("waiting for the debug log file") |
self.debug = open(debug_log, "rb") |
|
def qemu_is_alive(self): |
return self.popen.poll() == None |
|
def assert_qemu_not_died(self, while_): |
if not self.qemu_is_alive(): |
raise TestFailureException(f"Qemu has finished while {while_}.") |
|
def wait_for_debug_log(self, needle, timeout = 1): |
needle = bytes(needle, "utf-8") |
start = timeit.default_timer() |
42,6 → 51,7 |
|
# While no timeout, read and search logs |
while timeit.default_timer() - start < timeout: |
# TODO: Non-blocking read. |
log += self.debug.read(1) |
if needle in log: |
return |
57,6 → 67,8 |
if len(log) > len(needle) * 2: |
log = log[len(needle):] |
|
self.assert_qemu_not_died("waiting for the debug log") |
|
self.timeout() |
|
def kill(self): |
66,15 → 78,11 |
else: |
os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) |
|
def failure(self): |
self.kill() |
raise TestFailureException() |
|
def timeout(self): |
self.kill() |
raise TestTimeoutException() |
|
def wait(self, seconds = 0.25): |
def wait(self, seconds): |
time.sleep(seconds) |
|
def run_qemu(root_dir, test_dir, debug_log): |