replay.cc 19 KB


  1. #include "tools/replay/replay.h"
  2. #include <QDebug>
  3. #include <QtConcurrent>
  4. #include <capnp/dynamic.h>
  5. #include <csignal>
  6. #include "cereal/services.h"
  7. #include "common/params.h"
  8. #include "common/timing.h"
  9. #include "tools/replay/util.h"
  10. static void interrupt_sleep_handler(int signal) {}
  11. Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_,
  12. uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
  13. // Register signal handler for SIGUSR1
  14. std::signal(SIGUSR1, interrupt_sleep_handler);
  15. if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
  16. block << "uiDebug" << "userFlag";
  17. }
  18. auto event_struct = capnp::Schema::from<cereal::Event>().asStruct();
  19. sockets_.resize(event_struct.getUnionFields().size());
  20. for (const auto &[name, _] : services) {
  21. if (!block.contains(name.c_str()) && (allow.empty() || allow.contains(name.c_str()))) {
  22. uint16_t which = event_struct.getFieldByName(name).getProto().getDiscriminantValue();
  23. sockets_[which] = name.c_str();
  24. }
  25. }
  26. if (!allow.isEmpty()) {
  27. for (int i = 0; i < sockets_.size(); ++i) {
  28. filters_.push_back(i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
  29. }
  30. }
  31. std::vector<const char *> s;
  32. std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(s),
  33. [](const char *name) { return name != nullptr; });
  34. qDebug() << "services " << s;
  35. qDebug() << "loading route " << route;
  36. if (sm == nullptr) {
  37. pm = std::make_unique<PubMaster>(s);
  38. }
  39. route_ = std::make_unique<Route>(route, data_dir);
  40. }
  41. Replay::~Replay() {
  42. stop();
  43. }
  44. void Replay::stop() {
  45. exit_ = true;
  46. if (stream_thread_ != nullptr) {
  47. rInfo("shutdown: in progress...");
  48. pauseStreamThread();
  49. stream_cv_.notify_one();
  50. stream_thread_->quit();
  51. stream_thread_->wait();
  52. stream_thread_->deleteLater();
  53. stream_thread_ = nullptr;
  54. rInfo("shutdown: done");
  55. }
  56. timeline_future.waitForFinished();
  57. camera_server_.reset(nullptr);
  58. segments_.clear();
  59. }
  60. bool Replay::load() {
  61. if (!route_->load()) {
  62. qCritical() << "failed to load route" << route_->name()
  63. << "from" << (route_->dir().isEmpty() ? "server" : route_->dir());
  64. return false;
  65. }
  66. for (auto &[n, f] : route_->segments()) {
  67. bool has_log = !f.rlog.isEmpty() || !f.qlog.isEmpty();
  68. bool has_video = !f.road_cam.isEmpty() || !f.qcamera.isEmpty();
  69. if (has_log && (has_video || hasFlag(REPLAY_FLAG_NO_VIPC))) {
  70. segments_.insert({n, nullptr});
  71. }
  72. }
  73. if (segments_.empty()) {
  74. qCritical() << "no valid segments in route" << route_->name();
  75. return false;
  76. }
  77. rInfo("load route %s with %zu valid segments", qPrintable(route_->name()), segments_.size());
  78. max_seconds_ = (segments_.rbegin()->first + 1) * 60;
  79. return true;
  80. }
  81. void Replay::start(int seconds) {
  82. seekTo(route_->identifier().begin_segment * 60 + seconds, false);
  83. }
  84. void Replay::updateEvents(const std::function<bool()> &update_events_function) {
  85. pauseStreamThread();
  86. {
  87. std::unique_lock lk(stream_lock_);
  88. events_ready_ = update_events_function();
  89. paused_ = user_paused_;
  90. }
  91. stream_cv_.notify_one();
  92. }
  93. void Replay::seekTo(double seconds, bool relative) {
  94. updateEvents([&]() {
  95. double target_time = relative ? seconds + currentSeconds() : seconds;
  96. target_time = std::max(double(0.0), target_time);
  97. int target_segment = (int)target_time / 60;
  98. if (segments_.count(target_segment) == 0) {
  99. rWarning("Can't seek to %.2f s segment %d is invalid", target_time, target_segment);
  100. return true;
  101. }
  102. if (target_time > max_seconds_) {
  103. rWarning("Can't seek to %.2f s, time is invalid", target_time);
  104. return true;
  105. }
  106. rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
  107. current_segment_ = target_segment;
  108. cur_mono_time_ = route_start_ts_ + target_time * 1e9;
  109. seeking_to_ = target_time;
  110. return false;
  111. });
  112. checkSeekProgress();
  113. updateSegmentsCache();
  114. }
  115. void Replay::checkSeekProgress() {
  116. if (seeking_to_) {
  117. auto it = segments_.find(int(*seeking_to_ / 60));
  118. if (it != segments_.end() && it->second && it->second->isLoaded()) {
  119. emit seekedTo(*seeking_to_);
  120. seeking_to_ = std::nullopt;
  121. // wake up stream thread
  122. updateEvents([]() { return true; });
  123. } else {
  124. // Emit signal indicating the ongoing seek operation
  125. emit seeking(*seeking_to_);
  126. }
  127. }
  128. }
  129. void Replay::seekToFlag(FindFlag flag) {
  130. if (auto next = find(flag)) {
  131. seekTo(*next - 2, false); // seek to 2 seconds before next
  132. }
  133. }
  134. void Replay::buildTimeline() {
  135. uint64_t engaged_begin = 0;
  136. bool engaged = false;
  137. auto alert_status = cereal::SelfdriveState::AlertStatus::NORMAL;
  138. auto alert_size = cereal::SelfdriveState::AlertSize::NONE;
  139. uint64_t alert_begin = 0;
  140. std::string alert_type;
  141. const TimelineType timeline_types[] = {
  142. [(int)cereal::SelfdriveState::AlertStatus::NORMAL] = TimelineType::AlertInfo,
  143. [(int)cereal::SelfdriveState::AlertStatus::USER_PROMPT] = TimelineType::AlertWarning,
  144. [(int)cereal::SelfdriveState::AlertStatus::CRITICAL] = TimelineType::AlertCritical,
  145. };
  146. const auto &route_segments = route_->segments();
  147. for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) {
  148. std::shared_ptr<LogReader> log(new LogReader());
  149. if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3) || log->events.empty()) continue;
  150. std::vector<std::tuple<double, double, TimelineType>> timeline;
  151. for (const Event &e : log->events) {
  152. if (e.which == cereal::Event::Which::SELFDRIVE_STATE) {
  153. capnp::FlatArrayMessageReader reader(e.data);
  154. auto event = reader.getRoot<cereal::Event>();
  155. auto cs = event.getSelfdriveState();
  156. if (engaged != cs.getEnabled()) {
  157. if (engaged) {
  158. timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged});
  159. }
  160. engaged_begin = e.mono_time;
  161. engaged = cs.getEnabled();
  162. }
  163. if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) {
  164. if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) {
  165. timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]});
  166. }
  167. alert_begin = e.mono_time;
  168. alert_type = cs.getAlertType().cStr();
  169. alert_size = cs.getAlertSize();
  170. alert_status = cs.getAlertStatus();
  171. }
  172. } else if (e.which == cereal::Event::Which::USER_FLAG) {
  173. timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag});
  174. }
  175. }
  176. if (it->first == route_segments.rbegin()->first) {
  177. if (engaged) {
  178. timeline.push_back({toSeconds(engaged_begin), toSeconds(log->events.back().mono_time), TimelineType::Engaged});
  179. }
  180. if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) {
  181. timeline.push_back({toSeconds(alert_begin), toSeconds(log->events.back().mono_time), timeline_types[(int)alert_status]});
  182. }
  183. max_seconds_ = std::ceil(toSeconds(log->events.back().mono_time));
  184. emit minMaxTimeChanged(route_segments.cbegin()->first * 60.0, max_seconds_);
  185. }
  186. {
  187. std::lock_guard lk(timeline_lock);
  188. timeline_.insert(timeline_.end(), timeline.begin(), timeline.end());
  189. std::sort(timeline_.begin(), timeline_.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); });
  190. }
  191. emit qLogLoaded(log);
  192. }
  193. }
  194. std::optional<uint64_t> Replay::find(FindFlag flag) {
  195. int cur_ts = currentSeconds();
  196. for (auto [start_ts, end_ts, type] : getTimeline()) {
  197. if (type == TimelineType::Engaged) {
  198. if (flag == FindFlag::nextEngagement && start_ts > cur_ts) {
  199. return start_ts;
  200. } else if (flag == FindFlag::nextDisEngagement && end_ts > cur_ts) {
  201. return end_ts;
  202. }
  203. } else if (start_ts > cur_ts) {
  204. if ((flag == FindFlag::nextUserFlag && type == TimelineType::UserFlag) ||
  205. (flag == FindFlag::nextInfo && type == TimelineType::AlertInfo) ||
  206. (flag == FindFlag::nextWarning && type == TimelineType::AlertWarning) ||
  207. (flag == FindFlag::nextCritical && type == TimelineType::AlertCritical)) {
  208. return start_ts;
  209. }
  210. }
  211. }
  212. return std::nullopt;
  213. }
  214. void Replay::pause(bool pause) {
  215. if (user_paused_ != pause) {
  216. pauseStreamThread();
  217. {
  218. std::unique_lock lk(stream_lock_);
  219. rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
  220. paused_ = user_paused_ = pause;
  221. }
  222. stream_cv_.notify_one();
  223. }
  224. }
  225. void Replay::pauseStreamThread() {
  226. paused_ = true;
  227. // Send SIGUSR1 to interrupt clock_nanosleep
  228. if (stream_thread_ && stream_thread_id) {
  229. pthread_kill(stream_thread_id, SIGUSR1);
  230. }
  231. }
  232. void Replay::segmentLoadFinished(bool success) {
  233. if (!success) {
  234. Segment *seg = qobject_cast<Segment *>(sender());
  235. rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num);
  236. updateEvents([&]() {
  237. segments_.erase(seg->seg_num);
  238. return !segments_.empty();
  239. });
  240. }
  241. updateSegmentsCache();
  242. }
  243. void Replay::updateSegmentsCache() {
  244. auto cur = segments_.lower_bound(current_segment_.load());
  245. if (cur == segments_.end()) return;
  246. // Calculate the range of segments to load
  247. auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur)));
  248. auto end = std::next(begin, std::min<int>(segment_cache_limit, std::distance(begin, segments_.end())));
  249. begin = std::prev(end, std::min<int>(segment_cache_limit, std::distance(segments_.begin(), end)));
  250. loadSegmentInRange(begin, cur, end);
  251. mergeSegments(begin, end);
  252. // free segments out of current semgnt window.
  253. std::for_each(segments_.begin(), begin, [](auto &e) { e.second.reset(nullptr); });
  254. std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); });
  255. // start stream thread
  256. const auto &cur_segment = cur->second;
  257. if (stream_thread_ == nullptr && cur_segment->isLoaded()) {
  258. startStream(cur_segment.get());
  259. }
  260. }
  261. void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
  262. auto loadNextSegment = [this](auto first, auto last) {
  263. auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
  264. if (it != last && !it->second) {
  265. rDebug("loading segment %d...", it->first);
  266. it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_);
  267. QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
  268. return true;
  269. }
  270. return false;
  271. };
  272. // Try loading forward segments, then reverse segments
  273. if (!loadNextSegment(cur, end)) {
  274. loadNextSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin));
  275. }
  276. }
  277. void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
  278. std::set<int> segments_to_merge;
  279. size_t new_events_size = 0;
  280. for (auto it = begin; it != end; ++it) {
  281. if (it->second && it->second->isLoaded()) {
  282. segments_to_merge.insert(it->first);
  283. new_events_size += it->second->log->events.size();
  284. }
  285. }
  286. if (segments_to_merge == merged_segments_) return;
  287. rDebug("merge segments %s", std::accumulate(segments_to_merge.begin(), segments_to_merge.end(), std::string{},
  288. [](auto & a, int b) { return a + (a.empty() ? "" : ", ") + std::to_string(b); }).c_str());
  289. std::vector<Event> new_events;
  290. new_events.reserve(new_events_size);
  291. // Merge events from segments_to_merge into new_events
  292. for (int n : segments_to_merge) {
  293. size_t size = new_events.size();
  294. const auto &events = segments_.at(n)->log->events;
  295. std::copy_if(events.begin(), events.end(), std::back_inserter(new_events),
  296. [this](const Event &e) { return e.which < sockets_.size() && sockets_[e.which] != nullptr; });
  297. std::inplace_merge(new_events.begin(), new_events.begin() + size, new_events.end());
  298. }
  299. if (stream_thread_) {
  300. emit segmentsMerged();
  301. }
  302. updateEvents([&]() {
  303. events_.swap(new_events);
  304. merged_segments_ = segments_to_merge;
  305. // Wake up the stream thread if the current segment is loaded or invalid.
  306. return !seeking_to_ && (isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0));
  307. });
  308. checkSeekProgress();
  309. }
  310. void Replay::startStream(const Segment *cur_segment) {
  311. const auto &events = cur_segment->log->events;
  312. route_start_ts_ = events.front().mono_time;
  313. cur_mono_time_ += route_start_ts_ - 1;
  314. // get datetime from INIT_DATA, fallback to datetime in the route name
  315. route_date_time_ = route()->datetime();
  316. auto it = std::find_if(events.cbegin(), events.cend(),
  317. [](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; });
  318. if (it != events.cend()) {
  319. capnp::FlatArrayMessageReader reader(it->data);
  320. auto event = reader.getRoot<cereal::Event>();
  321. uint64_t wall_time = event.getInitData().getWallTimeNanos();
  322. if (wall_time > 0) {
  323. route_date_time_ = QDateTime::fromMSecsSinceEpoch(wall_time / 1e6);
  324. }
  325. }
  326. // write CarParams
  327. it = std::find_if(events.begin(), events.end(), [](const Event &e) { return e.which == cereal::Event::Which::CAR_PARAMS; });
  328. if (it != events.end()) {
  329. capnp::FlatArrayMessageReader reader(it->data);
  330. auto event = reader.getRoot<cereal::Event>();
  331. car_fingerprint_ = event.getCarParams().getCarFingerprint();
  332. capnp::MallocMessageBuilder builder;
  333. builder.setRoot(event.getCarParams());
  334. auto words = capnp::messageToFlatArray(builder);
  335. auto bytes = words.asBytes();
  336. Params().put("CarParams", (const char *)bytes.begin(), bytes.size());
  337. Params().put("CarParamsPersistent", (const char *)bytes.begin(), bytes.size());
  338. } else {
  339. rWarning("failed to read CarParams from current segment");
  340. }
  341. // start camera server
  342. if (!hasFlag(REPLAY_FLAG_NO_VIPC)) {
  343. std::pair<int, int> camera_size[MAX_CAMERAS] = {};
  344. for (auto type : ALL_CAMERAS) {
  345. if (auto &fr = cur_segment->frames[type]) {
  346. camera_size[type] = {fr->width, fr->height};
  347. }
  348. }
  349. camera_server_ = std::make_unique<CameraServer>(camera_size);
  350. }
  351. emit segmentsMerged();
  352. // start stream thread
  353. stream_thread_ = new QThread();
  354. QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); });
  355. stream_thread_->start();
  356. timeline_future = QtConcurrent::run(this, &Replay::buildTimeline);
  357. emit streamStarted();
  358. }
  359. void Replay::publishMessage(const Event *e) {
  360. if (event_filter && event_filter(e, filter_opaque)) return;
  361. if (sm == nullptr) {
  362. auto bytes = e->data.asBytes();
  363. int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
  364. if (ret == -1) {
  365. rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]);
  366. sockets_[e->which] = nullptr;
  367. }
  368. } else {
  369. capnp::FlatArrayMessageReader reader(e->data);
  370. auto event = reader.getRoot<cereal::Event>();
  371. sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}});
  372. }
  373. }
  374. void Replay::publishFrame(const Event *e) {
  375. CameraType cam;
  376. switch (e->which) {
  377. case cereal::Event::ROAD_ENCODE_IDX: cam = RoadCam; break;
  378. case cereal::Event::DRIVER_ENCODE_IDX: cam = DriverCam; break;
  379. case cereal::Event::WIDE_ROAD_ENCODE_IDX: cam = WideRoadCam; break;
  380. default: return; // Invalid event type
  381. }
  382. if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM)))
  383. return; // Camera isdisabled
  384. if (isSegmentMerged(e->eidx_segnum)) {
  385. auto &segment = segments_.at(e->eidx_segnum);
  386. if (auto &frame = segment->frames[cam]; frame) {
  387. camera_server_->pushFrame(cam, frame.get(), e);
  388. }
  389. }
  390. }
  391. void Replay::streamThread() {
  392. stream_thread_id = pthread_self();
  393. cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
  394. std::unique_lock lk(stream_lock_);
  395. while (true) {
  396. stream_cv_.wait(lk, [=]() { return exit_ || ( events_ready_ && !paused_); });
  397. if (exit_) break;
  398. Event event(cur_which, cur_mono_time_, {});
  399. auto first = std::upper_bound(events_.cbegin(), events_.cend(), event);
  400. if (first == events_.cend()) {
  401. rInfo("waiting for events...");
  402. events_ready_ = false;
  403. continue;
  404. }
  405. auto it = publishEvents(first, events_.cend());
  406. // Ensure frames are sent before unlocking to prevent race conditions
  407. if (camera_server_) {
  408. camera_server_->waitForSent();
  409. }
  410. if (it != events_.cend()) {
  411. cur_which = it->which;
  412. } else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) {
  413. // Check for loop end and restart if necessary
  414. int last_segment = segments_.rbegin()->first;
  415. if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) {
  416. rInfo("reaches the end of route, restart from beginning");
  417. QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, minSeconds(), false), Qt::QueuedConnection);
  418. }
  419. }
  420. }
  421. }
  422. std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::const_iterator first,
  423. std::vector<Event>::const_iterator last) {
  424. uint64_t evt_start_ts = cur_mono_time_;
  425. uint64_t loop_start_ts = nanos_since_boot();
  426. double prev_replay_speed = speed_;
  427. for (; !paused_ && first != last; ++first) {
  428. const Event &evt = *first;
  429. int segment = toSeconds(evt.mono_time) / 60;
  430. if (current_segment_ != segment) {
  431. current_segment_ = segment;
  432. QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection);
  433. }
  434. // Skip events if socket is not present
  435. if (!sockets_[evt.which]) continue;
  436. cur_mono_time_ = evt.mono_time;
  437. const uint64_t current_nanos = nanos_since_boot();
  438. const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts);
  439. // Reset timestamps for potential synchronization issues:
  440. // - A negative time_diff may indicate slow execution or system wake-up,
  441. // - A time_diff exceeding 1 second suggests a skipped segment.
  442. if ((time_diff < -1e9 || time_diff >= 1e9) || speed_ != prev_replay_speed) {
  443. evt_start_ts = evt.mono_time;
  444. loop_start_ts = current_nanos;
  445. prev_replay_speed = speed_;
  446. } else if (time_diff > 0) {
  447. precise_nano_sleep(time_diff, paused_);
  448. }
  449. if (paused_) break;
  450. if (evt.eidx_segnum == -1) {
  451. publishMessage(&evt);
  452. } else if (camera_server_) {
  453. if (speed_ > 1.0) {
  454. camera_server_->waitForSent();
  455. }
  456. publishFrame(&evt);
  457. }
  458. }
  459. return first;
  460. }