123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- diff --git opencensus/stats/internal/delta_producer.cc opencensus/stats/internal/delta_producer.cc
- index c61b4d9..b3e4ef2 100644
- --- opencensus/stats/internal/delta_producer.cc
- +++ opencensus/stats/internal/delta_producer.cc
- @@ -75,6 +75,20 @@ DeltaProducer* DeltaProducer::Get() {
- return global_delta_producer;
- }
-
- +void DeltaProducer::Shutdown() {
- + {
- + absl::MutexLock l(&mu_);
- + if (!thread_started_) {
- + return;
- + }
- + thread_started_ = false;
- + }
- + // Join loop thread when shutdown.
- + if (harvester_thread_.joinable()) {
- + harvester_thread_.join();
- + }
- +}
- +
- void DeltaProducer::AddMeasure() {
- delta_mu_.Lock();
- absl::MutexLock harvester_lock(&harvester_mu_);
- @@ -115,7 +129,10 @@ void DeltaProducer::Flush() {
- }
-
- DeltaProducer::DeltaProducer()
- - : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {}
- + : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {
- + absl::MutexLock l(&mu_);
- + thread_started_ = true;
- +}
-
- void DeltaProducer::SwapDeltas() {
- ABSL_ASSERT(last_delta_.delta().empty() && "Last delta was not consumed.");
- @@ -131,11 +148,19 @@ void DeltaProducer::RunHarvesterLoop() {
- absl::Time next_harvest_time = absl::Now() + harvest_interval_;
- while (true) {
- const absl::Time now = absl::Now();
- - absl::SleepFor(next_harvest_time - now);
- + absl::SleepFor(absl::Seconds(0.1));
- // Account for the possibility that the last harvest took longer than
- // harvest_interval_ and we are already past next_harvest_time.
- - next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
- - Flush();
- + if (absl::Now() > next_harvest_time) {
- + next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
- + Flush();
- + }
- + {
- + absl::MutexLock l(&mu_);
- + if (!thread_started_) {
- + break;
- + }
- + }
- }
- }
-
- diff --git opencensus/stats/internal/delta_producer.h opencensus/stats/internal/delta_producer.h
- index 2cff522..c8e2e95 100644
- --- opencensus/stats/internal/delta_producer.h
- +++ opencensus/stats/internal/delta_producer.h
- @@ -71,6 +71,8 @@ class DeltaProducer final {
- // Returns a pointer to the singleton DeltaProducer.
- static DeltaProducer* Get();
-
- + void Shutdown();
- +
- // Adds a new Measure.
- void AddMeasure();
-
- @@ -122,6 +124,9 @@ class DeltaProducer final {
- // thread when calling a flush during harvesting.
- Delta last_delta_ GUARDED_BY(harvester_mu_);
- std::thread harvester_thread_ GUARDED_BY(harvester_mu_);
- +
- + mutable absl::Mutex mu_;
- + bool thread_started_ GUARDED_BY(mu_) = false;
- };
-
- } // namespace stats
- diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal/stats_exporter.cc
- index 43ddbc7..37b4ae1 100644
- --- opencensus/stats/internal/stats_exporter.cc
- +++ opencensus/stats/internal/stats_exporter.cc
- @@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() {
- }
-
- void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- - t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
- thread_started_ = true;
- + t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
- +}
- +
- +void StatsExporterImpl::Shutdown() {
- + {
- + absl::MutexLock l(&mu_);
- + if (!thread_started_) {
- + return;
- + }
- + thread_started_ = false;
- + }
- + // Join loop thread when shutdown.
- + if (t_.joinable()) {
- + t_.join();
- + }
- }
-
- void StatsExporterImpl::RunWorkerLoop() {
- absl::Time next_export_time = GetNextExportTime();
- while (true) {
- // SleepFor() returns immediately when given a negative duration.
- - absl::SleepFor(next_export_time - absl::Now());
- + absl::SleepFor(absl::Seconds(0.1));
- // In case the last export took longer than the export interval, we
- // calculate the next time from now.
- - next_export_time = GetNextExportTime();
- - Export();
- + if (absl::Now() > next_export_time) {
- + next_export_time = GetNextExportTime();
- + Export();
- + }
- + {
- + absl::MutexLock l(&mu_);
- + if (!thread_started_) {
- + break;
- + }
- + }
- }
- }
-
- // StatsExporter
- // -------------
-
- +void StatsExporter::Shutdown() {
- + StatsExporterImpl::Get()->Shutdown();
- + StatsExporterImpl::Get()->ClearHandlersForTesting();
- +}
- +
- // static
- void StatsExporter::SetInterval(absl::Duration interval) {
- StatsExporterImpl::Get()->SetInterval(interval);
- diff --git opencensus/stats/internal/stats_exporter_impl.h opencensus/stats/internal/stats_exporter_impl.h
- index 11ae3c4..ebe9c4d 100644
- --- opencensus/stats/internal/stats_exporter_impl.h
- +++ opencensus/stats/internal/stats_exporter_impl.h
- @@ -34,6 +34,7 @@ class StatsExporterImpl {
- public:
- static StatsExporterImpl* Get();
- void SetInterval(absl::Duration interval);
- + void Shutdown();
- absl::Time GetNextExportTime() const;
- void AddView(const ViewDescriptor& view);
- void RemoveView(absl::string_view name);
- diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h
- index 6756858..65e0262 100644
- --- opencensus/stats/stats_exporter.h
- +++ opencensus/stats/stats_exporter.h
- @@ -45,6 +45,8 @@ class StatsExporter final {
- // Removes the view with 'name' from the registry, if one is registered.
- static void RemoveView(absl::string_view name);
-
- + static void Shutdown();
- +
- // StatsExporter::Handler is the interface for push exporters that export
- // recorded data for registered views. The exporter should provide a static
- // Register() method that takes any arguments needed by the exporter (e.g. a
|