Patch from Aaron.
authorCarl Hetherington <cth@carlh.net>
Thu, 6 Jul 2023 21:09:08 +0000 (23:09 +0200)
committerCarl Hetherington <cth@carlh.net>
Fri, 7 Jul 2023 11:58:35 +0000 (13:58 +0200)
20 files changed:
src/lib/config.cc
src/lib/config.h
src/lib/dcp_encoder.cc
src/lib/dcp_encoder.h
src/lib/dcp_video.cc
src/lib/dcp_video.h
src/lib/encoder.h
src/lib/grok_context.h [new file with mode: 0644]
src/lib/grok_messenger.h [new file with mode: 0644]
src/lib/j2k_encoder.cc
src/lib/j2k_encoder.h
src/lib/job.cc
src/lib/job.h
src/lib/transcode_job.cc
src/lib/transcode_job.h
src/tools/dcpomatic.cc
src/tools/dcpomatic_batch.cc
src/tools/dcpomatic_disk.cc
src/wx/full_config_dialog.cc
src/wx/gpu_config_panel.h [new file with mode: 0644]

index 3366a2bbc018015ca119fee015aa3f0134747329..2287cd592bb069d3e3bf024f7a230673250a0b39 100644 (file)
@@ -213,6 +213,13 @@ Config::set_defaults ()
        set_notification_email_to_default ();
        set_cover_sheet_to_default ();
 
+       _gpu_binary_location = "";
+       _enable_gpu = false;
+       _selected_gpu = 0;
+       _gpu_license_server = "";
+       _gpu_license_port = 5000;
+       _gpu_license = "";
+
        _main_divider_sash_position = {};
        _main_content_divider_sash_position = {};
 
@@ -634,6 +641,13 @@ try
        _allow_smpte_bv20 = f.optional_bool_child("AllowSMPTEBv20").get_value_or(false);
        _isdcf_name_part_length = f.optional_number_child<int>("ISDCFNamePartLength").get_value_or(14);
 
+       _enable_gpu = f.optional_bool_child("EnableGpu").get_value_or(false);
+       _gpu_binary_location = f.string_child("GpuBinaryLocation");
+       _selected_gpu = f.number_child<int>("SelectedGpu");
+       _gpu_license_server = f.string_child ("GpuLicenseServer");
+       _gpu_license_port = f.number_child<int> ("GpuLicensePort");
+       _gpu_license = f.string_child("GpuLicense");
+
        _export.read(f.optional_node_child("Export"));
 }
 catch (...) {
@@ -1120,6 +1134,13 @@ Config::write_config () const
        /* [XML] ISDCFNamePartLength Maximum length of the "name" part of an ISDCF name, which should be 14 according to the standard */
        root->add_child("ISDCFNamePartLength")->add_child_text(raw_convert<string>(_isdcf_name_part_length));
 
+       root->add_child("GpuBinaryLocation")->add_child_text (_gpu_binary_location);
+       root->add_child("EnableGpu")->add_child_text ((_enable_gpu ? "1" : "0"));
+       root->add_child("SelectedGpu")->add_child_text (raw_convert<string> (_selected_gpu));
+       root->add_child("GpuLicenseServer")->add_child_text (_gpu_license_server);
+       root->add_child("GpuLicensePort")->add_child_text (raw_convert<string> (_gpu_license_port));
+       root->add_child("GpuLicense")->add_child_text (_gpu_license);
+
        _export.write(root->add_child("Export"));
 
        auto target = config_write_file();
index 0a332bcbb74abdc287153077a86601fe4d1cf00e..0c9affbb6b1b3ba9e40c096fce07926ea55e0e7a 100644 (file)
@@ -618,6 +618,28 @@ public:
                return _allow_smpte_bv20;
        }
 
+       std::string gpu_binary_location () const {
+               return _gpu_binary_location;
+       }
+
+       bool enable_gpu () const {
+               return _enable_gpu;
+       }
+
+       int selected_gpu () const {
+               return _selected_gpu;
+       }
+       std::string gpu_license_server () const {
+               return _gpu_license_server;
+       }
+
+       int gpu_license_port () const {
+               return _gpu_license_port;
+       }
+       std::string gpu_license () const {
+               return _gpu_license;
+       }
+
        int isdcf_name_part_length() const {
                return _isdcf_name_part_length;
        }
@@ -1198,11 +1220,29 @@ public:
        void set_allow_smpte_bv20(bool allow) {
                maybe_set(_allow_smpte_bv20, allow, ALLOW_SMPTE_BV20);
        }
-
+       void set_gpu_binary_location (std::string location) {
+               maybe_set (_gpu_binary_location, location);
+       }
+       void set_enable_gpu (bool enable) {
+               maybe_set (_enable_gpu, enable);
+       }
+       void set_selected_gpu (int selected) {
+               maybe_set (_selected_gpu, selected);
+       }
+       void set_gpu_license_server (std::string s) {
+               maybe_set (_gpu_license_server, s);
+       }
+       void set_gpu_license_port (int p) {
+               maybe_set (_gpu_license_port, p);
+       }
+       void set_gpu_license (std::string p) {
+               maybe_set (_gpu_license, p);
+       }
        void set_isdcf_name_part_length(int length) {
                maybe_set(_isdcf_name_part_length, length, ISDCF_NAME_PART_LENGTH);
        }
 
+
        void changed (Property p = OTHER);
        boost::signals2::signal<void (Property)> Changed;
        /** Emitted if read() failed on an existing Config file.  There is nothing
@@ -1443,6 +1483,14 @@ private:
        bool _allow_smpte_bv20;
        int _isdcf_name_part_length;
 
+       /* GPU */
+       bool _enable_gpu;
+       std::string _gpu_binary_location;
+       int _selected_gpu;
+       std::string _gpu_license_server;
+       int _gpu_license_port;
+       std::string _gpu_license;
+
        ExportConfig _export;
 
        static int const _current_version;
index 9a840c8ab916b6a594f4e6881f223ad20346f4f5..7088225cd09a5a85f3f81a7ede8f4475d9ecc5b7 100644 (file)
@@ -114,10 +114,17 @@ DCPEncoder::go ()
        }
 
        _finishing = true;
-       _j2k_encoder.end();
+       _j2k_encoder.end(true);
        _writer.finish(_film->dir(_film->dcp_name()));
 }
 
