Rev 9255 | Rev 9257 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed
Rev | Author | Line No. | Line |
---|---|---|---|
9249 | Boppan | 1 | # Copyright 2021 KolibriOS Team |
2 | # Copyright 2021 Nekos Team |
||
3 | # Published under MIT License |
||
4 | |||
5 | import io |
||
6 | import os |
||
7 | import subprocess |
||
8 | import timeit |
||
9 | import time |
||
10 | import shlex |
||
11 | import signal |
||
12 | |||
13 | class TestTimeoutException(Exception): |
||
14 | pass |
||
15 | |||
16 | class TestFailureException(Exception): |
||
17 | pass |
||
18 | |||
19 | class Qemu: |
||
20 | def __init__(self, popen): |
||
21 | self.popen = popen |
||
9254 | Boppan | 22 | # Qemu needs time to create debug.log file |
23 | while not os.path.exists("debug.log"): |
||
24 | self.wait() |
||
9255 | Boppan | 25 | self.debug = open("debug.log", "rb") |
26 | self.monitor_in = open("monitor.pipe.in", "wb") |
||
9249 | Boppan | 27 | |
28 | def wait_for_debug_log(self, needle, timeout = 1): |
||
29 | needle = bytes(needle, "utf-8") |
||
30 | start = timeit.default_timer() |
||
31 | log = b"" |
||
32 | |||
33 | # While no timeout, read and search logs |
||
34 | while timeit.default_timer() - start < timeout: |
||
9255 | Boppan | 35 | log += self.debug.read(1) |
9249 | Boppan | 36 | if needle in log: |
37 | return |
||
38 | |||
39 | # We don't have to read whole logs to find the neddle |
||
40 | # If we read len(needle) * 2 bytes of log then we |
||
41 | # already can say that if there's no needle in the data |
||
42 | # then it can't be in first len(needle) bytes of the data |
||
43 | # so first len(needle) bytes of saved logs may be thrown away |
||
44 | # |
||
45 | # So we consume lessser memory and don't search all the previous |
||
46 | # logs every single time |
||
47 | if len(log) > len(needle) * 2: |
||
48 | log = log[len(needle):] |
||
49 | |||
50 | self.timeout() |
||
51 | |||
52 | def kill(self): |
||
53 | os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) |
||
54 | |||
55 | def failure(self): |
||
56 | self.kill() |
||
57 | raise TestFailureException() |
||
58 | |||
59 | def timeout(self): |
||
60 | self.kill() |
||
61 | raise TestTimeoutException() |
||
62 | |||
63 | def wait(self, seconds = 0.25): |
||
64 | time.sleep(seconds) |
||
65 | |||
9255 | Boppan | 66 | def send_keys(self, keys): |
67 | for key in keys.split(): |
||
68 | self.send_monitor_command(f"sendkey {key}") |
||
69 | |||
9256 | Boppan | 70 | def take_screenshot(self, fname): |
71 | self.send_monitor_command(f"screendump {fname}") |
||
72 | |||
9255 | Boppan | 73 | def send_monitor_command(self, command): |
74 | self.monitor_in.write(bytes(command + "\n", "utf-8")) |
||
75 | self.monitor_in.flush() |
||
76 | |||
9249 | Boppan | 77 | def run(): |
9255 | Boppan | 78 | if os.path.exists("debug.log"): |
79 | os.remove("debug.log") |
||
80 | if os.path.exists("monitor.pipe.in"): |
||
81 | os.remove("monitor.pipe.in") |
||
82 | if os.path.exists("monitor.pipe.out"): |
||
83 | os.remove("monitor.pipe.out") |
||
84 | os.system("mkfifo monitor.pipe.in monitor.pipe.out") |
||
85 | s = f"qemu-system-i386 -nographic -monitor pipe:monitor.pipe -debugcon file:debug.log -L . -m 128 -drive format=raw,file=../../kolibri_test.img,index=0,if=floppy -boot a -vga vmware -net nic,model=rtl8139 -net user -soundhw ac97" |
||
9249 | Boppan | 86 | a = shlex.split(s) |
9251 | Boppan | 87 | popen = subprocess.Popen(a, bufsize = 0, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL, start_new_session = True) |
9249 | Boppan | 88 | return Qemu(popen)> |
89 |