legacy_rds.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import datetime
  2. import json
  3. from typing import Optional
  4. import boto3
  5. from ray_release.aws import (
  6. RELEASE_AWS_DB_SECRET_ARN,
  7. RELEASE_AWS_DB_RESOURCE_ARN,
  8. RELEASE_AWS_DB_NAME,
  9. RELEASE_AWS_DB_TABLE,
  10. )
  11. from ray_release.config import Test
  12. from ray_release.template import get_test_env_var
  13. from ray_release.logger import logger
  14. from ray_release.reporter.reporter import Reporter
  15. from ray_release.result import Result
  16. from ray_release.util import exponential_backoff_retry
  17. DEFAULT_LEGACY_DB_TABLE = "release_test_result"
  18. class LegacyRDSReporter(Reporter):
  19. def __init__(
  20. self, database: Optional[str] = None, database_table: Optional[str] = None
  21. ):
  22. self.database = database or RELEASE_AWS_DB_NAME
  23. self.database_table = database_table or RELEASE_AWS_DB_TABLE
  24. def report_result(self, test: Test, result: Result):
  25. logger.info("Persisting results to database...")
  26. result_dict = {
  27. "_runtime": result.runtime,
  28. # Keep session url for legacy support
  29. "_session_url": result.cluster_url,
  30. "_cluster_url": result.cluster_url,
  31. "_commit_url": result.wheels_url,
  32. "_stable": result.stable,
  33. }
  34. now = datetime.datetime.utcnow()
  35. rds_data_client = boto3.client("rds-data", region_name="us-west-2")
  36. if "legacy" in test:
  37. test_name = test["legacy"]["test_name"]
  38. test_suite = test["legacy"]["test_suite"]
  39. else:
  40. test_name = test["name"]
  41. test_suite = ""
  42. team = test["team"] or ""
  43. # Branch name
  44. category = get_test_env_var("RAY_BRANCH", "")
  45. status = result.status or "invalid"
  46. last_logs = result.last_logs or ""
  47. if result.results:
  48. result_dict.update(result.results)
  49. artifacts = {}
  50. parameters = [
  51. {
  52. "name": "created_on",
  53. "typeHint": "TIMESTAMP",
  54. "value": {"stringValue": now.strftime("%Y-%m-%d %H:%M:%S")},
  55. },
  56. {"name": "test_suite", "value": {"stringValue": test_suite}},
  57. {"name": "test_name", "value": {"stringValue": test_name}},
  58. {"name": "status", "value": {"stringValue": status}},
  59. {"name": "last_logs", "value": {"stringValue": last_logs}},
  60. {
  61. "name": "results",
  62. "typeHint": "JSON",
  63. "value": {"stringValue": json.dumps(result_dict)},
  64. },
  65. {
  66. "name": "artifacts",
  67. "typeHint": "JSON",
  68. "value": {"stringValue": json.dumps(artifacts)},
  69. },
  70. {"name": "category", "value": {"stringValue": category}},
  71. {"name": "team", "value": {"stringValue": team}},
  72. {"name": "session_url", "value": {"stringValue": result.cluster_url or ""}},
  73. {"name": "commit_url", "value": {"stringValue": result.wheels_url or ""}},
  74. {"name": "runtime", "value": {"doubleValue": result.runtime or -1.0}},
  75. {"name": "stable", "value": {"booleanValue": result.stable}},
  76. {"name": "frequency", "value": {"stringValue": test.get("frequency", "")}},
  77. {"name": "return_code", "value": {"longValue": result.return_code}},
  78. ]
  79. columns = [param["name"] for param in parameters]
  80. values = [f":{param['name']}" for param in parameters]
  81. column_str = ", ".join(columns).strip(", ")
  82. value_str = ", ".join(values).strip(", ")
  83. sql = (
  84. f"INSERT INTO {self.database_table} "
  85. f"({column_str}) "
  86. f"VALUES ({value_str})"
  87. )
  88. logger.debug(f"SQL query: {sql}")
  89. # Default boto3 call timeout is 45 seconds.
  90. retry_delay_s = 64
  91. MAX_RDS_RETRY = 3
  92. exponential_backoff_retry(
  93. lambda: rds_data_client.execute_statement(
  94. database=self.database,
  95. parameters=parameters,
  96. secretArn=RELEASE_AWS_DB_SECRET_ARN,
  97. resourceArn=RELEASE_AWS_DB_RESOURCE_ARN,
  98. schema=self.database_table,
  99. sql=sql,
  100. ),
  101. retry_exceptions=rds_data_client.exceptions.StatementTimeoutException,
  102. initial_retry_delay_s=retry_delay_s,
  103. max_retries=MAX_RDS_RETRY,
  104. )
  105. logger.info("Result has been persisted to the database")