Subversion Repositories Kolibri OS

Rev

Rev 9316 | Rev 9320 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

  1. # Copyright 2021 KolibriOS Team
  2. # Copyright 2021 Nekos Team
  3. # Published under MIT License
  4.  
  5. import io
  6. import os
  7. import sys
  8. import subprocess
  9. import timeit
  10. import time
  11. import shlex
  12. import signal
  13. import shutil
  14. from . makeflop import Floppy
  15.  
  16. def is_win32():
  17.     return True if sys.platform == "win32" else False
  18.  
  19. def is_linux():
  20.     return True if sys.platform == "linux" or sys.platform == "linux2" else False
  21.  
  22. def  is_osx():
  23.     return True if sys.platform == "darwin" else False
  24.  
  25. class TestTimeoutException(Exception):
  26.     pass
  27.  
  28. class TestFailureException(Exception):
  29.     pass
  30.  
  31. class Qemu:
  32.     def __init__(self, popen):
  33.         self.popen = popen
  34.         # Qemu needs time to create debug.log file
  35.         while not os.path.exists("debug.log"):
  36.             self.wait()
  37.         self.debug = open("debug.log", "rb")
  38.  
  39.     def wait_for_debug_log(self, needle, timeout = 1):
  40.         needle = bytes(needle, "utf-8")
  41.         start = timeit.default_timer()
  42.         log = b""
  43.  
  44.         # While no timeout, read and search logs
  45.         while timeit.default_timer() - start < timeout:
  46.             log += self.debug.read(1)
  47.             if needle in log:
  48.                 return
  49.  
  50.             # We don't have to read whole logs to find the neddle
  51.             # If we read len(needle) * 2 bytes of log then we
  52.             # already can say that if there's no needle in the data
  53.             # then it can't be in first len(needle) bytes of the data
  54.             # so first len(needle) bytes of saved logs may be thrown away
  55.             #
  56.             # So we consume lessser memory and don't search all the previous
  57.             # logs every single time
  58.             if len(log) > len(needle) * 2:
  59.                 log = log[len(needle):]
  60.  
  61.         self.timeout()
  62.  
  63.     def kill(self):
  64.         if is_win32():
  65.             # FIXME: This is shit, isn't there anything better?
  66.             subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL)
  67.         else:
  68.             os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
  69.  
  70.     def failure(self):
  71.         self.kill()
  72.         raise TestFailureException()
  73.  
  74.     def timeout(self):
  75.         self.kill()
  76.         raise TestTimeoutException()
  77.  
  78.     def wait(self, seconds = 0.25):
  79.         time.sleep(seconds)
  80.  
  81. def get_file_directory(path):
  82.     path = path.replace("\\", "/")
  83.     if "/" in path:
  84.         folder = "/".join(path.split("/")[:-1])
  85.         if folder == "":
  86.             return "/" # It was a file in the root folder
  87.         return folder
  88.     else:
  89.         return "." # Just a filename, let's return current folder
  90.  
  91. def run_qemu():
  92.     qemu_command = f"qemu-system-i386"
  93.     flags = ""
  94.     flags += "-nographic " # Makes it faster
  95.     flags += "-debugcon file:debug.log " # 0xe9 port output
  96.     flags += "-L . " # IDK why it does not work without this
  97.     flags += "-m 128 "
  98.     flags += "-drive format=raw,file=../../kolibri_test.img,index=0,if=floppy -boot a "
  99.     flags += "-vga vmware "
  100.     flags += "-net nic,model=rtl8139 -net user "
  101.     flags += "-soundhw ac97 "
  102.     if is_win32():
  103.         qemu_full_path = shutil.which(qemu_command)
  104.         qemu_directory = get_file_directory(qemu_full_path)
  105.         flags += f"-L {qemu_directory} "
  106.     s = f"{qemu_command} {flags}"
  107.     if is_win32():
  108.         return subprocess.Popen(s, bufsize = 0, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL, shell = True, start_new_session = True)
  109.     else:
  110.         a = shlex.split(s)
  111.         return subprocess.Popen(a, bufsize = 0, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL, start_new_session = True)
  112.  
  113. def run():
  114.     if os.path.exists("debug.log"):
  115.         os.remove("debug.log")
  116.     popen = run_qemu()
  117.     return Qemu(popen)
  118.  
  119.