summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2025-05-16 22:15:47 +0200
committerCarl Hetherington <cth@carlh.net>2025-05-28 00:33:55 +0200
commitd16b4f52f72d95ed9d88d2e74103c4a06a6b1e62 (patch)
treebada9c2b1a17482818f63b3f817aa047ace645d7
parent893728e7efe6f40d6fa1c785cdbc4820e4d8f892 (diff)
Cleanup: rename variables and make more private.
-rw-r--r--src/lib/grok/messenger.cc36
-rw-r--r--src/lib/grok/messenger.h37
2 files changed, 36 insertions, 37 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc
index 83ac18fac..df8b623f7 100644
--- a/src/lib/grok/messenger.cc
+++ b/src/lib/grok/messenger.cc
@@ -36,8 +36,8 @@ Messenger::Messenger(MessengerInit init)
Messenger::~Messenger()
{
_running = false;
- sendQueue.deactivate();
- receiveQueue.deactivate();
+ _send_queue.deactivate();
+ _receive_queue.deactivate();
if (outboundSynch_) {
outboundSynch_->post(SYNCH_RECEIVE_READY);
@@ -68,7 +68,7 @@ Messenger::processor_thread()
{
while (_running) {
std::string message;
- if (!receiveQueue.waitAndPop(message)) {
+ if (!_receive_queue.waitAndPop(message)) {
break;
}
@@ -114,7 +114,7 @@ Messenger::outbound_thread(const std::string &sendBuf, Synch* synch)
break;
}
std::string message;
- if (!sendQueue.waitAndPop(message)) {
+ if (!_send_queue.waitAndPop(message)) {
break;
}
if (!_running) {
@@ -145,7 +145,7 @@ Messenger::inbound_thread(const std::string &receiveBuf, Synch* synch)
}
auto message = std::string(receive_buffer);
synch->post(SYNCH_RECEIVE_READY);
- receiveQueue.push(message);
+ _receive_queue.push(message);
}
SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
@@ -226,8 +226,8 @@ Messenger::launch_grok(
const std::string license
)
{
- std::unique_lock<std::mutex> lk(shutdownMutex_);
- if (async_result_.valid()) {
+ std::unique_lock<std::mutex> lk(_shutdown_mutex);
+ if (_async_result.valid()) {
return true;
}
_init.unlink();
@@ -255,13 +255,13 @@ Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize,
auto ptr = uncompressed_buffer_;
for(size_t i = 0; i < _init.numFrames_; ++i)
{
- availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
+ _available_buffers.push(BufferSrc(0, i, (uint8_t*)ptr));
ptr += _init.uncompressedFrameSize_;
}
- std::unique_lock<std::mutex> lk(shutdownMutex_);
+ std::unique_lock<std::mutex> lk(_shutdown_mutex);
_initialized = true;
- clientInitializedCondition_.notify_all();
+ _client_initialized_condition.notify_all();
}
bool
@@ -273,7 +273,7 @@ Messenger::waitForClientInit()
return false;
}
- std::unique_lock<std::mutex> lk(shutdownMutex_);
+ std::unique_lock<std::mutex> lk(_shutdown_mutex);
if (_initialized) {
return true;
@@ -282,10 +282,10 @@ Messenger::waitForClientInit()
}
while (true) {
- if (clientInitializedCondition_.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) {
+ if (_client_initialized_condition.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) {
break;
}
- auto status = async_result_.wait_for(std::chrono::milliseconds(100));
+ auto status = _async_result.wait_for(std::chrono::milliseconds(100));
if (status == std::future_status::ready) {
getMessengerLogger()->error("Grok exited unexpectedly during initialization");
return false;
@@ -306,14 +306,14 @@ Messenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixe
void
Messenger::reclaimCompressed(size_t frameId)
{
- availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
+ _available_buffers.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
}
void
Messenger::reclaimUncompressed(size_t frameId)
{
- availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
+ _available_buffers.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
}
@@ -352,10 +352,10 @@ Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir)
}
}
// Execute the command using std::async and std::system
- cmd_ = cmd;
+ _cmd = cmd;
getMessengerLogger()->info(cmd.c_str());
- async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
- bool success = async_result_.valid();
+ _async_result = std::async(std::launch::async, [this]() { return std::system(_cmd.c_str()); });
+ bool success = _async_result.valid();
if (!success)
getMessengerLogger()->error("Grok launch failed");
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
index f98c40074..5f85e931b 100644
--- a/src/lib/grok/messenger.h
+++ b/src/lib/grok/messenger.h
@@ -451,7 +451,7 @@ public:
int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
static_cast<void>(dummy);
- sendQueue.push(oss.str());
+ _send_queue.push(oss.str());
}
bool launch_grok(
@@ -479,18 +479,13 @@ public:
static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel);
- bool _initialized = false;
- bool _shutdown = false;
- MessengerBlockingQueue<std::string> sendQueue;
- MessengerBlockingQueue<std::string> receiveQueue;
- MessengerBlockingQueue<BufferSrc> availableBuffers_;
- std::string cmd_;
- std::future<int> async_result_;
- std::mutex shutdownMutex_;
- std::condition_variable shutdownCondition_;
-
protected:
- std::condition_variable clientInitializedCondition_;
+ std::condition_variable _client_initialized_condition;
+ MessengerBlockingQueue<BufferSrc> _available_buffers;
+ std::future<int> _async_result;
+ std::condition_variable _shutdown_condition;
+ bool _shutdown = false;
+ std::mutex _shutdown_mutex;
private:
bool launch(std::string const& cmd, boost::filesystem::path const& dir);
@@ -500,6 +495,10 @@ private:
std::atomic_bool _running;
MessengerInit _init;
+ bool _initialized = false;
+ MessengerBlockingQueue<std::string> _send_queue;
+ MessengerBlockingQueue<std::string> _receive_queue;
+ std::string _cmd;
std::thread outbound;
Synch* outboundSynch_ = nullptr;
@@ -593,7 +592,7 @@ struct ScheduledMessenger : public Messenger
bool schedule_compress(F const& proxy, std::function<void(BufferSrc const&)> converter)
{
BufferSrc src;
- if (!availableBuffers_.waitAndPop(src)) {
+ if (!_available_buffers.waitAndPop(src)) {
return false;
}
converter(src);
@@ -623,26 +622,26 @@ struct ScheduledMessenger : public Messenger
send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
if (_shutdown && _frames_compressed == _frames_scheduled) {
- shutdownCondition_.notify_all();
+ _shutdown_condition.notify_all();
}
}
void shutdown()
{
try {
- std::unique_lock<std::mutex> lk(shutdownMutex_);
- if (!async_result_.valid()) {
+ std::unique_lock<std::mutex> lk(_shutdown_mutex);
+ if (!_async_result.valid()) {
return;
}
_shutdown = true;
if (_frames_scheduled) {
uint32_t scheduled = _frames_scheduled;
send(GRK_MSGR_BATCH_FLUSH, scheduled);
- shutdownCondition_.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
+ _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
}
- availableBuffers_.deactivate();
+ _available_buffers.deactivate();
send(GRK_MSGR_BATCH_SHUTDOWN);
- int result = async_result_.get();
+ int result = _async_result.get();
if(result != 0) {
getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
}