diff options
| author | Carl Hetherington <cth@carlh.net> | 2025-05-29 18:19:09 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2025-05-29 21:08:14 +0200 |
| commit | 4c036673a75f9073b2cbe3bc0bd01d73126fb64e (patch) | |
| tree | 86b6b7d9e05776f0234125a164e9660941fafec5 | |
| parent | 4c59babf7f6bd74ef05a55d7c8d61641e807e887 (diff) | |
Cleanup: coding style.
| -rw-r--r-- | src/lib/grok/messenger.cc | 6 | ||||
| -rw-r--r-- | src/lib/grok/messenger.h | 99 |
2 files changed, 66 insertions, 39 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index 1fe7fbbc9..a12ad0486 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -93,7 +93,7 @@ Messenger::processor_thread() { while (_running) { std::string message; - if (!_receive_queue.waitAndPop(message)) { + if (!_receive_queue.wait_and_pop(message)) { break; } @@ -141,7 +141,7 @@ Messenger::outbound_thread() break; } std::string message; - if (!_send_queue.waitAndPop(message)) { + if (!_send_queue.wait_and_pop(message)) { break; } if (!_running) { @@ -398,7 +398,7 @@ bool Messenger::schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter) { BufferSrc src; - if (!_available_buffers.waitAndPop(src)) { + if (!_available_buffers.wait_and_pop(src)) { return false; } converter(src); diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index f3b6f915a..96b728140 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -146,99 +146,126 @@ private: template<typename Data> class MessengerBlockingQueue { - public: - explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {} - MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {} +public: + explicit MessengerBlockingQueue(size_t max) + : _max_size(max) + { + + } + + MessengerBlockingQueue() + : MessengerBlockingQueue(UINT_MAX) + { + + } + size_t size() const { - return queue_.size(); + return _queue.size(); } + // deactivate and clear queue void deactivate() { { - std::lock_guard<std::mutex> lk(mutex_); - active_ = false; - while(!queue_.empty()) - queue_.pop(); + std::lock_guard<std::mutex> lk(_mutex); + _active = false; + while (!_queue.empty()) { + _queue.pop(); + } } // release all waiting threads - can_pop_.notify_all(); - can_push_.notify_all(); + _can_pop.notify_all(); + _can_push.notify_all(); } + void activate() { - std::lock_guard<std::mutex> lk(mutex_); - active_ = true; + std::lock_guard<std::mutex> lk(_mutex); + _active = true; } + bool push(Data const& value) { bool rc; { - std::unique_lock<std::mutex> lk(mutex_); + std::unique_lock<std::mutex> lk(_mutex); rc = push_unlocked(value); } - if(rc) - can_pop_.notify_one(); + + if (rc) { + _can_pop.notify_one(); + } return rc; } + bool pop(Data& value) { bool rc; { - std::unique_lock<std::mutex> lk(mutex_); + std::unique_lock<std::mutex> lk(_mutex); rc = pop_unlocked(value); } - if(rc) - can_push_.notify_one(); + if (rc) { + _can_push.notify_one(); + } return rc; } - bool waitAndPop(Data& value) + + bool wait_and_pop(Data& value) { bool rc; { - std::unique_lock<std::mutex> lk(mutex_); - if(!active_) + std::unique_lock<std::mutex> lk(_mutex); + if (!_active) { return false; + } // in case of spurious wakeup, loop until predicate in lambda // is satisfied. - can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; }); + _can_pop.wait(lk, [this] { return !_queue.empty() || !_active; }); rc = pop_unlocked(value); } - if(rc) - can_push_.notify_one(); + if (rc) { + _can_push.notify_one(); + } return rc; } - private: +private: bool push_unlocked(Data const& value) { - if(queue_.size() == max_size_ || !active_) + if (_queue.size() == _max_size || !_active) { return false; - queue_.push(value); + } + + _queue.push(value); return true; } bool pop_unlocked(Data& value) { - if(queue_.empty() || !active_) + if (_queue.empty() || !_active) { return false; - value = queue_.front(); - queue_.pop(); + } + + value = _queue.front(); + _queue.pop(); return true; } - std::queue<Data> queue_; - mutable std::mutex mutex_; - std::condition_variable can_pop_; - std::condition_variable can_push_; - bool active_; - size_t max_size_; + + std::queue<Data> _queue; + + mutable std::mutex _mutex; + std::condition_variable _can_pop; + std::condition_variable _can_push; + bool _active = true; + size_t _max_size; }; |
