diff --git a/sentinel-core/log/block/block_log_task.cc b/sentinel-core/log/block/block_log_task.cc index 27192d63..5ae21994 100644 --- a/sentinel-core/log/block/block_log_task.cc +++ b/sentinel-core/log/block/block_log_task.cc @@ -81,7 +81,7 @@ void BlockLogTask::Log(const std::string& resource, const std::string& cause) { } auto key = absl::StrFormat("%s|%s", resource, cause); { - absl::ReaderMutexLock lck(&mtx_); + absl::WriterMutexLock lck(&mtx_); auto it = map_.find(key); if (it != map_.end()) { it->second.last_block_ = TimeUtils::CurrentTimeMillis().count(); diff --git a/sentinel-core/statistic/base/leap_array.h b/sentinel-core/statistic/base/leap_array.h index 7a43f1c1..7cdd345d 100644 --- a/sentinel-core/statistic/base/leap_array.h +++ b/sentinel-core/statistic/base/leap_array.h @@ -48,7 +48,7 @@ class LeapArray { const int32_t bucket_length_ms_; // time length of each bucket private: const std::unique_ptr[]> array_; - std::mutex mtx_; + mutable std::mutex leap_array_mtx_; int32_t CalculateTimeIdx(/*@Valid*/ int64_t time_millis) const; int64_t CalculateWindowStart(/*@Valid*/ int64_t time_millis) const; @@ -78,9 +78,11 @@ WindowWrapSharedPtr LeapArray::CurrentWindow(int64_t time_millis) { int64_t bucket_start = CalculateWindowStart(time_millis); while (true) { + std::unique_lock lck(leap_array_mtx_, std::defer_lock); + leap_array_mtx_.lock(); WindowWrapSharedPtr old = array_[idx]; + leap_array_mtx_.unlock(); if (old == nullptr) { - std::unique_lock lck(mtx_, std::defer_lock); if (lck.try_lock() && array_[idx] == nullptr) { WindowWrapSharedPtr bucket = std::make_shared>( bucket_length_ms_, bucket_start, NewEmptyBucket(time_millis)); @@ -90,7 +92,7 @@ WindowWrapSharedPtr LeapArray::CurrentWindow(int64_t time_millis) { } else if (bucket_start == old->BucketStart()) { return old; } else if (bucket_start > old->BucketStart()) { - std::unique_lock lck(mtx_, std::defer_lock); + std::unique_lock lck(leap_array_mtx_, std::defer_lock); if (lck.try_lock()) { ResetWindowTo(old, bucket_start); return old; @@ -148,7 +150,9 @@ std::vector> LeapArray::Buckets( } int size = sample_count_; // array_.size() for (int i = 0; i < size; i++) { + leap_array_mtx_.lock(); auto w = array_[i]; + leap_array_mtx_.unlock(); if (w == nullptr || IsBucketDeprecated(time_millis, w)) { continue; } @@ -166,7 +170,9 @@ std::vector> LeapArray::Values( } int size = sample_count_; // array_.size() for (int i = 0; i < size; i++) { + leap_array_mtx_.lock(); WindowWrapSharedPtr w = array_[i]; + leap_array_mtx_.unlock(); if (w == nullptr || IsBucketDeprecated(time_millis, w)) { continue; } diff --git a/sentinel-core/transport/common/BUILD b/sentinel-core/transport/common/BUILD index efd476b8..7f977f71 100644 --- a/sentinel-core/transport/common/BUILD +++ b/sentinel-core/transport/common/BUILD @@ -11,6 +11,7 @@ cc_library( ], deps = [ "//:libevent", + "@com_google_absl//absl/synchronization", ], visibility = ["//visibility:public"], ) diff --git a/sentinel-core/transport/common/event_loop_thread.cc b/sentinel-core/transport/common/event_loop_thread.cc index 7bd8c843..e1075234 100644 --- a/sentinel-core/transport/common/event_loop_thread.cc +++ b/sentinel-core/transport/common/event_loop_thread.cc @@ -7,14 +7,16 @@ namespace Sentinel { namespace Transport { -EventLoopThread::EventLoopThread() : stoped_(true) {} +EventLoopThread::EventLoopThread() = default; bool EventLoopThread::Start() { std::promise start_promise; auto start_future = start_promise.get_future(); - thd_.reset( - new std::thread([this, &start_promise] { this->Work(start_promise); })); + thd_.reset(new std::thread( + [start_promise = std::move(start_promise), this]() mutable { + this->Work(std::move(start_promise)); + })); return start_future.get(); } @@ -30,7 +32,7 @@ void EventLoopThread::Stop() { thd_->join(); } -void EventLoopThread::Work(std::promise& promise) { +void EventLoopThread::Work(std::promise&& promise) { auto ret = InitEventBase(); if (!ret) { promise.set_value(false); @@ -41,7 +43,9 @@ void EventLoopThread::Work(std::promise& promise) { Dispatch(); - ClearEventBase(); + // Do free job outside by whom use eventloop event_base struct, i.e., + // HttpServer. If not follow the rule above, which will lead to unexpected + // problems of data race. ClearEventBase(); } bool EventLoopThread::InitEventBase() { @@ -100,15 +104,15 @@ void EventLoopThread::Dispatch() { } } -void EventLoopThread::RunTask(Functor func) { +void EventLoopThread::RunTask(Functor&& func) { if (IsInLoopThread()) { func(); return; } { - std::lock_guard lock(task_mutex_); - pending_tasks_.emplace_back(func); + absl::WriterMutexLock lck(&task_mutex_); + pending_tasks_.emplace_back(std::move(func)); } Wakeup(); @@ -141,7 +145,7 @@ void EventLoopThread::DoPendingTasks() { std::vector functors; { - std::lock_guard lock(task_mutex_); + absl::WriterMutexLock lck(&task_mutex_); functors.swap(pending_tasks_); } diff --git a/sentinel-core/transport/common/event_loop_thread.h b/sentinel-core/transport/common/event_loop_thread.h index 6afcec1d..64957915 100644 --- a/sentinel-core/transport/common/event_loop_thread.h +++ b/sentinel-core/transport/common/event_loop_thread.h @@ -7,6 +7,8 @@ #include +#include "absl/synchronization/mutex.h" + namespace Sentinel { namespace Transport { @@ -22,7 +24,7 @@ class EventLoopThread { struct event_base *GetEventBase(); - void RunTask(Functor func); + void RunTask(Functor &&func); bool IsInLoopThread() const; @@ -31,7 +33,7 @@ class EventLoopThread { void ClearEventBase(); void Dispatch(); - void Work(std::promise &promise); + void Work(std::promise &&promise); void Wakeup(); void DoPendingTasks(); @@ -42,11 +44,11 @@ class EventLoopThread { struct event_base *base_ = nullptr; std::unique_ptr thd_; - std::atomic stoped_; + std::atomic stoped_{true}; evutil_socket_t wakeup_fd_[2]; // 0:read 1:write - std::mutex task_mutex_; - std::vector pending_tasks_; + absl::Mutex task_mutex_; + std::vector pending_tasks_ GUARDED_BY(task_mutex_); }; } // namespace Transport diff --git a/tests/tsan-flow.cc b/tests/tsan-flow.cc index 3a858390..6a55dc2f 100644 --- a/tests/tsan-flow.cc +++ b/tests/tsan-flow.cc @@ -36,8 +36,9 @@ void doAnotherEntry() { doEntry("big_brother_service:foo()"); } int main() { // Initialize for Sentinel. Sentinel::Log::Logger::InitDefaultLogger(); - Sentinel::Transport::HttpCommandCenterInitTarget command_center_init; - command_center_init.Initialize(); + Sentinel::Transport::HttpCommandCenterInitTarget* p_command_center_init = + new Sentinel::Transport::HttpCommandCenterInitTarget(); + p_command_center_init->Initialize(); Sentinel::Log::MetricLogTask metric_log_task; metric_log_task.Initialize(); @@ -73,5 +74,8 @@ int main() { t4.join(); t5.join(); t6.join(); + + delete p_command_center_init; + return 0; }