Subversion Repositories Kolibri OS

Rev

Rev 9369 | Rev 9920 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

  1. # Copyright 2021 Magomed Kostoev
  2. # Published under MIT License
  3.  
  4. import io
  5. import os
  6. import sys
  7. import subprocess
  8. import timeit
  9. import time
  10. import shlex
  11. import signal
  12. import shutil
  13. from . makeflop import Floppy
  14.  
  15. def is_win32():
  16.     return True if sys.platform == "win32" else False
  17.  
  18. def is_linux():
  19.     return True if sys.platform == "linux" or sys.platform == "linux2" else False
  20.  
  21. def  is_osx():
  22.     return True if sys.platform == "darwin" else False
  23.  
  24. class TestTimeoutException(Exception):
  25.     pass
  26.  
  27. class TestFailureException(Exception):
  28.     def __init__(self, message):
  29.         self.message = message
  30.  
  31. class Qemu:
  32.     def __init__(self, popen, debug_log):
  33.         self.popen = popen
  34.         # Qemu needs time to create debug.log file
  35.         while not os.path.exists(debug_log) and self.qemu_is_alive():
  36.             self.wait(0.250)
  37.         self.assert_qemu_not_died("waiting for the debug log file")
  38.         self.debug = open(debug_log, "rb")
  39.  
  40.     def qemu_is_alive(self):
  41.         return self.popen.poll() == None
  42.  
  43.     def assert_qemu_not_died(self, while_):
  44.         if not self.qemu_is_alive():
  45.             raise TestFailureException(f"Qemu has finished while {while_}.")
  46.  
  47.     def wait_for_debug_log(self, needle, timeout = 1):
  48.         needle = bytes(needle, "utf-8")
  49.         start = timeit.default_timer()
  50.         log = b""
  51.  
  52.         # While no timeout, read and search logs
  53.         while timeit.default_timer() - start < timeout:
  54.             # TODO: Non-blocking read.
  55.             log += self.debug.read(1)
  56.             if needle in log:
  57.                 return
  58.  
  59.             # We don't have to read whole logs to find the neddle
  60.             # If we read len(needle) * 2 bytes of log then we
  61.             # already can say that if there's no needle in the data
  62.             # then it can't be in first len(needle) bytes of the data
  63.             # so first len(needle) bytes of saved logs may be thrown away
  64.             #
  65.             # So we consume lessser memory and don't search all the previous
  66.             # logs every single time
  67.             if len(log) > len(needle) * 2:
  68.                 log = log[len(needle):]
  69.  
  70.             self.assert_qemu_not_died("waiting for the debug log")
  71.  
  72.         self.timeout()
  73.  
  74.     def kill(self):
  75.         if is_win32():
  76.             # FIXME: This is shit, isn't there anything better?
  77.             subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL)
  78.         else:
  79.             os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
  80.  
  81.     def timeout(self):
  82.         self.kill()
  83.         raise TestTimeoutException()
  84.  
  85.     def wait(self, seconds):
  86.         time.sleep(seconds)
  87.  
  88. def run_qemu(root_dir, test_dir, debug_log):
  89.     # Make local copy of IMG, so we will be able to run the test in parallel
  90.     if os.path.exists(f"{test_dir}/kolibri_test.img"): # If previous test run interrupted the file may be busy
  91.         os.remove(f"{test_dir}/kolibri_test.img")
  92.     shutil.copyfile(f"{root_dir}/kolibri_test.img", f"{test_dir}/kolibri_test.img")
  93.     qemu_command = f"qemu-system-i386"
  94.     flags = ""
  95.     flags += "-nographic " # Makes it faster
  96.     flags += f"-debugcon file:{debug_log} " # 0xe9 port output
  97.     flags += "-L . " # IDK why it does not work without this
  98.     flags += "-m 128 "
  99.     flags += f"-drive format=raw,file={test_dir}/kolibri_test.img,index=0,if=floppy -boot a "
  100.     flags += "-vga vmware "
  101.     flags += "-net nic,model=rtl8139 -net user "
  102.     flags += "-soundhw ac97 "
  103.     if is_win32():
  104.         qemu_full_path = shutil.which(qemu_command)
  105.         qemu_directory = os.path.dirname(qemu_full_path)
  106.         flags += f"-L {qemu_directory} "
  107.     s = f"{qemu_command} {flags}"
  108.     qemu_stdout = open(f"{test_dir}/qemu_stdout.log", "w")
  109.     qemu_stderr = open(f"{test_dir}/qemu_stderr.log", "w")
  110.     if is_win32():
  111.         return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True)
  112.     else:
  113.         a = shlex.split(s)
  114.         return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True)
  115.  
  116. def run(root_dir, test_dir):
  117.     debug_log = f"{test_dir}/debug.log"
  118.     if os.path.exists(debug_log):
  119.         os.remove(debug_log)
  120.     popen = run_qemu(root_dir, test_dir, debug_log)
  121.     return Qemu(popen, debug_log)
  122.  
  123.