summaryrefslogtreecommitdiff
path: root/src/lib/grok/messenger.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/grok/messenger.cc')
-rw-r--r--src/lib/grok/messenger.cc68
1 files changed, 68 insertions, 0 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc
index 38263597b..96d895287 100644
--- a/src/lib/grok/messenger.cc
+++ b/src/lib/grok/messenger.cc
@@ -366,3 +366,71 @@ Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir)
return success;
}
+
+
+bool
+Messenger::schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter)
+{
+ BufferSrc src;
+ if (!_available_buffers.waitAndPop(src)) {
+ return false;
+ }
+ converter(src);
+ _scheduled_frames.store(proxy);
+ _frames_scheduled++;
+ send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
+
+ return true;
+}
+
+
+/** @param processor function taking compressed J2K data and sending it to the Writer */
+void
+Messenger::process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression)
+{
+ Msg msg(message);
+ msg.next();
+ auto const clientFrameId = msg.nextUint();
+ auto const compressedFrameId = msg.nextUint();
+ auto const compressedFrameLength = msg.nextUint();
+ if (!needsRecompression) {
+ auto src_frame = _scheduled_frames.retrieve(clientFrameId);
+ if (!src_frame) {
+ return;
+ }
+ /* Write the compressed J2K data out */
+ processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength);
+ }
+
+ ++_frames_compressed;
+ send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
+
+ if (_shutdown && _frames_compressed == _frames_scheduled) {
+ _shutdown_condition.notify_all();
+ }
+}
+
+void
+Messenger::shutdown()
+{
+ try {
+ std::unique_lock<std::mutex> lk(_shutdown_mutex);
+ if (!_async_result.valid()) {
+ return;
+ }
+ _shutdown = true;
+ if (_frames_scheduled) {
+ uint32_t scheduled = _frames_scheduled;
+ send(GRK_MSGR_BATCH_FLUSH, scheduled);
+ _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
+ }
+ _available_buffers.deactivate();
+ send(GRK_MSGR_BATCH_SHUTDOWN);
+ int result = _async_result.get();
+ if(result != 0) {
+ getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
+ }
+ } catch (std::exception &ex) {
+ getMessengerLogger()->error("%s",ex.what());
+ }
+}