From d3e29ae53e2ac55dc3ea8bb80cd74802d0aac4e4 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Fri, 16 May 2025 23:25:34 +0200 Subject: Cleanup: move some methods into messenger.cc. --- src/lib/grok/messenger.cc | 68 +++++++++++++++++++++++++++++++++++++++++++++++ src/lib/grok/messenger.h | 62 +++--------------------------------------- 2 files changed, 71 insertions(+), 59 deletions(-) diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index 38263597b..96d895287 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -366,3 +366,71 @@ Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir) return success; } + + +bool +Messenger::schedule_compress(DCPVideo const& proxy, std::function converter) +{ + BufferSrc src; + if (!_available_buffers.waitAndPop(src)) { + return false; + } + converter(src); + _scheduled_frames.store(proxy); + _frames_scheduled++; + send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); + + return true; +} + + +/** @param processor function taking compressed J2K data and sending it to the Writer */ +void +Messenger::process_compressed(std::string const& message, std::function processor, bool needsRecompression) +{ + Msg msg(message); + msg.next(); + auto const clientFrameId = msg.nextUint(); + auto const compressedFrameId = msg.nextUint(); + auto const compressedFrameLength = msg.nextUint(); + if (!needsRecompression) { + auto src_frame = _scheduled_frames.retrieve(clientFrameId); + if (!src_frame) { + return; + } + /* Write the compressed J2K data out */ + processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength); + } + + ++_frames_compressed; + send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); + + if (_shutdown && _frames_compressed == _frames_scheduled) { + _shutdown_condition.notify_all(); + } +} + +void +Messenger::shutdown() +{ + try { + std::unique_lock lk(_shutdown_mutex); + if (!_async_result.valid()) { + return; + } + _shutdown = true; + if (_frames_scheduled) { + uint32_t scheduled = _frames_scheduled; + send(GRK_MSGR_BATCH_FLUSH, scheduled); + _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); + } + _available_buffers.deactivate(); + send(GRK_MSGR_BATCH_SHUTDOWN); + int result = _async_result.get(); + if(result != 0) { + getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); + } + } catch (std::exception &ex) { + getMessengerLogger()->error("%s",ex.what()); + } +} diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index ce4b74025..c47ef0107 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -543,68 +543,12 @@ public: static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel); - bool schedule_compress(DCPVideo const& proxy, std::function converter) - { - BufferSrc src; - if (!_available_buffers.waitAndPop(src)) { - return false; - } - converter(src); - _scheduled_frames.store(proxy); - _frames_scheduled++; - send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); - - return true; - } + bool schedule_compress(DCPVideo const& proxy, std::function converter); /** @param processor function taking compressed J2K data and sending it to the Writer */ - void process_compressed(std::string const& message, std::function processor, bool needsRecompression) - { - Msg msg(message); - msg.next(); - auto const clientFrameId = msg.nextUint(); - auto const compressedFrameId = msg.nextUint(); - auto const compressedFrameLength = msg.nextUint(); - if (!needsRecompression) { - auto src_frame = _scheduled_frames.retrieve(clientFrameId); - if (!src_frame) { - return; - } - /* Write the compressed J2K data out */ - processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength); - } + void process_compressed(std::string const& message, std::function processor, bool needsRecompression); - ++_frames_compressed; - send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); - - if (_shutdown && _frames_compressed == _frames_scheduled) { - _shutdown_condition.notify_all(); - } - } - - void shutdown() - { - try { - std::unique_lock lk(_shutdown_mutex); - if (!_async_result.valid()) { - return; - } - _shutdown = true; - if (_frames_scheduled) { - uint32_t scheduled = _frames_scheduled; - send(GRK_MSGR_BATCH_FLUSH, scheduled); - _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); - } - _available_buffers.deactivate(); - send(GRK_MSGR_BATCH_SHUTDOWN); - int result = _async_result.get(); - if(result != 0) { - getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); - } - } catch (std::exception &ex) { - getMessengerLogger()->error("%s",ex.what()); - } - } + void shutdown(); boost::optional retrieve(size_t index) { return _scheduled_frames.retrieve(index); -- cgit v1.2.3