threading.py 1.0 KB

12345678910111213141516171819202122232425262728293031323334
  1. from typing import Callable
  2. from ray.rllib.utils.annotations import DeveloperAPI
  3. @DeveloperAPI
  4. def with_lock(func: Callable) -> Callable:
  5. """Use as decorator (@withlock) around object methods that need locking.
  6. Note: The object must have a self._lock = threading.Lock() property.
  7. Locking thus works on the object level (no two locked methods of the same
  8. object can be called asynchronously).
  9. Args:
  10. func: The function to decorate/wrap.
  11. Returns:
  12. The wrapped (object-level locked) function.
  13. """
  14. def wrapper(self, *a, **k):
  15. try:
  16. with self._lock:
  17. return func(self, *a, **k)
  18. except AttributeError as e:
  19. if "has no attribute '_lock'" in e.args[0]:
  20. raise AttributeError(
  21. "Object {} must have a `self._lock` property (assigned "
  22. "to a threading.RLock() object in its "
  23. "constructor)!".format(self)
  24. )
  25. raise e
  26. return wrapper