Subversion Repositories Kolibri OS

Rev

Rev 9919 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

  1. # Copyright 2021 Magomed Kostoev
  2. # Published under MIT License
  3.  
  4. import io
  5. import os
  6. import sys
  7. import subprocess
  8. import timeit
  9. import time
  10. import shlex
  11. import signal
  12. import shutil
  13. from . makeflop import Floppy
  14.  
  15. def is_win32():
  16.     return True if sys.platform == "win32" else False
  17.  
  18. def is_linux():
  19.     return True if sys.platform == "linux" or sys.platform == "linux2" else False
  20.  
  21. def  is_osx():
  22.     return True if sys.platform == "darwin" else False
  23.  
  24. class TestException(Exception):
  25.     def __init__(self, error_kind, message, qemu_cmd):
  26.         self.error_kind = error_kind
  27.         super().__init__(message)
  28.         self.qemu_cmd = qemu_cmd
  29.  
  30.     def kind(self):
  31.         return self.error_kind
  32.  
  33.     def cmd(self):
  34.         return self.qemu_cmd
  35.  
  36. class TestException_Timeout(TestException):
  37.     def __init__(self, message, qemu_cmd):
  38.         super().__init__("TIMEOUT", message, qemu_cmd)
  39.  
  40. class TestException_Failure(TestException):
  41.     def __init__(self, message, qemu_cmd):
  42.         super().__init__("FAILURE", message, qemu_cmd)
  43.  
  44. class Qemu:
  45.     def __init__(self, popen, debug_log):
  46.         self.popen = popen
  47.         # Qemu needs time to create debug.log file
  48.         while not os.path.exists(debug_log) and self.qemu_is_alive():
  49.             self.wait(0.250)
  50.         self.assert_qemu_not_died("waiting for the debug log file creation")
  51.         self.debug = open(debug_log, "rb")
  52.  
  53.     def qemu_is_alive(self):
  54.         return self.popen.poll() == None
  55.  
  56.     def assert_qemu_not_died(self, while_):
  57.         if not self.qemu_is_alive():
  58.             raise TestException_Failure(f"Qemu has finished while {while_}.", ' '.join(self.popen.args))
  59.  
  60.     def wait_for_debug_log(self, needle, timeout = 1):
  61.         needle = bytes(needle, "utf-8")
  62.         start = timeit.default_timer()
  63.         log = b""
  64.  
  65.         # While no timeout, read and search logs
  66.         while timeit.default_timer() - start < timeout:
  67.             # TODO: Non-blocking read.
  68.             log += self.debug.read(1)
  69.             if needle in log:
  70.                 return
  71.  
  72.             # We don't have to read whole logs to find the neddle
  73.             # If we read len(needle) * 2 bytes of log then we
  74.             # already can say that if there's no needle in the data
  75.             # then it can't be in first len(needle) bytes of the data
  76.             # so first len(needle) bytes of saved logs may be thrown away
  77.             #
  78.             # So we consume lessser memory and don't search all the previous
  79.             # logs every single time
  80.             if len(log) > len(needle) * 2:
  81.                 log = log[len(needle):]
  82.  
  83.             self.assert_qemu_not_died("waiting for the debug log")
  84.  
  85.         self.timeout("waiting for the debug log")
  86.  
  87.     def kill(self):
  88.         if is_win32():
  89.             # FIXME: This is shit, isn't there anything better?
  90.             subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL)
  91.         else:
  92.             os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
  93.  
  94.     def timeout(self, while_):
  95.         self.kill()
  96.         raise TestException_Timeout(while_, ' '.join(self.popen.args))
  97.  
  98.     def wait(self, seconds):
  99.         time.sleep(seconds)
  100.  
  101. def run_qemu(root_dir, test_dir, debug_log):
  102.     # Make local copy of IMG, so we will be able to run the test in parallel
  103.     if os.path.exists(f"{test_dir}/kolibri_test.img"): # If previous test run interrupted the file may be busy
  104.         os.remove(f"{test_dir}/kolibri_test.img")
  105.     shutil.copyfile(f"{root_dir}/kolibri_test.img", f"{test_dir}/kolibri_test.img")
  106.     qemu_command = f"qemu-system-i386"
  107.     flags = ""
  108.     flags += "-nographic " # Makes it faster
  109.     flags += f"-debugcon file:{debug_log} " # 0xe9 port output
  110.     flags += "-L . " # IDK why it does not work without this
  111.     flags += "-m 128 "
  112.     flags += f"-drive format=raw,file={test_dir}/kolibri_test.img,index=0,if=floppy -boot a "
  113.     flags += "-vga vmware "
  114.     flags += "-net nic,model=rtl8139 -net user "
  115.     flags += "-soundhw ac97 "
  116.     if is_win32():
  117.         qemu_full_path = shutil.which(qemu_command)
  118.         qemu_directory = os.path.dirname(qemu_full_path)
  119.         flags += f"-L {qemu_directory} "
  120.     s = f"{qemu_command} {flags}"
  121.     qemu_stdout = open(f"{test_dir}/qemu_stdout.log", "w")
  122.     qemu_stderr = open(f"{test_dir}/qemu_stderr.log", "w")
  123.     if is_win32():
  124.         return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True)
  125.     else:
  126.         a = shlex.split(s)
  127.         return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True)
  128.  
  129. def run(root_dir, test_dir):
  130.     debug_log = f"{test_dir}/debug.log"
  131.     if os.path.exists(debug_log):
  132.         os.remove(debug_log)
  133.     popen = run_qemu(root_dir, test_dir, debug_log)
  134.     return Qemu(popen, debug_log)
  135.  
  136.