actor_utils.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import time
  2. import re
  3. from collections import defaultdict
  4. PYCLASSNAME_RE = re.compile(r"(.+?)\(")
  5. def construct_actor_groups(actors):
  6. """actors is a dict from actor id to an actor or an
  7. actor creation task The shared fields currently are
  8. "actorClass", "actorId", and "state" """
  9. actor_groups = _group_actors_by_python_class(actors)
  10. stats_by_group = {
  11. name: _get_actor_group_stats(group) for name, group in actor_groups.items()
  12. }
  13. summarized_actor_groups = {}
  14. for name, group in actor_groups.items():
  15. summarized_actor_groups[name] = {
  16. "entries": group,
  17. "summary": stats_by_group[name],
  18. }
  19. return summarized_actor_groups
  20. def actor_classname_from_task_spec(task_spec):
  21. return (
  22. task_spec.get("functionDescriptor", {})
  23. .get("pythonFunctionDescriptor", {})
  24. .get("className", "Unknown actor class")
  25. .split(".")[-1]
  26. )
  27. def _group_actors_by_python_class(actors):
  28. groups = defaultdict(list)
  29. for actor in actors.values():
  30. actor_class = actor["actorClass"]
  31. groups[actor_class].append(actor)
  32. return dict(groups)
  33. def _get_actor_group_stats(group):
  34. state_to_count = defaultdict(lambda: 0)
  35. executed_tasks = 0
  36. min_timestamp = None
  37. num_timestamps = 0
  38. sum_timestamps = 0
  39. now = time.time() * 1000 # convert S -> MS
  40. for actor in group:
  41. state_to_count[actor["state"]] += 1
  42. if "timestamp" in actor:
  43. if not min_timestamp or actor["timestamp"] < min_timestamp:
  44. min_timestamp = actor["timestamp"]
  45. num_timestamps += 1
  46. sum_timestamps += now - actor["timestamp"]
  47. if "numExecutedTasks" in actor:
  48. executed_tasks += actor["numExecutedTasks"]
  49. if num_timestamps > 0:
  50. avg_lifetime = int((sum_timestamps / num_timestamps) / 1000)
  51. max_lifetime = int((now - min_timestamp) / 1000)
  52. else:
  53. avg_lifetime = 0
  54. max_lifetime = 0
  55. return {
  56. "stateToCount": state_to_count,
  57. "avgLifetime": avg_lifetime,
  58. "maxLifetime": max_lifetime,
  59. "numExecutedTasks": executed_tasks,
  60. }