diff options
| author | Carl Hetherington <cth@carlh.net> | 2025-05-16 21:37:20 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2025-05-28 00:33:55 +0200 |
| commit | f51215cee869d647eff9b7a69b55387d8a214062 (patch) | |
| tree | 68c53621cf3263a33eb769a9661eb8c7cbf039ad /src/lib | |
| parent | 7d2b3873bab8e0e6362ef45422500785be79af7a (diff) | |
Clenaup: move some methods into messenger.cc.
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/grok/messenger.cc | 352 | ||||
| -rw-r--r-- | src/lib/grok/messenger.h | 302 | ||||
| -rw-r--r-- | src/lib/wscript | 2 |
3 files changed, 371 insertions, 285 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc new file mode 100644 index 000000000..019350584 --- /dev/null +++ b/src/lib/grok/messenger.cc @@ -0,0 +1,352 @@ +/* + Copyright (C) 2023 Grok Image Compression Inc. + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>. + +*/ + + +#include "messenger.h" + + +using namespace grk_plugin; + + +Messenger::Messenger(MessengerInit init) + : running(true) + , init_(init) +{ + +} + + +Messenger::~Messenger() +{ + running = false; + sendQueue.deactivate(); + receiveQueue.deactivate(); + + if (outboundSynch_) { + outboundSynch_->post(SYNCH_RECEIVE_READY); + outbound.join(); + } + + if (inboundSynch_) { + inboundSynch_->post(SYNCH_SENT); + inbound.join(); + } + + for(auto& p: processors_) { + p.join(); + } + + delete outboundSynch_; + delete inboundSynch_; + + deinit_shm(); +} + + +/* One of these is created for each core, to receive messages (e.g. "frame is encoded") + * from the grok process and handle them. + */ +static void +processor_thread(Messenger* messenger, std::function<void (std::string)> processor) +{ + while (messenger->running) { + std::string message; + if (!messenger->receiveQueue.waitAndPop(message)) { + break; + } + + if (!messenger->running) { + break; + } + + Msg msg(message); + auto tag = msg.next(); + if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) { + auto width = msg.nextUint(); + msg.nextUint(); // stride + auto height = msg.nextUint(); + auto samples_per_pixel = msg.nextUint(); + msg.nextUint(); // depth + messenger->init_.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); + auto compressed_frame_size = msg.nextUint(); + auto num_frames = msg.nextUint(); + messenger->initClient(compressed_frame_size, compressed_frame_size, num_frames); + } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) { + messenger->reclaimUncompressed(msg.nextUint()); + } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) { + messenger->reclaimCompressed(msg.nextUint()); + } + processor(message); + } +} + + +static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch) +{ + grk_handle shm_fd = 0; + char* send_buffer = nullptr; + + if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) + return; + while(messenger->running) + { + synch->wait(SYNCH_RECEIVE_READY); + if(!messenger->running) + break; + std::string message; + if(!messenger->sendQueue.waitAndPop(message)) + break; + if(!messenger->running) + break; + memcpy(send_buffer, message.c_str(), message.size() + 1); + synch->post(SYNCH_SENT); + } + SharedMemoryManager::deinit_shm(sendBuf, messageBufferLen, shm_fd, &send_buffer); +} + +static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch) +{ + grk_handle shm_fd = 0; + char* receive_buffer = nullptr; + + if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) + return; + while(messenger->running) + { + synch->wait(SYNCH_SENT); + if(!messenger->running) + break; + auto message = std::string(receive_buffer); + synch->post(SYNCH_RECEIVE_READY); + messenger->receiveQueue.push(message); + } + SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer); +} + + +void +Messenger::startThreads() +{ + outboundSynch_ = new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch); + outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_); + + inboundSynch_ = new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch); + inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_); + + for (size_t i = 0; i < init_.numProcessingThreads_; ++i) { + processors_.push_back(std::thread(processor_thread, this, init_.processor_)); + } +} + + +bool +Messenger::initBuffers() +{ + bool rc = true; + if (init_.uncompressedFrameSize_) { + rc = rc && SharedMemoryManager::initShm( + grokUncompressedBuf, + init_.uncompressedFrameSize_ * init_.numFrames_, + &uncompressed_fd_, &uncompressed_buffer_ + ); + } + + if (init_.compressedFrameSize_) { + rc = rc && SharedMemoryManager::initShm( + grokCompressedBuf, + init_.compressedFrameSize_ * init_.numFrames_, + &compressed_fd_, &compressed_buffer_ + ); + } + + return rc; +} + + +bool +Messenger::deinit_shm() +{ + bool rc = SharedMemoryManager::deinit_shm( + grokUncompressedBuf, + init_.uncompressedFrameSize_ * init_.numFrames_, + uncompressed_fd_, &uncompressed_buffer_ + ); + + rc = rc && SharedMemoryManager::deinit_shm( + grokCompressedBuf, + init_.compressedFrameSize_ * init_.numFrames_, + compressed_fd_, &compressed_buffer_ + ); + + return rc; +} + + +bool +Messenger::launch_grok( + boost::filesystem::path const& dir, + uint32_t width, + uint32_t stride, + uint32_t height, + uint32_t samplesPerPixel, + uint32_t depth, + int device, + bool is4K, + uint32_t fps, + uint32_t bandwidth, + const std::string server, + const std::string license + ) +{ + std::unique_lock<std::mutex> lk(shutdownMutex_); + if (async_result_.valid()) { + return true; + } + init_.unlink(); + startThreads(); + char cmd[4096]; + snprintf(cmd, sizeof(cmd), + "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -k 1 " + "-G %d -%s %d,%d -j %s -J %s -v", + GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth, + device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth, + license.c_str(), server.c_str()); + + return launch(cmd, dir); +} + + +void +Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) +{ + // client fills queue with pending uncompressed buffers + init_.uncompressedFrameSize_ = uncompressedFrameSize; + init_.compressedFrameSize_ = compressedFrameSize; + init_.numFrames_ = numFrames; + initBuffers(); + auto ptr = uncompressed_buffer_; + for(size_t i = 0; i < init_.numFrames_; ++i) + { + availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr)); + ptr += init_.uncompressedFrameSize_; + } + + std::unique_lock<std::mutex> lk(shutdownMutex_); + _initialized = true; + clientInitializedCondition_.notify_all(); +} + +bool +Messenger::waitForClientInit() +{ + if (_initialized) { + return true; + } else if (_shutdown) { + return false; + } + + std::unique_lock<std::mutex> lk(shutdownMutex_); + + if (_initialized) { + return true; + } else if (_shutdown) { + return false; + } + + while (true) { + if (clientInitializedCondition_.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) { + break; + } + auto status = async_result_.wait_for(std::chrono::milliseconds(100)); + if (status == std::future_status::ready) { + getMessengerLogger()->error("Grok exited unexpectedly during initialization"); + return false; + } + } + + return _initialized && !_shutdown; +} + + +size_t +Messenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) +{ + return sizeof(uint16_t) * w * h * samplesPerPixel; +} + + +void +Messenger::reclaimCompressed(size_t frameId) +{ + availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); +} + + +void +Messenger::reclaimUncompressed(size_t frameId) +{ + availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); +} + + +uint8_t* +Messenger::getUncompressedFrame(size_t frameId) +{ + assert(frameId < init_.numFrames_); + if(frameId >= init_.numFrames_) + return nullptr; + + return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_); +} + + +uint8_t* +Messenger::getCompressedFrame(size_t frameId) +{ + assert(frameId < init_.numFrames_); + if(frameId >= init_.numFrames_) + return nullptr; + + return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_); +} + +bool +Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir) +{ + // Change the working directory + if(!dir.empty()) + { + boost::system::error_code ec; + boost::filesystem::current_path(dir, ec); + if (ec) { + getMessengerLogger()->error("Error: failed to change the working directory"); + return false; + } + } + // Execute the command using std::async and std::system + cmd_ = cmd; + getMessengerLogger()->info(cmd.c_str()); + async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); }); + bool success = async_result_.valid(); + if (!success) + getMessengerLogger()->error("Grok launch failed"); + + return success; +} diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index 3a770c906..e475ae1c2 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -19,6 +19,9 @@ */ #pragma once +#include <boost/filesystem.hpp> +#include <boost/optional.hpp> + #include <iostream> #include <string> #include <cstring> @@ -33,6 +36,7 @@ #include <queue> #include <cassert> #include <cstdarg> +#include <climits> #ifdef _WIN32 #include <windows.h> @@ -428,97 +432,16 @@ struct BufferSrc uint8_t* framePtr_; }; -class Messenger; -static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch); -static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch); -static void processor_thread(Messenger* messenger, std::function<void (std::string)> processor); - class Messenger { public: - explicit Messenger(MessengerInit init) - : running(true) - , init_(init) - {} - - virtual ~Messenger() - { - running = false; - sendQueue.deactivate(); - receiveQueue.deactivate(); - - if (outboundSynch_) { - outboundSynch_->post(SYNCH_RECEIVE_READY); - outbound.join(); - } - - if (inboundSynch_) { - inboundSynch_->post(SYNCH_SENT); - inbound.join(); - } - - for(auto& p: processors_) { - p.join(); - } - - delete outboundSynch_; - delete inboundSynch_; + explicit Messenger(MessengerInit init); + virtual ~Messenger(); - deinit_shm(); - } - - void startThreads() - { - outboundSynch_ = new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch); - outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_); - - inboundSynch_ = new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch); - inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_); - - for (size_t i = 0; i < init_.numProcessingThreads_; ++i) { - processors_.push_back(std::thread(processor_thread, this, init_.processor_)); - } - } - - bool initBuffers() - { - bool rc = true; - if (init_.uncompressedFrameSize_) { - rc = rc && SharedMemoryManager::initShm( - grokUncompressedBuf, - init_.uncompressedFrameSize_ * init_.numFrames_, - &uncompressed_fd_, &uncompressed_buffer_ - ); - } - - if (init_.compressedFrameSize_) { - rc = rc && SharedMemoryManager::initShm( - grokCompressedBuf, - init_.compressedFrameSize_ * init_.numFrames_, - &compressed_fd_, &compressed_buffer_ - ); - } - - return rc; - } - - bool deinit_shm() - { - bool rc = SharedMemoryManager::deinit_shm( - grokUncompressedBuf, - init_.uncompressedFrameSize_ * init_.numFrames_, - uncompressed_fd_, &uncompressed_buffer_ - ); - - rc = rc && SharedMemoryManager::deinit_shm( - grokCompressedBuf, - init_.compressedFrameSize_ * init_.numFrames_, - compressed_fd_, &compressed_buffer_ - ); - - return rc; - } + void startThreads(); + bool initBuffers(); + bool deinit_shm(); template<typename... Args> void send(const std::string& str, Args... args) @@ -544,106 +467,17 @@ public: uint32_t bandwidth, const std::string server, const std::string license - ) - { - std::unique_lock<std::mutex> lk(shutdownMutex_); - if (async_result_.valid()) { - return true; - } - init_.unlink(); - startThreads(); - char cmd[4096]; - snprintf(cmd, sizeof(cmd), - "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -k 1 " - "-G %d -%s %d,%d -j %s -J %s -v", - GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth, - device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth, - license.c_str(), server.c_str()); - - return launch(cmd, dir); - } - - void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) - { - // client fills queue with pending uncompressed buffers - init_.uncompressedFrameSize_ = uncompressedFrameSize; - init_.compressedFrameSize_ = compressedFrameSize; - init_.numFrames_ = numFrames; - initBuffers(); - auto ptr = uncompressed_buffer_; - for(size_t i = 0; i < init_.numFrames_; ++i) - { - availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr)); - ptr += init_.uncompressedFrameSize_; - } - - std::unique_lock<std::mutex> lk(shutdownMutex_); - _initialized = true; - clientInitializedCondition_.notify_all(); - } - - bool waitForClientInit() - { - if (_initialized) { - return true; - } else if (_shutdown) { - return false; - } - - std::unique_lock<std::mutex> lk(shutdownMutex_); - - if (_initialized) { - return true; - } else if (_shutdown) { - return false; - } - - while (true) { - if (clientInitializedCondition_.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) { - break; - } - auto status = async_result_.wait_for(std::chrono::milliseconds(100)); - if (status == std::future_status::ready) { - getMessengerLogger()->error("Grok exited unexpectedly during initialization"); - return false; - } - } - - return _initialized && !_shutdown; - } - - static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) - { - return sizeof(uint16_t) * w * h * samplesPerPixel; - } - - void reclaimCompressed(size_t frameId) - { - availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); - } - - void reclaimUncompressed(size_t frameId) - { - availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); - } - - uint8_t* getUncompressedFrame(size_t frameId) - { - assert(frameId < init_.numFrames_); - if(frameId >= init_.numFrames_) - return nullptr; + ); - return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_); - } + void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames); + bool waitForClientInit(); - uint8_t* getCompressedFrame(size_t frameId) - { - assert(frameId < init_.numFrames_); - if(frameId >= init_.numFrames_) - return nullptr; + void reclaimCompressed(size_t frameId); + void reclaimUncompressed(size_t frameId); + uint8_t* getUncompressedFrame(size_t frameId); + uint8_t* getCompressedFrame(size_t frameId); - return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_); - } + static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel); std::atomic_bool running; bool _initialized = false; @@ -661,29 +495,8 @@ protected: std::condition_variable clientInitializedCondition_; private: - bool launch(std::string const& cmd, boost::filesystem::path const& dir) - { - // Change the working directory - if(!dir.empty()) - { - boost::system::error_code ec; - boost::filesystem::current_path(dir, ec); - if (ec) { - getMessengerLogger()->error("Error: failed to change the working directory"); - return false; - } - } - // Execute the command using std::async and std::system - cmd_ = cmd; - getMessengerLogger()->info(cmd.c_str()); - async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); }); - bool success = async_result_.valid(); - if (!success) - getMessengerLogger()->error("Grok launch failed"); + bool launch(std::string const& cmd, boost::filesystem::path const& dir); - return success; - - } std::thread outbound; Synch* outboundSynch_ = nullptr; @@ -699,47 +512,6 @@ private: }; -static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch) -{ - grk_handle shm_fd = 0; - char* send_buffer = nullptr; - - if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) - return; - while(messenger->running) - { - synch->wait(SYNCH_RECEIVE_READY); - if(!messenger->running) - break; - std::string message; - if(!messenger->sendQueue.waitAndPop(message)) - break; - if(!messenger->running) - break; - memcpy(send_buffer, message.c_str(), message.size() + 1); - synch->post(SYNCH_SENT); - } - SharedMemoryManager::deinit_shm(sendBuf, messageBufferLen, shm_fd, &send_buffer); -} - -static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch) -{ - grk_handle shm_fd = 0; - char* receive_buffer = nullptr; - - if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) - return; - while(messenger->running) - { - synch->wait(SYNCH_SENT); - if(!messenger->running) - break; - auto message = std::string(receive_buffer); - synch->post(SYNCH_RECEIVE_READY); - messenger->receiveQueue.push(message); - } - SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer); -} struct Msg { explicit Msg(const std::string &msg) : ct_(0) @@ -772,44 +544,6 @@ struct Msg }; -/* One of these is created for each core, to receive messages (e.g. "frame is encoded") - * from the grok process and handle them. - */ -static void -processor_thread(Messenger* messenger, std::function<void (std::string)> processor) -{ - while (messenger->running) { - std::string message; - if (!messenger->receiveQueue.waitAndPop(message)) { - break; - } - - if (!messenger->running) { - break; - } - - Msg msg(message); - auto tag = msg.next(); - if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) { - auto width = msg.nextUint(); - msg.nextUint(); // stride - auto height = msg.nextUint(); - auto samples_per_pixel = msg.nextUint(); - msg.nextUint(); // depth - messenger->init_.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); - auto compressed_frame_size = msg.nextUint(); - auto num_frames = msg.nextUint(); - messenger->initClient(compressed_frame_size, compressed_frame_size, num_frames); - } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) { - messenger->reclaimUncompressed(msg.nextUint()); - } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) { - messenger->reclaimCompressed(msg.nextUint()); - } - processor(message); - } -} - - template<typename F> struct ScheduledFrames { diff --git a/src/lib/wscript b/src/lib/wscript index ea6994eb1..6c642cc72 100644 --- a/src/lib/wscript +++ b/src/lib/wscript @@ -268,7 +268,7 @@ def build(bld): obj.uselib += ' POLKIT' if bld.env.ENABLE_GROK: - obj.source += ' grok_j2k_encoder_thread.cc grok/util.cc' + obj.source += ' grok_j2k_encoder_thread.cc grok/messenger.cc grok/util.cc' if bld.env.TARGET_WINDOWS_64 or bld.env.TARGET_WINDOWS_32: obj.uselib += ' WINSOCK2 DBGHELP SHLWAPI MSWSOCK BOOST_LOCALE SETUPAPI OLE32 UUID' |
