1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- import os
- import sys
- from typing import no_type_check
- class FdRedirect:
- def __init__(self, file_prefix: str, fd: int):
- fname = os.path.join("/tmp", f"{file_prefix}.{fd}")
- if os.path.exists(fname):
- os.unlink(fname)
- self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
- self.dest_fname = fname
- self.source_fd = fd
- os.set_inheritable(self.dest_fd, True)
- def __del__(self):
- os.close(self.dest_fd)
- def purge(self) -> None:
- os.unlink(self.dest_fname)
- def read(self) -> bytes:
- with open(self.dest_fname, "rb") as f:
- return f.read() or b""
- def link(self) -> None:
- os.dup2(self.dest_fd, self.source_fd)
- class ProcessOutputCapture:
- def __init__(self, proc_name: str, prefix: str):
- prefix = f"{proc_name}_{prefix}"
- self.stdout_redirect = FdRedirect(prefix, 1)
- self.stderr_redirect = FdRedirect(prefix, 2)
- def __del__(self):
- self.stdout_redirect.purge()
- self.stderr_redirect.purge()
- @no_type_check # ipython classes have incompatible signatures
- def link_with_current_proc(self) -> None:
- try:
- # prevent ipykernel from redirecting stdout/stderr of python subprocesses
- from ipykernel.iostream import OutStream
- if isinstance(sys.stdout, OutStream):
- sys.stdout = sys.__stdout__
- if isinstance(sys.stderr, OutStream):
- sys.stderr = sys.__stderr__
- except ImportError:
- pass
- # link stdout/stderr to the fifo
- self.stdout_redirect.link()
- self.stderr_redirect.link()
- def read_outerr(self) -> tuple[str, str]:
- out_str = self.stdout_redirect.read().decode()
- err_str = self.stderr_redirect.read().decode()
- return out_str, err_str
|