+void DCPEncoder::pause(void) {
+       _j2k_encoder.pause();
+}
+
+void DCPEncoder::resume(void) {
+       _j2k_encoder.resume();
+}
 void
 DCPEncoder::video (shared_ptr<PlayerVideo> data, DCPTime time)
 {
index ad77f6951312c03ace8fe91e7fe4d91608f2f743..a8043887eb6d8e1f2e8601a8dc12249225496485 100644 (file)
@@ -52,6 +52,8 @@ public:
        bool finishing () const override {
                return _finishing;
        }
+       void pause(void) override;
+       void resume(void) override;
 
 private:
 
index 8eb76fdd63670bebce5b09156f7900e8b8b9e6d2..78e973ca39c266a61e97d6d4cbe012c9191a424e 100644 (file)
@@ -118,6 +118,27 @@ DCPVideo::convert_to_xyz (shared_ptr<const PlayerVideo> frame, dcp::NoteHandler
        return xyz;
 }
 
+dcp::Size
+DCPVideo::get_size(void) {
+       auto image = _frame->image (bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
+       return image->size();
+}
+
+void
+DCPVideo::convert_to_xyz (uint16_t *dst) {
+
+       auto image = _frame->image (bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
+       if (_frame->colour_conversion()) {
+               dcp::rgb_to_xyz (
+                       image->data()[0],
+                       dst,
+                       image->size(),
+                       image->stride()[0],
+                       _frame->colour_conversion().get()
+                       );
+       }
+}
+
 /** J2K-encode this frame on the local host.
  *  @return Encoded data.
  */
index 33df0942cbb5e7a1e6c6c88b3edb3c9e2feb7ec2..c09442d1645c50662bddd6d11ea77247110643c7 100644 (file)
@@ -17,6 +17,8 @@
     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
 
 */
+#ifndef DCPOMATIC_DCP_VIDEO_H
+#define DCPOMATIC_DCP_VIDEO_H
 
 
 #include "encode_server_description.h"
@@ -47,6 +49,7 @@ class PlayerVideo;
 class DCPVideo
 {
 public:
+       DCPVideo (void) : DCPVideo(nullptr,0,0,0,Resolution::TWO_K){}
        DCPVideo (std::shared_ptr<const PlayerVideo>, int index, int dcp_fps, int bandwidth, Resolution r);
        DCPVideo (std::shared_ptr<const PlayerVideo>, cxml::ConstNodePtr);
 
@@ -66,6 +69,8 @@ public:
 
        static std::shared_ptr<dcp::OpenJPEGImage> convert_to_xyz (std::shared_ptr<const PlayerVideo> frame, dcp::NoteHandler note);
 
+       void convert_to_xyz (uint16_t *dst);
+       dcp::Size get_size(void);
 private:
 
        void add_metadata (xmlpp::Element *) const;
@@ -76,3 +81,5 @@ private:
        int _j2k_bandwidth;              ///< J2K bandwidth to use
        Resolution _resolution;          ///< Resolution (2K or 4K)
 };
+
+#endif
index 9b67720d361cc56931fa39867b20e70003bd08ae..f921fcb515962fb9f92af12d2189af77778c2274 100644 (file)
@@ -58,6 +58,8 @@ public:
        /** @return the number of frames that are done */
        virtual Frame frames_done () const = 0;
        virtual bool finishing () const = 0;
+       virtual void pause(void) {}
+       virtual void resume(void) {}
 
 protected:
        std::shared_ptr<const Film> _film;
diff --git a/src/lib/grok_context.h b/src/lib/grok_context.h
new file mode 100644 (file)
index 0000000..1f9726a
--- /dev/null
@@ -0,0 +1,245 @@
+/*
+    Copyright (C) 2023 Grok Image Compression Inc.
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+#pragma once
+
+#include "config.h"
+#include "log.h"
+#include "dcpomatic_log.h"
+#include "writer.h"
+#include "grok_messenger.h"
+
+class Film;
+using dcp::Data;
+using namespace dcpomatic;
+
+static std::mutex launchMutex;
+
+namespace grk_plugin
+{
+
+struct GrokLogger : public MessengerLogger {
+       explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
+       {}
+       virtual ~GrokLogger() = default;
+       void info(const char* fmt, ...) override{
+               va_list arg;
+               va_start(arg, fmt);
+               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
+               va_end(arg);
+       }
+       void warn(const char* fmt, ...) override{
+               va_list arg;
+               va_start(arg, fmt);
+               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
+               va_end(arg);
+       }
+       void error(const char* fmt, ...) override{
+               va_list arg;
+               va_start(arg, fmt);
+               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
+               va_end(arg);
+       }
+};
+
+struct GrokInitializer {
+       GrokInitializer(void) {
+               setMessengerLogger(new GrokLogger("[GROK] "));
+       }
+       ~GrokInitializer()  = default;
+};
+
+struct FrameProxy {
+       FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
+       {}
+       FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
+       {}
+       int index() const {
+               return index_;
+       }
+       Eyes eyes(void) const {
+               return eyes_;
+       }
+       int index_;
+       Eyes eyes_;
+       DCPVideo vf;
+};
+
+struct DcpomaticContext {
+       DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
+                                               EventHistory &history, const std::string &location) :
+                                                                       film_(film), writer_(writer),
+                                                                       history_(history), location_(location),
+                                                                       width_(0), height_(0)
+       {}
+       void setDimensions(uint32_t w, uint32_t h) {
+               width_ = w;
+               height_ = h;
+       }
+       std::shared_ptr<const Film> film_;
+       Writer& writer_;
+       EventHistory &history_;
+       std::string location_;
+       uint32_t width_;
+       uint32_t height_;
+};
+
+class GrokContext {
+public:
+       explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
+                                                               dcpomaticContext_(dcpomaticContext),
+                                                               messenger_(nullptr),
+                                                               launched_(false)
+       {
+               struct CompressedData : public dcp::Data {
+                       explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
+                       {}
+                       ~CompressedData(void){
+                               delete[] data_;
+                       }
+                       uint8_t const * data () const override {
+                               return data_;
+                       }
+                       uint8_t * data () override {
+                               return data_;
+                       }
+                       int size () const override {
+                               return dataLen_;
+                       }
+                       uint8_t *data_;
+                       int dataLen_;
+               };
+               if (Config::instance()->enable_gpu ())  {
+                   boost::filesystem::path folder(dcpomaticContext_.location_);
+                   boost::filesystem::path binaryPath = folder / "grk_compress";
+                   if (!boost::filesystem::exists(binaryPath)) {
+                       getMessengerLogger()->error("Invalid binary location %s",
+                                       dcpomaticContext_.location_.c_str());
+                               return;
+                   }
+                       auto proc = [this](const std::string& str) {
+                               try {
+                                       Msg msg(str);
+                                       auto tag = msg.next();
+                                       if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
+                                       {
+                                               auto clientFrameId = msg.nextUint();
+                                               auto compressedFrameId = msg.nextUint();
+                                               (void)compressedFrameId;
+                                               auto compressedFrameLength = msg.nextUint();
+                                               auto  processor =
+                                                               [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
+                                               {
+                                                       auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
+                                                       memcpy(compressedData->data_,compressed,compressedFrameLength );
+                                                       dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
+                                                       frame_done ();
+                                               };
+                                               int const minimum_size = 16384;
+                                               bool needsRecompression = compressedFrameLength < minimum_size;
+                                               messenger_->processCompressed(str, processor, needsRecompression);
+                                               if (needsRecompression) {
+                                                       bool success = false;
+                                                       auto fp = messenger_->retrieve(clientFrameId, success);
+                                                       if (!success)
+                                                               return;
+
+                                                       auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
+                                                       dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
+                                                       frame_done ();
+                                               }
+                                       }
+                               } catch (std::exception &ex){
+                                       getMessengerLogger()->error("%s",ex.what());
+                               }
+                       };
+                       auto clientInit =
+                               MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
+                                                         grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
+                                                         std::thread::hardware_concurrency());
+                       messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
+               }
+       }
+       ~GrokContext(void) {
+               shutdown();
+       }
+       bool launch(DCPVideo dcpv, int device){
+               if (!messenger_ )
+                       return false;
+               if (launched_)
+                       return true;
+               std::unique_lock<std::mutex> lk_global(launchMutex);
+               if (!messenger_)
+                       return false;
+               if (launched_)
+                       return true;
+               if (MessengerInit::firstLaunch(true)) {
+                       auto s = dcpv.get_size();
+                       dcpomaticContext_.setDimensions(s.width, s.height);
+                       auto config = Config::instance();
+                       messenger_->launchGrok(dcpomaticContext_.location_,
+                                       dcpomaticContext_.width_,dcpomaticContext_.width_,
+                                       dcpomaticContext_.height_,
+                                       3, 12, device,
+                                       dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
+                                       dcpomaticContext_.film_->video_frame_rate(),
+                                       dcpomaticContext_.film_->j2k_bandwidth(),
+                                       config->gpu_license_server(),
+                                       config->gpu_license_port(),
+                                       config->gpu_license());
+               }
+               launched_ =  messenger_->waitForClientInit();
+
+               return launched_;
+       }
+       bool scheduleCompress(const DCPVideo &vf){
+               if (!messenger_)
+                       return false;
+
+               auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
+               auto cvt = [this, &fp](BufferSrc src){
+                       // xyz conversion
+                       fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
+               };
+               return messenger_->scheduleCompress(fp, cvt);
+       }
+       void shutdown(void){
+               if (!messenger_)
+                       return;
+
+               std::unique_lock<std::mutex> lk_global(launchMutex);
+               if (!messenger_)
+                       return;
+               if (launched_)
+                       messenger_->shutdown();
+               delete messenger_;
+               messenger_ = nullptr;
+       }
+       void frame_done () {
+               dcpomaticContext_.history_.event ();
+       }
+private:
+       DcpomaticContext dcpomaticContext_;
+       ScheduledMessenger<FrameProxy> *messenger_;
+       bool launched_;
+};
+
+}
+
diff --git a/src/lib/grok_messenger.h b/src/lib/grok_messenger.h
new file mode 100644 (file)
index 0000000..45ee752
--- /dev/null
@@ -0,0 +1,930 @@
+/*
+    Copyright (C) 2023 Grok Image Compression Inc.
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+#pragma once
+
+#include <iostream>
+#include <string>
+#include <cstring>
+#include <atomic>
+#include <functional>
+#include <sstream>
+#include <future>
+#include <map>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <cassert>
+#include <cstdarg>
+
+#ifdef _WIN32
+#include <windows.h>
+#include <direct.h>
+#include <tlhelp32.h>
+#pragma warning(disable : 4100)
+#else
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <semaphore.h>
+#include <signal.h>
+#endif
+
+namespace grk_plugin
+{
+static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
+static std::string grokSentSynch = "Global\\grok_sent";
+static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
+static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
+static std::string clientSentSynch = "Global\\client_sent";
+static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
+static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
+static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
+static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
+static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
+static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
+       "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
+       "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
+static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
+static const size_t messageBufferLen = 256;
+struct IMessengerLogger
+{
+       virtual ~IMessengerLogger(void) = default;
+       virtual void info(const char* fmt, ...) = 0;
+       virtual void warn(const char* fmt, ...) = 0;
+       virtual void error(const char* fmt, ...) = 0;
+
+  protected:
+       template<typename... Args>
+       std::string log_message(char const* const format, Args&... args) noexcept
+       {
+               constexpr size_t message_size = 512;
+               char message[message_size];
+
+               std::snprintf(message, message_size, format, args...);
+               return std::string(message);
+       }
+};
+struct MessengerLogger : public IMessengerLogger
+{
+       explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
+       virtual ~MessengerLogger() = default;
+       virtual void info(const char* fmt, ...) override
+       {
+               va_list args;
+               std::string new_fmt = preamble_ + fmt + "\n";
+               va_start(args, fmt);
+               vfprintf(stdout, new_fmt.c_str(), args);
+               va_end(args);
+       }
+       virtual void warn(const char* fmt, ...) override
+       {
+               va_list args;
+               std::string new_fmt = preamble_ + fmt + "\n";
+               va_start(args, fmt);
+               vfprintf(stdout, new_fmt.c_str(), args);
+               va_end(args);
+       }
+       virtual void error(const char* fmt, ...) override
+       {
+               va_list args;
+               std::string new_fmt = preamble_ + fmt + "\n";
+               va_start(args, fmt);
+               vfprintf(stderr, new_fmt.c_str(), args);
+               va_end(args);
+       }
+
+  protected:
+       std::string preamble_;
+};
+
+static IMessengerLogger* sLogger = nullptr;
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-function"
+#endif
+static void setMessengerLogger(IMessengerLogger* logger)
+{
+       delete sLogger;
+       sLogger = logger;
+}
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+static IMessengerLogger* getMessengerLogger(void)
+{
+       return sLogger;
+}
+struct MessengerInit
+{
+       MessengerInit(const std::string &outBuf, const std::string &outSent,
+                                 const std::string &outReceiveReady, const std::string &inBuf,
+                                 const std::string &inSent,
+                                 const std::string &inReceiveReady,
+                                 std::function<void(std::string)> processor,
+                                 size_t numProcessingThreads)
+               : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
+                 outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
+                 inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
+                 numProcessingThreads_(numProcessingThreads),
+                 uncompressedFrameSize_(0), compressedFrameSize_(0),
+                 numFrames_(0)
+       {
+               if(firstLaunch(true))
+                       unlink();
+       }
+       void unlink(void)
+       {
+#ifndef _WIN32
+               shm_unlink(grokToClientMessageBuf.c_str());
+               shm_unlink(clientToGrokMessageBuf.c_str());
+#endif
+       }
+       static bool firstLaunch(bool isClient)
+       {
+               bool debugGrok = false;
+               return debugGrok != isClient;
+       }
+       std::string outboundMessageBuf;
+       std::string outboundSentSynch;
+       std::string outboundReceiveReadySynch;
+
+       std::string inboundMessageBuf;
+       std::string inboundSentSynch;
+       std::string inboundReceiveReadySynch;
+
+       std::function<void(std::string)> processor_;
+       size_t numProcessingThreads_;
+
+       size_t uncompressedFrameSize_;
+       size_t compressedFrameSize_;
+       size_t numFrames_;
+};
+
+/*************************** Synchronization *******************************/
+enum SynchDirection
+{
+       SYNCH_SENT,
+       SYNCH_RECEIVE_READY
+};
+
+typedef int grk_handle;
+struct Synch
+{
+       Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
+               : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
+       {
+               // unlink semaphores in case of previous crash
+               if(MessengerInit::firstLaunch(true))
+                       unlink();
+               open();
+       }
+       ~Synch()
+       {
+               close();
+               if(MessengerInit::firstLaunch(true))
+                       unlink();
+       }
+       void post(SynchDirection dir)
+       {
+               auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
+               int rc = sem_post(sem);
+               if(rc)
+                       getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
+       }
+       void wait(SynchDirection dir)
+       {
+               auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
+               int rc = sem_wait(sem);
+               if(rc)
+                       getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
+       }
+       void open(void)
+       {
+               sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
+               if(!sentSem_)
+                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+               receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
+               if(!receiveReadySem_)
+                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+       }
+       void close(void)
+       {
+               int rc = sem_close(sentSem_);
+               if(rc)
+                       getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
+                                                                               strerror(errno));
+               rc = sem_close(receiveReadySem_);
+               if(rc)
+                       getMessengerLogger()->error("Error closing semaphore %s: %s",
+                                                                               receiveReadySemName_.c_str(), strerror(errno));
+       }
+       void unlink(void)
+       {
+               int rc = sem_unlink(sentSemName_.c_str());
+               if(rc == -1 && errno != ENOENT)
+                       getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
+                                                                               strerror(errno));
+               rc = sem_unlink(receiveReadySemName_.c_str());
+               if(rc == -1 && errno != ENOENT)
+                       getMessengerLogger()->error("Error unlinking semaphore %s: %s",
+                                                                               receiveReadySemName_.c_str(), strerror(errno));
+       }
+       sem_t* sentSem_;
+       sem_t* receiveReadySem_;
+
+  private:
+       std::string sentSemName_;
+       std::string receiveReadySemName_;
+};
+struct SharedMemoryManager
+{
+       static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
+       {
+               *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
+               if(*shm_fd < 0)
+               {
+                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+                       return false;
+               }
+               int rc = ftruncate(*shm_fd, sizeof(char) * len);
+               if(rc)
+               {
+                       getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
+                       rc = close(*shm_fd);
+                       if(rc)
+                               getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+                       rc = shm_unlink(name.c_str());
+                       if(rc)
+                               getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+                       return false;
+               }
+               *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
+               if(!*buffer)
+               {
+                       getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
+                       rc = close(*shm_fd);
+                       if(rc)
+                               getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+                       rc = shm_unlink(name.c_str());
+                       if(rc)
+                               getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+               }
+
+               return *buffer != nullptr;
+       }
+       static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
+       {
+               if (!*buffer || !shm_fd)
+                       return true;
+
+               int rc = munmap(*buffer, len);
+               *buffer = nullptr;
+               if(rc)
+                       getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
+               rc = close(shm_fd);
+               shm_fd = 0;
+               if(rc)
+                       getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
+               rc = shm_unlink(name.c_str());
+               if(rc)
+                       fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
+
+               return true;
+       }
+};
+
+template<typename Data>
+class MessengerBlockingQueue
+{
+  public:
+       explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
+       MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
+       size_t size() const
+       {
+               return queue_.size();
+       }
+       // deactivate and clear queue
+       void deactivate()
+       {
+               {
+                       std::lock_guard<std::mutex> lk(mutex_);
+                       active_ = false;
+                       while(!queue_.empty())
+                               queue_.pop();
+               }
+
+               // release all waiting threads
+               can_pop_.notify_all();
+               can_push_.notify_all();
+       }
+       void activate()
+       {
+               std::lock_guard<std::mutex> lk(mutex_);
+               active_ = true;
+       }
+       bool push(Data const& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       rc = push_(value);
+               }
+               if(rc)
+                       can_pop_.notify_one();
+
+               return rc;
+       }
+       bool waitAndPush(Data& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       if(!active_)
+                               return false;
+                       // in case of spurious wakeup, loop until predicate in lambda
+                       // is satisfied.
+                       can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
+                       rc = push_(value);
+               }
+               if(rc)
+                       can_pop_.notify_one();
+
+               return rc;
+       }
+       bool pop(Data& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       rc = pop_(value);
+               }
+               if(rc)
+                       can_push_.notify_one();
+
+               return rc;
+       }
+       bool waitAndPop(Data& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       if(!active_)
+                               return false;
+                       // in case of spurious wakeup, loop until predicate in lambda
+                       // is satisfied.
+                       can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
+                       rc = pop_(value);
+               }
+               if(rc)
+                       can_push_.notify_one();
+
+               return rc;
+       }
+
+  private:
+       bool push_(Data const& value)
+       {
+               if(queue_.size() == max_size_ || !active_)
+                       return false;
+               queue_.push(value);
+
+               return true;
+       }
+       bool pop_(Data& value)
+       {
+               if(queue_.empty() || !active_)
+                       return false;
+               value = queue_.front();
+               queue_.pop();
+
+               return true;
+       }
+       std::queue<Data> queue_;
+       mutable std::mutex mutex_;
+       std::condition_variable can_pop_;
+       std::condition_variable can_push_;
+       bool active_;
+       size_t max_size_;
+};
+struct BufferSrc
+{
+       BufferSrc(void) : BufferSrc("") {}
+       explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
+       {}
+       BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
+               : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
+       {}
+       bool fromDisk(void)
+       {
+               return !file_.empty() && framePtr_ == nullptr;
+       }
+       size_t index() const
+       {
+               return clientFrameId_;
+       }
+       std::string file_;
+       size_t clientFrameId_;
+       size_t frameId_;
+       uint8_t* framePtr_;
+};
+
+struct Messenger;
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
+
+struct Messenger
+{
+       explicit Messenger(MessengerInit init)
+               : running(true), initialized_(false), shutdown_(false), init_(init),
+                 outboundSynch_(nullptr),
+                 inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
+                 uncompressed_fd_(0), compressed_fd_(0)
+       {}
+       virtual ~Messenger(void)
+       {
+               running = false;
+               sendQueue.deactivate();
+               receiveQueue.deactivate();
+
+               if (outboundSynch_) {
+                       outboundSynch_->post(SYNCH_RECEIVE_READY);
+                       outbound.join();
+               }
+
+               if (inboundSynch_) {
+                       inboundSynch_->post(SYNCH_SENT);
+                       inbound.join();
+               }
+
+               for(auto& p : processors_)
+                       p.join();
+
+               delete outboundSynch_;
+               delete inboundSynch_;
+
+               deinitShm();
+       }
+       void startThreads(void) {
+               outboundSynch_ =
+                       new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
+               outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
+
+               inboundSynch_ =
+                       new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
+               inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
+
+               for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
+                       processors_.push_back(std::thread(processorThread, this, init_.processor_));
+       }
+       size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
+                                        size_t compressedLength)
+       {
+               char fname[512];
+               if(!compressedPtr || !compressedLength)
+                       return 0;
+               sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
+               auto fp = fopen(fname, "wb");
+               if(!fp)
+                       return 0;
+               size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
+               if(written != compressedLength)
+               {
+                       fclose(fp);
+                       return 0;
+               }
+               fflush(fp);
+               fclose(fp);
+
+               return written;
+       }
+       bool initBuffers(void)
+       {
+               bool rc = true;
+               if(init_.uncompressedFrameSize_)
+               {
+                       rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
+                                                                                                       init_.uncompressedFrameSize_ * init_.numFrames_,
+                                                                                                       &uncompressed_fd_, &uncompressed_buffer_);
+               }
+               if(init_.compressedFrameSize_)
+               {
+                       rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
+                                                                                                       init_.compressedFrameSize_ * init_.numFrames_,
+                                                                                                       &compressed_fd_, &compressed_buffer_);
+               }
+
+               return rc;
+       }
+
+       bool deinitShm(void)
+       {
+               bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
+                                                                                                init_.uncompressedFrameSize_ * init_.numFrames_,
+                                                                                                uncompressed_fd_, &uncompressed_buffer_);
+               rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
+                                                                                                 init_.compressedFrameSize_ * init_.numFrames_,
+                                                                                                 compressed_fd_, &compressed_buffer_);
+
+               return rc;
+       }
+       template<typename... Args>
+       void send(const std::string& str, Args... args)
+       {
+               std::ostringstream oss;
+               oss << str;
+               int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
+               static_cast<void>(dummy);
+
+               sendQueue.push(oss.str());
+       }
+       static pid_t get_pid_by_process_name(const char* name)
+       {
+               char command[256];
+               snprintf(command, sizeof(command), "pgrep %s", name);
+               auto pgrep = popen(command, "r");
+               if(!pgrep)
+                       return -1;
+               pid_t pid;
+               if(fscanf(pgrep, "%d", &pid) != 1)
+                       pid = -1;
+               pclose(pgrep);
+
+               return pid;
+       }
+       static bool terminate_process(const char* name)
+       {
+               auto pid = get_pid_by_process_name(name);
+
+               return (pid != -1 && kill(pid, SIGTERM) != -1);
+       }
+       static bool kill_process(const char* name)
+       {
+               auto pid = get_pid_by_process_name(name);
+
+               return (pid != -1 && kill(pid, SIGKILL) != -1);
+       }
+       void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
+                                                               uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
+                                                               int device, bool is4K, uint32_t fps, uint32_t bandwidth,
+                                                               const std::string server, uint32_t port,
+                                                               const std::string license)
+       {
+
+               std::unique_lock<std::mutex> lk(shutdownMutex_);
+               if (async_result_.valid())
+                       return;
+               if(MessengerInit::firstLaunch(true))
+                       init_.unlink();
+               startThreads();
+               char _cmd[4096];
+               auto fullServer = server + ":" + std::to_string(port);
+               sprintf(_cmd,
+                               "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
+                               "-G %d -%s %d,%d -j %s -J %s",
+                               GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
+                               device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
+                               license.c_str(), fullServer.c_str());
+               launch(_cmd, dir);
+       }
+       void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
+       {
+               // client fills queue with pending uncompressed buffers
+               init_.uncompressedFrameSize_ = uncompressedFrameSize;
+               init_.compressedFrameSize_ = compressedFrameSize;
+               init_.numFrames_ = numFrames;
+               initBuffers();
+               auto ptr = uncompressed_buffer_;
+               for(size_t i = 0; i < init_.numFrames_; ++i)
+               {
+                       availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
+                       ptr += init_.uncompressedFrameSize_;
+               }
+
+               std::unique_lock<std::mutex> lk(shutdownMutex_);
+               initialized_ = true;
+               clientInitializedCondition_.notify_all();
+       }
+       bool waitForClientInit(void)
+       {
+               if(initialized_)
+                       return true;
+
+               std::unique_lock<std::mutex> lk(shutdownMutex_);
+               if(initialized_)
+                       return true;
+               else if (shutdown_)
+                       return false;
+               clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
+
+               return initialized_ && !shutdown_;
+       }
+       static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
+       {
+               return sizeof(uint16_t) * w * h * samplesPerPixel;
+       }
+       void reclaimCompressed(size_t frameId)
+       {
+               availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
+       }
+       void reclaimUncompressed(size_t frameId)
+       {
+               availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
+       }
+       uint8_t* getUncompressedFrame(size_t frameId)
+       {
+               assert(frameId < init_.numFrames_);
+               if(frameId >= init_.numFrames_)
+                       return nullptr;
+
+               return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
+       }
+       uint8_t* getCompressedFrame(size_t frameId)
+       {
+               assert(frameId < init_.numFrames_);
+               if(frameId >= init_.numFrames_)
+                       return nullptr;
+
+               return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
+       }
+       std::atomic_bool running;
+       bool initialized_;
+       bool shutdown_;
+       MessengerBlockingQueue<std::string> sendQueue;
+       MessengerBlockingQueue<std::string> receiveQueue;
+       MessengerBlockingQueue<BufferSrc> availableBuffers_;
+       MessengerInit init_;
+       std::string cmd_;
+       std::future<int> async_result_;
+       std::mutex shutdownMutex_;
+       std::condition_variable shutdownCondition_;
+
+  protected:
+       std::condition_variable clientInitializedCondition_;
+  private:
+       void launch(const std::string &cmd, const std::string &dir)
+       {
+               // Change the working directory
+               if(!dir.empty())
+               {
+                       if(chdir(dir.c_str()) != 0)
+                       {
+                               getMessengerLogger()->error("Error: failed to change the working directory");
+                               return;
+                       }
+               }
+               // Execute the command using std::async and std::system
+               cmd_ = cmd;
+               async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
+       }
+       std::thread outbound;
+       Synch* outboundSynch_;
+
+       std::thread inbound;
+       Synch* inboundSynch_;
+
+       std::vector<std::thread> processors_;
+       char* uncompressed_buffer_;
+       char* compressed_buffer_;
+
+       grk_handle uncompressed_fd_;
+       grk_handle compressed_fd_;
+};
+
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
+{
+       grk_handle shm_fd = 0;
+       char* send_buffer = nullptr;
+
+       if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
+               return;
+       while(messenger->running)
+       {
+               synch->wait(SYNCH_RECEIVE_READY);
+               if(!messenger->running)
+                       break;
+               std::string message;
+               if(!messenger->sendQueue.waitAndPop(message))
+                       break;
+               if(!messenger->running)
+                       break;
+               memcpy(send_buffer, message.c_str(), message.size() + 1);
+               synch->post(SYNCH_SENT);
+       }
+       SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
+}
+
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
+{
+       grk_handle shm_fd = 0;
+       char* receive_buffer = nullptr;
+
+       if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
+               return;
+       while(messenger->running)
+       {
+               synch->wait(SYNCH_SENT);
+               if(!messenger->running)
+                       break;
+               auto message = std::string(receive_buffer);
+               synch->post(SYNCH_RECEIVE_READY);
+               messenger->receiveQueue.push(message);
+       }
+       SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
+}
+struct Msg
+{
+       explicit Msg(const std::string &msg) : ct_(0)
+       {
+               std::stringstream ss(msg);
+               while(ss.good())
+               {
+                       std::string substr;
+                       std::getline(ss, substr, ',');
+                       cs_.push_back(substr);
+               }
+       }
+       std::string next()
+       {
+               if(ct_ == cs_.size())
+               {
+                       getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
+                       return "";
+               }
+               return cs_[ct_++];
+       }
+
+       uint32_t nextUint(void)
+       {
+               return (uint32_t)std::stoi(next());
+       }
+
+       std::vector<std::string> cs_;
+       size_t ct_;
+};
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
+{
+       while(messenger->running)
+       {
+               std::string message;
+               if(!messenger->receiveQueue.waitAndPop(message))
+                       break;
+               if(!messenger->running)
+                       break;
+               Msg msg(message);
+               auto tag = msg.next();
+               if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
+               {
+                       auto width = msg.nextUint();
+                       auto stride = msg.nextUint();
+                       (void)stride;
+                       auto height = msg.nextUint();
+                       auto samplesPerPixel = msg.nextUint();
+                       auto depth = msg.nextUint();
+                       (void)depth;
+                       messenger->init_.uncompressedFrameSize_ =
+                               Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
+                       auto compressedFrameSize = msg.nextUint();
+                       auto numFrames = msg.nextUint();
+                       messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
+               }
+               else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
+               {
+                       messenger->reclaimUncompressed(msg.nextUint());
+               }
+               else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
+               {
+                       messenger->reclaimCompressed(msg.nextUint());
+               }
+               processor(message);
+       }
+}
+
+template<typename F>
+struct ScheduledFrames
+{
+       void store(F& val)
+       {
+               std::unique_lock<std::mutex> lk(mapMutex_);
+               auto it = map_.find(val.index());
+               if (it == map_.end())
+                       map_[val.index()] = val;
+       }
+       F retrieve(size_t index, bool &success)
+       {
+               std::unique_lock<std::mutex> lk(mapMutex_);
+               success = false;
+               auto it = map_.find(index);
+               if(it == map_.end())
+                       return F();
+
+               success = true;
+               F val = it->second;
+               map_.erase(index);
+
+               return val;
+       }
+
+ private:
+       std::mutex mapMutex_;
+       std::map<size_t, F> map_;
+};
+
+template<typename F>
+struct ScheduledMessenger : public Messenger
+{
+       explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
+                                                                                       framesScheduled_(0),
+                                                                                       framesCompressed_(0)
+       {}
+       ~ScheduledMessenger(void) {
+               shutdown();
+       }
+       bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
+               size_t frameSize = init_.uncompressedFrameSize_;
+               assert(frameSize >= init_.uncompressedFrameSize_);
+               BufferSrc src;
+               if(!availableBuffers_.waitAndPop(src))
+                       return false;
+               converter(src);
+               scheduledFrames_.store(proxy);
+               framesScheduled_++;
+               send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
+
+               return true;
+       }
+       void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
+               Msg msg(message);
+               msg.next();
+               auto clientFrameId = msg.nextUint();
+               auto compressedFrameId = msg.nextUint();
+               auto compressedFrameLength = msg.nextUint();
+               if (!needsRecompression) {
+                       bool success = false;
+                       auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
+                       if (!success)
+                               return;
+                       processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
+               }
+               ++framesCompressed_;
+               send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
+               if (shutdown_ && framesCompressed_ == framesScheduled_)
+                       shutdownCondition_.notify_all();
+       }
+       void shutdown(void){
+               try {
+                       std::unique_lock<std::mutex> lk(shutdownMutex_);
+                       if (!async_result_.valid())
+                               return;
+                       shutdown_ = true;
+                       if (framesScheduled_) {
+                               uint32_t scheduled = framesScheduled_;
+                               send(GRK_MSGR_BATCH_FLUSH, scheduled);
+                               shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
+                       }
+                       availableBuffers_.deactivate();
+                       send(GRK_MSGR_BATCH_SHUTDOWN);
+                       int result = async_result_.get();
+                       if(result != 0)
+                               getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
+               } catch (std::exception &ex) {
+                       getMessengerLogger()->error("%s",ex.what());
+               }
+
+       }
+       F retrieve(size_t index, bool &success) {
+               return scheduledFrames_.retrieve(index, success);
+       }
+       void store(F& val) {
+               scheduledFrames_.store(val);
+       }
+
+private:
+       ScheduledFrames<F> scheduledFrames_;
+       std::atomic<uint32_t> framesScheduled_;
+       std::atomic<uint32_t> framesCompressed_;
+};
+
+} // namespace grk_plugin
index d2e840f85f2a87ce647789321525fb5a14782bad..11f403d9524fe98aa6179c11f957965514874f14 100644 (file)
@@ -53,6 +53,7 @@ using boost::optional;
 using dcp::Data;
 using namespace dcpomatic;
 
