diff options
Diffstat (limited to 'src/lib/grok/messenger.cc')
| -rw-r--r-- | src/lib/grok/messenger.cc | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc index 38263597b..96d895287 100644 --- a/src/lib/grok/messenger.cc +++ b/src/lib/grok/messenger.cc @@ -366,3 +366,71 @@ Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir) return success; } + + +bool +Messenger::schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter) +{ + BufferSrc src; + if (!_available_buffers.waitAndPop(src)) { + return false; + } + converter(src); + _scheduled_frames.store(proxy); + _frames_scheduled++; + send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); + + return true; +} + + +/** @param processor function taking compressed J2K data and sending it to the Writer */ +void +Messenger::process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression) +{ + Msg msg(message); + msg.next(); + auto const clientFrameId = msg.nextUint(); + auto const compressedFrameId = msg.nextUint(); + auto const compressedFrameLength = msg.nextUint(); + if (!needsRecompression) { + auto src_frame = _scheduled_frames.retrieve(clientFrameId); + if (!src_frame) { + return; + } + /* Write the compressed J2K data out */ + processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength); + } + + ++_frames_compressed; + send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); + + if (_shutdown && _frames_compressed == _frames_scheduled) { + _shutdown_condition.notify_all(); + } +} + +void +Messenger::shutdown() +{ + try { + std::unique_lock<std::mutex> lk(_shutdown_mutex); + if (!_async_result.valid()) { + return; + } + _shutdown = true; + if (_frames_scheduled) { + uint32_t scheduled = _frames_scheduled; + send(GRK_MSGR_BATCH_FLUSH, scheduled); + _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); + } + _available_buffers.deactivate(); + send(GRK_MSGR_BATCH_SHUTDOWN); + int result = _async_result.get(); + if(result != 0) { + getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); + } + } catch (std::exception &ex) { + getMessengerLogger()->error("%s",ex.what()); + } +} |
