diff options
| author | Carl Hetherington <cth@carlh.net> | 2025-05-16 23:16:58 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2025-05-28 00:33:55 +0200 |
| commit | 915b20098427ef8c4732b87af22f901ba6bd5e2a (patch) | |
| tree | faf11bf3ff812cd242e7b043e3908f8aa51d2646 | |
| parent | ec66fd6126bc1a29539a65ace35bbae8c03796f6 (diff) | |
Cleanup: move all of Messenger into ScheduledMessenger.
| -rw-r--r-- | src/lib/grok/messenger.cc | 46 | ||||
| -rw-r--r-- | src/lib/grok/messenger.h | 161 |
2 files changed, 97 insertions, 110 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index e8dfcf3df..5c6b050ff 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -25,16 +25,20 @@ using namespace grk_plugin; -Messenger::Messenger(MessengerInit init) +ScheduledMessenger::ScheduledMessenger(MessengerInit init) : _running(true) , _init(init) + , _frames_scheduled(0) + , _frames_compressed(0) { } -Messenger::~Messenger() +ScheduledMessenger::~ScheduledMessenger() { + shutdown(); + _running = false; _send_queue.deactivate(); _receive_queue.deactivate(); @@ -64,7 +68,7 @@ Messenger::~Messenger() * from the grok process and handle them. */ void -Messenger::processor_thread() +ScheduledMessenger::processor_thread() { while (_running) { std::string message; @@ -84,7 +88,7 @@ Messenger::processor_thread() auto height = msg.nextUint(); auto samples_per_pixel = msg.nextUint(); msg.nextUint(); // depth - _init.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); + _init.uncompressedFrameSize_ = ScheduledMessenger::uncompressedFrameSize(width, height, samples_per_pixel); auto compressed_frame_size = msg.nextUint(); auto num_frames = msg.nextUint(); initClient(compressed_frame_size, compressed_frame_size, num_frames); @@ -101,7 +105,7 @@ Messenger::processor_thread() void -Messenger::outbound_thread() +ScheduledMessenger::outbound_thread() { grk_handle shm_fd = 0; char* send_buffer = nullptr; @@ -131,7 +135,7 @@ Messenger::outbound_thread() void -Messenger::inbound_thread() +ScheduledMessenger::inbound_thread() { grk_handle shm_fd = 0; char* receive_buffer = nullptr; @@ -155,22 +159,22 @@ Messenger::inbound_thread() void -Messenger::startThreads() +ScheduledMessenger::startThreads() { _outbound_synch = new Synch(_init.outboundSentSynch, _init.outboundReceiveReadySynch); - _outbound = std::thread(&Messenger::outbound_thread, this); + _outbound = std::thread(&ScheduledMessenger::outbound_thread, this); _inbound_synch = new Synch(_init.inboundSentSynch, _init.inboundReceiveReadySynch); - _inbound = std::thread(&Messenger::inbound_thread, this); + _inbound = std::thread(&ScheduledMessenger::inbound_thread, this); for (size_t i = 0; i < _init.numProcessingThreads_; ++i) { - _processors.push_back(std::thread(&Messenger::processor_thread, this)); + _processors.push_back(std::thread(&ScheduledMessenger::processor_thread, this)); } } bool -Messenger::initBuffers() +ScheduledMessenger::initBuffers() { bool rc = true; if (_init.uncompressedFrameSize_) { @@ -194,7 +198,7 @@ Messenger::initBuffers() bool -Messenger::deinit_shm() +ScheduledMessenger::deinit_shm() { bool rc = SharedMemoryManager::deinit_shm( grokUncompressedBuf, @@ -213,7 +217,7 @@ Messenger::deinit_shm() bool -Messenger::launch_grok( +ScheduledMessenger::launch_grok( boost::filesystem::path const& dir, uint32_t width, uint32_t stride, @@ -247,7 +251,7 @@ Messenger::launch_grok( void -Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) +ScheduledMessenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) { // client fills queue with pending uncompressed buffers _init.uncompressedFrameSize_ = uncompressedFrameSize; @@ -267,7 +271,7 @@ Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, } bool -Messenger::waitForClientInit() +ScheduledMessenger::waitForClientInit() { if (_initialized) { return true; @@ -299,28 +303,28 @@ Messenger::waitForClientInit() size_t -Messenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) +ScheduledMessenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) { return sizeof(uint16_t) * w * h * samplesPerPixel; } void -Messenger::reclaimCompressed(size_t frameId) +ScheduledMessenger::reclaimCompressed(size_t frameId) { _available_buffers.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); } void -Messenger::reclaimUncompressed(size_t frameId) +ScheduledMessenger::reclaimUncompressed(size_t frameId) { _available_buffers.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); } uint8_t* -Messenger::getUncompressedFrame(size_t frameId) +ScheduledMessenger::getUncompressedFrame(size_t frameId) { assert(frameId < _init.numFrames_); if(frameId >= _init.numFrames_) @@ -331,7 +335,7 @@ Messenger::getUncompressedFrame(size_t frameId) uint8_t* -Messenger::getCompressedFrame(size_t frameId) +ScheduledMessenger::getCompressedFrame(size_t frameId) { assert(frameId < _init.numFrames_); if(frameId >= _init.numFrames_) @@ -341,7 +345,7 @@ Messenger::getCompressedFrame(size_t frameId) } bool -Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir) +ScheduledMessenger::launch(std::string const& cmd, boost::filesystem::path const& dir) { // Change the working directory if(!dir.empty()) diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index 99c9b35ac..648aa5a63 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -437,87 +437,6 @@ struct BufferSrc }; -class Messenger -{ -public: - explicit Messenger(MessengerInit init); - virtual ~Messenger(); - - void startThreads(); - bool initBuffers(); - bool deinit_shm(); - - template<typename... Args> - void send(const std::string& str, Args... args) - { - std::ostringstream oss; - oss << str; - int dummy[] = {0, ((void)(oss << ',' << args), 0)...}; - static_cast<void>(dummy); - - _send_queue.push(oss.str()); - } - - bool launch_grok( - boost::filesystem::path const& dir, - uint32_t width, - uint32_t stride, - uint32_t height, - uint32_t samplesPerPixel, - uint32_t depth, - int device, - bool is4K, - uint32_t fps, - uint32_t bandwidth, - const std::string server, - const std::string license - ); - - void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames); - bool waitForClientInit(); - - void reclaimCompressed(size_t frameId); - void reclaimUncompressed(size_t frameId); - uint8_t* getUncompressedFrame(size_t frameId); - uint8_t* getCompressedFrame(size_t frameId); - - static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel); - -protected: - std::condition_variable _client_initialized_condition; - MessengerBlockingQueue<BufferSrc> _available_buffers; - std::future<int> _async_result; - std::condition_variable _shutdown_condition; - bool _shutdown = false; - std::mutex _shutdown_mutex; - -private: - bool launch(std::string const& cmd, boost::filesystem::path const& dir); - void processor_thread(); - void outbound_thread(); - void inbound_thread(); - - std::atomic_bool _running; - MessengerInit _init; - bool _initialized = false; - MessengerBlockingQueue<std::string> _send_queue; - MessengerBlockingQueue<std::string> _receive_queue; - - std::thread _outbound; - Synch* _outbound_synch = nullptr; - - std::thread _inbound; - Synch* _inbound_synch = nullptr; - - std::vector<std::thread> _processors; - char* _uncompressed_buffer = nullptr; - char* _compressed_buffer = nullptr; - - grk_handle _uncompressed_fd = 0; - grk_handle _compressed_fd = 0; -}; - - struct Msg { explicit Msg(const std::string &msg) : ct_(0) @@ -578,19 +497,52 @@ struct ScheduledFrames }; -struct ScheduledMessenger : public Messenger +class ScheduledMessenger { - explicit ScheduledMessenger(MessengerInit init) - : Messenger(init) - , _frames_scheduled(0) - , _frames_compressed(0) - {} +public: + explicit ScheduledMessenger(MessengerInit init); + ~ScheduledMessenger(); - ~ScheduledMessenger() + void startThreads(); + bool initBuffers(); + bool deinit_shm(); + + template<typename... Args> + void send(const std::string& str, Args... args) { - shutdown(); + std::ostringstream oss; + oss << str; + int dummy[] = {0, ((void)(oss << ',' << args), 0)...}; + static_cast<void>(dummy); + + _send_queue.push(oss.str()); } + bool launch_grok( + boost::filesystem::path const& dir, + uint32_t width, + uint32_t stride, + uint32_t height, + uint32_t samplesPerPixel, + uint32_t depth, + int device, + bool is4K, + uint32_t fps, + uint32_t bandwidth, + const std::string server, + const std::string license + ); + + void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames); + bool waitForClientInit(); + + void reclaimCompressed(size_t frameId); + void reclaimUncompressed(size_t frameId); + uint8_t* getUncompressedFrame(size_t frameId); + uint8_t* getCompressedFrame(size_t frameId); + + static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel); + bool schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter) { BufferSrc src; @@ -663,6 +615,37 @@ struct ScheduledMessenger : public Messenger } private: + bool launch(std::string const& cmd, boost::filesystem::path const& dir); + void processor_thread(); + void outbound_thread(); + void inbound_thread(); + + std::atomic_bool _running; + MessengerInit _init; + bool _initialized = false; + MessengerBlockingQueue<std::string> _send_queue; + MessengerBlockingQueue<std::string> _receive_queue; + + std::thread _outbound; + Synch* _outbound_synch = nullptr; + + std::thread _inbound; + Synch* _inbound_synch = nullptr; + + std::vector<std::thread> _processors; + char* _uncompressed_buffer = nullptr; + char* _compressed_buffer = nullptr; + + grk_handle _uncompressed_fd = 0; + grk_handle _compressed_fd = 0; + + std::condition_variable _client_initialized_condition; + MessengerBlockingQueue<BufferSrc> _available_buffers; + std::future<int> _async_result; + std::condition_variable _shutdown_condition; + bool _shutdown = false; + std::mutex _shutdown_mutex; + ScheduledFrames _scheduled_frames; std::atomic<uint32_t> _frames_scheduled; std::atomic<uint32_t> _frames_compressed; |
