logreader.cc 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #include "tools/replay/logreader.h"
  2. #include <algorithm>
  3. #include <utility>
  4. #include "tools/replay/filereader.h"
  5. #include "tools/replay/util.h"
  6. #include "common/util.h"
  7. bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
  8. std::string data = FileReader(local_cache, chunk_size, retries).read(url, abort);
  9. if (!data.empty()) {
  10. if (url.find(".bz2") != std::string::npos || util::starts_with(data, "BZh9")) {
  11. data = decompressBZ2(data, abort);
  12. } else if (url.find(".zst") != std::string::npos || util::starts_with(data, "\x28\xB5\x2F\xFD")) {
  13. data = decompressZST(data, abort);
  14. }
  15. }
  16. bool success = !data.empty() && load(data.data(), data.size(), abort);
  17. if (filters_.empty())
  18. raw_ = std::move(data);
  19. return success;
  20. }
  21. bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
  22. try {
  23. events.reserve(65000);
  24. kj::ArrayPtr<const capnp::word> words((const capnp::word *)data, size / sizeof(capnp::word));
  25. while (words.size() > 0 && !(abort && *abort)) {
  26. capnp::FlatArrayMessageReader reader(words);
  27. auto event = reader.getRoot<cereal::Event>();
  28. auto which = event.which();
  29. auto event_data = kj::arrayPtr(words.begin(), reader.getEnd());
  30. words = kj::arrayPtr(reader.getEnd(), words.end());
  31. if (which == cereal::Event::Which::SELFDRIVE_STATE) {
  32. requires_migration = false;
  33. }
  34. if (!filters_.empty()) {
  35. if (which >= filters_.size() || !filters_[which])
  36. continue;
  37. auto buf = buffer_.allocate(event_data.size() * sizeof(capnp::word));
  38. memcpy(buf, event_data.begin(), event_data.size() * sizeof(capnp::word));
  39. event_data = kj::arrayPtr((const capnp::word *)buf, event_data.size());
  40. }
  41. uint64_t mono_time = event.getLogMonoTime();
  42. const Event &evt = events.emplace_back(which, mono_time, event_data);
  43. // Add encodeIdx packet again as a frame packet for the video stream
  44. if (evt.which == cereal::Event::ROAD_ENCODE_IDX ||
  45. evt.which == cereal::Event::DRIVER_ENCODE_IDX ||
  46. evt.which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
  47. auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
  48. if (idx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
  49. uint64_t sof = idx.getTimestampSof();
  50. events.emplace_back(which, sof ? sof : mono_time, event_data, idx.getSegmentNum());
  51. }
  52. }
  53. }
  54. } catch (const kj::Exception &e) {
  55. rWarning("Failed to parse log : %s.\nRetrieved %zu events from corrupt log", e.getDescription().cStr(), events.size());
  56. }
  57. if (requires_migration) {
  58. migrateOldEvents();
  59. }
  60. if (!events.empty() && !(abort && *abort)) {
  61. events.shrink_to_fit();
  62. std::sort(events.begin(), events.end());
  63. return true;
  64. }
  65. return false;
  66. }
  67. void LogReader::migrateOldEvents() {
  68. size_t events_size = events.size();
  69. for (int i = 0; i < events_size; ++i) {
  70. // Check if the event is of the old CONTROLS_STATE type
  71. auto &event = events[i];
  72. if (event.which == cereal::Event::CONTROLS_STATE) {
  73. // Read the old event data
  74. capnp::FlatArrayMessageReader reader(event.data);
  75. auto old_evt = reader.getRoot<cereal::Event>();
  76. auto old_state = old_evt.getControlsState();
  77. // Migrate relevant fields from old CONTROLS_STATE to new SelfdriveState
  78. MessageBuilder msg;
  79. auto new_evt = msg.initEvent(old_evt.getValid());
  80. new_evt.setLogMonoTime(old_evt.getLogMonoTime());
  81. auto new_state = new_evt.initSelfdriveState();
  82. new_state.setActive(old_state.getActiveDEPRECATED());
  83. new_state.setAlertSize(old_state.getAlertSizeDEPRECATED());
  84. new_state.setAlertSound(old_state.getAlertSound2DEPRECATED());
  85. new_state.setAlertStatus(old_state.getAlertStatusDEPRECATED());
  86. new_state.setAlertText1(old_state.getAlertText1DEPRECATED());
  87. new_state.setAlertText2(old_state.getAlertText2DEPRECATED());
  88. new_state.setAlertType(old_state.getAlertTypeDEPRECATED());
  89. new_state.setEnabled(old_state.getEnabledDEPRECATED());
  90. new_state.setEngageable(old_state.getEngageableDEPRECATED());
  91. new_state.setExperimentalMode(old_state.getExperimentalModeDEPRECATED());
  92. new_state.setPersonality(old_state.getPersonalityDEPRECATED());
  93. new_state.setState(old_state.getStateDEPRECATED());
  94. // Serialize the new event to the buffer
  95. auto buf_size = msg.getSerializedSize();
  96. auto buf = buffer_.allocate(buf_size);
  97. msg.serializeToBuffer(reinterpret_cast<unsigned char *>(buf), buf_size);
  98. // Store the migrated event in the events list
  99. auto event_data = kj::arrayPtr(reinterpret_cast<const capnp::word *>(buf), buf_size);
  100. events.emplace_back(new_evt.which(), new_evt.getLogMonoTime(), event_data);
  101. }
  102. }
  103. }