Rev 9369 | Rev 9920 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed
Rev | Author | Line No. | Line |
---|---|---|---|
9320 | Boppan | 1 | # Copyright 2021 Magomed Kostoev |
9249 | Boppan | 2 | # Published under MIT License |
3 | |||
4 | import io |
||
5 | import os |
||
9319 | Boppan | 6 | import sys |
9249 | Boppan | 7 | import subprocess |
8 | import timeit |
||
9 | import time |
||
10 | import shlex |
||
11 | import signal |
||
9319 | Boppan | 12 | import shutil |
9316 | Boppan | 13 | from . makeflop import Floppy |
9249 | Boppan | 14 | |
9319 | Boppan | 15 | def is_win32(): |
16 | return True if sys.platform == "win32" else False |
||
17 | |||
18 | def is_linux(): |
||
19 | return True if sys.platform == "linux" or sys.platform == "linux2" else False |
||
20 | |||
21 | def is_osx(): |
||
22 | return True if sys.platform == "darwin" else False |
||
23 | |||
9249 | Boppan | 24 | class TestTimeoutException(Exception): |
25 | pass |
||
26 | |||
27 | class TestFailureException(Exception): |
||
9919 | Boppan | 28 | def __init__(self, message): |
29 | self.message = message |
||
9249 | Boppan | 30 | |
31 | class Qemu: |
||
9331 | Boppan | 32 | def __init__(self, popen, debug_log): |
9249 | Boppan | 33 | self.popen = popen |
9254 | Boppan | 34 | # Qemu needs time to create debug.log file |
9919 | Boppan | 35 | while not os.path.exists(debug_log) and self.qemu_is_alive(): |
36 | self.wait(0.250) |
||
37 | self.assert_qemu_not_died("waiting for the debug log file") |
||
9331 | Boppan | 38 | self.debug = open(debug_log, "rb") |
9249 | Boppan | 39 | |
9919 | Boppan | 40 | def qemu_is_alive(self): |
41 | return self.popen.poll() == None |
||
42 | |||
43 | def assert_qemu_not_died(self, while_): |
||
44 | if not self.qemu_is_alive(): |
||
45 | raise TestFailureException(f"Qemu has finished while {while_}.") |
||
46 | |||
9249 | Boppan | 47 | def wait_for_debug_log(self, needle, timeout = 1): |
48 | needle = bytes(needle, "utf-8") |
||
49 | start = timeit.default_timer() |
||
50 | log = b"" |
||
51 | |||
52 | # While no timeout, read and search logs |
||
53 | while timeit.default_timer() - start < timeout: |
||
9919 | Boppan | 54 | # TODO: Non-blocking read. |
9255 | Boppan | 55 | log += self.debug.read(1) |
9249 | Boppan | 56 | if needle in log: |
57 | return |
||
58 | |||
59 | # We don't have to read whole logs to find the neddle |
||
60 | # If we read len(needle) * 2 bytes of log then we |
||
61 | # already can say that if there's no needle in the data |
||
62 | # then it can't be in first len(needle) bytes of the data |
||
63 | # so first len(needle) bytes of saved logs may be thrown away |
||
64 | # |
||
65 | # So we consume lessser memory and don't search all the previous |
||
66 | # logs every single time |
||
67 | if len(log) > len(needle) * 2: |
||
68 | log = log[len(needle):] |
||
69 | |||
9919 | Boppan | 70 | self.assert_qemu_not_died("waiting for the debug log") |
71 | |||
9249 | Boppan | 72 | self.timeout() |
73 | |||
74 | def kill(self): |
||
9319 | Boppan | 75 | if is_win32(): |
76 | # FIXME: This is shit, isn't there anything better? |
||
77 | subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL) |
||
78 | else: |
||
79 | os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) |
||
9249 | Boppan | 80 | |
81 | def timeout(self): |
||
82 | self.kill() |
||
83 | raise TestTimeoutException() |
||
84 | |||
9919 | Boppan | 85 | def wait(self, seconds): |
9249 | Boppan | 86 | time.sleep(seconds) |
87 | |||
9331 | Boppan | 88 | def run_qemu(root_dir, test_dir, debug_log): |
9334 | Boppan | 89 | # Make local copy of IMG, so we will be able to run the test in parallel |
9342 | Boppan | 90 | if os.path.exists(f"{test_dir}/kolibri_test.img"): # If previous test run interrupted the file may be busy |
91 | os.remove(f"{test_dir}/kolibri_test.img") |
||
9332 | Boppan | 92 | shutil.copyfile(f"{root_dir}/kolibri_test.img", f"{test_dir}/kolibri_test.img") |
9319 | Boppan | 93 | qemu_command = f"qemu-system-i386" |
94 | flags = "" |
||
95 | flags += "-nographic " # Makes it faster |
||
9331 | Boppan | 96 | flags += f"-debugcon file:{debug_log} " # 0xe9 port output |
9319 | Boppan | 97 | flags += "-L . " # IDK why it does not work without this |
98 | flags += "-m 128 " |
||
9332 | Boppan | 99 | flags += f"-drive format=raw,file={test_dir}/kolibri_test.img,index=0,if=floppy -boot a " |
9319 | Boppan | 100 | flags += "-vga vmware " |
101 | flags += "-net nic,model=rtl8139 -net user " |
||
102 | flags += "-soundhw ac97 " |
||
103 | if is_win32(): |
||
104 | qemu_full_path = shutil.which(qemu_command) |
||
9369 | Boppan | 105 | qemu_directory = os.path.dirname(qemu_full_path) |
9319 | Boppan | 106 | flags += f"-L {qemu_directory} " |
107 | s = f"{qemu_command} {flags}" |
||
9331 | Boppan | 108 | qemu_stdout = open(f"{test_dir}/qemu_stdout.log", "w") |
109 | qemu_stderr = open(f"{test_dir}/qemu_stderr.log", "w") |
||
9319 | Boppan | 110 | if is_win32(): |
9324 | Boppan | 111 | return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True) |
9319 | Boppan | 112 | else: |
113 | a = shlex.split(s) |
||
9324 | Boppan | 114 | return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True) |
9256 | Boppan | 115 | |
9331 | Boppan | 116 | def run(root_dir, test_dir): |
117 | debug_log = f"{test_dir}/debug.log" |
||
118 | if os.path.exists(debug_log): |
||
119 | os.remove(debug_log) |
||
120 | popen = run_qemu(root_dir, test_dir, debug_log) |
||
121 | return Qemu(popen, debug_log)> |
||
9249 | Boppan | 122 |