Subversion Repositories Kolibri OS

Rev

Rev 9254 | Rev 9256 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

  1. # Copyright 2021 KolibriOS Team
  2. # Copyright 2021 Nekos Team
  3. # Published under MIT License
  4.  
  5. import io
  6. import os
  7. import subprocess
  8. import timeit
  9. import time
  10. import shlex
  11. import signal
  12.  
  13. class TestTimeoutException(Exception):
  14.     pass
  15.  
  16. class TestFailureException(Exception):
  17.     pass
  18.  
  19. class Qemu:
  20.     def __init__(self, popen):
  21.         self.popen = popen
  22.         # Qemu needs time to create debug.log file
  23.         while not os.path.exists("debug.log"):
  24.             self.wait()
  25.         self.debug = open("debug.log", "rb")
  26.         self.monitor_in = open("monitor.pipe.in", "wb")
  27.  
  28.     def wait_for_debug_log(self, needle, timeout = 1):
  29.         needle = bytes(needle, "utf-8")
  30.         start = timeit.default_timer()
  31.         log = b""
  32.  
  33.         # While no timeout, read and search logs
  34.         while timeit.default_timer() - start < timeout:
  35.             log += self.debug.read(1)
  36.             if needle in log:
  37.                 return
  38.  
  39.             # We don't have to read whole logs to find the neddle
  40.             # If we read len(needle) * 2 bytes of log then we
  41.             # already can say that if there's no needle in the data
  42.             # then it can't be in first len(needle) bytes of the data
  43.             # so first len(needle) bytes of saved logs may be thrown away
  44.             #
  45.             # So we consume lessser memory and don't search all the previous
  46.             # logs every single time
  47.             if len(log) > len(needle) * 2:
  48.                 log = log[len(needle):]
  49.  
  50.         self.timeout()
  51.  
  52.     def kill(self):
  53.         os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
  54.  
  55.     def failure(self):
  56.         self.kill()
  57.         raise TestFailureException()
  58.  
  59.     def timeout(self):
  60.         self.kill()
  61.         raise TestTimeoutException()
  62.  
  63.     def wait(self, seconds = 0.25):
  64.         time.sleep(seconds)
  65.  
  66.     def send_keys(self, keys):
  67.         for key in keys.split():
  68.             self.send_monitor_command(f"sendkey {key}")
  69.  
  70.     def send_monitor_command(self, command):
  71.         self.monitor_in.write(bytes(command + "\n", "utf-8"))
  72.         self.monitor_in.flush()
  73.  
  74. def run():
  75.     if os.path.exists("debug.log"):
  76.         os.remove("debug.log")
  77.     if os.path.exists("monitor.pipe.in"):
  78.         os.remove("monitor.pipe.in")
  79.     if os.path.exists("monitor.pipe.out"):
  80.         os.remove("monitor.pipe.out")
  81.     os.system("mkfifo monitor.pipe.in monitor.pipe.out")
  82.     s = f"qemu-system-i386 -nographic -monitor pipe:monitor.pipe -debugcon file:debug.log -L . -m 128 -drive format=raw,file=../../kolibri_test.img,index=0,if=floppy -boot a -vga vmware -net nic,model=rtl8139 -net user -soundhw ac97"
  83.     a = shlex.split(s)
  84.     popen = subprocess.Popen(a, bufsize = 0, stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL, start_new_session = True)
  85.     return Qemu(popen)
  86.  
  87.