summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2025-05-16 22:04:03 +0200
committerCarl Hetherington <cth@carlh.net>2025-05-28 00:33:55 +0200
commit893728e7efe6f40d6fa1c785cdbc4820e4d8f892 (patch)
tree4e8941687fd3ea677d26bfb2fb266a1c6fb4cb9e /src
parent9057c23f81e5ace350cca5a7e5e78bd9278f320c (diff)
Cleanup: make running and init_ private and rename them.
Diffstat (limited to 'src')
-rw-r--r--src/lib/grok/messenger.cc70
-rw-r--r--src/lib/grok/messenger.h6
2 files changed, 39 insertions, 37 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc
index d9b3692e0..83ac18fac 100644
--- a/src/lib/grok/messenger.cc
+++ b/src/lib/grok/messenger.cc
@@ -26,8 +26,8 @@ using namespace grk_plugin;
Messenger::Messenger(MessengerInit init)
- : running(true)
- , init_(init)
+ : _running(true)
+ , _init(init)
{
}
@@ -35,7 +35,7 @@ Messenger::Messenger(MessengerInit init)
Messenger::~Messenger()
{
- running = false;
+ _running = false;
sendQueue.deactivate();
receiveQueue.deactivate();
@@ -66,13 +66,13 @@ Messenger::~Messenger()
void
Messenger::processor_thread()
{
- while (running) {
+ while (_running) {
std::string message;
if (!receiveQueue.waitAndPop(message)) {
break;
}
- if (!running) {
+ if (!_running) {
break;
}
@@ -84,7 +84,7 @@ Messenger::processor_thread()
auto height = msg.nextUint();
auto samples_per_pixel = msg.nextUint();
msg.nextUint(); // depth
- init_.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel);
+ _init.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel);
auto compressed_frame_size = msg.nextUint();
auto num_frames = msg.nextUint();
initClient(compressed_frame_size, compressed_frame_size, num_frames);
@@ -93,7 +93,7 @@ Messenger::processor_thread()
} else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) {
reclaimCompressed(msg.nextUint());
}
- init_.processor_(message);
+ _init.processor_(message);
}
}
@@ -108,16 +108,16 @@ Messenger::outbound_thread(const std::string &sendBuf, Synch* synch)
return;
}
- while (running) {
+ while (_running) {
synch->wait(SYNCH_RECEIVE_READY);
- if (!running) {
+ if (!_running) {
break;
}
std::string message;
if (!sendQueue.waitAndPop(message)) {
break;
}
- if (!running) {
+ if (!_running) {
break;
}
memcpy(send_buffer, message.c_str(), message.size() + 1);
@@ -138,9 +138,9 @@ Messenger::inbound_thread(const std::string &receiveBuf, Synch* synch)
return;
}
- while (running) {
+ while (_running) {
synch->wait(SYNCH_SENT);
- if (!running) {
+ if (!_running) {
break;
}
auto message = std::string(receive_buffer);
@@ -155,13 +155,13 @@ Messenger::inbound_thread(const std::string &receiveBuf, Synch* synch)
void
Messenger::startThreads()
{
- outboundSynch_ = new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
- outbound = std::thread(&Messenger::outbound_thread, this, init_.outboundMessageBuf, outboundSynch_);
+ outboundSynch_ = new Synch(_init.outboundSentSynch, _init.outboundReceiveReadySynch);
+ outbound = std::thread(&Messenger::outbound_thread, this, _init.outboundMessageBuf, outboundSynch_);
- inboundSynch_ = new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
- inbound = std::thread(&Messenger::inbound_thread, this, init_.inboundMessageBuf, inboundSynch_);
+ inboundSynch_ = new Synch(_init.inboundSentSynch, _init.inboundReceiveReadySynch);
+ inbound = std::thread(&Messenger::inbound_thread, this, _init.inboundMessageBuf, inboundSynch_);
- for (size_t i = 0; i < init_.numProcessingThreads_; ++i) {
+ for (size_t i = 0; i < _init.numProcessingThreads_; ++i) {
processors_.push_back(std::thread(&Messenger::processor_thread, this));
}
}
@@ -171,18 +171,18 @@ bool
Messenger::initBuffers()
{
bool rc = true;
- if (init_.uncompressedFrameSize_) {
+ if (_init.uncompressedFrameSize_) {
rc = rc && SharedMemoryManager::initShm(
grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
+ _init.uncompressedFrameSize_ * _init.numFrames_,
&uncompressed_fd_, &uncompressed_buffer_
);
}
- if (init_.compressedFrameSize_) {
+ if (_init.compressedFrameSize_) {
rc = rc && SharedMemoryManager::initShm(
grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
+ _init.compressedFrameSize_ * _init.numFrames_,
&compressed_fd_, &compressed_buffer_
);
}
@@ -196,13 +196,13 @@ Messenger::deinit_shm()
{
bool rc = SharedMemoryManager::deinit_shm(
grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
+ _init.uncompressedFrameSize_ * _init.numFrames_,
uncompressed_fd_, &uncompressed_buffer_
);
rc = rc && SharedMemoryManager::deinit_shm(
grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
+ _init.compressedFrameSize_ * _init.numFrames_,
compressed_fd_, &compressed_buffer_
);
@@ -230,7 +230,7 @@ Messenger::launch_grok(
if (async_result_.valid()) {
return true;
}
- init_.unlink();
+ _init.unlink();
startThreads();
char cmd[4096];
snprintf(cmd, sizeof(cmd),
@@ -248,15 +248,15 @@ void
Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
{
// client fills queue with pending uncompressed buffers
- init_.uncompressedFrameSize_ = uncompressedFrameSize;
- init_.compressedFrameSize_ = compressedFrameSize;
- init_.numFrames_ = numFrames;
+ _init.uncompressedFrameSize_ = uncompressedFrameSize;
+ _init.compressedFrameSize_ = compressedFrameSize;
+ _init.numFrames_ = numFrames;
initBuffers();
auto ptr = uncompressed_buffer_;
- for(size_t i = 0; i < init_.numFrames_; ++i)
+ for(size_t i = 0; i < _init.numFrames_; ++i)
{
availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
- ptr += init_.uncompressedFrameSize_;
+ ptr += _init.uncompressedFrameSize_;
}
std::unique_lock<std::mutex> lk(shutdownMutex_);
@@ -320,22 +320,22 @@ Messenger::reclaimUncompressed(size_t frameId)
uint8_t*
Messenger::getUncompressedFrame(size_t frameId)
{
- assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
+ assert(frameId < _init.numFrames_);
+ if(frameId >= _init.numFrames_)
return nullptr;
- return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
+ return (uint8_t*)(uncompressed_buffer_ + frameId * _init.uncompressedFrameSize_);
}
uint8_t*
Messenger::getCompressedFrame(size_t frameId)
{
- assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
+ assert(frameId < _init.numFrames_);
+ if(frameId >= _init.numFrames_)
return nullptr;
- return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
+ return (uint8_t*)(compressed_buffer_ + frameId * _init.compressedFrameSize_);
}
bool
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
index 74f812d48..f98c40074 100644
--- a/src/lib/grok/messenger.h
+++ b/src/lib/grok/messenger.h
@@ -479,13 +479,11 @@ public:
static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel);
- std::atomic_bool running;
bool _initialized = false;
bool _shutdown = false;
MessengerBlockingQueue<std::string> sendQueue;
MessengerBlockingQueue<std::string> receiveQueue;
MessengerBlockingQueue<BufferSrc> availableBuffers_;
- MessengerInit init_;
std::string cmd_;
std::future<int> async_result_;
std::mutex shutdownMutex_;
@@ -498,6 +496,10 @@ private:
bool launch(std::string const& cmd, boost::filesystem::path const& dir);
void processor_thread();
void outbound_thread(const std::string &sendBuf, Synch* synch);
+ void inbound_thread(const std::string &receiveBuf, Synch* synch);
+
+ std::atomic_bool _running;
+ MessengerInit _init;
std::thread outbound;
Synch* outboundSynch_ = nullptr;