summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2025-05-29 18:19:09 +0200
committerCarl Hetherington <cth@carlh.net>2025-05-29 21:08:14 +0200
commit4c036673a75f9073b2cbe3bc0bd01d73126fb64e (patch)
tree86b6b7d9e05776f0234125a164e9660941fafec5
parent4c59babf7f6bd74ef05a55d7c8d61641e807e887 (diff)
Cleanup: coding style.
-rw-r--r--src/lib/grok/messenger.cc6
-rw-r--r--src/lib/grok/messenger.h99
2 files changed, 66 insertions, 39 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc
index 1fe7fbbc9..a12ad0486 100644
--- a/src/lib/grok/messenger.cc
+++ b/src/lib/grok/messenger.cc
@@ -93,7 +93,7 @@ Messenger::processor_thread()
{
while (_running) {
std::string message;
- if (!_receive_queue.waitAndPop(message)) {
+ if (!_receive_queue.wait_and_pop(message)) {
break;
}
@@ -141,7 +141,7 @@ Messenger::outbound_thread()
break;
}
std::string message;
- if (!_send_queue.waitAndPop(message)) {
+ if (!_send_queue.wait_and_pop(message)) {
break;
}
if (!_running) {
@@ -398,7 +398,7 @@ bool
Messenger::schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter)
{
BufferSrc src;
- if (!_available_buffers.waitAndPop(src)) {
+ if (!_available_buffers.wait_and_pop(src)) {
return false;
}
converter(src);
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
index f3b6f915a..96b728140 100644
--- a/src/lib/grok/messenger.h
+++ b/src/lib/grok/messenger.h
@@ -146,99 +146,126 @@ private:
template<typename Data>
class MessengerBlockingQueue
{
- public:
- explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
- MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
+public:
+ explicit MessengerBlockingQueue(size_t max)
+ : _max_size(max)
+ {
+
+ }
+
+ MessengerBlockingQueue()
+ : MessengerBlockingQueue(UINT_MAX)
+ {
+
+ }
+
size_t size() const
{
- return queue_.size();
+ return _queue.size();
}
+
// deactivate and clear queue
void deactivate()
{
{
- std::lock_guard<std::mutex> lk(mutex_);
- active_ = false;
- while(!queue_.empty())
- queue_.pop();
+ std::lock_guard<std::mutex> lk(_mutex);
+ _active = false;
+ while (!_queue.empty()) {
+ _queue.pop();
+ }
}
// release all waiting threads
- can_pop_.notify_all();
- can_push_.notify_all();
+ _can_pop.notify_all();
+ _can_push.notify_all();
}
+
void activate()
{
- std::lock_guard<std::mutex> lk(mutex_);
- active_ = true;
+ std::lock_guard<std::mutex> lk(_mutex);
+ _active = true;
}
+
bool push(Data const& value)
{
bool rc;
{
- std::unique_lock<std::mutex> lk(mutex_);
+ std::unique_lock<std::mutex> lk(_mutex);
rc = push_unlocked(value);
}
- if(rc)
- can_pop_.notify_one();
+
+ if (rc) {
+ _can_pop.notify_one();
+ }
return rc;
}
+
bool pop(Data& value)
{
bool rc;
{
- std::unique_lock<std::mutex> lk(mutex_);
+ std::unique_lock<std::mutex> lk(_mutex);
rc = pop_unlocked(value);
}
- if(rc)
- can_push_.notify_one();
+ if (rc) {
+ _can_push.notify_one();
+ }
return rc;
}
- bool waitAndPop(Data& value)
+
+ bool wait_and_pop(Data& value)
{
bool rc;
{
- std::unique_lock<std::mutex> lk(mutex_);
- if(!active_)
+ std::unique_lock<std::mutex> lk(_mutex);
+ if (!_active) {
return false;
+ }
// in case of spurious wakeup, loop until predicate in lambda
// is satisfied.
- can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
+ _can_pop.wait(lk, [this] { return !_queue.empty() || !_active; });
rc = pop_unlocked(value);
}
- if(rc)
- can_push_.notify_one();
+ if (rc) {
+ _can_push.notify_one();
+ }
return rc;
}
- private:
+private:
bool push_unlocked(Data const& value)
{
- if(queue_.size() == max_size_ || !active_)
+ if (_queue.size() == _max_size || !_active) {
return false;
- queue_.push(value);
+ }
+
+ _queue.push(value);
return true;
}
bool pop_unlocked(Data& value)
{
- if(queue_.empty() || !active_)
+ if (_queue.empty() || !_active) {
return false;
- value = queue_.front();
- queue_.pop();
+ }
+
+ value = _queue.front();
+ _queue.pop();
return true;
}
- std::queue<Data> queue_;
- mutable std::mutex mutex_;
- std::condition_variable can_pop_;
- std::condition_variable can_push_;
- bool active_;
- size_t max_size_;
+
+ std::queue<Data> _queue;
+
+ mutable std::mutex _mutex;
+ std::condition_variable _can_pop;
+ std::condition_variable _can_push;
+ bool _active = true;
+ size_t _max_size;
};