queued_pipe.py 593 B

123456789101112131415161718192021222324
  1. from multiprocessing import Pipe, Queue
  2. import time
  3. import threading
  4. class PipeSide(object):
  5. def __init__(self, q_2remote, q_2local) -> None:
  6. self.q_2remote = q_2remote
  7. self.q_2local = q_2local
  8. def recv(self):
  9. return self.q_2local.get()
  10. def send(self, buf):
  11. self.q_2remote.put(buf)
  12. def poll(self):
  13. return not self.q_2local.empty()
  14. def create_queue_pipe():
  15. q_p2c = Queue()
  16. q_c2p = Queue()
  17. pipe_c = PipeSide(q_2local=q_p2c, q_2remote=q_c2p)
  18. pipe_p = PipeSide(q_2local=q_c2p, q_2remote=q_p2c)
  19. return pipe_c, pipe_p