Skip to content

Commit

Permalink
Fix data race problem related to leap array and event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
chenneal committed Jan 18, 2021
1 parent 780220f commit 45a5802
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 20 deletions.
2 changes: 1 addition & 1 deletion sentinel-core/log/block/block_log_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void BlockLogTask::Log(const std::string& resource, const std::string& cause) {
}
auto key = absl::StrFormat("%s|%s", resource, cause);
{
absl::ReaderMutexLock lck(&mtx_);
absl::WriterMutexLock lck(&mtx_);
auto it = map_.find(key);
if (it != map_.end()) {
it->second.last_block_ = TimeUtils::CurrentTimeMillis().count();
Expand Down
12 changes: 9 additions & 3 deletions sentinel-core/statistic/base/leap_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class LeapArray {
const int32_t bucket_length_ms_; // time length of each bucket
private:
const std::unique_ptr<WindowWrapSharedPtr<T>[]> array_;
std::mutex mtx_;
mutable std::mutex leap_array_mtx_;

int32_t CalculateTimeIdx(/*@Valid*/ int64_t time_millis) const;
int64_t CalculateWindowStart(/*@Valid*/ int64_t time_millis) const;
Expand Down Expand Up @@ -78,9 +78,11 @@ WindowWrapSharedPtr<T> LeapArray<T>::CurrentWindow(int64_t time_millis) {
int64_t bucket_start = CalculateWindowStart(time_millis);

while (true) {
std::unique_lock<std::mutex> lck(leap_array_mtx_, std::defer_lock);
leap_array_mtx_.lock();
WindowWrapSharedPtr<T> old = array_[idx];
leap_array_mtx_.unlock();
if (old == nullptr) {
std::unique_lock<std::mutex> lck(mtx_, std::defer_lock);
if (lck.try_lock() && array_[idx] == nullptr) {
WindowWrapSharedPtr<T> bucket = std::make_shared<WindowWrap<T>>(
bucket_length_ms_, bucket_start, NewEmptyBucket(time_millis));
Expand All @@ -90,7 +92,7 @@ WindowWrapSharedPtr<T> LeapArray<T>::CurrentWindow(int64_t time_millis) {
} else if (bucket_start == old->BucketStart()) {
return old;
} else if (bucket_start > old->BucketStart()) {
std::unique_lock<std::mutex> lck(mtx_, std::defer_lock);
std::unique_lock<std::mutex> lck(leap_array_mtx_, std::defer_lock);
if (lck.try_lock()) {
ResetWindowTo(old, bucket_start);
return old;
Expand Down Expand Up @@ -148,7 +150,9 @@ std::vector<WindowWrapSharedPtr<T>> LeapArray<T>::Buckets(
}
int size = sample_count_; // array_.size()
for (int i = 0; i < size; i++) {
leap_array_mtx_.lock();
auto w = array_[i];
leap_array_mtx_.unlock();
if (w == nullptr || IsBucketDeprecated(time_millis, w)) {
continue;
}
Expand All @@ -166,7 +170,9 @@ std::vector<std::shared_ptr<T>> LeapArray<T>::Values(
}
int size = sample_count_; // array_.size()
for (int i = 0; i < size; i++) {
leap_array_mtx_.lock();
WindowWrapSharedPtr<T> w = array_[i];
leap_array_mtx_.unlock();
if (w == nullptr || IsBucketDeprecated(time_millis, w)) {
continue;
}
Expand Down
1 change: 1 addition & 0 deletions sentinel-core/transport/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ cc_library(
],
deps = [
"//:libevent",
"@com_google_absl//absl/synchronization",
],
visibility = ["//visibility:public"],
)
Expand Down
22 changes: 13 additions & 9 deletions sentinel-core/transport/common/event_loop_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
namespace Sentinel {
namespace Transport {

EventLoopThread::EventLoopThread() : stoped_(true) {}
EventLoopThread::EventLoopThread() = default;

bool EventLoopThread::Start() {
std::promise<bool> start_promise;
auto start_future = start_promise.get_future();

thd_.reset(
new std::thread([this, &start_promise] { this->Work(start_promise); }));
thd_.reset(new std::thread(
[start_promise = std::move(start_promise), this]() mutable {
this->Work(std::move(start_promise));
}));

return start_future.get();
}
Expand All @@ -30,7 +32,7 @@ void EventLoopThread::Stop() {
thd_->join();
}

void EventLoopThread::Work(std::promise<bool>& promise) {
void EventLoopThread::Work(std::promise<bool>&& promise) {
auto ret = InitEventBase();
if (!ret) {
promise.set_value(false);
Expand All @@ -41,7 +43,9 @@ void EventLoopThread::Work(std::promise<bool>& promise) {

Dispatch();

ClearEventBase();
// Do free job outside by whom use eventloop event_base struct, i.e.,
// HttpServer. If not follow the rule above, which will lead to unexpected
// problems of data race. ClearEventBase();
}

bool EventLoopThread::InitEventBase() {
Expand Down Expand Up @@ -100,15 +104,15 @@ void EventLoopThread::Dispatch() {
}
}

void EventLoopThread::RunTask(Functor func) {
void EventLoopThread::RunTask(Functor&& func) {
if (IsInLoopThread()) {
func();
return;
}

{
std::lock_guard<std::mutex> lock(task_mutex_);
pending_tasks_.emplace_back(func);
absl::WriterMutexLock lck(&task_mutex_);
pending_tasks_.emplace_back(std::move(func));
}

Wakeup();
Expand Down Expand Up @@ -141,7 +145,7 @@ void EventLoopThread::DoPendingTasks() {
std::vector<Functor> functors;

{
std::lock_guard<std::mutex> lock(task_mutex_);
absl::WriterMutexLock lck(&task_mutex_);
functors.swap(pending_tasks_);
}

Expand Down
12 changes: 7 additions & 5 deletions sentinel-core/transport/common/event_loop_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <event2/event.h>

#include "absl/synchronization/mutex.h"

namespace Sentinel {
namespace Transport {

Expand All @@ -22,7 +24,7 @@ class EventLoopThread {

struct event_base *GetEventBase();

void RunTask(Functor func);
void RunTask(Functor &&func);

bool IsInLoopThread() const;

Expand All @@ -31,7 +33,7 @@ class EventLoopThread {
void ClearEventBase();

void Dispatch();
void Work(std::promise<bool> &promise);
void Work(std::promise<bool> &&promise);
void Wakeup();
void DoPendingTasks();

Expand All @@ -42,11 +44,11 @@ class EventLoopThread {
struct event_base *base_ = nullptr;

std::unique_ptr<std::thread> thd_;
std::atomic<bool> stoped_;
std::atomic<bool> stoped_{true};
evutil_socket_t wakeup_fd_[2]; // 0:read 1:write

std::mutex task_mutex_;
std::vector<Functor> pending_tasks_;
absl::Mutex task_mutex_;
std::vector<Functor> pending_tasks_ GUARDED_BY(task_mutex_);
};

} // namespace Transport
Expand Down
8 changes: 6 additions & 2 deletions tests/tsan-flow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ void doAnotherEntry() { doEntry("big_brother_service:foo()"); }
int main() {
// Initialize for Sentinel.
Sentinel::Log::Logger::InitDefaultLogger();
Sentinel::Transport::HttpCommandCenterInitTarget command_center_init;
command_center_init.Initialize();
Sentinel::Transport::HttpCommandCenterInitTarget* p_command_center_init =
new Sentinel::Transport::HttpCommandCenterInitTarget();
p_command_center_init->Initialize();
Sentinel::Log::MetricLogTask metric_log_task;
metric_log_task.Initialize();

Expand Down Expand Up @@ -73,5 +74,8 @@ int main() {
t4.join();
t5.join();
t6.join();

delete p_command_center_init;

return 0;
}

0 comments on commit 45a5802

Please sign in to comment.