diff options
| author | Carl Hetherington <cth@carlh.net> | 2025-05-16 22:15:47 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2025-05-28 00:33:55 +0200 |
| commit | d16b4f52f72d95ed9d88d2e74103c4a06a6b1e62 (patch) | |
| tree | bada9c2b1a17482818f63b3f817aa047ace645d7 | |
| parent | 893728e7efe6f40d6fa1c785cdbc4820e4d8f892 (diff) | |
Cleanup: rename variables and make more private.
| -rw-r--r-- | src/lib/grok/messenger.cc | 36 | ||||
| -rw-r--r-- | src/lib/grok/messenger.h | 37 |
2 files changed, 36 insertions, 37 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index 83ac18fac..df8b623f7 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -36,8 +36,8 @@ Messenger::Messenger(MessengerInit init) Messenger::~Messenger() { _running = false; - sendQueue.deactivate(); - receiveQueue.deactivate(); + _send_queue.deactivate(); + _receive_queue.deactivate(); if (outboundSynch_) { outboundSynch_->post(SYNCH_RECEIVE_READY); @@ -68,7 +68,7 @@ Messenger::processor_thread() { while (_running) { std::string message; - if (!receiveQueue.waitAndPop(message)) { + if (!_receive_queue.waitAndPop(message)) { break; } @@ -114,7 +114,7 @@ Messenger::outbound_thread(const std::string &sendBuf, Synch* synch) break; } std::string message; - if (!sendQueue.waitAndPop(message)) { + if (!_send_queue.waitAndPop(message)) { break; } if (!_running) { @@ -145,7 +145,7 @@ Messenger::inbound_thread(const std::string &receiveBuf, Synch* synch) } auto message = std::string(receive_buffer); synch->post(SYNCH_RECEIVE_READY); - receiveQueue.push(message); + _receive_queue.push(message); } SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer); @@ -226,8 +226,8 @@ Messenger::launch_grok( const std::string license ) { - std::unique_lock<std::mutex> lk(shutdownMutex_); - if (async_result_.valid()) { + std::unique_lock<std::mutex> lk(_shutdown_mutex); + if (_async_result.valid()) { return true; } _init.unlink(); @@ -255,13 +255,13 @@ Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, auto ptr = uncompressed_buffer_; for(size_t i = 0; i < _init.numFrames_; ++i) { - availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr)); + _available_buffers.push(BufferSrc(0, i, (uint8_t*)ptr)); ptr += _init.uncompressedFrameSize_; } - std::unique_lock<std::mutex> lk(shutdownMutex_); + std::unique_lock<std::mutex> lk(_shutdown_mutex); _initialized = true; - clientInitializedCondition_.notify_all(); + _client_initialized_condition.notify_all(); } bool @@ -273,7 +273,7 @@ Messenger::waitForClientInit() return false; } - std::unique_lock<std::mutex> lk(shutdownMutex_); + std::unique_lock<std::mutex> lk(_shutdown_mutex); if (_initialized) { return true; @@ -282,10 +282,10 @@ Messenger::waitForClientInit() } while (true) { - if (clientInitializedCondition_.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) { + if (_client_initialized_condition.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) { break; } - auto status = async_result_.wait_for(std::chrono::milliseconds(100)); + auto status = _async_result.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { getMessengerLogger()->error("Grok exited unexpectedly during initialization"); return false; @@ -306,14 +306,14 @@ Messenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixe void Messenger::reclaimCompressed(size_t frameId) { - availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); + _available_buffers.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); } void Messenger::reclaimUncompressed(size_t frameId) { - availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); + _available_buffers.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); } @@ -352,10 +352,10 @@ Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir) } } // Execute the command using std::async and std::system - cmd_ = cmd; + _cmd = cmd; getMessengerLogger()->info(cmd.c_str()); - async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); }); - bool success = async_result_.valid(); + _async_result = std::async(std::launch::async, [this]() { return std::system(_cmd.c_str()); }); + bool success = _async_result.valid(); if (!success) getMessengerLogger()->error("Grok launch failed"); diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index f98c40074..5f85e931b 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -451,7 +451,7 @@ public: int dummy[] = {0, ((void)(oss << ',' << args), 0)...}; static_cast<void>(dummy); - sendQueue.push(oss.str()); + _send_queue.push(oss.str()); } bool launch_grok( @@ -479,18 +479,13 @@ public: static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel); - bool _initialized = false; - bool _shutdown = false; - MessengerBlockingQueue<std::string> sendQueue; - MessengerBlockingQueue<std::string> receiveQueue; - MessengerBlockingQueue<BufferSrc> availableBuffers_; - std::string cmd_; - std::future<int> async_result_; - std::mutex shutdownMutex_; - std::condition_variable shutdownCondition_; - protected: - std::condition_variable clientInitializedCondition_; + std::condition_variable _client_initialized_condition; + MessengerBlockingQueue<BufferSrc> _available_buffers; + std::future<int> _async_result; + std::condition_variable _shutdown_condition; + bool _shutdown = false; + std::mutex _shutdown_mutex; private: bool launch(std::string const& cmd, boost::filesystem::path const& dir); @@ -500,6 +495,10 @@ private: std::atomic_bool _running; MessengerInit _init; + bool _initialized = false; + MessengerBlockingQueue<std::string> _send_queue; + MessengerBlockingQueue<std::string> _receive_queue; + std::string _cmd; std::thread outbound; Synch* outboundSynch_ = nullptr; @@ -593,7 +592,7 @@ struct ScheduledMessenger : public Messenger bool schedule_compress(F const& proxy, std::function<void(BufferSrc const&)> converter) { BufferSrc src; - if (!availableBuffers_.waitAndPop(src)) { + if (!_available_buffers.waitAndPop(src)) { return false; } converter(src); @@ -623,26 +622,26 @@ struct ScheduledMessenger : public Messenger send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); if (_shutdown && _frames_compressed == _frames_scheduled) { - shutdownCondition_.notify_all(); + _shutdown_condition.notify_all(); } } void shutdown() { try { - std::unique_lock<std::mutex> lk(shutdownMutex_); - if (!async_result_.valid()) { + std::unique_lock<std::mutex> lk(_shutdown_mutex); + if (!_async_result.valid()) { return; } _shutdown = true; if (_frames_scheduled) { uint32_t scheduled = _frames_scheduled; send(GRK_MSGR_BATCH_FLUSH, scheduled); - shutdownCondition_.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); + _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); } - availableBuffers_.deactivate(); + _available_buffers.deactivate(); send(GRK_MSGR_BATCH_SHUTDOWN); - int result = async_result_.get(); + int result = _async_result.get(); if(result != 0) { getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); } |
