Subversion Repositories Kolibri OS

Rev

Rev 9334 | Rev 9369 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

  1. # Copyright 2021 Magomed Kostoev
  2. # Published under MIT License
  3.  
  4. import io
  5. import os
  6. import sys
  7. import subprocess
  8. import timeit
  9. import time
  10. import shlex
  11. import signal
  12. import shutil
  13. from . makeflop import Floppy
  14.  
  15. def is_win32():
  16.     return True if sys.platform == "win32" else False
  17.  
  18. def is_linux():
  19.     return True if sys.platform == "linux" or sys.platform == "linux2" else False
  20.  
  21. def  is_osx():
  22.     return True if sys.platform == "darwin" else False
  23.  
  24. class TestTimeoutException(Exception):
  25.     pass
  26.  
  27. class TestFailureException(Exception):
  28.     pass
  29.  
  30. class Qemu:
  31.     def __init__(self, popen, debug_log):
  32.         self.popen = popen
  33.         # Qemu needs time to create debug.log file
  34.         while not os.path.exists(debug_log):
  35.             self.wait()
  36.         self.debug = open(debug_log, "rb")
  37.  
  38.     def wait_for_debug_log(self, needle, timeout = 1):
  39.         needle = bytes(needle, "utf-8")
  40.         start = timeit.default_timer()
  41.         log = b""
  42.  
  43.         # While no timeout, read and search logs
  44.         while timeit.default_timer() - start < timeout:
  45.             log += self.debug.read(1)
  46.             if needle in log:
  47.                 return
  48.  
  49.             # We don't have to read whole logs to find the neddle
  50.             # If we read len(needle) * 2 bytes of log then we
  51.             # already can say that if there's no needle in the data
  52.             # then it can't be in first len(needle) bytes of the data
  53.             # so first len(needle) bytes of saved logs may be thrown away
  54.             #
  55.             # So we consume lessser memory and don't search all the previous
  56.             # logs every single time
  57.             if len(log) > len(needle) * 2:
  58.                 log = log[len(needle):]
  59.  
  60.         self.timeout()
  61.  
  62.     def kill(self):
  63.         if is_win32():
  64.             # FIXME: This is shit, isn't there anything better?
  65.             subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL)
  66.         else:
  67.             os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
  68.  
  69.     def failure(self):
  70.         self.kill()
  71.         raise TestFailureException()
  72.  
  73.     def timeout(self):
  74.         self.kill()
  75.         raise TestTimeoutException()
  76.  
  77.     def wait(self, seconds = 0.25):
  78.         time.sleep(seconds)
  79.  
  80. def get_file_directory(path):
  81.     path = path.replace("\\", "/")
  82.     if "/" in path:
  83.         folder = "/".join(path.split("/")[:-1])
  84.         if folder == "":
  85.             return "/" # It was a file in the root folder
  86.         return folder
  87.     else:
  88.         return "." # Just a filename, let's return current folder
  89.  
  90. def run_qemu(root_dir, test_dir, debug_log):
  91.     # Make local copy of IMG, so we will be able to run the test in parallel
  92.     if os.path.exists(f"{test_dir}/kolibri_test.img"): # If previous test run interrupted the file may be busy
  93.         os.remove(f"{test_dir}/kolibri_test.img")
  94.     shutil.copyfile(f"{root_dir}/kolibri_test.img", f"{test_dir}/kolibri_test.img")
  95.     qemu_command = f"qemu-system-i386"
  96.     flags = ""
  97.     flags += "-nographic " # Makes it faster
  98.     flags += f"-debugcon file:{debug_log} " # 0xe9 port output
  99.     flags += "-L . " # IDK why it does not work without this
  100.     flags += "-m 128 "
  101.     flags += f"-drive format=raw,file={test_dir}/kolibri_test.img,index=0,if=floppy -boot a "
  102.     flags += "-vga vmware "
  103.     flags += "-net nic,model=rtl8139 -net user "
  104.     flags += "-soundhw ac97 "
  105.     if is_win32():
  106.         qemu_full_path = shutil.which(qemu_command)
  107.         qemu_directory = get_file_directory(qemu_full_path)
  108.         flags += f"-L {qemu_directory} "
  109.     s = f"{qemu_command} {flags}"
  110.     qemu_stdout = open(f"{test_dir}/qemu_stdout.log", "w")
  111.     qemu_stderr = open(f"{test_dir}/qemu_stderr.log", "w")
  112.     if is_win32():
  113.         return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True)
  114.     else:
  115.         a = shlex.split(s)
  116.         return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True)
  117.  
  118. def run(root_dir, test_dir):
  119.     debug_log = f"{test_dir}/debug.log"
  120.     if os.path.exists(debug_log):
  121.         os.remove(debug_log)
  122.     popen = run_qemu(root_dir, test_dir, debug_log)
  123.     return Qemu(popen, debug_log)
  124.  
  125.