diff options
| author | Carl Hetherington <cth@carlh.net> | 2025-05-16 23:25:34 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2025-05-28 00:33:55 +0200 |
| commit | d3e29ae53e2ac55dc3ea8bb80cd74802d0aac4e4 (patch) | |
| tree | 44997b8856e174c526704027ac4b30f321f21de2 | |
| parent | da99ccd7b3bff2ff0de99331d540e645ce2039fa (diff) | |
Cleanup: move some methods into messenger.cc.
| -rw-r--r-- | src/lib/grok/messenger.cc | 68 | ||||
| -rw-r--r-- | src/lib/grok/messenger.h | 62 |
2 files changed, 71 insertions, 59 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index 38263597b..96d895287 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -366,3 +366,71 @@ Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir) return success; } + + +bool +Messenger::schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter) +{ + BufferSrc src; + if (!_available_buffers.waitAndPop(src)) { + return false; + } + converter(src); + _scheduled_frames.store(proxy); + _frames_scheduled++; + send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); + + return true; +} + + +/** @param processor function taking compressed J2K data and sending it to the Writer */ +void +Messenger::process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression) +{ + Msg msg(message); + msg.next(); + auto const clientFrameId = msg.nextUint(); + auto const compressedFrameId = msg.nextUint(); + auto const compressedFrameLength = msg.nextUint(); + if (!needsRecompression) { + auto src_frame = _scheduled_frames.retrieve(clientFrameId); + if (!src_frame) { + return; + } + /* Write the compressed J2K data out */ + processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength); + } + + ++_frames_compressed; + send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); + + if (_shutdown && _frames_compressed == _frames_scheduled) { + _shutdown_condition.notify_all(); + } +} + +void +Messenger::shutdown() +{ + try { + std::unique_lock<std::mutex> lk(_shutdown_mutex); + if (!_async_result.valid()) { + return; + } + _shutdown = true; + if (_frames_scheduled) { + uint32_t scheduled = _frames_scheduled; + send(GRK_MSGR_BATCH_FLUSH, scheduled); + _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); + } + _available_buffers.deactivate(); + send(GRK_MSGR_BATCH_SHUTDOWN); + int result = _async_result.get(); + if(result != 0) { + getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); + } + } catch (std::exception &ex) { + getMessengerLogger()->error("%s",ex.what()); + } +} diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h index ce4b74025..c47ef0107 100644 --- a/src/lib/grok/messenger.h +++ b/src/lib/grok/messenger.h @@ -543,68 +543,12 @@ public: static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel); - bool schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter) - { - BufferSrc src; - if (!_available_buffers.waitAndPop(src)) { - return false; - } - converter(src); - _scheduled_frames.store(proxy); - _frames_scheduled++; - send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); - - return true; - } + bool schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter); /** @param processor function taking compressed J2K data and sending it to the Writer */ - void process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression) - { - Msg msg(message); - msg.next(); - auto const clientFrameId = msg.nextUint(); - auto const compressedFrameId = msg.nextUint(); - auto const compressedFrameLength = msg.nextUint(); - if (!needsRecompression) { - auto src_frame = _scheduled_frames.retrieve(clientFrameId); - if (!src_frame) { - return; - } - /* Write the compressed J2K data out */ - processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength); - } + void process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression); - ++_frames_compressed; - send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); - - if (_shutdown && _frames_compressed == _frames_scheduled) { - _shutdown_condition.notify_all(); - } - } - - void shutdown() - { - try { - std::unique_lock<std::mutex> lk(_shutdown_mutex); - if (!_async_result.valid()) { - return; - } - _shutdown = true; - if (_frames_scheduled) { - uint32_t scheduled = _frames_scheduled; - send(GRK_MSGR_BATCH_FLUSH, scheduled); - _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); - } - _available_buffers.deactivate(); - send(GRK_MSGR_BATCH_SHUTDOWN); - int result = _async_result.get(); - if(result != 0) { - getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); - } - } catch (std::exception &ex) { - getMessengerLogger()->error("%s",ex.what()); - } - } + void shutdown(); boost::optional<DCPVideo> retrieve(size_t index) { return _scheduled_frames.retrieve(index); |
