fix(sleeper-thread): synchronize control flags with atomics
SleeperThread control flags were shared across threads without consistent synchronization. I converted the run/signal flags to atomics and updated wait predicates and lifecycle transitions to use explicit atomic loads/stores. Signed-off-by: Austin Horstman <khaneliman12@gmail.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <ctime>
|
||||
@@ -31,8 +32,8 @@ class SleeperThread {
|
||||
|
||||
SleeperThread(std::function<void()> func)
|
||||
: thread_{[this, func] {
|
||||
while (do_run_) {
|
||||
signal_ = false;
|
||||
while (do_run_.load(std::memory_order_relaxed)) {
|
||||
signal_.store(false, std::memory_order_relaxed);
|
||||
func();
|
||||
}
|
||||
}} {
|
||||
@@ -48,12 +49,12 @@ class SleeperThread {
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
do_run_ = true;
|
||||
signal_ = false;
|
||||
do_run_.store(true, std::memory_order_relaxed);
|
||||
signal_.store(false, std::memory_order_relaxed);
|
||||
}
|
||||
thread_ = std::thread([this, func] {
|
||||
while (do_run_) {
|
||||
signal_ = false;
|
||||
while (do_run_.load(std::memory_order_relaxed)) {
|
||||
signal_.store(false, std::memory_order_relaxed);
|
||||
func();
|
||||
}
|
||||
});
|
||||
@@ -65,12 +66,15 @@ class SleeperThread {
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool isRunning() const { return do_run_; }
|
||||
bool isRunning() const { return do_run_.load(std::memory_order_relaxed); }
|
||||
|
||||
auto sleep() {
|
||||
std::unique_lock lk(mutex_);
|
||||
CancellationGuard cancel_lock;
|
||||
return condvar_.wait(lk, [this] { return signal_ || !do_run_; });
|
||||
return condvar_.wait(lk, [this] {
|
||||
return signal_.load(std::memory_order_relaxed) ||
|
||||
!do_run_.load(std::memory_order_relaxed);
|
||||
});
|
||||
}
|
||||
|
||||
auto sleep_for(std::chrono::system_clock::duration dur) {
|
||||
@@ -82,7 +86,10 @@ class SleeperThread {
|
||||
if (now < max_time_point - dur) {
|
||||
wait_end = now + dur;
|
||||
}
|
||||
return condvar_.wait_until(lk, wait_end, [this] { return signal_ || !do_run_; });
|
||||
return condvar_.wait_until(lk, wait_end, [this] {
|
||||
return signal_.load(std::memory_order_relaxed) ||
|
||||
!do_run_.load(std::memory_order_relaxed);
|
||||
});
|
||||
}
|
||||
|
||||
auto sleep_until(
|
||||
@@ -90,13 +97,16 @@ class SleeperThread {
|
||||
time_point) {
|
||||
std::unique_lock lk(mutex_);
|
||||
CancellationGuard cancel_lock;
|
||||
return condvar_.wait_until(lk, time_point, [this] { return signal_ || !do_run_; });
|
||||
return condvar_.wait_until(lk, time_point, [this] {
|
||||
return signal_.load(std::memory_order_relaxed) ||
|
||||
!do_run_.load(std::memory_order_relaxed);
|
||||
});
|
||||
}
|
||||
|
||||
void wake_up() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
signal_ = true;
|
||||
signal_.store(true, std::memory_order_relaxed);
|
||||
}
|
||||
condvar_.notify_all();
|
||||
}
|
||||
@@ -104,8 +114,8 @@ class SleeperThread {
|
||||
void stop() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
signal_ = true;
|
||||
do_run_ = false;
|
||||
signal_.store(true, std::memory_order_relaxed);
|
||||
do_run_.store(false, std::memory_order_relaxed);
|
||||
}
|
||||
condvar_.notify_all();
|
||||
auto handle = thread_.native_handle();
|
||||
@@ -127,8 +137,8 @@ class SleeperThread {
|
||||
std::thread thread_;
|
||||
std::condition_variable condvar_;
|
||||
std::mutex mutex_;
|
||||
bool do_run_ = true;
|
||||
bool signal_ = false;
|
||||
std::atomic<bool> do_run_ = true;
|
||||
std::atomic<bool> signal_ = false;
|
||||
sigc::connection connection_;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user