+static grk_plugin::GrokInitializer grokInitializer;
 
 /** @param film Film that we are encoding.
  *  @param writer Writer that we are using.
@@ -60,7 +61,9 @@ using namespace dcpomatic;
 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
        : _film (film)
        , _history (200)
-       , _writer (writer)
+       , _writer (writer) ,
+       dcpomaticContext_(film,writer,_history, Config::instance()->gpu_binary_location ()),
+       context_(Config::instance()->enable_gpu () ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr)
 {
        servers_list_changed ();
 }
@@ -70,10 +73,13 @@ J2KEncoder::~J2KEncoder ()
 {
        _server_found_connection.disconnect();
 
+       {
        boost::mutex::scoped_lock lm (_threads_mutex);
        terminate_threads ();
-}
+       }
 
+       delete context_;
+}
 
 void
 J2KEncoder::begin ()
@@ -83,23 +89,35 @@ J2KEncoder::begin ()
                );
 }
 
+void J2KEncoder::pause(void){
+       if (Config::instance()->enable_gpu ())
+               end(false);
+}
+
+void J2KEncoder::resume(void){
+       if (Config::instance()->enable_gpu ()) {
+               context_ = new grk_plugin::GrokContext(dcpomaticContext_);
+               servers_list_changed ();
+       }
+}
 
 void
-J2KEncoder::end ()
+J2KEncoder::end (bool isFinal)
 {
-       boost::mutex::scoped_lock lock (_queue_mutex);
+       if (isFinal) {
+               boost::mutex::scoped_lock lock (_queue_mutex);
 
-       LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
+               LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
 
-       /* Keep waking workers until the queue is empty */
-       while (!_queue.empty ()) {
-               rethrow ();
-               _empty_condition.notify_all ();
-               _full_condition.wait (lock);
+               /* Keep waking workers until the queue is empty */
+                       while (!_queue.empty ()) {
+                               rethrow ();
+                               _empty_condition.notify_all ();
+                               _full_condition.wait (lock);
+                       }
+               lock.unlock ();
        }
 
