fix(util): bound SafeSignal queue growth under burst load
SafeSignal could queue events forever when worker threads emitted faster than the main loop could consume, which risks memory growth and stale updates. I added a queue cap with a drop-oldest policy so growth stays bounded under burst load, plus a regression test that validates bounded delivery. Signed-off-by: Austin Horstman <khaneliman12@gmail.com>
This commit is contained in:
@@ -6,6 +6,7 @@
|
|||||||
#include <functional>
|
#include <functional>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
|
#include <cstddef>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
@@ -27,6 +28,12 @@ struct SafeSignal : sigc::signal<void(std::decay_t<Args>...)> {
|
|||||||
public:
|
public:
|
||||||
SafeSignal() { dp_.connect(sigc::mem_fun(*this, &SafeSignal::handle_event)); }
|
SafeSignal() { dp_.connect(sigc::mem_fun(*this, &SafeSignal::handle_event)); }
|
||||||
|
|
||||||
|
void set_max_queued_events(std::size_t max_queued_events) {
|
||||||
|
std::unique_lock lock(mutex_);
|
||||||
|
max_queued_events_ = max_queued_events;
|
||||||
|
trim_queue_locked();
|
||||||
|
}
|
||||||
|
|
||||||
template <typename... EmitArgs>
|
template <typename... EmitArgs>
|
||||||
void emit(EmitArgs&&... args) {
|
void emit(EmitArgs&&... args) {
|
||||||
if (main_tid_ == std::this_thread::get_id()) {
|
if (main_tid_ == std::this_thread::get_id()) {
|
||||||
@@ -41,6 +48,9 @@ struct SafeSignal : sigc::signal<void(std::decay_t<Args>...)> {
|
|||||||
} else {
|
} else {
|
||||||
{
|
{
|
||||||
std::unique_lock lock(mutex_);
|
std::unique_lock lock(mutex_);
|
||||||
|
if (max_queued_events_ != 0 && queue_.size() >= max_queued_events_) {
|
||||||
|
queue_.pop();
|
||||||
|
}
|
||||||
queue_.emplace(std::forward<EmitArgs>(args)...);
|
queue_.emplace(std::forward<EmitArgs>(args)...);
|
||||||
}
|
}
|
||||||
dp_.emit();
|
dp_.emit();
|
||||||
@@ -60,6 +70,15 @@ struct SafeSignal : sigc::signal<void(std::decay_t<Args>...)> {
|
|||||||
using signal_t::emit_reverse;
|
using signal_t::emit_reverse;
|
||||||
using signal_t::make_slot;
|
using signal_t::make_slot;
|
||||||
|
|
||||||
|
void trim_queue_locked() {
|
||||||
|
if (max_queued_events_ == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
while (queue_.size() > max_queued_events_) {
|
||||||
|
queue_.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void handle_event() {
|
void handle_event() {
|
||||||
for (std::unique_lock lock(mutex_); !queue_.empty(); lock.lock()) {
|
for (std::unique_lock lock(mutex_); !queue_.empty(); lock.lock()) {
|
||||||
auto args = queue_.front();
|
auto args = queue_.front();
|
||||||
@@ -72,6 +91,7 @@ struct SafeSignal : sigc::signal<void(std::decay_t<Args>...)> {
|
|||||||
Glib::Dispatcher dp_;
|
Glib::Dispatcher dp_;
|
||||||
std::mutex mutex_;
|
std::mutex mutex_;
|
||||||
std::queue<arg_tuple_t> queue_;
|
std::queue<arg_tuple_t> queue_;
|
||||||
|
std::size_t max_queued_events_ = 4096;
|
||||||
const std::thread::id main_tid_ = std::this_thread::get_id();
|
const std::thread::id main_tid_ = std::this_thread::get_id();
|
||||||
// cache functor for signal emission to avoid recreating it on each event
|
// cache functor for signal emission to avoid recreating it on each event
|
||||||
const slot_t cached_fn_ = make_slot();
|
const slot_t cached_fn_ = make_slot();
|
||||||
|
|||||||
@@ -9,6 +9,7 @@
|
|||||||
#endif
|
#endif
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
#include "fixtures/GlibTestsFixture.hpp"
|
#include "fixtures/GlibTestsFixture.hpp"
|
||||||
|
|
||||||
@@ -141,3 +142,33 @@ TEST_CASE_METHOD(GlibTestsFixture, "SafeSignal copy/move counter", "[signal][thr
|
|||||||
producer.join();
|
producer.join();
|
||||||
REQUIRE(count == NUM_EVENTS);
|
REQUIRE(count == NUM_EVENTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE_METHOD(GlibTestsFixture, "SafeSignal queue stays bounded under burst load",
|
||||||
|
"[signal][thread][util][perf]") {
|
||||||
|
constexpr int NUM_EVENTS = 200;
|
||||||
|
constexpr std::size_t MAX_QUEUED_EVENTS = 8;
|
||||||
|
std::vector<int> received;
|
||||||
|
|
||||||
|
SafeSignal<int> test_signal;
|
||||||
|
test_signal.set_max_queued_events(MAX_QUEUED_EVENTS);
|
||||||
|
|
||||||
|
setTimeout(500);
|
||||||
|
|
||||||
|
test_signal.connect([&](auto value) { received.push_back(value); });
|
||||||
|
|
||||||
|
run([&]() {
|
||||||
|
std::thread producer([&]() {
|
||||||
|
for (int i = 1; i <= NUM_EVENTS; ++i) {
|
||||||
|
test_signal.emit(i);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
producer.join();
|
||||||
|
|
||||||
|
Glib::signal_timeout().connect_once([this]() { this->quit(); }, 50);
|
||||||
|
});
|
||||||
|
|
||||||
|
REQUIRE(received.size() <= MAX_QUEUED_EVENTS);
|
||||||
|
REQUIRE_FALSE(received.empty());
|
||||||
|
REQUIRE(received.back() == NUM_EVENTS);
|
||||||
|
REQUIRE(received.front() == NUM_EVENTS - static_cast<int>(received.size()) + 1);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user