diff options
| author | Carl Hetherington <cth@carlh.net> | 2025-05-17 00:19:23 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2025-05-28 00:33:55 +0200 |
| commit | 2b7ba76b217800cd08e251ebc6e06ecbb9c60134 (patch) | |
| tree | c39189ed1d8009735fe5d306bdfe2d71ddc16435 | |
| parent | 93f0c532e16cbcb9a89f7c32046b61cda16f1967 (diff) | |
Cleanup: remove MessengerInit.
| -rw-r--r-- | src/lib/grok/context.h | 4 | ||||
| -rw-r--r-- | src/lib/grok/messenger.cc | 83 | ||||
| -rw-r--r-- | src/lib/grok/messenger.h | 80 |
3 files changed, 80 insertions, 87 deletions
diff --git a/src/lib/grok/context.h b/src/lib/grok/context.h index 729a4f041..05343f01b 100644 --- a/src/lib/grok/context.h +++ b/src/lib/grok/context.h @@ -150,7 +150,7 @@ public: } }; - auto clientInit = MessengerInit( + _messenger = new Messenger( clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch, @@ -160,8 +160,6 @@ public: proc, std::thread::hardware_concurrency() ); - - _messenger = new Messenger(clientInit); } ~GrokContext() diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index dec3f2a8a..2b8b9d2bb 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -25,13 +25,33 @@ using namespace grk_plugin; -Messenger::Messenger(MessengerInit init) +Messenger::Messenger( + std::string const& outBuf, + std::string const& outSent, + std::string const& outReceiveReady, + std::string const& inBuf, + std::string const& inSent, + std::string const& inReceiveReady, + std::function<void(std::string)> processor, + size_t numProcessingThreads + ) : _running(true) - , _init(init) , _frames_scheduled(0) , _frames_compressed(0) + , outboundMessageBuf(outBuf) + , outboundSentSynch(outSent) + , outboundReceiveReadySynch(outReceiveReady) + , inboundMessageBuf(inBuf) + , inboundSentSynch(inSent) + , inboundReceiveReadySynch(inReceiveReady) + , processor_(processor) + , numProcessingThreads_(numProcessingThreads) + , uncompressedFrameSize_(0) + , compressedFrameSize_(0) + , numFrames_(0) { - + shm_unlink(grokToClientMessageBuf.c_str()); + shm_unlink(clientToGrokMessageBuf.c_str()); } @@ -88,7 +108,7 @@ Messenger::processor_thread() auto height = msg.nextUint(); auto samples_per_pixel = msg.nextUint(); msg.nextUint(); // depth - _init.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); + uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); auto compressed_frame_size = msg.nextUint(); auto num_frames = msg.nextUint(); initClient(compressed_frame_size, compressed_frame_size, num_frames); @@ -99,7 +119,7 @@ Messenger::processor_thread() } /* Handle writing J2K data to disk */ - _init.processor_(message); + processor_(message); } } @@ -110,7 +130,7 @@ Messenger::outbound_thread() int shm_fd = 0; char* send_buffer = nullptr; - if (!SharedMemoryManager::initShm(_init.outboundMessageBuf, messageBufferLen, &shm_fd, &send_buffer)) { + if (!SharedMemoryManager::initShm(outboundMessageBuf, messageBufferLen, &shm_fd, &send_buffer)) { return; } @@ -130,7 +150,7 @@ Messenger::outbound_thread() _outbound_synch->post(SYNCH_SENT); } - SharedMemoryManager::deinit_shm(_init.outboundMessageBuf, messageBufferLen, shm_fd, &send_buffer); + SharedMemoryManager::deinit_shm(outboundMessageBuf, messageBufferLen, shm_fd, &send_buffer); } @@ -140,7 +160,7 @@ Messenger::inbound_thread() int shm_fd = 0; char* receive_buffer = nullptr; - if (!SharedMemoryManager::initShm(_init.inboundMessageBuf, messageBufferLen, &shm_fd, &receive_buffer)) { + if (!SharedMemoryManager::initShm(inboundMessageBuf, messageBufferLen, &shm_fd, &receive_buffer)) { return; } @@ -154,20 +174,20 @@ Messenger::inbound_thread() _receive_queue.push(message); } - SharedMemoryManager::deinit_shm(_init.inboundMessageBuf, messageBufferLen, shm_fd, &receive_buffer); + SharedMemoryManager::deinit_shm(inboundMessageBuf, messageBufferLen, shm_fd, &receive_buffer); } void Messenger::startThreads() { - _outbound_synch = new Synch(_init.outboundSentSynch, _init.outboundReceiveReadySynch); + _outbound_synch = new Synch(outboundSentSynch, outboundReceiveReadySynch); _outbound = std::thread(&Messenger::outbound_thread, this); - _inbound_synch = new Synch(_init.inboundSentSynch, _init.inboundReceiveReadySynch); + _inbound_synch = new Synch(inboundSentSynch, inboundReceiveReadySynch); _inbound = std::thread(&Messenger::inbound_thread, this); - for (size_t i = 0; i < _init.numProcessingThreads_; ++i) { + for (size_t i = 0; i < numProcessingThreads_; ++i) { _processors.push_back(std::thread(&Messenger::processor_thread, this)); } } @@ -177,18 +197,18 @@ bool Messenger::initBuffers() { bool rc = true; - if (_init.uncompressedFrameSize_) { + if (uncompressedFrameSize_) { rc = rc && SharedMemoryManager::initShm( grokUncompressedBuf, - _init.uncompressedFrameSize_ * _init.numFrames_, + uncompressedFrameSize_ * numFrames_, &_uncompressed_fd, &_uncompressed_buffer ); } - if (_init.compressedFrameSize_) { + if (compressedFrameSize_) { rc = rc && SharedMemoryManager::initShm( grokCompressedBuf, - _init.compressedFrameSize_ * _init.numFrames_, + compressedFrameSize_ * numFrames_, &_compressed_fd, &_compressed_buffer ); } @@ -202,13 +222,13 @@ Messenger::deinit_shm() { bool rc = SharedMemoryManager::deinit_shm( grokUncompressedBuf, - _init.uncompressedFrameSize_ * _init.numFrames_, + uncompressedFrameSize_ * numFrames_, _uncompressed_fd, &_uncompressed_buffer ); rc = rc && SharedMemoryManager::deinit_shm( grokCompressedBuf, - _init.compressedFrameSize_ * _init.numFrames_, + compressedFrameSize_ * numFrames_, _compressed_fd, &_compressed_buffer ); @@ -236,7 +256,10 @@ Messenger::launch_grok( if (_async_result.valid()) { return true; } - _init.unlink(); + + shm_unlink(grokToClientMessageBuf.c_str()); + shm_unlink(clientToGrokMessageBuf.c_str()); + startThreads(); char cmd[4096]; snprintf(cmd, sizeof(cmd), @@ -254,15 +277,15 @@ void Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) { // client fills queue with pending uncompressed buffers - _init.uncompressedFrameSize_ = uncompressedFrameSize; - _init.compressedFrameSize_ = compressedFrameSize; - _init.numFrames_ = numFrames; + uncompressedFrameSize_ = uncompressedFrameSize; + compressedFrameSize_ = compressedFrameSize; + numFrames_ = numFrames; initBuffers(); auto ptr = _uncompressed_buffer; - for(size_t i = 0; i < _init.numFrames_; ++i) + for(size_t i = 0; i < numFrames_; ++i) { _available_buffers.push(BufferSrc(0, i, (uint8_t*)ptr)); - ptr += _init.uncompressedFrameSize_; + ptr += uncompressedFrameSize_; } std::unique_lock<std::mutex> lk(_shutdown_mutex); @@ -326,22 +349,22 @@ Messenger::reclaimUncompressed(size_t frameId) uint8_t* Messenger::getUncompressedFrame(size_t frameId) { - assert(frameId < _init.numFrames_); - if(frameId >= _init.numFrames_) + assert(frameId < numFrames_); + if(frameId >= numFrames_) return nullptr; - return (uint8_t*)(_uncompressed_buffer + frameId * _init.uncompressedFrameSize_); + return (uint8_t*)(_uncompressed_buffer + frameId * uncompressedFrameSize_); } uint8_t* Messenger::getCompressedFrame(size_t frameId) { - assert(frameId < _init.numFrames_); - if(frameId >= _init.numFrames_) + assert(frameId < numFrames_); + if(frameId >= numFrames_) return nullptr; - return (uint8_t*)(_compressed_buffer + frameId * _init.compressedFrameSize_); + return (uint8_t*)(_compressed_buffer + frameId * compressedFrameSize_); } bool diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index 1ef3e3ee7..6b6575584 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -133,58 +133,6 @@ void setMessengerLogger(IMessengerLogger* logger); IMessengerLogger* getMessengerLogger(void); -struct MessengerInit -{ - MessengerInit( - std::string const& outBuf, - std::string const& outSent, - std::string const& outReceiveReady, - std::string const& inBuf, - std::string const& inSent, - std::string const& inReceiveReady, - std::function<void(std::string)> processor, - size_t numProcessingThreads - ) - : outboundMessageBuf(outBuf) - , outboundSentSynch(outSent) - , outboundReceiveReadySynch(outReceiveReady) - , inboundMessageBuf(inBuf) - , inboundSentSynch(inSent) - , inboundReceiveReadySynch(inReceiveReady) - , processor_(processor) - , numProcessingThreads_(numProcessingThreads) - , uncompressedFrameSize_(0) - , compressedFrameSize_(0) - , numFrames_(0) - { - unlink(); - } - - void unlink() - { -#ifndef _WIN32 - shm_unlink(grokToClientMessageBuf.c_str()); - shm_unlink(clientToGrokMessageBuf.c_str()); -#endif - } - - std::string outboundMessageBuf; - std::string outboundSentSynch; - std::string outboundReceiveReadySynch; - - std::string inboundMessageBuf; - std::string inboundSentSynch; - std::string inboundReceiveReadySynch; - - std::function<void(std::string)> processor_; - size_t numProcessingThreads_; - - size_t uncompressedFrameSize_; - size_t compressedFrameSize_; - size_t numFrames_; -}; - - enum SynchDirection { SYNCH_SENT, @@ -457,7 +405,17 @@ struct ScheduledFrames class Messenger { public: - explicit Messenger(MessengerInit init); + Messenger( + std::string const& outBuf, + std::string const& outSent, + std::string const& outReceiveReady, + std::string const& inBuf, + std::string const& inSent, + std::string const& inReceiveReady, + std::function<void(std::string)> processor, + size_t numProcessingThreads + ); + ~Messenger(); void startThreads(); @@ -522,7 +480,6 @@ private: void inbound_thread(); std::atomic_bool _running; - MessengerInit _init; bool _initialized = false; /** a queue of messages sent (by outbound_thread) via shared memory to the Grok process */ MessengerBlockingQueue<std::string> _send_queue; @@ -558,6 +515,21 @@ private: ScheduledFrames _scheduled_frames; std::atomic<uint32_t> _frames_scheduled; std::atomic<uint32_t> _frames_compressed; + + std::string outboundMessageBuf; + std::string outboundSentSynch; + std::string outboundReceiveReadySynch; + + std::string inboundMessageBuf; + std::string inboundSentSynch; + std::string inboundReceiveReadySynch; + + std::function<void(std::string)> processor_; + size_t numProcessingThreads_; + + size_t uncompressedFrameSize_; + size_t compressedFrameSize_; + size_t numFrames_; }; } // namespace grk_plugin |
