summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2025-05-17 00:19:23 +0200
committerCarl Hetherington <cth@carlh.net>2025-05-28 00:33:55 +0200
commit2b7ba76b217800cd08e251ebc6e06ecbb9c60134 (patch)
treec39189ed1d8009735fe5d306bdfe2d71ddc16435
parent93f0c532e16cbcb9a89f7c32046b61cda16f1967 (diff)
Cleanup: remove MessengerInit.
-rw-r--r--src/lib/grok/context.h4
-rw-r--r--src/lib/grok/messenger.cc83
-rw-r--r--src/lib/grok/messenger.h80
3 files changed, 80 insertions, 87 deletions
diff --git a/src/lib/grok/context.h b/src/lib/grok/context.h
index 729a4f041..05343f01b 100644
--- a/src/lib/grok/context.h
+++ b/src/lib/grok/context.h
@@ -150,7 +150,7 @@ public:
}
};
- auto clientInit = MessengerInit(
+ _messenger = new Messenger(
clientToGrokMessageBuf,
clientSentSynch,
grokReceiveReadySynch,
@@ -160,8 +160,6 @@ public:
proc,
std::thread::hardware_concurrency()
);
-
- _messenger = new Messenger(clientInit);
}
~GrokContext()
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc
index dec3f2a8a..2b8b9d2bb 100644
--- a/src/lib/grok/messenger.cc
+++ b/src/lib/grok/messenger.cc
@@ -25,13 +25,33 @@
using namespace grk_plugin;
-Messenger::Messenger(MessengerInit init)
+Messenger::Messenger(
+ std::string const& outBuf,
+ std::string const& outSent,
+ std::string const& outReceiveReady,
+ std::string const& inBuf,
+ std::string const& inSent,
+ std::string const& inReceiveReady,
+ std::function<void(std::string)> processor,
+ size_t numProcessingThreads
+ )
: _running(true)
- , _init(init)
, _frames_scheduled(0)
, _frames_compressed(0)
+ , outboundMessageBuf(outBuf)
+ , outboundSentSynch(outSent)
+ , outboundReceiveReadySynch(outReceiveReady)
+ , inboundMessageBuf(inBuf)
+ , inboundSentSynch(inSent)
+ , inboundReceiveReadySynch(inReceiveReady)
+ , processor_(processor)
+ , numProcessingThreads_(numProcessingThreads)
+ , uncompressedFrameSize_(0)
+ , compressedFrameSize_(0)
+ , numFrames_(0)
{
-
+ shm_unlink(grokToClientMessageBuf.c_str());
+ shm_unlink(clientToGrokMessageBuf.c_str());
}
@@ -88,7 +108,7 @@ Messenger::processor_thread()
auto height = msg.nextUint();
auto samples_per_pixel = msg.nextUint();
msg.nextUint(); // depth
- _init.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel);
+ uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel);
auto compressed_frame_size = msg.nextUint();
auto num_frames = msg.nextUint();
initClient(compressed_frame_size, compressed_frame_size, num_frames);
@@ -99,7 +119,7 @@ Messenger::processor_thread()
}
/* Handle writing J2K data to disk */
- _init.processor_(message);
+ processor_(message);
}
}
@@ -110,7 +130,7 @@ Messenger::outbound_thread()
int shm_fd = 0;
char* send_buffer = nullptr;
- if (!SharedMemoryManager::initShm(_init.outboundMessageBuf, messageBufferLen, &shm_fd, &send_buffer)) {
+ if (!SharedMemoryManager::initShm(outboundMessageBuf, messageBufferLen, &shm_fd, &send_buffer)) {
return;
}
@@ -130,7 +150,7 @@ Messenger::outbound_thread()
_outbound_synch->post(SYNCH_SENT);
}
- SharedMemoryManager::deinit_shm(_init.outboundMessageBuf, messageBufferLen, shm_fd, &send_buffer);
+ SharedMemoryManager::deinit_shm(outboundMessageBuf, messageBufferLen, shm_fd, &send_buffer);
}
@@ -140,7 +160,7 @@ Messenger::inbound_thread()
int shm_fd = 0;
char* receive_buffer = nullptr;
- if (!SharedMemoryManager::initShm(_init.inboundMessageBuf, messageBufferLen, &shm_fd, &receive_buffer)) {
+ if (!SharedMemoryManager::initShm(inboundMessageBuf, messageBufferLen, &shm_fd, &receive_buffer)) {
return;
}
@@ -154,20 +174,20 @@ Messenger::inbound_thread()
_receive_queue.push(message);
}
- SharedMemoryManager::deinit_shm(_init.inboundMessageBuf, messageBufferLen, shm_fd, &receive_buffer);
+ SharedMemoryManager::deinit_shm(inboundMessageBuf, messageBufferLen, shm_fd, &receive_buffer);
}
void
Messenger::startThreads()
{
- _outbound_synch = new Synch(_init.outboundSentSynch, _init.outboundReceiveReadySynch);
+ _outbound_synch = new Synch(outboundSentSynch, outboundReceiveReadySynch);
_outbound = std::thread(&Messenger::outbound_thread, this);
- _inbound_synch = new Synch(_init.inboundSentSynch, _init.inboundReceiveReadySynch);
+ _inbound_synch = new Synch(inboundSentSynch, inboundReceiveReadySynch);
_inbound = std::thread(&Messenger::inbound_thread, this);
- for (size_t i = 0; i < _init.numProcessingThreads_; ++i) {
+ for (size_t i = 0; i < numProcessingThreads_; ++i) {
_processors.push_back(std::thread(&Messenger::processor_thread, this));
}
}
@@ -177,18 +197,18 @@ bool
Messenger::initBuffers()
{
bool rc = true;
- if (_init.uncompressedFrameSize_) {
+ if (uncompressedFrameSize_) {
rc = rc && SharedMemoryManager::initShm(
grokUncompressedBuf,
- _init.uncompressedFrameSize_ * _init.numFrames_,
+ uncompressedFrameSize_ * numFrames_,
&_uncompressed_fd, &_uncompressed_buffer
);
}
- if (_init.compressedFrameSize_) {
+ if (compressedFrameSize_) {
rc = rc && SharedMemoryManager::initShm(
grokCompressedBuf,
- _init.compressedFrameSize_ * _init.numFrames_,
+ compressedFrameSize_ * numFrames_,
&_compressed_fd, &_compressed_buffer
);
}
@@ -202,13 +222,13 @@ Messenger::deinit_shm()
{
bool rc = SharedMemoryManager::deinit_shm(
grokUncompressedBuf,
- _init.uncompressedFrameSize_ * _init.numFrames_,
+ uncompressedFrameSize_ * numFrames_,
_uncompressed_fd, &_uncompressed_buffer
);
rc = rc && SharedMemoryManager::deinit_shm(
grokCompressedBuf,
- _init.compressedFrameSize_ * _init.numFrames_,
+ compressedFrameSize_ * numFrames_,
_compressed_fd, &_compressed_buffer
);
@@ -236,7 +256,10 @@ Messenger::launch_grok(
if (_async_result.valid()) {
return true;
}
- _init.unlink();
+
+ shm_unlink(grokToClientMessageBuf.c_str());
+ shm_unlink(clientToGrokMessageBuf.c_str());
+
startThreads();
char cmd[4096];
snprintf(cmd, sizeof(cmd),
@@ -254,15 +277,15 @@ void
Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
{
// client fills queue with pending uncompressed buffers
- _init.uncompressedFrameSize_ = uncompressedFrameSize;
- _init.compressedFrameSize_ = compressedFrameSize;
- _init.numFrames_ = numFrames;
+ uncompressedFrameSize_ = uncompressedFrameSize;
+ compressedFrameSize_ = compressedFrameSize;
+ numFrames_ = numFrames;
initBuffers();
auto ptr = _uncompressed_buffer;
- for(size_t i = 0; i < _init.numFrames_; ++i)
+ for(size_t i = 0; i < numFrames_; ++i)
{
_available_buffers.push(BufferSrc(0, i, (uint8_t*)ptr));
- ptr += _init.uncompressedFrameSize_;
+ ptr += uncompressedFrameSize_;
}
std::unique_lock<std::mutex> lk(_shutdown_mutex);
@@ -326,22 +349,22 @@ Messenger::reclaimUncompressed(size_t frameId)
uint8_t*
Messenger::getUncompressedFrame(size_t frameId)
{
- assert(frameId < _init.numFrames_);
- if(frameId >= _init.numFrames_)
+ assert(frameId < numFrames_);
+ if(frameId >= numFrames_)
return nullptr;
- return (uint8_t*)(_uncompressed_buffer + frameId * _init.uncompressedFrameSize_);
+ return (uint8_t*)(_uncompressed_buffer + frameId * uncompressedFrameSize_);
}
uint8_t*
Messenger::getCompressedFrame(size_t frameId)
{
- assert(frameId < _init.numFrames_);
- if(frameId >= _init.numFrames_)
+ assert(frameId < numFrames_);
+ if(frameId >= numFrames_)
return nullptr;
- return (uint8_t*)(_compressed_buffer + frameId * _init.compressedFrameSize_);
+ return (uint8_t*)(_compressed_buffer + frameId * compressedFrameSize_);
}
bool
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
index 1ef3e3ee7..6b6575584 100644
--- a/src/lib/grok/messenger.h
+++ b/src/lib/grok/messenger.h
@@ -133,58 +133,6 @@ void setMessengerLogger(IMessengerLogger* logger);
IMessengerLogger* getMessengerLogger(void);
-struct MessengerInit
-{
- MessengerInit(
- std::string const& outBuf,
- std::string const& outSent,
- std::string const& outReceiveReady,
- std::string const& inBuf,
- std::string const& inSent,
- std::string const& inReceiveReady,
- std::function<void(std::string)> processor,
- size_t numProcessingThreads
- )
- : outboundMessageBuf(outBuf)
- , outboundSentSynch(outSent)
- , outboundReceiveReadySynch(outReceiveReady)
- , inboundMessageBuf(inBuf)
- , inboundSentSynch(inSent)
- , inboundReceiveReadySynch(inReceiveReady)
- , processor_(processor)
- , numProcessingThreads_(numProcessingThreads)
- , uncompressedFrameSize_(0)
- , compressedFrameSize_(0)
- , numFrames_(0)
- {
- unlink();
- }
-
- void unlink()
- {
-#ifndef _WIN32
- shm_unlink(grokToClientMessageBuf.c_str());
- shm_unlink(clientToGrokMessageBuf.c_str());
-#endif
- }
-
- std::string outboundMessageBuf;
- std::string outboundSentSynch;
- std::string outboundReceiveReadySynch;
-
- std::string inboundMessageBuf;
- std::string inboundSentSynch;
- std::string inboundReceiveReadySynch;
-
- std::function<void(std::string)> processor_;
- size_t numProcessingThreads_;
-
- size_t uncompressedFrameSize_;
- size_t compressedFrameSize_;
- size_t numFrames_;
-};
-
-
enum SynchDirection
{
SYNCH_SENT,
@@ -457,7 +405,17 @@ struct ScheduledFrames
class Messenger
{
public:
- explicit Messenger(MessengerInit init);
+ Messenger(
+ std::string const& outBuf,
+ std::string const& outSent,
+ std::string const& outReceiveReady,
+ std::string const& inBuf,
+ std::string const& inSent,
+ std::string const& inReceiveReady,
+ std::function<void(std::string)> processor,
+ size_t numProcessingThreads
+ );
+
~Messenger();
void startThreads();
@@ -522,7 +480,6 @@ private:
void inbound_thread();
std::atomic_bool _running;
- MessengerInit _init;
bool _initialized = false;
/** a queue of messages sent (by outbound_thread) via shared memory to the Grok process */
MessengerBlockingQueue<std::string> _send_queue;
@@ -558,6 +515,21 @@ private:
ScheduledFrames _scheduled_frames;
std::atomic<uint32_t> _frames_scheduled;
std::atomic<uint32_t> _frames_compressed;
+
+ std::string outboundMessageBuf;
+ std::string outboundSentSynch;
+ std::string outboundReceiveReadySynch;
+
+ std::string inboundMessageBuf;
+ std::string inboundSentSynch;
+ std::string inboundReceiveReadySynch;
+
+ std::function<void(std::string)> processor_;
+ size_t numProcessingThreads_;
+
+ size_t uncompressedFrameSize_;
+ size_t compressedFrameSize_;
+ size_t numFrames_;
};
} // namespace grk_plugin