Merge pull request #4889 from khaneliman/bugfix/stab-001-002-sleeper-thread
fix(util): ensure SleeperThread lifecycle safety and thread sync
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <ctime>
|
#include <ctime>
|
||||||
@@ -31,8 +32,8 @@ class SleeperThread {
|
|||||||
|
|
||||||
SleeperThread(std::function<void()> func)
|
SleeperThread(std::function<void()> func)
|
||||||
: thread_{[this, func] {
|
: thread_{[this, func] {
|
||||||
while (do_run_) {
|
while (do_run_.load(std::memory_order_relaxed)) {
|
||||||
signal_ = false;
|
signal_.store(false, std::memory_order_relaxed);
|
||||||
func();
|
func();
|
||||||
}
|
}
|
||||||
}} {
|
}} {
|
||||||
@@ -42,9 +43,18 @@ class SleeperThread {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SleeperThread& operator=(std::function<void()> func) {
|
SleeperThread& operator=(std::function<void()> func) {
|
||||||
|
if (thread_.joinable()) {
|
||||||
|
stop();
|
||||||
|
thread_.join();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lck(mutex_);
|
||||||
|
do_run_.store(true, std::memory_order_relaxed);
|
||||||
|
signal_.store(false, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
thread_ = std::thread([this, func] {
|
thread_ = std::thread([this, func] {
|
||||||
while (do_run_) {
|
while (do_run_.load(std::memory_order_relaxed)) {
|
||||||
signal_ = false;
|
signal_.store(false, std::memory_order_relaxed);
|
||||||
func();
|
func();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -56,12 +66,15 @@ class SleeperThread {
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isRunning() const { return do_run_; }
|
bool isRunning() const { return do_run_.load(std::memory_order_relaxed); }
|
||||||
|
|
||||||
auto sleep() {
|
auto sleep() {
|
||||||
std::unique_lock lk(mutex_);
|
std::unique_lock lk(mutex_);
|
||||||
CancellationGuard cancel_lock;
|
CancellationGuard cancel_lock;
|
||||||
return condvar_.wait(lk, [this] { return signal_ || !do_run_; });
|
return condvar_.wait(lk, [this] {
|
||||||
|
return signal_.load(std::memory_order_relaxed) ||
|
||||||
|
!do_run_.load(std::memory_order_relaxed);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sleep_for(std::chrono::system_clock::duration dur) {
|
auto sleep_for(std::chrono::system_clock::duration dur) {
|
||||||
@@ -73,7 +86,10 @@ class SleeperThread {
|
|||||||
if (now < max_time_point - dur) {
|
if (now < max_time_point - dur) {
|
||||||
wait_end = now + dur;
|
wait_end = now + dur;
|
||||||
}
|
}
|
||||||
return condvar_.wait_until(lk, wait_end, [this] { return signal_ || !do_run_; });
|
return condvar_.wait_until(lk, wait_end, [this] {
|
||||||
|
return signal_.load(std::memory_order_relaxed) ||
|
||||||
|
!do_run_.load(std::memory_order_relaxed);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sleep_until(
|
auto sleep_until(
|
||||||
@@ -81,22 +97,25 @@ class SleeperThread {
|
|||||||
time_point) {
|
time_point) {
|
||||||
std::unique_lock lk(mutex_);
|
std::unique_lock lk(mutex_);
|
||||||
CancellationGuard cancel_lock;
|
CancellationGuard cancel_lock;
|
||||||
return condvar_.wait_until(lk, time_point, [this] { return signal_ || !do_run_; });
|
return condvar_.wait_until(lk, time_point, [this] {
|
||||||
|
return signal_.load(std::memory_order_relaxed) ||
|
||||||
|
!do_run_.load(std::memory_order_relaxed);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void wake_up() {
|
void wake_up() {
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lck(mutex_);
|
std::lock_guard<std::mutex> lck(mutex_);
|
||||||
signal_ = true;
|
signal_.store(true, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
condvar_.notify_all();
|
condvar_.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto stop() {
|
void stop() {
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lck(mutex_);
|
std::lock_guard<std::mutex> lck(mutex_);
|
||||||
signal_ = true;
|
signal_.store(true, std::memory_order_relaxed);
|
||||||
do_run_ = false;
|
do_run_.store(false, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
condvar_.notify_all();
|
condvar_.notify_all();
|
||||||
auto handle = thread_.native_handle();
|
auto handle = thread_.native_handle();
|
||||||
@@ -118,8 +137,8 @@ class SleeperThread {
|
|||||||
std::thread thread_;
|
std::thread thread_;
|
||||||
std::condition_variable condvar_;
|
std::condition_variable condvar_;
|
||||||
std::mutex mutex_;
|
std::mutex mutex_;
|
||||||
bool do_run_ = true;
|
std::atomic<bool> do_run_ = true;
|
||||||
bool signal_ = false;
|
std::atomic<bool> signal_ = false;
|
||||||
sigc::connection connection_;
|
sigc::connection connection_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ test_src = files(
|
|||||||
'../../src/config.cpp',
|
'../../src/config.cpp',
|
||||||
'JsonParser.cpp',
|
'JsonParser.cpp',
|
||||||
'SafeSignal.cpp',
|
'SafeSignal.cpp',
|
||||||
|
'sleeper_thread.cpp',
|
||||||
'css_reload_helper.cpp',
|
'css_reload_helper.cpp',
|
||||||
'../../src/util/css_reload_helper.cpp',
|
'../../src/util/css_reload_helper.cpp',
|
||||||
)
|
)
|
||||||
|
|||||||
79
test/utils/sleeper_thread.cpp
Normal file
79
test/utils/sleeper_thread.cpp
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
#if __has_include(<catch2/catch_test_macros.hpp>)
|
||||||
|
#include <catch2/catch_test_macros.hpp>
|
||||||
|
#else
|
||||||
|
#include <catch2/catch.hpp>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <sys/wait.h>
|
||||||
|
#include <thread>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "util/sleeper_thread.hpp"
|
||||||
|
|
||||||
|
namespace waybar::util {
|
||||||
|
SafeSignal<bool>& prepare_for_sleep() {
|
||||||
|
static SafeSignal<bool> signal;
|
||||||
|
return signal;
|
||||||
|
}
|
||||||
|
} // namespace waybar::util
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
int run_in_subprocess(int (*task)()) {
|
||||||
|
const auto pid = fork();
|
||||||
|
if (pid < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pid == 0) {
|
||||||
|
alarm(5);
|
||||||
|
_exit(task());
|
||||||
|
}
|
||||||
|
|
||||||
|
int status = -1;
|
||||||
|
if (waitpid(pid, &status, 0) != pid) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (!WIFEXITED(status)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return WEXITSTATUS(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
int run_reassignment_regression() {
|
||||||
|
waybar::util::SleeperThread thread;
|
||||||
|
thread = [] { std::this_thread::sleep_for(std::chrono::milliseconds(10)); };
|
||||||
|
thread = [] { std::this_thread::sleep_for(std::chrono::milliseconds(1)); };
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int run_control_flag_stress() {
|
||||||
|
for (int i = 0; i < 200; ++i) {
|
||||||
|
waybar::util::SleeperThread thread;
|
||||||
|
thread = [&thread] { thread.sleep_for(std::chrono::milliseconds(1)); };
|
||||||
|
|
||||||
|
std::thread waker([&thread] {
|
||||||
|
for (int j = 0; j < 100; ++j) {
|
||||||
|
thread.wake_up();
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
||||||
|
thread.stop();
|
||||||
|
waker.join();
|
||||||
|
if (thread.isRunning()) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
TEST_CASE("SleeperThread reassignment does not terminate process", "[util][sleeper_thread]") {
|
||||||
|
REQUIRE(run_in_subprocess(run_reassignment_regression) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("SleeperThread control flags are stable under concurrent wake and stop",
|
||||||
|
"[util][sleeper_thread]") {
|
||||||
|
REQUIRE(run_in_subprocess(run_control_flag_stress) == 0);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user