-       lock.unlock ();
-
        LOG_GENERAL_NC (N_("Terminating encoder threads"));
 
        {
@@ -113,27 +131,38 @@ J2KEncoder::end ()
        LOG_GENERAL (N_("Mopping up %1"), _queue.size());
 
        /* The following sequence of events can occur in the above code:
-            1. a remote worker takes the last image off the queue
-            2. the loop above terminates
-            3. the remote worker fails to encode the image and puts it back on the queue
-            4. the remote worker is then terminated by terminate_threads
+                1. a remote worker takes the last image off the queue
+                2. the loop above terminates
+                3. the remote worker fails to encode the image and puts it back on the queue
+                4. the remote worker is then terminated by terminate_threads
 
-            So just mop up anything left in the queue here.
+                So just mop up anything left in the queue here.
        */
-
-       for (auto const& i: _queue) {
-               LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
-               try {
-                       _writer.write(
-                               make_shared<dcp::ArrayData>(i.encode_locally()),
-                               i.index(),
-                               i.eyes()
-                               );
-                       frame_done ();
-               } catch (std::exception& e) {
-                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+       if (isFinal) {
+               for (auto & i: _queue) {
+                       if (Config::instance()->enable_gpu ()) {
+                               if (!context_->scheduleCompress(i)){
+                                       LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
+                                       // handle error
+                               }
+                       }
+                       else {
+                               LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
+                               try {
+                                       _writer.write(
+                                                       make_shared<dcp::ArrayData>(i.encode_locally()),
+                                               i.index(),
+                                               i.eyes()
+                                               );
+                                       frame_done ();
+                               } catch (std::exception& e) {
+                                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+                               }
+                       }
                }
        }
+       delete context_;
+       context_ = nullptr;
 }
 
 
@@ -183,7 +212,10 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
        size_t threads = 0;
        {
                boost::mutex::scoped_lock lm (_threads_mutex);
-               threads = _threads->size();
+               if (_threads)
+                       threads = _threads->size();
+               else
+                       threads = std::thread::hardware_concurrency();
        }
 
        boost::mutex::scoped_lock queue_lock (_queue_mutex);
@@ -223,13 +255,14 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
                LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
                /* Queue this new frame for encoding */
                LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
-               _queue.push_back (DCPVideo(
+               auto dcpv = DCPVideo(
                                pv,
                                position,
                                _film->video_frame_rate(),
                                _film->j2k_bandwidth(),
                                _film->resolution()
-                               ));
+                               );
+               _queue.push_back (dcpv);
 
                /* The queue might not be empty any more, so notify anything which is
                   waiting on that.
@@ -269,6 +302,8 @@ void
 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
 try
 {
+       auto config = Config::instance ();
+
        start_of_thread ("J2KEncoder");
 
        if (server) {
@@ -332,14 +367,22 @@ try
                                }
 
                        } else {
-                               try {
-                                       LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
-                                       encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
-                                       LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
-                               } catch (std::exception& e) {
-                                       /* This is very bad, so don't cope with it, just pass it on */
-                                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
-                                       throw;
+                               if (context_) {
+                                       if (!context_->launch(vf, config->selected_gpu()) || !context_->scheduleCompress(vf)) {
+                                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
+                                               _queue.push_front (vf);
+                                       }
+
+                               } else {
+                                       try {
+                                               LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
+                                               encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
+                                               LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
+                                       } catch (std::exception& e) {
+                                               /* This is very bad, so don't cope with it, just pass it on */
+                                               LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+                                               throw;
+                                       }
                                }
                        }
 
@@ -347,10 +390,12 @@ try
                                _writer.write(encoded, vf.index(), vf.eyes());
                                frame_done ();
                        } else {
-                               lock.lock ();
-                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
-                               _queue.push_front (vf);
-                               lock.unlock ();
+                               if (!Config::instance()->enable_gpu ()) {
+                                       lock.lock ();
+                                       LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
+                                       _queue.push_front (vf);
+                                       lock.unlock ();
+                               }
                        }
                }
 
