Subversion Repositories Kolibri OS

Rev

Rev 9320 | Rev 9331 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
9320 Boppan 1
# Copyright 2021 Magomed Kostoev
9249 Boppan 2
# Published under MIT License
3
 
4
import io
5
import os
9319 Boppan 6
import sys
9249 Boppan 7
import subprocess
8
import timeit
9
import time
10
import shlex
11
import signal
9319 Boppan 12
import shutil
9316 Boppan 13
from . makeflop import Floppy
9249 Boppan 14
 
9319 Boppan 15
def is_win32():
16
    return True if sys.platform == "win32" else False
17
 
18
def is_linux():
19
    return True if sys.platform == "linux" or sys.platform == "linux2" else False
20
 
21
def  is_osx():
22
    return True if sys.platform == "darwin" else False
23
 
9249 Boppan 24
class TestTimeoutException(Exception):
25
    pass
26
 
27
class TestFailureException(Exception):
28
    pass
29
 
30
class Qemu:
31
    def __init__(self, popen):
32
        self.popen = popen
9254 Boppan 33
        # Qemu needs time to create debug.log file
34
        while not os.path.exists("debug.log"):
35
            self.wait()
9255 Boppan 36
        self.debug = open("debug.log", "rb")
9249 Boppan 37
 
38
    def wait_for_debug_log(self, needle, timeout = 1):
39
        needle = bytes(needle, "utf-8")
40
        start = timeit.default_timer()
41
        log = b""
42
 
43
        # While no timeout, read and search logs
44
        while timeit.default_timer() - start < timeout:
9255 Boppan 45
            log += self.debug.read(1)
9249 Boppan 46
            if needle in log:
47
                return
48
 
49
            # We don't have to read whole logs to find the neddle
50
            # If we read len(needle) * 2 bytes of log then we
51
            # already can say that if there's no needle in the data
52
            # then it can't be in first len(needle) bytes of the data
53
            # so first len(needle) bytes of saved logs may be thrown away
54
            #
55
            # So we consume lessser memory and don't search all the previous
56
            # logs every single time
57
            if len(log) > len(needle) * 2:
58
                log = log[len(needle):]
59
 
60
        self.timeout()
61
 
62
    def kill(self):
9319 Boppan 63
        if is_win32():
64
            # FIXME: This is shit, isn't there anything better?
65
            subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL)
66
        else:
67
            os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
9249 Boppan 68
 
69
    def failure(self):
70
        self.kill()
71
        raise TestFailureException()
72
 
73
    def timeout(self):
74
        self.kill()
75
        raise TestTimeoutException()
76
 
77
    def wait(self, seconds = 0.25):
78
        time.sleep(seconds)
79
 
9319 Boppan 80
def get_file_directory(path):
81
    path = path.replace("\\", "/")
82
    if "/" in path:
83
        folder = "/".join(path.split("/")[:-1])
84
        if folder == "":
85
            return "/" # It was a file in the root folder
86
        return folder
87
    else:
88
        return "." # Just a filename, let's return current folder
9255 Boppan 89
 
9319 Boppan 90
def run_qemu():
91
    qemu_command = f"qemu-system-i386"
92
    flags = ""
93
    flags += "-nographic " # Makes it faster
94
    flags += "-debugcon file:debug.log " # 0xe9 port output
95
    flags += "-L . " # IDK why it does not work without this
96
    flags += "-m 128 "
97
    flags += "-drive format=raw,file=../../kolibri_test.img,index=0,if=floppy -boot a "
98
    flags += "-vga vmware "
99
    flags += "-net nic,model=rtl8139 -net user "
100
    flags += "-soundhw ac97 "
101
    if is_win32():
102
        qemu_full_path = shutil.which(qemu_command)
103
        qemu_directory = get_file_directory(qemu_full_path)
104
        flags += f"-L {qemu_directory} "
105
    s = f"{qemu_command} {flags}"
9324 Boppan 106
    qemu_stdout = open("qemu_stdout.log", "w")
107
    qemu_stderr = open("qemu_stderr.log", "w")
9319 Boppan 108
    if is_win32():
9324 Boppan 109
        return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True)
9319 Boppan 110
    else:
111
        a = shlex.split(s)
9324 Boppan 112
        return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True)
9256 Boppan 113
 
9249 Boppan 114
def run():
9255 Boppan 115
    if os.path.exists("debug.log"):
116
        os.remove("debug.log")
9319 Boppan 117
    popen = run_qemu()
9249 Boppan 118
    return Qemu(popen)
119