capture.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import os
  2. import sys
  3. from typing import no_type_check
  4. class FdRedirect:
  5. def __init__(self, file_prefix: str, fd: int):
  6. fname = os.path.join("/tmp", f"{file_prefix}.{fd}")
  7. if os.path.exists(fname):
  8. os.unlink(fname)
  9. self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
  10. self.dest_fname = fname
  11. self.source_fd = fd
  12. os.set_inheritable(self.dest_fd, True)
  13. def __del__(self):
  14. os.close(self.dest_fd)
  15. def purge(self) -> None:
  16. os.unlink(self.dest_fname)
  17. def read(self) -> bytes:
  18. with open(self.dest_fname, "rb") as f:
  19. return f.read() or b""
  20. def link(self) -> None:
  21. os.dup2(self.dest_fd, self.source_fd)
  22. class ProcessOutputCapture:
  23. def __init__(self, proc_name: str, prefix: str):
  24. prefix = f"{proc_name}_{prefix}"
  25. self.stdout_redirect = FdRedirect(prefix, 1)
  26. self.stderr_redirect = FdRedirect(prefix, 2)
  27. def __del__(self):
  28. self.stdout_redirect.purge()
  29. self.stderr_redirect.purge()
  30. @no_type_check # ipython classes have incompatible signatures
  31. def link_with_current_proc(self) -> None:
  32. try:
  33. # prevent ipykernel from redirecting stdout/stderr of python subprocesses
  34. from ipykernel.iostream import OutStream
  35. if isinstance(sys.stdout, OutStream):
  36. sys.stdout = sys.__stdout__
  37. if isinstance(sys.stderr, OutStream):
  38. sys.stderr = sys.__stderr__
  39. except ImportError:
  40. pass
  41. # link stdout/stderr to the fifo
  42. self.stdout_redirect.link()
  43. self.stderr_redirect.link()
  44. def read_outerr(self) -> tuple[str, str]:
  45. out_str = self.stdout_redirect.read().decode()
  46. err_str = self.stderr_redirect.read().decode()
  47. return out_str, err_str