index 63228a6b8fac8cb7635de681bbabf4d5b14b46d2..6df30a3f7e7fcdac976f54da141e4a6e01afa58c 100644 (file)
@@ -29,6 +29,7 @@
 
 
 #include "cross.h"
+#include "dcp_video.h"
 #include "enum_indexed_vector.h"
 #include "event_history.h"
 #include "exception_store.h"
@@ -41,8 +42,8 @@
 #include <list>
 #include <stdint.h>
 
+#include "grok_context.h"
 
-class DCPVideo;
 class EncodeServerDescription;
 class Film;
 class Job;
@@ -70,8 +71,11 @@ public:
        /** Called to pass a bit of video to be encoded as the next DCP frame */
        void encode (std::shared_ptr<PlayerVideo> pv, dcpomatic::DCPTime time);
 
+       void pause(void);
+       void resume(void);
+
        /** Called when a processing run has finished */
-       void end ();
+       void end (bool isFinal);
 
        boost::optional<float> current_encoding_rate () const;
        int video_frames_enqueued () const;
@@ -107,6 +111,9 @@ private:
        boost::optional<dcpomatic::DCPTime> _last_player_video_time;
 
        boost::signals2::scoped_connection _server_found_connection;
+
+       grk_plugin::DcpomaticContext dcpomaticContext_;
+       grk_plugin::GrokContext *context_;
 };
 
 
