spinner.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import os
  2. import subprocess
  3. from openpilot.common.basedir import BASEDIR
  4. class Spinner:
  5. def __init__(self):
  6. try:
  7. self.spinner_proc = subprocess.Popen(["./spinner"],
  8. stdin=subprocess.PIPE,
  9. cwd=os.path.join(BASEDIR, "selfdrive", "ui"),
  10. close_fds=True)
  11. except OSError:
  12. self.spinner_proc = None
  13. def __enter__(self):
  14. return self
  15. def update(self, spinner_text: str):
  16. if self.spinner_proc is not None:
  17. self.spinner_proc.stdin.write(spinner_text.encode('utf8') + b"\n")
  18. try:
  19. self.spinner_proc.stdin.flush()
  20. except BrokenPipeError:
  21. pass
  22. def update_progress(self, cur: float, total: float):
  23. self.update(str(round(100 * cur / total)))
  24. def close(self):
  25. if self.spinner_proc is not None:
  26. self.spinner_proc.kill()
  27. try:
  28. self.spinner_proc.communicate(timeout=2.)
  29. except subprocess.TimeoutExpired:
  30. print("WARNING: failed to kill spinner")
  31. self.spinner_proc = None
  32. def __del__(self):
  33. self.close()
  34. def __exit__(self, exc_type, exc_value, traceback):
  35. self.close()
  36. if __name__ == "__main__":
  37. import time
  38. with Spinner() as s:
  39. s.update("Spinner text")
  40. time.sleep(5.0)
  41. print("gone")
  42. time.sleep(5.0)