Rev 9331 | Rev 9334 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed
Rev | Author | Line No. | Line |
---|---|---|---|
9320 | Boppan | 1 | # Copyright 2021 Magomed Kostoev |
9249 | Boppan | 2 | # Published under MIT License |
3 | |||
4 | import io |
||
5 | import os |
||
9319 | Boppan | 6 | import sys |
9249 | Boppan | 7 | import subprocess |
8 | import timeit |
||
9 | import time |
||
10 | import shlex |
||
11 | import signal |
||
9319 | Boppan | 12 | import shutil |
9316 | Boppan | 13 | from . makeflop import Floppy |
9249 | Boppan | 14 | |
9319 | Boppan | 15 | def is_win32(): |
16 | return True if sys.platform == "win32" else False |
||
17 | |||
18 | def is_linux(): |
||
19 | return True if sys.platform == "linux" or sys.platform == "linux2" else False |
||
20 | |||
21 | def is_osx(): |
||
22 | return True if sys.platform == "darwin" else False |
||
23 | |||
9249 | Boppan | 24 | class TestTimeoutException(Exception): |
25 | pass |
||
26 | |||
27 | class TestFailureException(Exception): |
||
28 | pass |
||
29 | |||
30 | class Qemu: |
||
9331 | Boppan | 31 | def __init__(self, popen, debug_log): |
9249 | Boppan | 32 | self.popen = popen |
9254 | Boppan | 33 | # Qemu needs time to create debug.log file |
9331 | Boppan | 34 | while not os.path.exists(debug_log): |
9254 | Boppan | 35 | self.wait() |
9331 | Boppan | 36 | self.debug = open(debug_log, "rb") |
9249 | Boppan | 37 | |
38 | def wait_for_debug_log(self, needle, timeout = 1): |
||
39 | needle = bytes(needle, "utf-8") |
||
40 | start = timeit.default_timer() |
||
41 | log = b"" |
||
42 | |||
43 | # While no timeout, read and search logs |
||
44 | while timeit.default_timer() - start < timeout: |
||
9255 | Boppan | 45 | log += self.debug.read(1) |
9249 | Boppan | 46 | if needle in log: |
47 | return |
||
48 | |||
49 | # We don't have to read whole logs to find the neddle |
||
50 | # If we read len(needle) * 2 bytes of log then we |
||
51 | # already can say that if there's no needle in the data |
||
52 | # then it can't be in first len(needle) bytes of the data |
||
53 | # so first len(needle) bytes of saved logs may be thrown away |
||
54 | # |
||
55 | # So we consume lessser memory and don't search all the previous |
||
56 | # logs every single time |
||
57 | if len(log) > len(needle) * 2: |
||
58 | log = log[len(needle):] |
||
59 | |||
60 | self.timeout() |
||
61 | |||
62 | def kill(self): |
||
9319 | Boppan | 63 | if is_win32(): |
64 | # FIXME: This is shit, isn't there anything better? |
||
65 | subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL) |
||
66 | else: |
||
67 | os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) |
||
9249 | Boppan | 68 | |
69 | def failure(self): |
||
70 | self.kill() |
||
71 | raise TestFailureException() |
||
72 | |||
73 | def timeout(self): |
||
74 | self.kill() |
||
75 | raise TestTimeoutException() |
||
76 | |||
77 | def wait(self, seconds = 0.25): |
||
78 | time.sleep(seconds) |
||
79 | |||
9319 | Boppan | 80 | def get_file_directory(path): |
81 | path = path.replace("\\", "/") |
||
82 | if "/" in path: |
||
83 | folder = "/".join(path.split("/")[:-1]) |
||
84 | if folder == "": |
||
85 | return "/" # It was a file in the root folder |
||
86 | return folder |
||
87 | else: |
||
88 | return "." # Just a filename, let's return current folder |
||
9255 | Boppan | 89 | |
9331 | Boppan | 90 | def run_qemu(root_dir, test_dir, debug_log): |
9332 | Boppan | 91 | # Copy IMG to make local copy, so we will be able to run the test in parallel |
92 | shutil.copyfile(f"{root_dir}/kolibri_test.img", f"{test_dir}/kolibri_test.img") |
||
9319 | Boppan | 93 | qemu_command = f"qemu-system-i386" |
94 | flags = "" |
||
95 | flags += "-nographic " # Makes it faster |
||
9331 | Boppan | 96 | flags += f"-debugcon file:{debug_log} " # 0xe9 port output |
9319 | Boppan | 97 | flags += "-L . " # IDK why it does not work without this |
98 | flags += "-m 128 " |
||
9332 | Boppan | 99 | flags += f"-drive format=raw,file={test_dir}/kolibri_test.img,index=0,if=floppy -boot a " |
9319 | Boppan | 100 | flags += "-vga vmware " |
101 | flags += "-net nic,model=rtl8139 -net user " |
||
102 | flags += "-soundhw ac97 " |
||
103 | if is_win32(): |
||
104 | qemu_full_path = shutil.which(qemu_command) |
||
105 | qemu_directory = get_file_directory(qemu_full_path) |
||
106 | flags += f"-L {qemu_directory} " |
||
107 | s = f"{qemu_command} {flags}" |
||
9331 | Boppan | 108 | qemu_stdout = open(f"{test_dir}/qemu_stdout.log", "w") |
109 | qemu_stderr = open(f"{test_dir}/qemu_stderr.log", "w") |
||
9319 | Boppan | 110 | if is_win32(): |
9324 | Boppan | 111 | return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True) |
9319 | Boppan | 112 | else: |
113 | a = shlex.split(s) |
||
9324 | Boppan | 114 | return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True) |
9256 | Boppan | 115 | |
9331 | Boppan | 116 | def run(root_dir, test_dir): |
117 | debug_log = f"{test_dir}/debug.log" |
||
118 | if os.path.exists(debug_log): |
||
119 | os.remove(debug_log) |
||
120 | popen = run_qemu(root_dir, test_dir, debug_log) |
||
121 | return Qemu(popen, debug_log)> |
||
9249 | Boppan | 122 |