Subversion Repositories Kolibri OS

Rev

Rev 9257 | Rev 9319 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

  1. # Copyright 2021 KolibriOS Team
  2. # Copyright 2021 Nekos Team
  3. # Published under MIT License
  4.  
  5. import io
  6. import os
  7. import subprocess
  8. import timeit
  9. import time
  10. import shlex
  11. import signal
  12. from . makeflop import Floppy
  13.  
  14. class TestTimeoutException(Exception):
  15.     pass
  16.  
  17. class TestFailureException(Exception):
  18.     pass
  19.  
  20. class Qemu:
  21.     def __init__(self, popen):
  22.         self.popen = popen
  23.         # Qemu needs time to create debug.log file
  24.         while not os.path.exists("debug.log"):
  25.             self.wait()
  26.         self.debug = open("debug.log", "rb")
  27.         self.monitor_in = open("monitor.pipe.in", "wb")
  28.  
  29.     def wait_for_debug_log(self, needle, timeout = 1):
  30.         needle = bytes(needle, "utf-8")
  31.         start = timeit.default_timer()
  32.         log = b""
  33.  
  34.         # While no timeout, read and search logs
  35.         while timeit.default_timer() - start < timeout:
  36.             log += self.debug.read(1)
  37.             if needle in log:
  38.                 return
  39.  
  40.             # We don't have to read whole logs to find the neddle
  41.             # If we read len(needle) * 2 bytes of log then we
  42.             # already can say that if there's no needle in the data
  43.             # then it can't be in first len(needle) bytes of the data
  44.             # so first len(needle) bytes of saved logs may be thrown away
  45.             #
  46.             # So we consume lessser memory and don't search all the previous
  47.             # logs every single time
  48.             if len(log) > len(needle) * 2:
  49.                 log = log[len(needle):]
  50.  
  51.         self.timeout()
  52.  
  53.     def kill(self):
  54.         os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
  55.  
  56.     def failure(self):
  57.         self.kill()
  58.         raise TestFailureException()
  59.  
  60.     def timeout(self):
  61.         self.kill()
  62.         raise TestTimeoutException()
  63.  
  64.     def wait(self, seconds = 0.25):
  65.         time.sleep(seconds)
  66.  
  67.     def send_keys(self, keys):
  68.         for key in keys.split():
  69.             self.send_monitor_command(f"sendkey {key}")
  70.  
  71.     def take_screenshot(self, fname):
  72.         self.send_monitor_command(f"screendump {fname}")
  73.  
  74.     def send_monitor_command(self, command):
  75.         self.monitor_in.write(bytes(command + "\n", "utf-8"))
  76.         self.monitor_in.flush()
  77.  
  78.     def images_diff(self, i0, i1, expect=True):
  79.         diff = bool(os.system(f"perceptualdiff {i0} {i1} > /dev/null"))
  80.         if diff != expect:
  81.             self.failure()
  82.  
  83. def run():
  84.     if os.path.exists("debug.log"):
  85.         os.remove("debug.log")
  86.     if os.path.exists("monitor.pipe.in"):
  87.         os.remove("monitor.pipe.in")
  88.     if os.path.exists("monitor.pipe.out"):
  89.         os.remove("monitor.pipe.out")
  90.     os.system("mkfifo monitor.pipe.in monitor.pipe.out")
  91.     s = f"qemu-system-i386 -nographic -monitor pipe:monitor.pipe -debugcon file:debug.log -L . -m 128 -drive format=raw,file=../../kolibri_test.img,index=0,if=floppy -boot a -vga vmware -net nic,model=rtl8139 -net user -soundhw ac97"
  92.     a = shlex.split(s)
  93.     popen = subprocess.Popen(a, bufsize = 0, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL, start_new_session = True)
  94.     return Qemu(popen)
  95.  
  96.