test_valgrind_replay.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. #!/usr/bin/env python3
  2. import os
  3. import threading
  4. import time
  5. import unittest
  6. import subprocess
  7. import signal
  8. if "CI" in os.environ:
  9. def tqdm(x):
  10. return x
  11. else:
  12. from tqdm import tqdm # type: ignore
  13. import cereal.messaging as messaging
  14. from collections import namedtuple
  15. from tools.lib.logreader import LogReader
  16. from selfdrive.test.openpilotci import get_url
  17. from common.basedir import BASEDIR
  18. ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response'])
  19. CONFIGS = [
  20. ProcessConfig(
  21. proc_name="ubloxd",
  22. pub_sub={
  23. "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
  24. },
  25. ignore=[],
  26. command="./ubloxd",
  27. path="selfdrive/locationd/",
  28. segment="0375fdf7b1ce594d|2019-06-13--08-32-25--3",
  29. wait_for_response=True
  30. ),
  31. ]
  32. class TestValgrind(unittest.TestCase):
  33. def extract_leak_sizes(self, log):
  34. if "All heap blocks were freed -- no leaks are possible" in log:
  35. return (0,0,0)
  36. log = log.replace(",","") # fixes casting to int issue with large leaks
  37. err_lost1 = log.split("definitely lost: ")[1]
  38. err_lost2 = log.split("indirectly lost: ")[1]
  39. err_lost3 = log.split("possibly lost: ")[1]
  40. definitely_lost = int(err_lost1.split(" ")[0])
  41. indirectly_lost = int(err_lost2.split(" ")[0])
  42. possibly_lost = int(err_lost3.split(" ")[0])
  43. return (definitely_lost, indirectly_lost, possibly_lost)
  44. def valgrindlauncher(self, arg, cwd):
  45. os.chdir(os.path.join(BASEDIR, cwd))
  46. # Run valgrind on a process
  47. command = "valgrind --leak-check=full " + arg
  48. p = subprocess.Popen(command, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid) # pylint: disable=W1509
  49. while not self.replay_done:
  50. time.sleep(0.1)
  51. # Kill valgrind and extract leak output
  52. os.killpg(os.getpgid(p.pid), signal.SIGINT)
  53. _, err = p.communicate()
  54. error_msg = str(err, encoding='utf-8')
  55. with open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "a") as f:
  56. f.write(error_msg)
  57. f.write(5 * "\n")
  58. definitely_lost, indirectly_lost, possibly_lost = self.extract_leak_sizes(error_msg)
  59. if max(definitely_lost, indirectly_lost, possibly_lost) > 0:
  60. self.leak = True
  61. print("LEAKS from", arg, "\nDefinitely lost:", definitely_lost, "\nIndirectly lost", indirectly_lost, "\nPossibly lost", possibly_lost)
  62. else:
  63. self.leak = False
  64. def replay_process(self, config, logreader):
  65. pub_sockets = [s for s in config.pub_sub.keys()] # We dump data from logs here
  66. sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub] # We get responses here
  67. pm = messaging.PubMaster(pub_sockets)
  68. sm = messaging.SubMaster(sub_sockets)
  69. print("Sorting logs")
  70. all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime)
  71. pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())]
  72. thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path))
  73. thread.daemon = True
  74. thread.start()
  75. while not all(pm.all_readers_updated(s) for s in config.pub_sub.keys()):
  76. time.sleep(0)
  77. for msg in tqdm(pub_msgs):
  78. pm.send(msg.which(), msg.as_builder())
  79. if config.wait_for_response:
  80. sm.update(100)
  81. self.replay_done = True
  82. def test_config(self):
  83. open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "w").close()
  84. for cfg in CONFIGS:
  85. self.leak = None
  86. self.replay_done = False
  87. r, n = cfg.segment.rsplit("--", 1)
  88. lr = LogReader(get_url(r, n))
  89. self.replay_process(cfg, lr)
  90. while self.leak is None:
  91. time.sleep(0.1) # Wait for the valgrind to finish
  92. self.assertFalse(self.leak)
  93. if __name__ == "__main__":
  94. unittest.main()