baseHandler.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. from time import perf_counter
  2. import logging
  3. logger = logging.getLogger(__name__)
  4. class BaseHandler:
  5. """
  6. Base class for pipeline parts. Each part of the pipeline has an input and an output queue.
  7. The `setup` method along with `setup_args` and `setup_kwargs` can be used to address the specific requirements of the implemented pipeline part.
  8. To stop a handler properly, set the stop_event and, to avoid queue deadlocks, place b"END" in the input queue.
  9. Objects placed in the input queue will be processed by the `process` method, and the yielded results will be placed in the output queue.
  10. The cleanup method handles stopping the handler, and b"END" is placed in the output queue.
  11. """
  12. def __init__(self, stop_event, queue_in, queue_out, setup_args=(), setup_kwargs={}):
  13. self.stop_event = stop_event
  14. self.queue_in = queue_in
  15. self.queue_out = queue_out
  16. self.setup(*setup_args, **setup_kwargs)
  17. self._times = []
  18. def setup(self):
  19. pass
  20. def process(self):
  21. raise NotImplementedError
  22. def run(self):
  23. while not self.stop_event.is_set():
  24. input = self.queue_in.get()
  25. if isinstance(input, bytes) and input == b"END":
  26. # sentinelle signal to avoid queue deadlock
  27. logger.debug("Stopping thread")
  28. break
  29. start_time = perf_counter()
  30. for output in self.process(input):
  31. self._times.append(perf_counter() - start_time)
  32. if self.last_time > self.min_time_to_debug:
  33. logger.debug(f"{self.__class__.__name__}: {self.last_time: .3f} s")
  34. self.queue_out.put(output)
  35. start_time = perf_counter()
  36. self.cleanup()
  37. self.queue_out.put(b"END")
  38. @property
  39. def last_time(self):
  40. return self._times[-1]
  41. @property
  42. def min_time_to_debug(self):
  43. return 0.001
  44. def cleanup(self):
  45. pass