index 912c6a6ef3c961374a81bd4863509a94d0c524f2..d4890d98bf6d574a287c208b83f0d09cfb60bb7f 100644 (file)
@@ -641,7 +641,7 @@ void
 Job::cancel ()
 {
        if (_thread.joinable()) {
-               resume();
+               Job::resume();
 
                _thread.interrupt ();
                _thread.join ();
@@ -668,6 +668,7 @@ Job::pause_by_user ()
        }
 
        if (paused) {
+               pause();
                _pause_changed.notify_all ();
        }
 
@@ -680,6 +681,7 @@ Job::pause_by_priority ()
 {
        if (running ()) {
                set_state (PAUSED_BY_PRIORITY);
+               pause();
                _pause_changed.notify_all ();
        }
 }
index 5562afc16cc89cae1e9dec8941ee837622598d8b..21d8df0f667215863cfeb1dd30ad4446d77892a6 100644 (file)
@@ -56,9 +56,10 @@ public:
        }
 
        void start ();
+       virtual void pause() {}
        bool pause_by_user ();
        void pause_by_priority ();
-       void resume ();
+       virtual void resume ();
        void cancel ();
 
        bool is_new () const;
index 6b1563b9b85da2b30a37f4a09e3b2cbd47a255b6..96e93a50c8222e3ab1518d4db45e7a4d95aeb0ee 100644 (file)
@@ -157,6 +157,14 @@ TranscodeJob::run ()
        }
 }
 
