diff options
| author | Carl Hetherington <cth@carlh.net> | 2025-05-16 22:04:03 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2025-05-28 00:33:55 +0200 |
| commit | 893728e7efe6f40d6fa1c785cdbc4820e4d8f892 (patch) | |
| tree | 4e8941687fd3ea677d26bfb2fb266a1c6fb4cb9e /src | |
| parent | 9057c23f81e5ace350cca5a7e5e78bd9278f320c (diff) | |
Cleanup: make running and init_ private and rename them.
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/grok/messenger.cc | 70 | ||||
| -rw-r--r-- | src/lib/grok/messenger.h | 6 |
2 files changed, 39 insertions, 37 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index d9b3692e0..83ac18fac 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -26,8 +26,8 @@ using namespace grk_plugin; Messenger::Messenger(MessengerInit init) - : running(true) - , init_(init) + : _running(true) + , _init(init) { } @@ -35,7 +35,7 @@ Messenger::Messenger(MessengerInit init) Messenger::~Messenger() { - running = false; + _running = false; sendQueue.deactivate(); receiveQueue.deactivate(); @@ -66,13 +66,13 @@ Messenger::~Messenger() void Messenger::processor_thread() { - while (running) { + while (_running) { std::string message; if (!receiveQueue.waitAndPop(message)) { break; } - if (!running) { + if (!_running) { break; } @@ -84,7 +84,7 @@ Messenger::processor_thread() auto height = msg.nextUint(); auto samples_per_pixel = msg.nextUint(); msg.nextUint(); // depth - init_.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); + _init.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); auto compressed_frame_size = msg.nextUint(); auto num_frames = msg.nextUint(); initClient(compressed_frame_size, compressed_frame_size, num_frames); @@ -93,7 +93,7 @@ Messenger::processor_thread() } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) { reclaimCompressed(msg.nextUint()); } - init_.processor_(message); + _init.processor_(message); } } @@ -108,16 +108,16 @@ Messenger::outbound_thread(const std::string &sendBuf, Synch* synch) return; } - while (running) { + while (_running) { synch->wait(SYNCH_RECEIVE_READY); - if (!running) { + if (!_running) { break; } std::string message; if (!sendQueue.waitAndPop(message)) { break; } - if (!running) { + if (!_running) { break; } memcpy(send_buffer, message.c_str(), message.size() + 1); @@ -138,9 +138,9 @@ Messenger::inbound_thread(const std::string &receiveBuf, Synch* synch) return; } - while (running) { + while (_running) { synch->wait(SYNCH_SENT); - if (!running) { + if (!_running) { break; } auto message = std::string(receive_buffer); @@ -155,13 +155,13 @@ Messenger::inbound_thread(const std::string &receiveBuf, Synch* synch) void Messenger::startThreads() { - outboundSynch_ = new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch); - outbound = std::thread(&Messenger::outbound_thread, this, init_.outboundMessageBuf, outboundSynch_); + outboundSynch_ = new Synch(_init.outboundSentSynch, _init.outboundReceiveReadySynch); + outbound = std::thread(&Messenger::outbound_thread, this, _init.outboundMessageBuf, outboundSynch_); - inboundSynch_ = new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch); - inbound = std::thread(&Messenger::inbound_thread, this, init_.inboundMessageBuf, inboundSynch_); + inboundSynch_ = new Synch(_init.inboundSentSynch, _init.inboundReceiveReadySynch); + inbound = std::thread(&Messenger::inbound_thread, this, _init.inboundMessageBuf, inboundSynch_); - for (size_t i = 0; i < init_.numProcessingThreads_; ++i) { + for (size_t i = 0; i < _init.numProcessingThreads_; ++i) { processors_.push_back(std::thread(&Messenger::processor_thread, this)); } } @@ -171,18 +171,18 @@ bool Messenger::initBuffers() { bool rc = true; - if (init_.uncompressedFrameSize_) { + if (_init.uncompressedFrameSize_) { rc = rc && SharedMemoryManager::initShm( grokUncompressedBuf, - init_.uncompressedFrameSize_ * init_.numFrames_, + _init.uncompressedFrameSize_ * _init.numFrames_, &uncompressed_fd_, &uncompressed_buffer_ ); } - if (init_.compressedFrameSize_) { + if (_init.compressedFrameSize_) { rc = rc && SharedMemoryManager::initShm( grokCompressedBuf, - init_.compressedFrameSize_ * init_.numFrames_, + _init.compressedFrameSize_ * _init.numFrames_, &compressed_fd_, &compressed_buffer_ ); } @@ -196,13 +196,13 @@ Messenger::deinit_shm() { bool rc = SharedMemoryManager::deinit_shm( grokUncompressedBuf, - init_.uncompressedFrameSize_ * init_.numFrames_, + _init.uncompressedFrameSize_ * _init.numFrames_, uncompressed_fd_, &uncompressed_buffer_ ); rc = rc && SharedMemoryManager::deinit_shm( grokCompressedBuf, - init_.compressedFrameSize_ * init_.numFrames_, + _init.compressedFrameSize_ * _init.numFrames_, compressed_fd_, &compressed_buffer_ ); @@ -230,7 +230,7 @@ Messenger::launch_grok( if (async_result_.valid()) { return true; } - init_.unlink(); + _init.unlink(); startThreads(); char cmd[4096]; snprintf(cmd, sizeof(cmd), @@ -248,15 +248,15 @@ void Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) { // client fills queue with pending uncompressed buffers - init_.uncompressedFrameSize_ = uncompressedFrameSize; - init_.compressedFrameSize_ = compressedFrameSize; - init_.numFrames_ = numFrames; + _init.uncompressedFrameSize_ = uncompressedFrameSize; + _init.compressedFrameSize_ = compressedFrameSize; + _init.numFrames_ = numFrames; initBuffers(); auto ptr = uncompressed_buffer_; - for(size_t i = 0; i < init_.numFrames_; ++i) + for(size_t i = 0; i < _init.numFrames_; ++i) { availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr)); - ptr += init_.uncompressedFrameSize_; + ptr += _init.uncompressedFrameSize_; } std::unique_lock<std::mutex> lk(shutdownMutex_); @@ -320,22 +320,22 @@ Messenger::reclaimUncompressed(size_t frameId) uint8_t* Messenger::getUncompressedFrame(size_t frameId) { - assert(frameId < init_.numFrames_); - if(frameId >= init_.numFrames_) + assert(frameId < _init.numFrames_); + if(frameId >= _init.numFrames_) return nullptr; - return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_); + return (uint8_t*)(uncompressed_buffer_ + frameId * _init.uncompressedFrameSize_); } uint8_t* Messenger::getCompressedFrame(size_t frameId) { - assert(frameId < init_.numFrames_); - if(frameId >= init_.numFrames_) + assert(frameId < _init.numFrames_); + if(frameId >= _init.numFrames_) return nullptr; - return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_); + return (uint8_t*)(compressed_buffer_ + frameId * _init.compressedFrameSize_); } bool diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index 74f812d48..f98c40074 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -479,13 +479,11 @@ public: static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel); - std::atomic_bool running; bool _initialized = false; bool _shutdown = false; MessengerBlockingQueue<std::string> sendQueue; MessengerBlockingQueue<std::string> receiveQueue; MessengerBlockingQueue<BufferSrc> availableBuffers_; - MessengerInit init_; std::string cmd_; std::future<int> async_result_; std::mutex shutdownMutex_; @@ -498,6 +496,10 @@ private: bool launch(std::string const& cmd, boost::filesystem::path const& dir); void processor_thread(); void outbound_thread(const std::string &sendBuf, Synch* synch); + void inbound_thread(const std::string &receiveBuf, Synch* synch); + + std::atomic_bool _running; + MessengerInit _init; std::thread outbound; Synch* outboundSynch_ = nullptr; |
