Move grok headers into src/lib/grok
authorCarl Hetherington <cth@carlh.net>
Fri, 7 Jul 2023 21:24:45 +0000 (23:24 +0200)
committerCarl Hetherington <cth@carlh.net>
Sun, 28 Jan 2024 01:01:57 +0000 (02:01 +0100)
src/lib/grok/context.h [new file with mode: 0644]
src/lib/grok/messenger.h [new file with mode: 0644]
src/lib/grok_context.h [deleted file]
src/lib/grok_messenger.h [deleted file]
src/lib/j2k_encoder.h

diff --git a/src/lib/grok/context.h b/src/lib/grok/context.h
new file mode 100644 (file)
index 0000000..7a447d3
--- /dev/null
@@ -0,0 +1,246 @@
+/*
+    Copyright (C) 2023 Grok Image Compression Inc.
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+#pragma once
+
+#include "../config.h"
+#include "../dcp_video.h"
+#include "../log.h"
+#include "../dcpomatic_log.h"
+#include "../writer.h"
+#include "messenger.h"
+
+class Film;
+using dcp::Data;
+using namespace dcpomatic;
+
+static std::mutex launchMutex;
+
+namespace grk_plugin
+{
+
+struct GrokLogger : public MessengerLogger {
+       explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
+       {}
+       virtual ~GrokLogger() = default;
+       void info(const char* fmt, ...) override{
+               va_list arg;
+               va_start(arg, fmt);
+               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
+               va_end(arg);
+       }
+       void warn(const char* fmt, ...) override{
+               va_list arg;
+               va_start(arg, fmt);
+               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
+               va_end(arg);
+       }
+       void error(const char* fmt, ...) override{
+               va_list arg;
+               va_start(arg, fmt);
+               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
+               va_end(arg);
+       }
+};
+
+struct GrokInitializer {
+       GrokInitializer(void) {
+               setMessengerLogger(new GrokLogger("[GROK] "));
+       }
+       ~GrokInitializer()  = default;
+};
+
+struct FrameProxy {
+       FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
+       {}
+       FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
+       {}
+       int index() const {
+               return index_;
+       }
+       Eyes eyes(void) const {
+               return eyes_;
+       }
+       int index_;
+       Eyes eyes_;
+       DCPVideo vf;
+};
+
+struct DcpomaticContext {
+       DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
+                                               EventHistory &history, const std::string &location) :
+                                                                       film_(film), writer_(writer),
+                                                                       history_(history), location_(location),
+                                                                       width_(0), height_(0)
+       {}
+       void setDimensions(uint32_t w, uint32_t h) {
+               width_ = w;
+               height_ = h;
+       }
+       std::shared_ptr<const Film> film_;
+       Writer& writer_;
+       EventHistory &history_;
+       std::string location_;
+       uint32_t width_;
+       uint32_t height_;
+};
+
+class GrokContext {
+public:
+       explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
+                                                               dcpomaticContext_(dcpomaticContext),
+                                                               messenger_(nullptr),
+                                                               launched_(false)
+       {
+               struct CompressedData : public dcp::Data {
+                       explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
+                       {}
+                       ~CompressedData(void){
+                               delete[] data_;
+                       }
+                       uint8_t const * data () const override {
+                               return data_;
+                       }
+                       uint8_t * data () override {
+                               return data_;
+                       }
+                       int size () const override {
+                               return dataLen_;
+                       }
+                       uint8_t *data_;
+                       int dataLen_;
+               };
+               if (Config::instance()->enable_gpu ())  {
+                   boost::filesystem::path folder(dcpomaticContext_.location_);
+                   boost::filesystem::path binaryPath = folder / "grk_compress";
+                   if (!boost::filesystem::exists(binaryPath)) {
+                       getMessengerLogger()->error("Invalid binary location %s",
+                                       dcpomaticContext_.location_.c_str());
+                               return;
+                   }
+                       auto proc = [this](const std::string& str) {
+                               try {
+                                       Msg msg(str);
+                                       auto tag = msg.next();
+                                       if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
+                                       {
+                                               auto clientFrameId = msg.nextUint();
+                                               auto compressedFrameId = msg.nextUint();
+                                               (void)compressedFrameId;
+                                               auto compressedFrameLength = msg.nextUint();
+                                               auto  processor =
+                                                               [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
+                                               {
+                                                       auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
+                                                       memcpy(compressedData->data_,compressed,compressedFrameLength );
+                                                       dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
+                                                       frame_done ();
+                                               };
+                                               int const minimum_size = 16384;
+                                               bool needsRecompression = compressedFrameLength < minimum_size;
+                                               messenger_->processCompressed(str, processor, needsRecompression);
+                                               if (needsRecompression) {
+                                                       bool success = false;
+                                                       auto fp = messenger_->retrieve(clientFrameId, success);
+                                                       if (!success)
+                                                               return;
+
+                                                       auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
+                                                       dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
+                                                       frame_done ();
+                                               }
+                                       }
+                               } catch (std::exception &ex){
+                                       getMessengerLogger()->error("%s",ex.what());
+                               }
+                       };
+                       auto clientInit =
+                               MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
+                                                         grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
+                                                         std::thread::hardware_concurrency());
+                       messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
+               }
+       }
+       ~GrokContext(void) {
+               shutdown();
+       }
+       bool launch(DCPVideo dcpv, int device){
+               if (!messenger_ )
+                       return false;
+               if (launched_)
+                       return true;
+               std::unique_lock<std::mutex> lk_global(launchMutex);
+               if (!messenger_)
+                       return false;
+               if (launched_)
+                       return true;
+               if (MessengerInit::firstLaunch(true)) {
+                       auto s = dcpv.get_size();
+                       dcpomaticContext_.setDimensions(s.width, s.height);
+                       auto config = Config::instance();
+                       messenger_->launchGrok(dcpomaticContext_.location_,
+                                       dcpomaticContext_.width_,dcpomaticContext_.width_,
+                                       dcpomaticContext_.height_,
+                                       3, 12, device,
+                                       dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
+                                       dcpomaticContext_.film_->video_frame_rate(),
+                                       dcpomaticContext_.film_->j2k_bandwidth(),
+                                       config->gpu_license_server(),
+                                       config->gpu_license_port(),
+                                       config->gpu_license());
+               }
+               launched_ =  messenger_->waitForClientInit();
+
+               return launched_;
+       }
+       bool scheduleCompress(const DCPVideo &vf){
+               if (!messenger_)
+                       return false;
+
+               auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
+               auto cvt = [this, &fp](BufferSrc src){
+                       // xyz conversion
+                       fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
+               };
+               return messenger_->scheduleCompress(fp, cvt);
+       }
+       void shutdown(void){
+               if (!messenger_)
+                       return;
+
+               std::unique_lock<std::mutex> lk_global(launchMutex);
+               if (!messenger_)
+                       return;
+               if (launched_)
+                       messenger_->shutdown();
+               delete messenger_;
+               messenger_ = nullptr;
+       }
+       void frame_done () {
+               dcpomaticContext_.history_.event ();
+       }
+private:
+       DcpomaticContext dcpomaticContext_;
+       ScheduledMessenger<FrameProxy> *messenger_;
+       bool launched_;
+};
+
+}
+
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
new file mode 100644 (file)
index 0000000..45ee752
--- /dev/null
@@ -0,0 +1,930 @@
+/*
+    Copyright (C) 2023 Grok Image Compression Inc.
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+#pragma once
+
+#include <iostream>
+#include <string>
+#include <cstring>
+#include <atomic>
+#include <functional>
+#include <sstream>
+#include <future>
+#include <map>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <cassert>
+#include <cstdarg>
+
+#ifdef _WIN32
+#include <windows.h>
+#include <direct.h>
+#include <tlhelp32.h>
+#pragma warning(disable : 4100)
+#else
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <semaphore.h>
+#include <signal.h>
+#endif
+
+namespace grk_plugin
+{
+static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
+static std::string grokSentSynch = "Global\\grok_sent";
+static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
+static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
+static std::string clientSentSynch = "Global\\client_sent";
+static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
+static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
+static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
+static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
+static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
+static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
+       "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
+       "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
+static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
+static const size_t messageBufferLen = 256;
+struct IMessengerLogger
+{
+       virtual ~IMessengerLogger(void) = default;
+       virtual void info(const char* fmt, ...) = 0;
+       virtual void warn(const char* fmt, ...) = 0;
+       virtual void error(const char* fmt, ...) = 0;
+
+  protected:
+       template<typename... Args>
+       std::string log_message(char const* const format, Args&... args) noexcept
+       {
+               constexpr size_t message_size = 512;
+               char message[message_size];
+
+               std::snprintf(message, message_size, format, args...);
+               return std::string(message);
+       }
+};
+struct MessengerLogger : public IMessengerLogger
+{
+       explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
+       virtual ~MessengerLogger() = default;
+       virtual void info(const char* fmt, ...) override
+       {
+               va_list args;
+               std::string new_fmt = preamble_ + fmt + "\n";
+               va_start(args, fmt);
+               vfprintf(stdout, new_fmt.c_str(), args);
+               va_end(args);
+       }
+       virtual void warn(const char* fmt, ...) override
+       {
+               va_list args;
+               std::string new_fmt = preamble_ + fmt + "\n";
+               va_start(args, fmt);
+               vfprintf(stdout, new_fmt.c_str(), args);
+               va_end(args);
+       }
+       virtual void error(const char* fmt, ...) override
+       {
+               va_list args;
+               std::string new_fmt = preamble_ + fmt + "\n";
+               va_start(args, fmt);
+               vfprintf(stderr, new_fmt.c_str(), args);
+               va_end(args);
+       }
+
+  protected:
+       std::string preamble_;
+};
+
+static IMessengerLogger* sLogger = nullptr;
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-function"
+#endif
+static void setMessengerLogger(IMessengerLogger* logger)
+{
+       delete sLogger;
+       sLogger = logger;
+}
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+static IMessengerLogger* getMessengerLogger(void)
+{
+       return sLogger;
+}
+struct MessengerInit
+{
+       MessengerInit(const std::string &outBuf, const std::string &outSent,
+                                 const std::string &outReceiveReady, const std::string &inBuf,
+                                 const std::string &inSent,
+                                 const std::string &inReceiveReady,
+                                 std::function<void(std::string)> processor,
+                                 size_t numProcessingThreads)
+               : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
+                 outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
+                 inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
+                 numProcessingThreads_(numProcessingThreads),
+                 uncompressedFrameSize_(0), compressedFrameSize_(0),
+                 numFrames_(0)
+       {
+               if(firstLaunch(true))
+                       unlink();
+       }
+       void unlink(void)
+       {
+#ifndef _WIN32
+               shm_unlink(grokToClientMessageBuf.c_str());
+               shm_unlink(clientToGrokMessageBuf.c_str());
+#endif
+       }
+       static bool firstLaunch(bool isClient)
+       {
+               bool debugGrok = false;
+               return debugGrok != isClient;
+       }
+       std::string outboundMessageBuf;
+       std::string outboundSentSynch;
+       std::string outboundReceiveReadySynch;
+
+       std::string inboundMessageBuf;
+       std::string inboundSentSynch;
+       std::string inboundReceiveReadySynch;
+
+       std::function<void(std::string)> processor_;
+       size_t numProcessingThreads_;
+
+       size_t uncompressedFrameSize_;
+       size_t compressedFrameSize_;
+       size_t numFrames_;
+};
+
+/*************************** Synchronization *******************************/
+enum SynchDirection
+{
+       SYNCH_SENT,
+       SYNCH_RECEIVE_READY
+};
+
+typedef int grk_handle;
+struct Synch
+{
+       Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
+               : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
+       {
+               // unlink semaphores in case of previous crash
+               if(MessengerInit::firstLaunch(true))
+                       unlink();
+               open();
+       }
+       ~Synch()
+       {
+               close();
+               if(MessengerInit::firstLaunch(true))
+                       unlink();
+       }
+       void post(SynchDirection dir)
+       {
+               auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
+               int rc = sem_post(sem);
+               if(rc)
+                       getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
+       }
+       void wait(SynchDirection dir)
+       {
+               auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
+               int rc = sem_wait(sem);
+               if(rc)
+                       getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
+       }
+       void open(void)
+       {
+               sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
+               if(!sentSem_)
+                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+               receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
+               if(!receiveReadySem_)
+                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+       }
+       void close(void)
+       {
+               int rc = sem_close(sentSem_);
+               if(rc)
+                       getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
+                                                                               strerror(errno));
+               rc = sem_close(receiveReadySem_);
+               if(rc)
+                       getMessengerLogger()->error("Error closing semaphore %s: %s",
+                                                                               receiveReadySemName_.c_str(), strerror(errno));
+       }
+       void unlink(void)
+       {
+               int rc = sem_unlink(sentSemName_.c_str());
+               if(rc == -1 && errno != ENOENT)
+                       getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
+                                                                               strerror(errno));
+               rc = sem_unlink(receiveReadySemName_.c_str());
+               if(rc == -1 && errno != ENOENT)
+                       getMessengerLogger()->error("Error unlinking semaphore %s: %s",
+                                                                               receiveReadySemName_.c_str(), strerror(errno));
+       }
+       sem_t* sentSem_;
+       sem_t* receiveReadySem_;
+
+  private:
+       std::string sentSemName_;
+       std::string receiveReadySemName_;
+};
+struct SharedMemoryManager
+{
+       static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
+       {
+               *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
+               if(*shm_fd < 0)
+               {
+                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+                       return false;
+               }
+               int rc = ftruncate(*shm_fd, sizeof(char) * len);
+               if(rc)
+               {
+                       getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
+                       rc = close(*shm_fd);
+                       if(rc)
+                               getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+                       rc = shm_unlink(name.c_str());
+                       if(rc)
+                               getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+                       return false;
+               }
+               *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
+               if(!*buffer)
+               {
+                       getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
+                       rc = close(*shm_fd);
+                       if(rc)
+                               getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+                       rc = shm_unlink(name.c_str());
+                       if(rc)
+                               getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+               }
+
+               return *buffer != nullptr;
+       }
+       static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
+       {
+               if (!*buffer || !shm_fd)
+                       return true;
+
+               int rc = munmap(*buffer, len);
+               *buffer = nullptr;
+               if(rc)
+                       getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
+               rc = close(shm_fd);
+               shm_fd = 0;
+               if(rc)
+                       getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
+               rc = shm_unlink(name.c_str());
+               if(rc)
+                       fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
+
+               return true;
+       }
+};
+
+template<typename Data>
+class MessengerBlockingQueue
+{
+  public:
+       explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
+       MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
+       size_t size() const
+       {
+               return queue_.size();
+       }
+       // deactivate and clear queue
+       void deactivate()
+       {
+               {
+                       std::lock_guard<std::mutex> lk(mutex_);
+                       active_ = false;
+                       while(!queue_.empty())
+                               queue_.pop();
+               }
+
+               // release all waiting threads
+               can_pop_.notify_all();
+               can_push_.notify_all();
+       }
+       void activate()
+       {
+               std::lock_guard<std::mutex> lk(mutex_);
+               active_ = true;
+       }
+       bool push(Data const& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       rc = push_(value);
+               }
+               if(rc)
+                       can_pop_.notify_one();
+
+               return rc;
+       }
+       bool waitAndPush(Data& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       if(!active_)
+                               return false;
+                       // in case of spurious wakeup, loop until predicate in lambda
+                       // is satisfied.
+                       can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
+                       rc = push_(value);
+               }
+               if(rc)
+                       can_pop_.notify_one();
+
+               return rc;
+       }
+       bool pop(Data& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       rc = pop_(value);
+               }
+               if(rc)
+                       can_push_.notify_one();
+
+               return rc;
+       }
+       bool waitAndPop(Data& value)
+       {
+               bool rc;
+               {
+                       std::unique_lock<std::mutex> lk(mutex_);
+                       if(!active_)
+                               return false;
+                       // in case of spurious wakeup, loop until predicate in lambda
+                       // is satisfied.
+                       can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
+                       rc = pop_(value);
+               }
+               if(rc)
+                       can_push_.notify_one();
+
+               return rc;
+       }
+
+  private:
+       bool push_(Data const& value)
+       {
+               if(queue_.size() == max_size_ || !active_)
+                       return false;
+               queue_.push(value);
+
+               return true;
+       }
+       bool pop_(Data& value)
+       {
+               if(queue_.empty() || !active_)
+                       return false;
+               value = queue_.front();
+               queue_.pop();
+
+               return true;
+       }
+       std::queue<Data> queue_;
+       mutable std::mutex mutex_;
+       std::condition_variable can_pop_;
+       std::condition_variable can_push_;
+       bool active_;
+       size_t max_size_;
+};
+struct BufferSrc
+{
+       BufferSrc(void) : BufferSrc("") {}
+       explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
+       {}
+       BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
+               : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
+       {}
+       bool fromDisk(void)
+       {
+               return !file_.empty() && framePtr_ == nullptr;
+       }
+       size_t index() const
+       {
+               return clientFrameId_;
+       }
+       std::string file_;
+       size_t clientFrameId_;
+       size_t frameId_;
+       uint8_t* framePtr_;
+};
+
+struct Messenger;
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
+
+struct Messenger
+{
+       explicit Messenger(MessengerInit init)
+               : running(true), initialized_(false), shutdown_(false), init_(init),
+                 outboundSynch_(nullptr),
+                 inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
+                 uncompressed_fd_(0), compressed_fd_(0)
+       {}
+       virtual ~Messenger(void)
+       {
+               running = false;
+               sendQueue.deactivate();
+               receiveQueue.deactivate();
+
+               if (outboundSynch_) {
+                       outboundSynch_->post(SYNCH_RECEIVE_READY);
+                       outbound.join();
+               }
+
+               if (inboundSynch_) {
+                       inboundSynch_->post(SYNCH_SENT);
+                       inbound.join();
+               }
+
+               for(auto& p : processors_)
+                       p.join();
+
+               delete outboundSynch_;
+               delete inboundSynch_;
+
+               deinitShm();
+       }
+       void startThreads(void) {
+               outboundSynch_ =
+                       new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
+               outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
+
+               inboundSynch_ =
+                       new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
+               inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
+
+               for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
+                       processors_.push_back(std::thread(processorThread, this, init_.processor_));
+       }
+       size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
+                                        size_t compressedLength)
+       {
+               char fname[512];
+               if(!compressedPtr || !compressedLength)
+                       return 0;
+               sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
+               auto fp = fopen(fname, "wb");
+               if(!fp)
+                       return 0;
+               size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
+               if(written != compressedLength)
+               {
+                       fclose(fp);
+                       return 0;
+               }
+               fflush(fp);
+               fclose(fp);
+
+               return written;
+       }
+       bool initBuffers(void)
+       {
+               bool rc = true;
+               if(init_.uncompressedFrameSize_)
+               {
+                       rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
+                                                                                                       init_.uncompressedFrameSize_ * init_.numFrames_,
+                                                                                                       &uncompressed_fd_, &uncompressed_buffer_);
+               }
+               if(init_.compressedFrameSize_)
+               {
+                       rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
+                                                                                                       init_.compressedFrameSize_ * init_.numFrames_,
+                                                                                                       &compressed_fd_, &compressed_buffer_);
+               }
+
+               return rc;
+       }
+
+       bool deinitShm(void)
+       {
+               bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
+                                                                                                init_.uncompressedFrameSize_ * init_.numFrames_,
+                                                                                                uncompressed_fd_, &uncompressed_buffer_);
+               rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
+                                                                                                 init_.compressedFrameSize_ * init_.numFrames_,
+                                                                                                 compressed_fd_, &compressed_buffer_);
+
+               return rc;
+       }
+       template<typename... Args>
+       void send(const std::string& str, Args... args)
+       {
+               std::ostringstream oss;
+               oss << str;
+               int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
+               static_cast<void>(dummy);
+
+               sendQueue.push(oss.str());
+       }
+       static pid_t get_pid_by_process_name(const char* name)
+       {
+               char command[256];
+               snprintf(command, sizeof(command), "pgrep %s", name);
+               auto pgrep = popen(command, "r");
+               if(!pgrep)
+                       return -1;
+               pid_t pid;
+               if(fscanf(pgrep, "%d", &pid) != 1)
+                       pid = -1;
+               pclose(pgrep);
+
+               return pid;
+       }
+       static bool terminate_process(const char* name)
+       {
+               auto pid = get_pid_by_process_name(name);
+
+               return (pid != -1 && kill(pid, SIGTERM) != -1);
+       }
+       static bool kill_process(const char* name)
+       {
+               auto pid = get_pid_by_process_name(name);
+
+               return (pid != -1 && kill(pid, SIGKILL) != -1);
+       }
+       void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
+                                                               uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
+                                                               int device, bool is4K, uint32_t fps, uint32_t bandwidth,
+                                                               const std::string server, uint32_t port,
+                                                               const std::string license)
+       {
+
+               std::unique_lock<std::mutex> lk(shutdownMutex_);
+               if (async_result_.valid())
+                       return;
+               if(MessengerInit::firstLaunch(true))
+                       init_.unlink();
+               startThreads();
+               char _cmd[4096];
+               auto fullServer = server + ":" + std::to_string(port);
+               sprintf(_cmd,
+                               "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
+                               "-G %d -%s %d,%d -j %s -J %s",
+                               GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
+                               device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
+                               license.c_str(), fullServer.c_str());
+               launch(_cmd, dir);
+       }
+       void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
+       {
+               // client fills queue with pending uncompressed buffers
+               init_.uncompressedFrameSize_ = uncompressedFrameSize;
+               init_.compressedFrameSize_ = compressedFrameSize;
+               init_.numFrames_ = numFrames;
+               initBuffers();
+               auto ptr = uncompressed_buffer_;
+               for(size_t i = 0; i < init_.numFrames_; ++i)
+               {
+                       availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
+                       ptr += init_.uncompressedFrameSize_;
+               }
+
+               std::unique_lock<std::mutex> lk(shutdownMutex_);
+               initialized_ = true;
+               clientInitializedCondition_.notify_all();
+       }
+       bool waitForClientInit(void)
+       {
+               if(initialized_)
+                       return true;
+
+               std::unique_lock<std::mutex> lk(shutdownMutex_);
+               if(initialized_)
+                       return true;
+               else if (shutdown_)
+                       return false;
+               clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
+
+               return initialized_ && !shutdown_;
+       }
+       static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
+       {
+               return sizeof(uint16_t) * w * h * samplesPerPixel;
+       }
+       void reclaimCompressed(size_t frameId)
+       {
+               availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
+       }
+       void reclaimUncompressed(size_t frameId)
+       {
+               availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
+       }
+       uint8_t* getUncompressedFrame(size_t frameId)
+       {
+               assert(frameId < init_.numFrames_);
+               if(frameId >= init_.numFrames_)
+                       return nullptr;
+
+               return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
+       }
+       uint8_t* getCompressedFrame(size_t frameId)
+       {
+               assert(frameId < init_.numFrames_);
+               if(frameId >= init_.numFrames_)
+                       return nullptr;
+
+               return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
+       }
+       std::atomic_bool running;
+       bool initialized_;
+       bool shutdown_;
+       MessengerBlockingQueue<std::string> sendQueue;
+       MessengerBlockingQueue<std::string> receiveQueue;
+       MessengerBlockingQueue<BufferSrc> availableBuffers_;
+       MessengerInit init_;
+       std::string cmd_;
+       std::future<int> async_result_;
+       std::mutex shutdownMutex_;
+       std::condition_variable shutdownCondition_;
+
+  protected:
+       std::condition_variable clientInitializedCondition_;
+  private:
+       void launch(const std::string &cmd, const std::string &dir)
+       {
+               // Change the working directory
+               if(!dir.empty())
+               {
+                       if(chdir(dir.c_str()) != 0)
+                       {
+                               getMessengerLogger()->error("Error: failed to change the working directory");
+                               return;
+                       }
+               }
+               // Execute the command using std::async and std::system
+               cmd_ = cmd;
+               async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
+       }
+       std::thread outbound;
+       Synch* outboundSynch_;
+
+       std::thread inbound;
+       Synch* inboundSynch_;
+
+       std::vector<std::thread> processors_;
+       char* uncompressed_buffer_;
+       char* compressed_buffer_;
+
+       grk_handle uncompressed_fd_;
+       grk_handle compressed_fd_;
+};
+
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
+{
+       grk_handle shm_fd = 0;
+       char* send_buffer = nullptr;
+
+       if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
+               return;
+       while(messenger->running)
+       {
+               synch->wait(SYNCH_RECEIVE_READY);
+               if(!messenger->running)
+                       break;
+               std::string message;
+               if(!messenger->sendQueue.waitAndPop(message))
+                       break;
+               if(!messenger->running)
+                       break;
+               memcpy(send_buffer, message.c_str(), message.size() + 1);
+               synch->post(SYNCH_SENT);
+       }
+       SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
+}
+
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
+{
+       grk_handle shm_fd = 0;
+       char* receive_buffer = nullptr;
+
+       if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
+               return;
+       while(messenger->running)
+       {
+               synch->wait(SYNCH_SENT);
+               if(!messenger->running)
+                       break;
+               auto message = std::string(receive_buffer);
+               synch->post(SYNCH_RECEIVE_READY);
+               messenger->receiveQueue.push(message);
+       }
+       SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
+}
+struct Msg
+{
+       explicit Msg(const std::string &msg) : ct_(0)
+       {
+               std::stringstream ss(msg);
+               while(ss.good())
+               {
+                       std::string substr;
+                       std::getline(ss, substr, ',');
+                       cs_.push_back(substr);
+               }
+       }
+       std::string next()
+       {
+               if(ct_ == cs_.size())
+               {
+                       getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
+                       return "";
+               }
+               return cs_[ct_++];
+       }
+
+       uint32_t nextUint(void)
+       {
+               return (uint32_t)std::stoi(next());
+       }
+
+       std::vector<std::string> cs_;
+       size_t ct_;
+};
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
+{
+       while(messenger->running)
+       {
+               std::string message;
+               if(!messenger->receiveQueue.waitAndPop(message))
+                       break;
+               if(!messenger->running)
+                       break;
+               Msg msg(message);
+               auto tag = msg.next();
+               if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
+               {
+                       auto width = msg.nextUint();
+                       auto stride = msg.nextUint();
+                       (void)stride;
+                       auto height = msg.nextUint();
+                       auto samplesPerPixel = msg.nextUint();
+                       auto depth = msg.nextUint();
+                       (void)depth;
+                       messenger->init_.uncompressedFrameSize_ =
+                               Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
+                       auto compressedFrameSize = msg.nextUint();
+                       auto numFrames = msg.nextUint();
+                       messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
+               }
+               else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
+               {
+                       messenger->reclaimUncompressed(msg.nextUint());
+               }
+               else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
+               {
+                       messenger->reclaimCompressed(msg.nextUint());
+               }
+               processor(message);
+       }
+}
+
+template<typename F>
+struct ScheduledFrames
+{
+       void store(F& val)
+       {
+               std::unique_lock<std::mutex> lk(mapMutex_);
+               auto it = map_.find(val.index());
+               if (it == map_.end())
+                       map_[val.index()] = val;
+       }
+       F retrieve(size_t index, bool &success)
+       {
+               std::unique_lock<std::mutex> lk(mapMutex_);
+               success = false;
+               auto it = map_.find(index);
+               if(it == map_.end())
+                       return F();
+
+               success = true;
+               F val = it->second;
+               map_.erase(index);
+
+               return val;
+       }
+
+ private:
+       std::mutex mapMutex_;
+       std::map<size_t, F> map_;
+};
+
+template<typename F>
+struct ScheduledMessenger : public Messenger
+{
+       explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
+                                                                                       framesScheduled_(0),
+                                                                                       framesCompressed_(0)
+       {}
+       ~ScheduledMessenger(void) {
+               shutdown();
+       }
+       bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
+               size_t frameSize = init_.uncompressedFrameSize_;
+               assert(frameSize >= init_.uncompressedFrameSize_);
+               BufferSrc src;
+               if(!availableBuffers_.waitAndPop(src))
+                       return false;
+               converter(src);
+               scheduledFrames_.store(proxy);
+               framesScheduled_++;
+               send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
+
+               return true;
+       }
+       void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
+               Msg msg(message);
+               msg.next();
+               auto clientFrameId = msg.nextUint();
+               auto compressedFrameId = msg.nextUint();
+               auto compressedFrameLength = msg.nextUint();
+               if (!needsRecompression) {
+                       bool success = false;
+                       auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
+                       if (!success)
+                               return;
+                       processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
+               }
+               ++framesCompressed_;
+               send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
+               if (shutdown_ && framesCompressed_ == framesScheduled_)
+                       shutdownCondition_.notify_all();
+       }
+       void shutdown(void){
+               try {
+                       std::unique_lock<std::mutex> lk(shutdownMutex_);
+                       if (!async_result_.valid())
+                               return;
+                       shutdown_ = true;
+                       if (framesScheduled_) {
+                               uint32_t scheduled = framesScheduled_;
+                               send(GRK_MSGR_BATCH_FLUSH, scheduled);
+                               shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
+                       }
+                       availableBuffers_.deactivate();
+                       send(GRK_MSGR_BATCH_SHUTDOWN);
+                       int result = async_result_.get();
+                       if(result != 0)
+                               getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
+               } catch (std::exception &ex) {
+                       getMessengerLogger()->error("%s",ex.what());
+               }
+
+       }
+       F retrieve(size_t index, bool &success) {
+               return scheduledFrames_.retrieve(index, success);
+       }
+       void store(F& val) {
+               scheduledFrames_.store(val);
+       }
+
+private:
+       ScheduledFrames<F> scheduledFrames_;
+       std::atomic<uint32_t> framesScheduled_;
+       std::atomic<uint32_t> framesCompressed_;
+};
+
+} // namespace grk_plugin
diff --git a/src/lib/grok_context.h b/src/lib/grok_context.h
deleted file mode 100644 (file)
index 1f9726a..0000000
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
-    Copyright (C) 2023 Grok Image Compression Inc.
-
-    This file is part of DCP-o-matic.
-
-    DCP-o-matic is free software; you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation; either version 2 of the License, or
-    (at your option) any later version.
-
-    DCP-o-matic is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
-
-*/
-
-#pragma once
-
-#include "config.h"
-#include "log.h"
-#include "dcpomatic_log.h"
-#include "writer.h"
-#include "grok_messenger.h"
-
-class Film;
-using dcp::Data;
-using namespace dcpomatic;
-
-static std::mutex launchMutex;
-
-namespace grk_plugin
-{
-
-struct GrokLogger : public MessengerLogger {
-       explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
-       {}
-       virtual ~GrokLogger() = default;
-       void info(const char* fmt, ...) override{
-               va_list arg;
-               va_start(arg, fmt);
-               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
-               va_end(arg);
-       }
-       void warn(const char* fmt, ...) override{
-               va_list arg;
-               va_start(arg, fmt);
-               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
-               va_end(arg);
-       }
-       void error(const char* fmt, ...) override{
-               va_list arg;
-               va_start(arg, fmt);
-               dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
-               va_end(arg);
-       }
-};
-
-struct GrokInitializer {
-       GrokInitializer(void) {
-               setMessengerLogger(new GrokLogger("[GROK] "));
-       }
-       ~GrokInitializer()  = default;
-};
-
-struct FrameProxy {
-       FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
-       {}
-       FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
-       {}
-       int index() const {
-               return index_;
-       }
-       Eyes eyes(void) const {
-               return eyes_;
-       }
-       int index_;
-       Eyes eyes_;
-       DCPVideo vf;
-};
-
-struct DcpomaticContext {
-       DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
-                                               EventHistory &history, const std::string &location) :
-                                                                       film_(film), writer_(writer),
-                                                                       history_(history), location_(location),
-                                                                       width_(0), height_(0)
-       {}
-       void setDimensions(uint32_t w, uint32_t h) {
-               width_ = w;
-               height_ = h;
-       }
-       std::shared_ptr<const Film> film_;
-       Writer& writer_;
-       EventHistory &history_;
-       std::string location_;
-       uint32_t width_;
-       uint32_t height_;
-};
-
-class GrokContext {
-public:
-       explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
-                                                               dcpomaticContext_(dcpomaticContext),
-                                                               messenger_(nullptr),
-                                                               launched_(false)
-       {
-               struct CompressedData : public dcp::Data {
-                       explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
-                       {}
-                       ~CompressedData(void){
-                               delete[] data_;
-                       }
-                       uint8_t const * data () const override {
-                               return data_;
-                       }
-                       uint8_t * data () override {
-                               return data_;
-                       }
-                       int size () const override {
-                               return dataLen_;
-                       }
-                       uint8_t *data_;
-                       int dataLen_;
-               };
-               if (Config::instance()->enable_gpu ())  {
-                   boost::filesystem::path folder(dcpomaticContext_.location_);
-                   boost::filesystem::path binaryPath = folder / "grk_compress";
-                   if (!boost::filesystem::exists(binaryPath)) {
-                       getMessengerLogger()->error("Invalid binary location %s",
-                                       dcpomaticContext_.location_.c_str());
-                               return;
-                   }
-                       auto proc = [this](const std::string& str) {
-                               try {
-                                       Msg msg(str);
-                                       auto tag = msg.next();
-                                       if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
-                                       {
-                                               auto clientFrameId = msg.nextUint();
-                                               auto compressedFrameId = msg.nextUint();
-                                               (void)compressedFrameId;
-                                               auto compressedFrameLength = msg.nextUint();
-                                               auto  processor =
-                                                               [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
-                                               {
-                                                       auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
-                                                       memcpy(compressedData->data_,compressed,compressedFrameLength );
-                                                       dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
-                                                       frame_done ();
-                                               };
-                                               int const minimum_size = 16384;
-                                               bool needsRecompression = compressedFrameLength < minimum_size;
-                                               messenger_->processCompressed(str, processor, needsRecompression);
-                                               if (needsRecompression) {
-                                                       bool success = false;
-                                                       auto fp = messenger_->retrieve(clientFrameId, success);
-                                                       if (!success)
-                                                               return;
-
-                                                       auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
-                                                       dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
-                                                       frame_done ();
-                                               }
-                                       }
-                               } catch (std::exception &ex){
-                                       getMessengerLogger()->error("%s",ex.what());
-                               }
-                       };
-                       auto clientInit =
-                               MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
-                                                         grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
-                                                         std::thread::hardware_concurrency());
-                       messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
-               }
-       }
-       ~GrokContext(void) {
-               shutdown();
-       }
-       bool launch(DCPVideo dcpv, int device){
-               if (!messenger_ )
-                       return false;
-               if (launched_)
-                       return true;
-               std::unique_lock<std::mutex> lk_global(launchMutex);
-               if (!messenger_)
-                       return false;
-               if (launched_)
-                       return true;
-               if (MessengerInit::firstLaunch(true)) {
-                       auto s = dcpv.get_size();
-                       dcpomaticContext_.setDimensions(s.width, s.height);
-                       auto config = Config::instance();
-                       messenger_->launchGrok(dcpomaticContext_.location_,
-                                       dcpomaticContext_.width_,dcpomaticContext_.width_,
-                                       dcpomaticContext_.height_,
-                                       3, 12, device,
-                                       dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
-                                       dcpomaticContext_.film_->video_frame_rate(),
-                                       dcpomaticContext_.film_->j2k_bandwidth(),
-                                       config->gpu_license_server(),
-                                       config->gpu_license_port(),
-                                       config->gpu_license());
-               }
-               launched_ =  messenger_->waitForClientInit();
-
-               return launched_;
-       }
-       bool scheduleCompress(const DCPVideo &vf){
-               if (!messenger_)
-                       return false;
-
-               auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
-               auto cvt = [this, &fp](BufferSrc src){
-                       // xyz conversion
-                       fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
-               };
-               return messenger_->scheduleCompress(fp, cvt);
-       }
-       void shutdown(void){
-               if (!messenger_)
-                       return;
-
-               std::unique_lock<std::mutex> lk_global(launchMutex);
-               if (!messenger_)
-                       return;
-               if (launched_)
-                       messenger_->shutdown();
-               delete messenger_;
-               messenger_ = nullptr;
-       }
-       void frame_done () {
-               dcpomaticContext_.history_.event ();
-       }
-private:
-       DcpomaticContext dcpomaticContext_;
-       ScheduledMessenger<FrameProxy> *messenger_;
-       bool launched_;
-};
-
-}
-
diff --git a/src/lib/grok_messenger.h b/src/lib/grok_messenger.h
deleted file mode 100644 (file)
index 45ee752..0000000
+++ /dev/null
@@ -1,930 +0,0 @@
-/*
-    Copyright (C) 2023 Grok Image Compression Inc.
-
-    This file is part of DCP-o-matic.
-
-    DCP-o-matic is free software; you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation; either version 2 of the License, or
-    (at your option) any later version.
-
-    DCP-o-matic is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
-
-*/
-#pragma once
-
-#include <iostream>
-#include <string>
-#include <cstring>
-#include <atomic>
-#include <functional>
-#include <sstream>
-#include <future>
-#include <map>
-#include <thread>
-#include <mutex>
-#include <condition_variable>
-#include <queue>
-#include <cassert>
-#include <cstdarg>
-
-#ifdef _WIN32
-#include <windows.h>
-#include <direct.h>
-#include <tlhelp32.h>
-#pragma warning(disable : 4100)
-#else
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/mman.h>
-#include <semaphore.h>
-#include <signal.h>
-#endif
-
-namespace grk_plugin
-{
-static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
-static std::string grokSentSynch = "Global\\grok_sent";
-static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
-static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
-static std::string clientSentSynch = "Global\\client_sent";
-static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
-static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
-static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
-static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
-static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
-static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
-static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
-       "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
-static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
-static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
-       "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
-static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
-static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
-static const size_t messageBufferLen = 256;
-struct IMessengerLogger
-{
-       virtual ~IMessengerLogger(void) = default;
-       virtual void info(const char* fmt, ...) = 0;
-       virtual void warn(const char* fmt, ...) = 0;
-       virtual void error(const char* fmt, ...) = 0;
-
-  protected:
-       template<typename... Args>
-       std::string log_message(char const* const format, Args&... args) noexcept
-       {
-               constexpr size_t message_size = 512;
-               char message[message_size];
-
-               std::snprintf(message, message_size, format, args...);
-               return std::string(message);
-       }
-};
-struct MessengerLogger : public IMessengerLogger
-{
-       explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
-       virtual ~MessengerLogger() = default;
-       virtual void info(const char* fmt, ...) override
-       {
-               va_list args;
-               std::string new_fmt = preamble_ + fmt + "\n";
-               va_start(args, fmt);
-               vfprintf(stdout, new_fmt.c_str(), args);
-               va_end(args);
-       }
-       virtual void warn(const char* fmt, ...) override
-       {
-               va_list args;
-               std::string new_fmt = preamble_ + fmt + "\n";
-               va_start(args, fmt);
-               vfprintf(stdout, new_fmt.c_str(), args);
-               va_end(args);
-       }
-       virtual void error(const char* fmt, ...) override
-       {
-               va_list args;
-               std::string new_fmt = preamble_ + fmt + "\n";
-               va_start(args, fmt);
-               vfprintf(stderr, new_fmt.c_str(), args);
-               va_end(args);
-       }
-
-  protected:
-       std::string preamble_;
-};
-
-static IMessengerLogger* sLogger = nullptr;
-#if defined(__GNUC__) || defined(__clang__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wunused-function"
-#endif
-static void setMessengerLogger(IMessengerLogger* logger)
-{
-       delete sLogger;
-       sLogger = logger;
-}
-#if defined(__GNUC__) || defined(__clang__)
-#pragma GCC diagnostic pop
-#endif
-static IMessengerLogger* getMessengerLogger(void)
-{
-       return sLogger;
-}
-struct MessengerInit
-{
-       MessengerInit(const std::string &outBuf, const std::string &outSent,
-                                 const std::string &outReceiveReady, const std::string &inBuf,
-                                 const std::string &inSent,
-                                 const std::string &inReceiveReady,
-                                 std::function<void(std::string)> processor,
-                                 size_t numProcessingThreads)
-               : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
-                 outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
-                 inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
-                 numProcessingThreads_(numProcessingThreads),
-                 uncompressedFrameSize_(0), compressedFrameSize_(0),
-                 numFrames_(0)
-       {
-               if(firstLaunch(true))
-                       unlink();
-       }
-       void unlink(void)
-       {
-#ifndef _WIN32
-               shm_unlink(grokToClientMessageBuf.c_str());
-               shm_unlink(clientToGrokMessageBuf.c_str());
-#endif
-       }
-       static bool firstLaunch(bool isClient)
-       {
-               bool debugGrok = false;
-               return debugGrok != isClient;
-       }
-       std::string outboundMessageBuf;
-       std::string outboundSentSynch;
-       std::string outboundReceiveReadySynch;
-
-       std::string inboundMessageBuf;
-       std::string inboundSentSynch;
-       std::string inboundReceiveReadySynch;
-
-       std::function<void(std::string)> processor_;
-       size_t numProcessingThreads_;
-
-       size_t uncompressedFrameSize_;
-       size_t compressedFrameSize_;
-       size_t numFrames_;
-};
-
-/*************************** Synchronization *******************************/
-enum SynchDirection
-{
-       SYNCH_SENT,
-       SYNCH_RECEIVE_READY
-};
-
-typedef int grk_handle;
-struct Synch
-{
-       Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
-               : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
-       {
-               // unlink semaphores in case of previous crash
-               if(MessengerInit::firstLaunch(true))
-                       unlink();
-               open();
-       }
-       ~Synch()
-       {
-               close();
-               if(MessengerInit::firstLaunch(true))
-                       unlink();
-       }
-       void post(SynchDirection dir)
-       {
-               auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
-               int rc = sem_post(sem);
-               if(rc)
-                       getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
-       }
-       void wait(SynchDirection dir)
-       {
-               auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
-               int rc = sem_wait(sem);
-               if(rc)
-                       getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
-       }
-       void open(void)
-       {
-               sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
-               if(!sentSem_)
-                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
-               receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
-               if(!receiveReadySem_)
-                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
-       }
-       void close(void)
-       {
-               int rc = sem_close(sentSem_);
-               if(rc)
-                       getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
-                                                                               strerror(errno));
-               rc = sem_close(receiveReadySem_);
-               if(rc)
-                       getMessengerLogger()->error("Error closing semaphore %s: %s",
-                                                                               receiveReadySemName_.c_str(), strerror(errno));
-       }
-       void unlink(void)
-       {
-               int rc = sem_unlink(sentSemName_.c_str());
-               if(rc == -1 && errno != ENOENT)
-                       getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
-                                                                               strerror(errno));
-               rc = sem_unlink(receiveReadySemName_.c_str());
-               if(rc == -1 && errno != ENOENT)
-                       getMessengerLogger()->error("Error unlinking semaphore %s: %s",
-                                                                               receiveReadySemName_.c_str(), strerror(errno));
-       }
-       sem_t* sentSem_;
-       sem_t* receiveReadySem_;
-
-  private:
-       std::string sentSemName_;
-       std::string receiveReadySemName_;
-};
-struct SharedMemoryManager
-{
-       static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
-       {
-               *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
-               if(*shm_fd < 0)
-               {
-                       getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
-                       return false;
-               }
-               int rc = ftruncate(*shm_fd, sizeof(char) * len);
-               if(rc)
-               {
-                       getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
-                       rc = close(*shm_fd);
-                       if(rc)
-                               getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
-                       rc = shm_unlink(name.c_str());
-                       if(rc)
-                               getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
-                       return false;
-               }
-               *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
-               if(!*buffer)
-               {
-                       getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
-                       rc = close(*shm_fd);
-                       if(rc)
-                               getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
-                       rc = shm_unlink(name.c_str());
-                       if(rc)
-                               getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
-               }
-
-               return *buffer != nullptr;
-       }
-       static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
-       {
-               if (!*buffer || !shm_fd)
-                       return true;
-
-               int rc = munmap(*buffer, len);
-               *buffer = nullptr;
-               if(rc)
-                       getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
-               rc = close(shm_fd);
-               shm_fd = 0;
-               if(rc)
-                       getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
-               rc = shm_unlink(name.c_str());
-               if(rc)
-                       fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
-
-               return true;
-       }
-};
-
-template<typename Data>
-class MessengerBlockingQueue
-{
-  public:
-       explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
-       MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
-       size_t size() const
-       {
-               return queue_.size();
-       }
-       // deactivate and clear queue
-       void deactivate()
-       {
-               {
-                       std::lock_guard<std::mutex> lk(mutex_);
-                       active_ = false;
-                       while(!queue_.empty())
-                               queue_.pop();
-               }
-
-               // release all waiting threads
-               can_pop_.notify_all();
-               can_push_.notify_all();
-       }
-       void activate()
-       {
-               std::lock_guard<std::mutex> lk(mutex_);
-               active_ = true;
-       }
-       bool push(Data const& value)
-       {
-               bool rc;
-               {
-                       std::unique_lock<std::mutex> lk(mutex_);
-                       rc = push_(value);
-               }
-               if(rc)
-                       can_pop_.notify_one();
-
-               return rc;
-       }
-       bool waitAndPush(Data& value)
-       {
-               bool rc;
-               {
-                       std::unique_lock<std::mutex> lk(mutex_);
-                       if(!active_)
-                               return false;
-                       // in case of spurious wakeup, loop until predicate in lambda
-                       // is satisfied.
-                       can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
-                       rc = push_(value);
-               }
-               if(rc)
-                       can_pop_.notify_one();
-
-               return rc;
-       }
-       bool pop(Data& value)
-       {
-               bool rc;
-               {
-                       std::unique_lock<std::mutex> lk(mutex_);
-                       rc = pop_(value);
-               }
-               if(rc)
-                       can_push_.notify_one();
-
-               return rc;
-       }
-       bool waitAndPop(Data& value)
-       {
-               bool rc;
-               {
-                       std::unique_lock<std::mutex> lk(mutex_);
-                       if(!active_)
-                               return false;
-                       // in case of spurious wakeup, loop until predicate in lambda
-                       // is satisfied.
-                       can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
-                       rc = pop_(value);
-               }
-               if(rc)
-                       can_push_.notify_one();
-
-               return rc;
-       }
-
-  private:
-       bool push_(Data const& value)
-       {
-               if(queue_.size() == max_size_ || !active_)
-                       return false;
-               queue_.push(value);
-
-               return true;
-       }
-       bool pop_(Data& value)
-       {
-               if(queue_.empty() || !active_)
-                       return false;
-               value = queue_.front();
-               queue_.pop();
-
-               return true;
-       }
-       std::queue<Data> queue_;
-       mutable std::mutex mutex_;
-       std::condition_variable can_pop_;
-       std::condition_variable can_push_;
-       bool active_;
-       size_t max_size_;
-};
-struct BufferSrc
-{
-       BufferSrc(void) : BufferSrc("") {}
-       explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
-       {}
-       BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
-               : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
-       {}
-       bool fromDisk(void)
-       {
-               return !file_.empty() && framePtr_ == nullptr;
-       }
-       size_t index() const
-       {
-               return clientFrameId_;
-       }
-       std::string file_;
-       size_t clientFrameId_;
-       size_t frameId_;
-       uint8_t* framePtr_;
-};
-
-struct Messenger;
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
-
-struct Messenger
-{
-       explicit Messenger(MessengerInit init)
-               : running(true), initialized_(false), shutdown_(false), init_(init),
-                 outboundSynch_(nullptr),
-                 inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
-                 uncompressed_fd_(0), compressed_fd_(0)
-       {}
-       virtual ~Messenger(void)
-       {
-               running = false;
-               sendQueue.deactivate();
-               receiveQueue.deactivate();
-
-               if (outboundSynch_) {
-                       outboundSynch_->post(SYNCH_RECEIVE_READY);
-                       outbound.join();
-               }
-
-               if (inboundSynch_) {
-                       inboundSynch_->post(SYNCH_SENT);
-                       inbound.join();
-               }
-
-               for(auto& p : processors_)
-                       p.join();
-
-               delete outboundSynch_;
-               delete inboundSynch_;
-
-               deinitShm();
-       }
-       void startThreads(void) {
-               outboundSynch_ =
-                       new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
-               outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
-
-               inboundSynch_ =
-                       new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
-               inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
-
-               for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
-                       processors_.push_back(std::thread(processorThread, this, init_.processor_));
-       }
-       size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
-                                        size_t compressedLength)
-       {
-               char fname[512];
-               if(!compressedPtr || !compressedLength)
-                       return 0;
-               sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
-               auto fp = fopen(fname, "wb");
-               if(!fp)
-                       return 0;
-               size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
-               if(written != compressedLength)
-               {
-                       fclose(fp);
-                       return 0;
-               }
-               fflush(fp);
-               fclose(fp);
-
-               return written;
-       }
-       bool initBuffers(void)
-       {
-               bool rc = true;
-               if(init_.uncompressedFrameSize_)
-               {
-                       rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
-                                                                                                       init_.uncompressedFrameSize_ * init_.numFrames_,
-                                                                                                       &uncompressed_fd_, &uncompressed_buffer_);
-               }
-               if(init_.compressedFrameSize_)
-               {
-                       rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
-                                                                                                       init_.compressedFrameSize_ * init_.numFrames_,
-                                                                                                       &compressed_fd_, &compressed_buffer_);
-               }
-
-               return rc;
-       }
-
-       bool deinitShm(void)
-       {
-               bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
-                                                                                                init_.uncompressedFrameSize_ * init_.numFrames_,
-                                                                                                uncompressed_fd_, &uncompressed_buffer_);
-               rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
-                                                                                                 init_.compressedFrameSize_ * init_.numFrames_,
-                                                                                                 compressed_fd_, &compressed_buffer_);
-
-               return rc;
-       }
-       template<typename... Args>
-       void send(const std::string& str, Args... args)
-       {
-               std::ostringstream oss;
-               oss << str;
-               int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
-               static_cast<void>(dummy);
-
-               sendQueue.push(oss.str());
-       }
-       static pid_t get_pid_by_process_name(const char* name)
-       {
-               char command[256];
-               snprintf(command, sizeof(command), "pgrep %s", name);
-               auto pgrep = popen(command, "r");
-               if(!pgrep)
-                       return -1;
-               pid_t pid;
-               if(fscanf(pgrep, "%d", &pid) != 1)
-                       pid = -1;
-               pclose(pgrep);
-
-               return pid;
-       }
-       static bool terminate_process(const char* name)
-       {
-               auto pid = get_pid_by_process_name(name);
-
-               return (pid != -1 && kill(pid, SIGTERM) != -1);
-       }
-       static bool kill_process(const char* name)
-       {
-               auto pid = get_pid_by_process_name(name);
-
-               return (pid != -1 && kill(pid, SIGKILL) != -1);
-       }
-       void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
-                                                               uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
-                                                               int device, bool is4K, uint32_t fps, uint32_t bandwidth,
-                                                               const std::string server, uint32_t port,
-                                                               const std::string license)
-       {
-
-               std::unique_lock<std::mutex> lk(shutdownMutex_);
-               if (async_result_.valid())
-                       return;
-               if(MessengerInit::firstLaunch(true))
-                       init_.unlink();
-               startThreads();
-               char _cmd[4096];
-               auto fullServer = server + ":" + std::to_string(port);
-               sprintf(_cmd,
-                               "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
-                               "-G %d -%s %d,%d -j %s -J %s",
-                               GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
-                               device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
-                               license.c_str(), fullServer.c_str());
-               launch(_cmd, dir);
-       }
-       void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
-       {
-               // client fills queue with pending uncompressed buffers
-               init_.uncompressedFrameSize_ = uncompressedFrameSize;
-               init_.compressedFrameSize_ = compressedFrameSize;
-               init_.numFrames_ = numFrames;
-               initBuffers();
-               auto ptr = uncompressed_buffer_;
-               for(size_t i = 0; i < init_.numFrames_; ++i)
-               {
-                       availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
-                       ptr += init_.uncompressedFrameSize_;
-               }
-
-               std::unique_lock<std::mutex> lk(shutdownMutex_);
-               initialized_ = true;
-               clientInitializedCondition_.notify_all();
-       }
-       bool waitForClientInit(void)
-       {
-               if(initialized_)
-                       return true;
-
-               std::unique_lock<std::mutex> lk(shutdownMutex_);
-               if(initialized_)
-                       return true;
-               else if (shutdown_)
-                       return false;
-               clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
-
-               return initialized_ && !shutdown_;
-       }
-       static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
-       {
-               return sizeof(uint16_t) * w * h * samplesPerPixel;
-       }
-       void reclaimCompressed(size_t frameId)
-       {
-               availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
-       }
-       void reclaimUncompressed(size_t frameId)
-       {
-               availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
-       }
-       uint8_t* getUncompressedFrame(size_t frameId)
-       {
-               assert(frameId < init_.numFrames_);
-               if(frameId >= init_.numFrames_)
-                       return nullptr;
-
-               return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
-       }
-       uint8_t* getCompressedFrame(size_t frameId)
-       {
-               assert(frameId < init_.numFrames_);
-               if(frameId >= init_.numFrames_)
-                       return nullptr;
-
-               return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
-       }
-       std::atomic_bool running;
-       bool initialized_;
-       bool shutdown_;
-       MessengerBlockingQueue<std::string> sendQueue;
-       MessengerBlockingQueue<std::string> receiveQueue;
-       MessengerBlockingQueue<BufferSrc> availableBuffers_;
-       MessengerInit init_;
-       std::string cmd_;
-       std::future<int> async_result_;
-       std::mutex shutdownMutex_;
-       std::condition_variable shutdownCondition_;
-
-  protected:
-       std::condition_variable clientInitializedCondition_;
-  private:
-       void launch(const std::string &cmd, const std::string &dir)
-       {
-               // Change the working directory
-               if(!dir.empty())
-               {
-                       if(chdir(dir.c_str()) != 0)
-                       {
-                               getMessengerLogger()->error("Error: failed to change the working directory");
-                               return;
-                       }
-               }
-               // Execute the command using std::async and std::system
-               cmd_ = cmd;
-               async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
-       }
-       std::thread outbound;
-       Synch* outboundSynch_;
-
-       std::thread inbound;
-       Synch* inboundSynch_;
-
-       std::vector<std::thread> processors_;
-       char* uncompressed_buffer_;
-       char* compressed_buffer_;
-
-       grk_handle uncompressed_fd_;
-       grk_handle compressed_fd_;
-};
-
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
-{
-       grk_handle shm_fd = 0;
-       char* send_buffer = nullptr;
-
-       if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
-               return;
-       while(messenger->running)
-       {
-               synch->wait(SYNCH_RECEIVE_READY);
-               if(!messenger->running)
-                       break;
-               std::string message;
-               if(!messenger->sendQueue.waitAndPop(message))
-                       break;
-               if(!messenger->running)
-                       break;
-               memcpy(send_buffer, message.c_str(), message.size() + 1);
-               synch->post(SYNCH_SENT);
-       }
-       SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
-}
-
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
-{
-       grk_handle shm_fd = 0;
-       char* receive_buffer = nullptr;
-
-       if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
-               return;
-       while(messenger->running)
-       {
-               synch->wait(SYNCH_SENT);
-               if(!messenger->running)
-                       break;
-               auto message = std::string(receive_buffer);
-               synch->post(SYNCH_RECEIVE_READY);
-               messenger->receiveQueue.push(message);
-       }
-       SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
-}
-struct Msg
-{
-       explicit Msg(const std::string &msg) : ct_(0)
-       {
-               std::stringstream ss(msg);
-               while(ss.good())
-               {
-                       std::string substr;
-                       std::getline(ss, substr, ',');
-                       cs_.push_back(substr);
-               }
-       }
-       std::string next()
-       {
-               if(ct_ == cs_.size())
-               {
-                       getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
-                       return "";
-               }
-               return cs_[ct_++];
-       }
-
-       uint32_t nextUint(void)
-       {
-               return (uint32_t)std::stoi(next());
-       }
-
-       std::vector<std::string> cs_;
-       size_t ct_;
-};
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
-{
-       while(messenger->running)
-       {
-               std::string message;
-               if(!messenger->receiveQueue.waitAndPop(message))
-                       break;
-               if(!messenger->running)
-                       break;
-               Msg msg(message);
-               auto tag = msg.next();
-               if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
-               {
-                       auto width = msg.nextUint();
-                       auto stride = msg.nextUint();
-                       (void)stride;
-                       auto height = msg.nextUint();
-                       auto samplesPerPixel = msg.nextUint();
-                       auto depth = msg.nextUint();
-                       (void)depth;
-                       messenger->init_.uncompressedFrameSize_ =
-                               Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
-                       auto compressedFrameSize = msg.nextUint();
-                       auto numFrames = msg.nextUint();
-                       messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
-               }
-               else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
-               {
-                       messenger->reclaimUncompressed(msg.nextUint());
-               }
-               else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
-               {
-                       messenger->reclaimCompressed(msg.nextUint());
-               }
-               processor(message);
-       }
-}
-
-template<typename F>
-struct ScheduledFrames
-{
-       void store(F& val)
-       {
-               std::unique_lock<std::mutex> lk(mapMutex_);
-               auto it = map_.find(val.index());
-               if (it == map_.end())
-                       map_[val.index()] = val;
-       }
-       F retrieve(size_t index, bool &success)
-       {
-               std::unique_lock<std::mutex> lk(mapMutex_);
-               success = false;
-               auto it = map_.find(index);
-               if(it == map_.end())
-                       return F();
-
-               success = true;
-               F val = it->second;
-               map_.erase(index);
-
-               return val;
-       }
-
- private:
-       std::mutex mapMutex_;
-       std::map<size_t, F> map_;
-};
-
-template<typename F>
-struct ScheduledMessenger : public Messenger
-{
-       explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
-                                                                                       framesScheduled_(0),
-                                                                                       framesCompressed_(0)
-       {}
-       ~ScheduledMessenger(void) {
-               shutdown();
-       }
-       bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
-               size_t frameSize = init_.uncompressedFrameSize_;
-               assert(frameSize >= init_.uncompressedFrameSize_);
-               BufferSrc src;
-               if(!availableBuffers_.waitAndPop(src))
-                       return false;
-               converter(src);
-               scheduledFrames_.store(proxy);
-               framesScheduled_++;
-               send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
-
-               return true;
-       }
-       void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
-               Msg msg(message);
-               msg.next();
-               auto clientFrameId = msg.nextUint();
-               auto compressedFrameId = msg.nextUint();
-               auto compressedFrameLength = msg.nextUint();
-               if (!needsRecompression) {
-                       bool success = false;
-                       auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
-                       if (!success)
-                               return;
-                       processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
-               }
-               ++framesCompressed_;
-               send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
-               if (shutdown_ && framesCompressed_ == framesScheduled_)
-                       shutdownCondition_.notify_all();
-       }
-       void shutdown(void){
-               try {
-                       std::unique_lock<std::mutex> lk(shutdownMutex_);
-                       if (!async_result_.valid())
-                               return;
-                       shutdown_ = true;
-                       if (framesScheduled_) {
-                               uint32_t scheduled = framesScheduled_;
-                               send(GRK_MSGR_BATCH_FLUSH, scheduled);
-                               shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
-                       }
-                       availableBuffers_.deactivate();
-                       send(GRK_MSGR_BATCH_SHUTDOWN);
-                       int result = async_result_.get();
-                       if(result != 0)
-                               getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
-               } catch (std::exception &ex) {
-                       getMessengerLogger()->error("%s",ex.what());
-               }
-
-       }
-       F retrieve(size_t index, bool &success) {
-               return scheduledFrames_.retrieve(index, success);
-       }
-       void store(F& val) {
-               scheduledFrames_.store(val);
-       }
-
-private:
-       ScheduledFrames<F> scheduledFrames_;
-       std::atomic<uint32_t> framesScheduled_;
-       std::atomic<uint32_t> framesCompressed_;
-};
-
-} // namespace grk_plugin
index 6df30a3f7e7fcdac976f54da141e4a6e01afa58c..a3ea9f3e8684bd52c46a65c5b39024b74b206fca 100644 (file)
@@ -27,6 +27,7 @@
  *  @brief J2KEncoder class.
  */
 
+#include "grok/context.h"
 
 #include "cross.h"
 #include "dcp_video.h"
@@ -42,8 +43,6 @@
 #include <list>
 #include <stdint.h>
 
-#include "grok_context.h"
-
 class EncodeServerDescription;
 class Film;
 class Job;