+void TranscodeJob::pause() {
+       _encoder->pause();
+}
+
+void TranscodeJob::resume() {
+       _encoder->resume();
+       Job::resume();
+}
 
 string
 TranscodeJob::status () const
index 8b145e362333279e97daec904e61757f8cb832fb..8c96bb4371f4c4166fe3488e6902ae694d0a258b 100644 (file)
@@ -56,6 +56,8 @@ public:
        std::string name () const override;
        std::string json_name () const override;
        void run () override;
+       void pause() override;
+       void resume() override;
        std::string status () const override;
        bool enable_notify () const override {
                return true;
index b53f39b451ab3aca2cede112d120e283ad135d75..acdd41fdbf2a1c87e186be075d40781a79caf885 100644 (file)
@@ -1195,6 +1195,7 @@ private:
                FontConfig::drop();
 
                ev.Skip ();
+               JobManager::drop ();
        }
 
        void active_jobs_changed()
index 24897dfba1aaa993f8819a6fc9a7b0b8a018320e..f0830ad35cc92ebe84449ebccc190f1b47e2b579 100644 (file)
@@ -287,6 +287,7 @@ private:
                }
 
                ev.Skip ();
+               JobManager::drop ();
        }
 
        void file_add_film ()
index 28799013f700f5e643abf455b83c2e191f4dd933..27caac360d6e83d4983cc904efc3eecc7321394a 100644 (file)
@@ -268,6 +268,7 @@ private:
                }
 
                ev.Skip ();
+               JobManager::drop ();
        }
 
        void copy ()
index c61c75eced8660635dfd288ef95138fa8c29ec02..f035a11289477aa6201d15f05af09e8b3013c23b 100644 (file)
@@ -45,6 +45,7 @@
 #include "send_test_email_dialog.h"
 #include "server_dialog.h"
 #include "static_text.h"
+#include "gpu_config_panel.h"
 #include "wx_util.h"
 #include "lib/config.h"
 #include "lib/cross.h"
@@ -1943,6 +1944,7 @@ create_full_config_dialog ()
        e->AddPage (new SoundPage          (ps, border));
        e->AddPage (new DefaultsPage       (ps, border));
        e->AddPage (new EncodingServersPage(ps, border));
+       e->AddPage (new GPUPage            (ps, border));
        e->AddPage (new KeysPage           (ps, border));
        e->AddPage (new TMSPage            (ps, border));
        e->AddPage (new EmailPage          (ps, border));
