123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- #!/usr/bin/env python3
- import os
- import threading
- import time
- import unittest
- import subprocess
- import signal
- if "CI" in os.environ:
- def tqdm(x):
- return x
- else:
- from tqdm import tqdm # type: ignore
- import cereal.messaging as messaging
- from collections import namedtuple
- from openpilot.tools.lib.logreader import LogReader
- from openpilot.tools.lib.openpilotci import get_url
- from openpilot.common.basedir import BASEDIR
- ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response'])
- CONFIGS = [
- ProcessConfig(
- proc_name="ubloxd",
- pub_sub={
- "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
- },
- ignore=[],
- command="./ubloxd",
- path="system/ubloxd",
- segment="0375fdf7b1ce594d|2019-06-13--08-32-25--3",
- wait_for_response=True
- ),
- ]
- class TestValgrind(unittest.TestCase):
- def extract_leak_sizes(self, log):
- if "All heap blocks were freed -- no leaks are possible" in log:
- return (0,0,0)
- log = log.replace(",","") # fixes casting to int issue with large leaks
- err_lost1 = log.split("definitely lost: ")[1]
- err_lost2 = log.split("indirectly lost: ")[1]
- err_lost3 = log.split("possibly lost: ")[1]
- definitely_lost = int(err_lost1.split(" ")[0])
- indirectly_lost = int(err_lost2.split(" ")[0])
- possibly_lost = int(err_lost3.split(" ")[0])
- return (definitely_lost, indirectly_lost, possibly_lost)
- def valgrindlauncher(self, arg, cwd):
- os.chdir(os.path.join(BASEDIR, cwd))
- # Run valgrind on a process
- command = "valgrind --leak-check=full " + arg
- p = subprocess.Popen(command, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
- while not self.replay_done:
- time.sleep(0.1)
- # Kill valgrind and extract leak output
- os.killpg(os.getpgid(p.pid), signal.SIGINT)
- _, err = p.communicate()
- error_msg = str(err, encoding='utf-8')
- with open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "a") as f:
- f.write(error_msg)
- f.write(5 * "\n")
- definitely_lost, indirectly_lost, possibly_lost = self.extract_leak_sizes(error_msg)
- if max(definitely_lost, indirectly_lost, possibly_lost) > 0:
- self.leak = True
- print("LEAKS from", arg, "\nDefinitely lost:", definitely_lost, "\nIndirectly lost", indirectly_lost, "\nPossibly lost", possibly_lost)
- else:
- self.leak = False
- def replay_process(self, config, logreader):
- pub_sockets = list(config.pub_sub.keys()) # We dump data from logs here
- sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub] # We get responses here
- pm = messaging.PubMaster(pub_sockets)
- sm = messaging.SubMaster(sub_sockets)
- print("Sorting logs")
- all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime)
- pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())]
- thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path))
- thread.daemon = True
- thread.start()
- while not all(pm.all_readers_updated(s) for s in config.pub_sub.keys()):
- time.sleep(0)
- for msg in tqdm(pub_msgs):
- pm.send(msg.which(), msg.as_builder())
- if config.wait_for_response:
- sm.update(100)
- self.replay_done = True
- def test_config(self):
- open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "w").close()
- for cfg in CONFIGS:
- self.leak = None
- self.replay_done = False
- r, n = cfg.segment.rsplit("--", 1)
- lr = LogReader(get_url(r, n))
- self.replay_process(cfg, lr)
- while self.leak is None:
- time.sleep(0.1) # Wait for the valgrind to finish
- self.assertFalse(self.leak)
- if __name__ == "__main__":
- unittest.main()
|