diff --git a/src/wx/gpu_config_panel.h b/src/wx/gpu_config_panel.h
new file mode 100644 (file)
index 0000000..1478434
--- /dev/null
@@ -0,0 +1,186 @@
+#pragma once
+
+static std::vector<std::string> get_gpu_names(std::string binary, std::string filename)
+{
+    // Execute the GPU listing program and redirect its output to a file
+    std::system((binary + " > " +  filename).c_str());
+
+    std::vector<std::string> gpu_names;
+    std::ifstream file(filename);
+    if (file.is_open())
+    {
+        std::string line;
+        while (std::getline(file, line))
+            gpu_names.push_back(line);
+        file.close();
+    }
+
+    return gpu_names;
+}
+
+class GpuList : public wxPanel
+{
+public:
+    GpuList(wxPanel* parent) : wxPanel(parent, wxID_ANY), selection(0) {
+        comboBox = new wxComboBox(this, wxID_ANY, "", wxDefaultPosition, wxSize(400, -1));
+        comboBox->Bind(wxEVT_COMBOBOX, &GpuList::OnComboBox, this);
+        update();
+
+        wxBoxSizer* sizer = new wxBoxSizer(wxHORIZONTAL);
+
+        sizer->Add(comboBox, 0, wxALIGN_CENTER_VERTICAL); // Vertically center the comboBox
+
+        this->SetSizerAndFit(sizer);
+    }
+    void update(void) {
+       auto cfg = Config::instance();
+       auto lister_binary = cfg->gpu_binary_location() + "/" + "gpu_lister";
+       auto lister_file = cfg->gpu_binary_location () + "/" + "gpus.txt";
+       if (boost::filesystem::exists(lister_binary)) {
+                       auto gpu_names = get_gpu_names(lister_binary, lister_file);
+
+                       comboBox->Clear();
+                       for (const auto& name : gpu_names)
+                                comboBox->Append(name);
+       }
+    }
+
+    int getSelection(void) {
+       return selection;
+    }
+    void setSelection(int sel) {
+        if ((int)comboBox->GetCount() > sel)
+            comboBox->SetSelection(sel);
+    }
+
+private:
+    void OnComboBox([[maybe_unused]] wxCommandEvent& event) {
+        selection = comboBox->GetSelection();
+        if (selection != wxNOT_FOUND)
+               Config::instance ()->set_selected_gpu(selection);
+    }
+
+    wxComboBox* comboBox;
+    int selection;
+};
+
+class GPUPage : public Page
+{
+public:
+       GPUPage (wxSize panel_size, int border)
+               : Page (panel_size, border),
+                 _enable_gpu(nullptr), _binary_location(nullptr), _gpu_list_control(nullptr)
+       {}
+
+       wxString GetName () const override
+       {
+               return _("GPU");
+       }
+
+#ifdef DCPOMATIC_OSX
+       wxBitmap GetLargeIcon () const override
+       {
+               return wxBitmap(icon_path("tms"), wxBITMAP_TYPE_PNG);
+       }
+#endif
+
+private:
+       void setup () override
+       {
+               auto config = Config::instance ();
+
+               _enable_gpu = new CheckBox (_panel, _("Enable GPU Acceleration"));
+               _panel->GetSizer()->Add (_enable_gpu, 0, wxALL | wxEXPAND, _border);
+
+               wxFlexGridSizer* table = new wxFlexGridSizer (2, DCPOMATIC_SIZER_X_GAP, DCPOMATIC_SIZER_Y_GAP);
+               table->AddGrowableCol (1, 1);
+               _panel->GetSizer()->Add (table, 1, wxALL | wxEXPAND, _border);
+
+               add_label_to_sizer (table, _panel, _("Acceleration Binary Folder"), true, 0, wxLEFT | wxLEFT | wxALIGN_CENTRE_VERTICAL);
+               _binary_location = new wxDirPickerCtrl (_panel, wxDD_DIR_MUST_EXIST);
+               table->Add (_binary_location, 1, wxEXPAND);
+
+               add_label_to_sizer (table, _panel, _("GPU Selection"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+               _gpu_list_control = new GpuList(_panel);
+               table->Add (_gpu_list_control, 1, wxEXPAND);
+
+               add_label_to_sizer (table, _panel, _("License Server"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+               _server = new wxTextCtrl (_panel, wxID_ANY);
+               table->Add (_server, 1, wxEXPAND | wxALL);
+
+               add_label_to_sizer (table, _panel, _("Port"), false, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+               _port = new wxSpinCtrl (_panel, wxID_ANY);
+               _port->SetRange (0, 65535);
+               table->Add (_port);
+
+               add_label_to_sizer (table, _panel, _("License"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+               _license = new PasswordEntry (_panel);
+               table->Add (_license->get_panel(), 1, wxEXPAND | wxALL);
+
+               _enable_gpu->bind(&GPUPage::enable_gpu_changed, this);
+               _binary_location->Bind (wxEVT_DIRPICKER_CHANGED, boost::bind (&GPUPage::binary_location_changed, this));
+               _server->Bind (wxEVT_TEXT, boost::bind(&GPUPage::server_changed, this));
+               _port->Bind (wxEVT_SPINCTRL, boost::bind(&GPUPage::port_changed, this));
+               _license->Changed.connect (boost::bind(&GPUPage::license_changed, this));
+
+               _binary_location->Enable(config->enable_gpu());
+               _gpu_list_control->Enable(config->enable_gpu());
+               _server->Enable(config->enable_gpu());
+               _port->Enable(config->enable_gpu());
+               _license->get_panel()->Enable(config->enable_gpu());
+       }
+
+
+       void config_changed () override
+       {
+               auto config = Config::instance ();
+
+               checked_set (_enable_gpu, config->enable_gpu());
+               _binary_location->SetPath(config->gpu_binary_location ());
+               _gpu_list_control->update();
+               _gpu_list_control->setSelection(config->selected_gpu());
+               checked_set (_server, config->gpu_license_server());
+               checked_set (_port, config->gpu_license_port());
+               checked_set (_license, config->gpu_license());
+       }
+
+       void enable_gpu_changed ()
+       {
+               auto config = Config::instance ();
+
+               config->set_enable_gpu (_enable_gpu->GetValue());
+               _binary_location->Enable(config->enable_gpu());
+               _gpu_list_control->Enable(config->enable_gpu());
+               _server->Enable(config->enable_gpu());
+               _port->Enable(config->enable_gpu());
+               _license->get_panel()->Enable(config->enable_gpu());
+       }
+
+       void binary_location_changed ()
+       {
+               Config::instance()->set_gpu_binary_location (wx_to_std (_binary_location->GetPath ()));
+               _gpu_list_control->update();
+       }
+
+       void server_changed ()
+       {
+               Config::instance()->set_gpu_license_server(wx_to_std(_server->GetValue()));
+       }
+
+       void port_changed ()
+       {
+               Config::instance()->set_gpu_license_port(_port->GetValue());
+       }
+
+       void license_changed ()
+       {
+               Config::instance()->set_gpu_license(_license->get());
+       }
+
+       CheckBox* _enable_gpu;
+       wxDirPickerCtrl* _binary_location;
+       GpuList *_gpu_list_control;
+       wxTextCtrl* _server;
+       wxSpinCtrl* _port;
+       PasswordEntry* _license;
+};