--- /dev/null
+/*
+ Copyright (C) 2023 Grok Image Compression Inc.
+
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ DCP-o-matic is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+#pragma once
+
+#include "../config.h"
+#include "../dcp_video.h"
+#include "../log.h"
+#include "../dcpomatic_log.h"
+#include "../writer.h"
+#include "messenger.h"
+
+class Film;
+using dcp::Data;
+using namespace dcpomatic;
+
+static std::mutex launchMutex;
+
+namespace grk_plugin
+{
+
+struct GrokLogger : public MessengerLogger {
+ explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
+ {}
+ virtual ~GrokLogger() = default;
+ void info(const char* fmt, ...) override{
+ va_list arg;
+ va_start(arg, fmt);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
+ va_end(arg);
+ }
+ void warn(const char* fmt, ...) override{
+ va_list arg;
+ va_start(arg, fmt);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
+ va_end(arg);
+ }
+ void error(const char* fmt, ...) override{
+ va_list arg;
+ va_start(arg, fmt);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
+ va_end(arg);
+ }
+};
+
+struct GrokInitializer {
+ GrokInitializer(void) {
+ setMessengerLogger(new GrokLogger("[GROK] "));
+ }
+ ~GrokInitializer() = default;
+};
+
+struct FrameProxy {
+ FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
+ {}
+ FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
+ {}
+ int index() const {
+ return index_;
+ }
+ Eyes eyes(void) const {
+ return eyes_;
+ }
+ int index_;
+ Eyes eyes_;
+ DCPVideo vf;
+};
+
+struct DcpomaticContext {
+ DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
+ EventHistory &history, const std::string &location) :
+ film_(film), writer_(writer),
+ history_(history), location_(location),
+ width_(0), height_(0)
+ {}
+ void setDimensions(uint32_t w, uint32_t h) {
+ width_ = w;
+ height_ = h;
+ }
+ std::shared_ptr<const Film> film_;
+ Writer& writer_;
+ EventHistory &history_;
+ std::string location_;
+ uint32_t width_;
+ uint32_t height_;
+};
+
+class GrokContext {
+public:
+ explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
+ dcpomaticContext_(dcpomaticContext),
+ messenger_(nullptr),
+ launched_(false)
+ {
+ struct CompressedData : public dcp::Data {
+ explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
+ {}
+ ~CompressedData(void){
+ delete[] data_;
+ }
+ uint8_t const * data () const override {
+ return data_;
+ }
+ uint8_t * data () override {
+ return data_;
+ }
+ int size () const override {
+ return dataLen_;
+ }
+ uint8_t *data_;
+ int dataLen_;
+ };
+ if (Config::instance()->enable_gpu ()) {
+ boost::filesystem::path folder(dcpomaticContext_.location_);
+ boost::filesystem::path binaryPath = folder / "grk_compress";
+ if (!boost::filesystem::exists(binaryPath)) {
+ getMessengerLogger()->error("Invalid binary location %s",
+ dcpomaticContext_.location_.c_str());
+ return;
+ }
+ auto proc = [this](const std::string& str) {
+ try {
+ Msg msg(str);
+ auto tag = msg.next();
+ if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
+ {
+ auto clientFrameId = msg.nextUint();
+ auto compressedFrameId = msg.nextUint();
+ (void)compressedFrameId;
+ auto compressedFrameLength = msg.nextUint();
+ auto processor =
+ [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
+ {
+ auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
+ memcpy(compressedData->data_,compressed,compressedFrameLength );
+ dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
+ frame_done ();
+ };
+ int const minimum_size = 16384;
+ bool needsRecompression = compressedFrameLength < minimum_size;
+ messenger_->processCompressed(str, processor, needsRecompression);
+ if (needsRecompression) {
+ bool success = false;
+ auto fp = messenger_->retrieve(clientFrameId, success);
+ if (!success)
+ return;
+
+ auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
+ dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
+ frame_done ();
+ }
+ }
+ } catch (std::exception &ex){
+ getMessengerLogger()->error("%s",ex.what());
+ }
+ };
+ auto clientInit =
+ MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
+ grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
+ std::thread::hardware_concurrency());
+ messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
+ }
+ }
+ ~GrokContext(void) {
+ shutdown();
+ }
+ bool launch(DCPVideo dcpv, int device){
+ if (!messenger_ )
+ return false;
+ if (launched_)
+ return true;
+ std::unique_lock<std::mutex> lk_global(launchMutex);
+ if (!messenger_)
+ return false;
+ if (launched_)
+ return true;
+ if (MessengerInit::firstLaunch(true)) {
+ auto s = dcpv.get_size();
+ dcpomaticContext_.setDimensions(s.width, s.height);
+ auto config = Config::instance();
+ messenger_->launchGrok(dcpomaticContext_.location_,
+ dcpomaticContext_.width_,dcpomaticContext_.width_,
+ dcpomaticContext_.height_,
+ 3, 12, device,
+ dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
+ dcpomaticContext_.film_->video_frame_rate(),
+ dcpomaticContext_.film_->j2k_bandwidth(),
+ config->gpu_license_server(),
+ config->gpu_license_port(),
+ config->gpu_license());
+ }
+ launched_ = messenger_->waitForClientInit();
+
+ return launched_;
+ }
+ bool scheduleCompress(const DCPVideo &vf){
+ if (!messenger_)
+ return false;
+
+ auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
+ auto cvt = [this, &fp](BufferSrc src){
+ // xyz conversion
+ fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
+ };
+ return messenger_->scheduleCompress(fp, cvt);
+ }
+ void shutdown(void){
+ if (!messenger_)
+ return;
+
+ std::unique_lock<std::mutex> lk_global(launchMutex);
+ if (!messenger_)
+ return;
+ if (launched_)
+ messenger_->shutdown();
+ delete messenger_;
+ messenger_ = nullptr;
+ }
+ void frame_done () {
+ dcpomaticContext_.history_.event ();
+ }
+private:
+ DcpomaticContext dcpomaticContext_;
+ ScheduledMessenger<FrameProxy> *messenger_;
+ bool launched_;
+};
+
+}
+
--- /dev/null
+/*
+ Copyright (C) 2023 Grok Image Compression Inc.
+
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ DCP-o-matic is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+#pragma once
+
+#include <iostream>
+#include <string>
+#include <cstring>
+#include <atomic>
+#include <functional>
+#include <sstream>
+#include <future>
+#include <map>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <cassert>
+#include <cstdarg>
+
+#ifdef _WIN32
+#include <windows.h>
+#include <direct.h>
+#include <tlhelp32.h>
+#pragma warning(disable : 4100)
+#else
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <semaphore.h>
+#include <signal.h>
+#endif
+
+namespace grk_plugin
+{
+static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
+static std::string grokSentSynch = "Global\\grok_sent";
+static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
+static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
+static std::string clientSentSynch = "Global\\client_sent";
+static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
+static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
+static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
+static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
+static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
+static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
+ "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
+ "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
+static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
+static const size_t messageBufferLen = 256;
+struct IMessengerLogger
+{
+ virtual ~IMessengerLogger(void) = default;
+ virtual void info(const char* fmt, ...) = 0;
+ virtual void warn(const char* fmt, ...) = 0;
+ virtual void error(const char* fmt, ...) = 0;
+
+ protected:
+ template<typename... Args>
+ std::string log_message(char const* const format, Args&... args) noexcept
+ {
+ constexpr size_t message_size = 512;
+ char message[message_size];
+
+ std::snprintf(message, message_size, format, args...);
+ return std::string(message);
+ }
+};
+struct MessengerLogger : public IMessengerLogger
+{
+ explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
+ virtual ~MessengerLogger() = default;
+ virtual void info(const char* fmt, ...) override
+ {
+ va_list args;
+ std::string new_fmt = preamble_ + fmt + "\n";
+ va_start(args, fmt);
+ vfprintf(stdout, new_fmt.c_str(), args);
+ va_end(args);
+ }
+ virtual void warn(const char* fmt, ...) override
+ {
+ va_list args;
+ std::string new_fmt = preamble_ + fmt + "\n";
+ va_start(args, fmt);
+ vfprintf(stdout, new_fmt.c_str(), args);
+ va_end(args);
+ }
+ virtual void error(const char* fmt, ...) override
+ {
+ va_list args;
+ std::string new_fmt = preamble_ + fmt + "\n";
+ va_start(args, fmt);
+ vfprintf(stderr, new_fmt.c_str(), args);
+ va_end(args);
+ }
+
+ protected:
+ std::string preamble_;
+};
+
+static IMessengerLogger* sLogger = nullptr;
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-function"
+#endif
+static void setMessengerLogger(IMessengerLogger* logger)
+{
+ delete sLogger;
+ sLogger = logger;
+}
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+static IMessengerLogger* getMessengerLogger(void)
+{
+ return sLogger;
+}
+struct MessengerInit
+{
+ MessengerInit(const std::string &outBuf, const std::string &outSent,
+ const std::string &outReceiveReady, const std::string &inBuf,
+ const std::string &inSent,
+ const std::string &inReceiveReady,
+ std::function<void(std::string)> processor,
+ size_t numProcessingThreads)
+ : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
+ outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
+ inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
+ numProcessingThreads_(numProcessingThreads),
+ uncompressedFrameSize_(0), compressedFrameSize_(0),
+ numFrames_(0)
+ {
+ if(firstLaunch(true))
+ unlink();
+ }
+ void unlink(void)
+ {
+#ifndef _WIN32
+ shm_unlink(grokToClientMessageBuf.c_str());
+ shm_unlink(clientToGrokMessageBuf.c_str());
+#endif
+ }
+ static bool firstLaunch(bool isClient)
+ {
+ bool debugGrok = false;
+ return debugGrok != isClient;
+ }
+ std::string outboundMessageBuf;
+ std::string outboundSentSynch;
+ std::string outboundReceiveReadySynch;
+
+ std::string inboundMessageBuf;
+ std::string inboundSentSynch;
+ std::string inboundReceiveReadySynch;
+
+ std::function<void(std::string)> processor_;
+ size_t numProcessingThreads_;
+
+ size_t uncompressedFrameSize_;
+ size_t compressedFrameSize_;
+ size_t numFrames_;
+};
+
+/*************************** Synchronization *******************************/
+enum SynchDirection
+{
+ SYNCH_SENT,
+ SYNCH_RECEIVE_READY
+};
+
+typedef int grk_handle;
+struct Synch
+{
+ Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
+ : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
+ {
+ // unlink semaphores in case of previous crash
+ if(MessengerInit::firstLaunch(true))
+ unlink();
+ open();
+ }
+ ~Synch()
+ {
+ close();
+ if(MessengerInit::firstLaunch(true))
+ unlink();
+ }
+ void post(SynchDirection dir)
+ {
+ auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
+ int rc = sem_post(sem);
+ if(rc)
+ getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
+ }
+ void wait(SynchDirection dir)
+ {
+ auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
+ int rc = sem_wait(sem);
+ if(rc)
+ getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
+ }
+ void open(void)
+ {
+ sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
+ if(!sentSem_)
+ getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+ receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
+ if(!receiveReadySem_)
+ getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+ }
+ void close(void)
+ {
+ int rc = sem_close(sentSem_);
+ if(rc)
+ getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
+ strerror(errno));
+ rc = sem_close(receiveReadySem_);
+ if(rc)
+ getMessengerLogger()->error("Error closing semaphore %s: %s",
+ receiveReadySemName_.c_str(), strerror(errno));
+ }
+ void unlink(void)
+ {
+ int rc = sem_unlink(sentSemName_.c_str());
+ if(rc == -1 && errno != ENOENT)
+ getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
+ strerror(errno));
+ rc = sem_unlink(receiveReadySemName_.c_str());
+ if(rc == -1 && errno != ENOENT)
+ getMessengerLogger()->error("Error unlinking semaphore %s: %s",
+ receiveReadySemName_.c_str(), strerror(errno));
+ }
+ sem_t* sentSem_;
+ sem_t* receiveReadySem_;
+
+ private:
+ std::string sentSemName_;
+ std::string receiveReadySemName_;
+};
+struct SharedMemoryManager
+{
+ static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
+ {
+ *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
+ if(*shm_fd < 0)
+ {
+ getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+ return false;
+ }
+ int rc = ftruncate(*shm_fd, sizeof(char) * len);
+ if(rc)
+ {
+ getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
+ rc = close(*shm_fd);
+ if(rc)
+ getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+ rc = shm_unlink(name.c_str());
+ if(rc)
+ getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+ return false;
+ }
+ *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
+ if(!*buffer)
+ {
+ getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
+ rc = close(*shm_fd);
+ if(rc)
+ getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+ rc = shm_unlink(name.c_str());
+ if(rc)
+ getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+ }
+
+ return *buffer != nullptr;
+ }
+ static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
+ {
+ if (!*buffer || !shm_fd)
+ return true;
+
+ int rc = munmap(*buffer, len);
+ *buffer = nullptr;
+ if(rc)
+ getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
+ rc = close(shm_fd);
+ shm_fd = 0;
+ if(rc)
+ getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
+ rc = shm_unlink(name.c_str());
+ if(rc)
+ fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
+
+ return true;
+ }
+};
+
+template<typename Data>
+class MessengerBlockingQueue
+{
+ public:
+ explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
+ MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
+ size_t size() const
+ {
+ return queue_.size();
+ }
+ // deactivate and clear queue
+ void deactivate()
+ {
+ {
+ std::lock_guard<std::mutex> lk(mutex_);
+ active_ = false;
+ while(!queue_.empty())
+ queue_.pop();
+ }
+
+ // release all waiting threads
+ can_pop_.notify_all();
+ can_push_.notify_all();
+ }
+ void activate()
+ {
+ std::lock_guard<std::mutex> lk(mutex_);
+ active_ = true;
+ }
+ bool push(Data const& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ rc = push_(value);
+ }
+ if(rc)
+ can_pop_.notify_one();
+
+ return rc;
+ }
+ bool waitAndPush(Data& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ if(!active_)
+ return false;
+ // in case of spurious wakeup, loop until predicate in lambda
+ // is satisfied.
+ can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
+ rc = push_(value);
+ }
+ if(rc)
+ can_pop_.notify_one();
+
+ return rc;
+ }
+ bool pop(Data& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ rc = pop_(value);
+ }
+ if(rc)
+ can_push_.notify_one();
+
+ return rc;
+ }
+ bool waitAndPop(Data& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ if(!active_)
+ return false;
+ // in case of spurious wakeup, loop until predicate in lambda
+ // is satisfied.
+ can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
+ rc = pop_(value);
+ }
+ if(rc)
+ can_push_.notify_one();
+
+ return rc;
+ }
+
+ private:
+ bool push_(Data const& value)
+ {
+ if(queue_.size() == max_size_ || !active_)
+ return false;
+ queue_.push(value);
+
+ return true;
+ }
+ bool pop_(Data& value)
+ {
+ if(queue_.empty() || !active_)
+ return false;
+ value = queue_.front();
+ queue_.pop();
+
+ return true;
+ }
+ std::queue<Data> queue_;
+ mutable std::mutex mutex_;
+ std::condition_variable can_pop_;
+ std::condition_variable can_push_;
+ bool active_;
+ size_t max_size_;
+};
+struct BufferSrc
+{
+ BufferSrc(void) : BufferSrc("") {}
+ explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
+ {}
+ BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
+ : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
+ {}
+ bool fromDisk(void)
+ {
+ return !file_.empty() && framePtr_ == nullptr;
+ }
+ size_t index() const
+ {
+ return clientFrameId_;
+ }
+ std::string file_;
+ size_t clientFrameId_;
+ size_t frameId_;
+ uint8_t* framePtr_;
+};
+
+struct Messenger;
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
+
+struct Messenger
+{
+ explicit Messenger(MessengerInit init)
+ : running(true), initialized_(false), shutdown_(false), init_(init),
+ outboundSynch_(nullptr),
+ inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
+ uncompressed_fd_(0), compressed_fd_(0)
+ {}
+ virtual ~Messenger(void)
+ {
+ running = false;
+ sendQueue.deactivate();
+ receiveQueue.deactivate();
+
+ if (outboundSynch_) {
+ outboundSynch_->post(SYNCH_RECEIVE_READY);
+ outbound.join();
+ }
+
+ if (inboundSynch_) {
+ inboundSynch_->post(SYNCH_SENT);
+ inbound.join();
+ }
+
+ for(auto& p : processors_)
+ p.join();
+
+ delete outboundSynch_;
+ delete inboundSynch_;
+
+ deinitShm();
+ }
+ void startThreads(void) {
+ outboundSynch_ =
+ new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
+ outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
+
+ inboundSynch_ =
+ new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
+ inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
+
+ for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
+ processors_.push_back(std::thread(processorThread, this, init_.processor_));
+ }
+ size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
+ size_t compressedLength)
+ {
+ char fname[512];
+ if(!compressedPtr || !compressedLength)
+ return 0;
+ sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
+ auto fp = fopen(fname, "wb");
+ if(!fp)
+ return 0;
+ size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
+ if(written != compressedLength)
+ {
+ fclose(fp);
+ return 0;
+ }
+ fflush(fp);
+ fclose(fp);
+
+ return written;
+ }
+ bool initBuffers(void)
+ {
+ bool rc = true;
+ if(init_.uncompressedFrameSize_)
+ {
+ rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ &uncompressed_fd_, &uncompressed_buffer_);
+ }
+ if(init_.compressedFrameSize_)
+ {
+ rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
+ init_.compressedFrameSize_ * init_.numFrames_,
+ &compressed_fd_, &compressed_buffer_);
+ }
+
+ return rc;
+ }
+
+ bool deinitShm(void)
+ {
+ bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ uncompressed_fd_, &uncompressed_buffer_);
+ rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
+ init_.compressedFrameSize_ * init_.numFrames_,
+ compressed_fd_, &compressed_buffer_);
+
+ return rc;
+ }
+ template<typename... Args>
+ void send(const std::string& str, Args... args)
+ {
+ std::ostringstream oss;
+ oss << str;
+ int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
+ static_cast<void>(dummy);
+
+ sendQueue.push(oss.str());
+ }
+ static pid_t get_pid_by_process_name(const char* name)
+ {
+ char command[256];
+ snprintf(command, sizeof(command), "pgrep %s", name);
+ auto pgrep = popen(command, "r");
+ if(!pgrep)
+ return -1;
+ pid_t pid;
+ if(fscanf(pgrep, "%d", &pid) != 1)
+ pid = -1;
+ pclose(pgrep);
+
+ return pid;
+ }
+ static bool terminate_process(const char* name)
+ {
+ auto pid = get_pid_by_process_name(name);
+
+ return (pid != -1 && kill(pid, SIGTERM) != -1);
+ }
+ static bool kill_process(const char* name)
+ {
+ auto pid = get_pid_by_process_name(name);
+
+ return (pid != -1 && kill(pid, SIGKILL) != -1);
+ }
+ void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
+ uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
+ int device, bool is4K, uint32_t fps, uint32_t bandwidth,
+ const std::string server, uint32_t port,
+ const std::string license)
+ {
+
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+ if (async_result_.valid())
+ return;
+ if(MessengerInit::firstLaunch(true))
+ init_.unlink();
+ startThreads();
+ char _cmd[4096];
+ auto fullServer = server + ":" + std::to_string(port);
+ sprintf(_cmd,
+ "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
+ "-G %d -%s %d,%d -j %s -J %s",
+ GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
+ device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
+ license.c_str(), fullServer.c_str());
+ launch(_cmd, dir);
+ }
+ void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
+ {
+ // client fills queue with pending uncompressed buffers
+ init_.uncompressedFrameSize_ = uncompressedFrameSize;
+ init_.compressedFrameSize_ = compressedFrameSize;
+ init_.numFrames_ = numFrames;
+ initBuffers();
+ auto ptr = uncompressed_buffer_;
+ for(size_t i = 0; i < init_.numFrames_; ++i)
+ {
+ availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
+ ptr += init_.uncompressedFrameSize_;
+ }
+
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+ initialized_ = true;
+ clientInitializedCondition_.notify_all();
+ }
+ bool waitForClientInit(void)
+ {
+ if(initialized_)
+ return true;
+
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+ if(initialized_)
+ return true;
+ else if (shutdown_)
+ return false;
+ clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
+
+ return initialized_ && !shutdown_;
+ }
+ static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
+ {
+ return sizeof(uint16_t) * w * h * samplesPerPixel;
+ }
+ void reclaimCompressed(size_t frameId)
+ {
+ availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
+ }
+ void reclaimUncompressed(size_t frameId)
+ {
+ availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
+ }
+ uint8_t* getUncompressedFrame(size_t frameId)
+ {
+ assert(frameId < init_.numFrames_);
+ if(frameId >= init_.numFrames_)
+ return nullptr;
+
+ return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
+ }
+ uint8_t* getCompressedFrame(size_t frameId)
+ {
+ assert(frameId < init_.numFrames_);
+ if(frameId >= init_.numFrames_)
+ return nullptr;
+
+ return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
+ }
+ std::atomic_bool running;
+ bool initialized_;
+ bool shutdown_;
+ MessengerBlockingQueue<std::string> sendQueue;
+ MessengerBlockingQueue<std::string> receiveQueue;
+ MessengerBlockingQueue<BufferSrc> availableBuffers_;
+ MessengerInit init_;
+ std::string cmd_;
+ std::future<int> async_result_;
+ std::mutex shutdownMutex_;
+ std::condition_variable shutdownCondition_;
+
+ protected:
+ std::condition_variable clientInitializedCondition_;
+ private:
+ void launch(const std::string &cmd, const std::string &dir)
+ {
+ // Change the working directory
+ if(!dir.empty())
+ {
+ if(chdir(dir.c_str()) != 0)
+ {
+ getMessengerLogger()->error("Error: failed to change the working directory");
+ return;
+ }
+ }
+ // Execute the command using std::async and std::system
+ cmd_ = cmd;
+ async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
+ }
+ std::thread outbound;
+ Synch* outboundSynch_;
+
+ std::thread inbound;
+ Synch* inboundSynch_;
+
+ std::vector<std::thread> processors_;
+ char* uncompressed_buffer_;
+ char* compressed_buffer_;
+
+ grk_handle uncompressed_fd_;
+ grk_handle compressed_fd_;
+};
+
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
+{
+ grk_handle shm_fd = 0;
+ char* send_buffer = nullptr;
+
+ if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
+ return;
+ while(messenger->running)
+ {
+ synch->wait(SYNCH_RECEIVE_READY);
+ if(!messenger->running)
+ break;
+ std::string message;
+ if(!messenger->sendQueue.waitAndPop(message))
+ break;
+ if(!messenger->running)
+ break;
+ memcpy(send_buffer, message.c_str(), message.size() + 1);
+ synch->post(SYNCH_SENT);
+ }
+ SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
+}
+
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
+{
+ grk_handle shm_fd = 0;
+ char* receive_buffer = nullptr;
+
+ if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
+ return;
+ while(messenger->running)
+ {
+ synch->wait(SYNCH_SENT);
+ if(!messenger->running)
+ break;
+ auto message = std::string(receive_buffer);
+ synch->post(SYNCH_RECEIVE_READY);
+ messenger->receiveQueue.push(message);
+ }
+ SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
+}
+struct Msg
+{
+ explicit Msg(const std::string &msg) : ct_(0)
+ {
+ std::stringstream ss(msg);
+ while(ss.good())
+ {
+ std::string substr;
+ std::getline(ss, substr, ',');
+ cs_.push_back(substr);
+ }
+ }
+ std::string next()
+ {
+ if(ct_ == cs_.size())
+ {
+ getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
+ return "";
+ }
+ return cs_[ct_++];
+ }
+
+ uint32_t nextUint(void)
+ {
+ return (uint32_t)std::stoi(next());
+ }
+
+ std::vector<std::string> cs_;
+ size_t ct_;
+};
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
+{
+ while(messenger->running)
+ {
+ std::string message;
+ if(!messenger->receiveQueue.waitAndPop(message))
+ break;
+ if(!messenger->running)
+ break;
+ Msg msg(message);
+ auto tag = msg.next();
+ if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
+ {
+ auto width = msg.nextUint();
+ auto stride = msg.nextUint();
+ (void)stride;
+ auto height = msg.nextUint();
+ auto samplesPerPixel = msg.nextUint();
+ auto depth = msg.nextUint();
+ (void)depth;
+ messenger->init_.uncompressedFrameSize_ =
+ Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
+ auto compressedFrameSize = msg.nextUint();
+ auto numFrames = msg.nextUint();
+ messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
+ }
+ else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
+ {
+ messenger->reclaimUncompressed(msg.nextUint());
+ }
+ else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
+ {
+ messenger->reclaimCompressed(msg.nextUint());
+ }
+ processor(message);
+ }
+}
+
+template<typename F>
+struct ScheduledFrames
+{
+ void store(F& val)
+ {
+ std::unique_lock<std::mutex> lk(mapMutex_);
+ auto it = map_.find(val.index());
+ if (it == map_.end())
+ map_[val.index()] = val;
+ }
+ F retrieve(size_t index, bool &success)
+ {
+ std::unique_lock<std::mutex> lk(mapMutex_);
+ success = false;
+ auto it = map_.find(index);
+ if(it == map_.end())
+ return F();
+
+ success = true;
+ F val = it->second;
+ map_.erase(index);
+
+ return val;
+ }
+
+ private:
+ std::mutex mapMutex_;
+ std::map<size_t, F> map_;
+};
+
+template<typename F>
+struct ScheduledMessenger : public Messenger
+{
+ explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
+ framesScheduled_(0),
+ framesCompressed_(0)
+ {}
+ ~ScheduledMessenger(void) {
+ shutdown();
+ }
+ bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
+ size_t frameSize = init_.uncompressedFrameSize_;
+ assert(frameSize >= init_.uncompressedFrameSize_);
+ BufferSrc src;
+ if(!availableBuffers_.waitAndPop(src))
+ return false;
+ converter(src);
+ scheduledFrames_.store(proxy);
+ framesScheduled_++;
+ send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
+
+ return true;
+ }
+ void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
+ Msg msg(message);
+ msg.next();
+ auto clientFrameId = msg.nextUint();
+ auto compressedFrameId = msg.nextUint();
+ auto compressedFrameLength = msg.nextUint();
+ if (!needsRecompression) {
+ bool success = false;
+ auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
+ if (!success)
+ return;
+ processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
+ }
+ ++framesCompressed_;
+ send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
+ if (shutdown_ && framesCompressed_ == framesScheduled_)
+ shutdownCondition_.notify_all();
+ }
+ void shutdown(void){
+ try {
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+ if (!async_result_.valid())
+ return;
+ shutdown_ = true;
+ if (framesScheduled_) {
+ uint32_t scheduled = framesScheduled_;
+ send(GRK_MSGR_BATCH_FLUSH, scheduled);
+ shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
+ }
+ availableBuffers_.deactivate();
+ send(GRK_MSGR_BATCH_SHUTDOWN);
+ int result = async_result_.get();
+ if(result != 0)
+ getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
+ } catch (std::exception &ex) {
+ getMessengerLogger()->error("%s",ex.what());
+ }
+
+ }
+ F retrieve(size_t index, bool &success) {
+ return scheduledFrames_.retrieve(index, success);
+ }
+ void store(F& val) {
+ scheduledFrames_.store(val);
+ }
+
+private:
+ ScheduledFrames<F> scheduledFrames_;
+ std::atomic<uint32_t> framesScheduled_;
+ std::atomic<uint32_t> framesCompressed_;
+};
+
+} // namespace grk_plugin
+++ /dev/null
-/*
- Copyright (C) 2023 Grok Image Compression Inc.
-
- This file is part of DCP-o-matic.
-
- DCP-o-matic is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- DCP-o-matic is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
-
-*/
-
-#pragma once
-
-#include "config.h"
-#include "log.h"
-#include "dcpomatic_log.h"
-#include "writer.h"
-#include "grok_messenger.h"
-
-class Film;
-using dcp::Data;
-using namespace dcpomatic;
-
-static std::mutex launchMutex;
-
-namespace grk_plugin
-{
-
-struct GrokLogger : public MessengerLogger {
- explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
- {}
- virtual ~GrokLogger() = default;
- void info(const char* fmt, ...) override{
- va_list arg;
- va_start(arg, fmt);
- dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
- va_end(arg);
- }
- void warn(const char* fmt, ...) override{
- va_list arg;
- va_start(arg, fmt);
- dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
- va_end(arg);
- }
- void error(const char* fmt, ...) override{
- va_list arg;
- va_start(arg, fmt);
- dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
- va_end(arg);
- }
-};
-
-struct GrokInitializer {
- GrokInitializer(void) {
- setMessengerLogger(new GrokLogger("[GROK] "));
- }
- ~GrokInitializer() = default;
-};
-
-struct FrameProxy {
- FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
- {}
- FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
- {}
- int index() const {
- return index_;
- }
- Eyes eyes(void) const {
- return eyes_;
- }
- int index_;
- Eyes eyes_;
- DCPVideo vf;
-};
-
-struct DcpomaticContext {
- DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
- EventHistory &history, const std::string &location) :
- film_(film), writer_(writer),
- history_(history), location_(location),
- width_(0), height_(0)
- {}
- void setDimensions(uint32_t w, uint32_t h) {
- width_ = w;
- height_ = h;
- }
- std::shared_ptr<const Film> film_;
- Writer& writer_;
- EventHistory &history_;
- std::string location_;
- uint32_t width_;
- uint32_t height_;
-};
-
-class GrokContext {
-public:
- explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
- dcpomaticContext_(dcpomaticContext),
- messenger_(nullptr),
- launched_(false)
- {
- struct CompressedData : public dcp::Data {
- explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
- {}
- ~CompressedData(void){
- delete[] data_;
- }
- uint8_t const * data () const override {
- return data_;
- }
- uint8_t * data () override {
- return data_;
- }
- int size () const override {
- return dataLen_;
- }
- uint8_t *data_;
- int dataLen_;
- };
- if (Config::instance()->enable_gpu ()) {
- boost::filesystem::path folder(dcpomaticContext_.location_);
- boost::filesystem::path binaryPath = folder / "grk_compress";
- if (!boost::filesystem::exists(binaryPath)) {
- getMessengerLogger()->error("Invalid binary location %s",
- dcpomaticContext_.location_.c_str());
- return;
- }
- auto proc = [this](const std::string& str) {
- try {
- Msg msg(str);
- auto tag = msg.next();
- if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
- {
- auto clientFrameId = msg.nextUint();
- auto compressedFrameId = msg.nextUint();
- (void)compressedFrameId;
- auto compressedFrameLength = msg.nextUint();
- auto processor =
- [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
- {
- auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
- memcpy(compressedData->data_,compressed,compressedFrameLength );
- dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
- frame_done ();
- };
- int const minimum_size = 16384;
- bool needsRecompression = compressedFrameLength < minimum_size;
- messenger_->processCompressed(str, processor, needsRecompression);
- if (needsRecompression) {
- bool success = false;
- auto fp = messenger_->retrieve(clientFrameId, success);
- if (!success)
- return;
-
- auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
- dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
- frame_done ();
- }
- }
- } catch (std::exception &ex){
- getMessengerLogger()->error("%s",ex.what());
- }
- };
- auto clientInit =
- MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
- grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
- std::thread::hardware_concurrency());
- messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
- }
- }
- ~GrokContext(void) {
- shutdown();
- }
- bool launch(DCPVideo dcpv, int device){
- if (!messenger_ )
- return false;
- if (launched_)
- return true;
- std::unique_lock<std::mutex> lk_global(launchMutex);
- if (!messenger_)
- return false;
- if (launched_)
- return true;
- if (MessengerInit::firstLaunch(true)) {
- auto s = dcpv.get_size();
- dcpomaticContext_.setDimensions(s.width, s.height);
- auto config = Config::instance();
- messenger_->launchGrok(dcpomaticContext_.location_,
- dcpomaticContext_.width_,dcpomaticContext_.width_,
- dcpomaticContext_.height_,
- 3, 12, device,
- dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
- dcpomaticContext_.film_->video_frame_rate(),
- dcpomaticContext_.film_->j2k_bandwidth(),
- config->gpu_license_server(),
- config->gpu_license_port(),
- config->gpu_license());
- }
- launched_ = messenger_->waitForClientInit();
-
- return launched_;
- }
- bool scheduleCompress(const DCPVideo &vf){
- if (!messenger_)
- return false;
-
- auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
- auto cvt = [this, &fp](BufferSrc src){
- // xyz conversion
- fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
- };
- return messenger_->scheduleCompress(fp, cvt);
- }
- void shutdown(void){
- if (!messenger_)
- return;
-
- std::unique_lock<std::mutex> lk_global(launchMutex);
- if (!messenger_)
- return;
- if (launched_)
- messenger_->shutdown();
- delete messenger_;
- messenger_ = nullptr;
- }
- void frame_done () {
- dcpomaticContext_.history_.event ();
- }
-private:
- DcpomaticContext dcpomaticContext_;
- ScheduledMessenger<FrameProxy> *messenger_;
- bool launched_;
-};
-
-}
-
+++ /dev/null
-/*
- Copyright (C) 2023 Grok Image Compression Inc.
-
- This file is part of DCP-o-matic.
-
- DCP-o-matic is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- DCP-o-matic is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
-
-*/
-#pragma once
-
-#include <iostream>
-#include <string>
-#include <cstring>
-#include <atomic>
-#include <functional>
-#include <sstream>
-#include <future>
-#include <map>
-#include <thread>
-#include <mutex>
-#include <condition_variable>
-#include <queue>
-#include <cassert>
-#include <cstdarg>
-
-#ifdef _WIN32
-#include <windows.h>
-#include <direct.h>
-#include <tlhelp32.h>
-#pragma warning(disable : 4100)
-#else
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/mman.h>
-#include <semaphore.h>
-#include <signal.h>
-#endif
-
-namespace grk_plugin
-{
-static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
-static std::string grokSentSynch = "Global\\grok_sent";
-static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
-static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
-static std::string clientSentSynch = "Global\\client_sent";
-static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
-static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
-static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
-static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
-static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
-static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
-static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
- "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
-static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
-static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
- "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
-static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
-static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
-static const size_t messageBufferLen = 256;
-struct IMessengerLogger
-{
- virtual ~IMessengerLogger(void) = default;
- virtual void info(const char* fmt, ...) = 0;
- virtual void warn(const char* fmt, ...) = 0;
- virtual void error(const char* fmt, ...) = 0;
-
- protected:
- template<typename... Args>
- std::string log_message(char const* const format, Args&... args) noexcept
- {
- constexpr size_t message_size = 512;
- char message[message_size];
-
- std::snprintf(message, message_size, format, args...);
- return std::string(message);
- }
-};
-struct MessengerLogger : public IMessengerLogger
-{
- explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
- virtual ~MessengerLogger() = default;
- virtual void info(const char* fmt, ...) override
- {
- va_list args;
- std::string new_fmt = preamble_ + fmt + "\n";
- va_start(args, fmt);
- vfprintf(stdout, new_fmt.c_str(), args);
- va_end(args);
- }
- virtual void warn(const char* fmt, ...) override
- {
- va_list args;
- std::string new_fmt = preamble_ + fmt + "\n";
- va_start(args, fmt);
- vfprintf(stdout, new_fmt.c_str(), args);
- va_end(args);
- }
- virtual void error(const char* fmt, ...) override
- {
- va_list args;
- std::string new_fmt = preamble_ + fmt + "\n";
- va_start(args, fmt);
- vfprintf(stderr, new_fmt.c_str(), args);
- va_end(args);
- }
-
- protected:
- std::string preamble_;
-};
-
-static IMessengerLogger* sLogger = nullptr;
-#if defined(__GNUC__) || defined(__clang__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wunused-function"
-#endif
-static void setMessengerLogger(IMessengerLogger* logger)
-{
- delete sLogger;
- sLogger = logger;
-}
-#if defined(__GNUC__) || defined(__clang__)
-#pragma GCC diagnostic pop
-#endif
-static IMessengerLogger* getMessengerLogger(void)
-{
- return sLogger;
-}
-struct MessengerInit
-{
- MessengerInit(const std::string &outBuf, const std::string &outSent,
- const std::string &outReceiveReady, const std::string &inBuf,
- const std::string &inSent,
- const std::string &inReceiveReady,
- std::function<void(std::string)> processor,
- size_t numProcessingThreads)
- : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
- outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
- inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
- numProcessingThreads_(numProcessingThreads),
- uncompressedFrameSize_(0), compressedFrameSize_(0),
- numFrames_(0)
- {
- if(firstLaunch(true))
- unlink();
- }
- void unlink(void)
- {
-#ifndef _WIN32
- shm_unlink(grokToClientMessageBuf.c_str());
- shm_unlink(clientToGrokMessageBuf.c_str());
-#endif
- }
- static bool firstLaunch(bool isClient)
- {
- bool debugGrok = false;
- return debugGrok != isClient;
- }
- std::string outboundMessageBuf;
- std::string outboundSentSynch;
- std::string outboundReceiveReadySynch;
-
- std::string inboundMessageBuf;
- std::string inboundSentSynch;
- std::string inboundReceiveReadySynch;
-
- std::function<void(std::string)> processor_;
- size_t numProcessingThreads_;
-
- size_t uncompressedFrameSize_;
- size_t compressedFrameSize_;
- size_t numFrames_;
-};
-
-/*************************** Synchronization *******************************/
-enum SynchDirection
-{
- SYNCH_SENT,
- SYNCH_RECEIVE_READY
-};
-
-typedef int grk_handle;
-struct Synch
-{
- Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
- : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
- {
- // unlink semaphores in case of previous crash
- if(MessengerInit::firstLaunch(true))
- unlink();
- open();
- }
- ~Synch()
- {
- close();
- if(MessengerInit::firstLaunch(true))
- unlink();
- }
- void post(SynchDirection dir)
- {
- auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
- int rc = sem_post(sem);
- if(rc)
- getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
- }
- void wait(SynchDirection dir)
- {
- auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
- int rc = sem_wait(sem);
- if(rc)
- getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
- }
- void open(void)
- {
- sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
- if(!sentSem_)
- getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
- receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
- if(!receiveReadySem_)
- getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
- }
- void close(void)
- {
- int rc = sem_close(sentSem_);
- if(rc)
- getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
- strerror(errno));
- rc = sem_close(receiveReadySem_);
- if(rc)
- getMessengerLogger()->error("Error closing semaphore %s: %s",
- receiveReadySemName_.c_str(), strerror(errno));
- }
- void unlink(void)
- {
- int rc = sem_unlink(sentSemName_.c_str());
- if(rc == -1 && errno != ENOENT)
- getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
- strerror(errno));
- rc = sem_unlink(receiveReadySemName_.c_str());
- if(rc == -1 && errno != ENOENT)
- getMessengerLogger()->error("Error unlinking semaphore %s: %s",
- receiveReadySemName_.c_str(), strerror(errno));
- }
- sem_t* sentSem_;
- sem_t* receiveReadySem_;
-
- private:
- std::string sentSemName_;
- std::string receiveReadySemName_;
-};
-struct SharedMemoryManager
-{
- static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
- {
- *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
- if(*shm_fd < 0)
- {
- getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
- return false;
- }
- int rc = ftruncate(*shm_fd, sizeof(char) * len);
- if(rc)
- {
- getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
- rc = close(*shm_fd);
- if(rc)
- getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
- rc = shm_unlink(name.c_str());
- if(rc)
- getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
- return false;
- }
- *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
- if(!*buffer)
- {
- getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
- rc = close(*shm_fd);
- if(rc)
- getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
- rc = shm_unlink(name.c_str());
- if(rc)
- getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
- }
-
- return *buffer != nullptr;
- }
- static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
- {
- if (!*buffer || !shm_fd)
- return true;
-
- int rc = munmap(*buffer, len);
- *buffer = nullptr;
- if(rc)
- getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
- rc = close(shm_fd);
- shm_fd = 0;
- if(rc)
- getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
- rc = shm_unlink(name.c_str());
- if(rc)
- fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
-
- return true;
- }
-};
-
-template<typename Data>
-class MessengerBlockingQueue
-{
- public:
- explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
- MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
- size_t size() const
- {
- return queue_.size();
- }
- // deactivate and clear queue
- void deactivate()
- {
- {
- std::lock_guard<std::mutex> lk(mutex_);
- active_ = false;
- while(!queue_.empty())
- queue_.pop();
- }
-
- // release all waiting threads
- can_pop_.notify_all();
- can_push_.notify_all();
- }
- void activate()
- {
- std::lock_guard<std::mutex> lk(mutex_);
- active_ = true;
- }
- bool push(Data const& value)
- {
- bool rc;
- {
- std::unique_lock<std::mutex> lk(mutex_);
- rc = push_(value);
- }
- if(rc)
- can_pop_.notify_one();
-
- return rc;
- }
- bool waitAndPush(Data& value)
- {
- bool rc;
- {
- std::unique_lock<std::mutex> lk(mutex_);
- if(!active_)
- return false;
- // in case of spurious wakeup, loop until predicate in lambda
- // is satisfied.
- can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
- rc = push_(value);
- }
- if(rc)
- can_pop_.notify_one();
-
- return rc;
- }
- bool pop(Data& value)
- {
- bool rc;
- {
- std::unique_lock<std::mutex> lk(mutex_);
- rc = pop_(value);
- }
- if(rc)
- can_push_.notify_one();
-
- return rc;
- }
- bool waitAndPop(Data& value)
- {
- bool rc;
- {
- std::unique_lock<std::mutex> lk(mutex_);
- if(!active_)
- return false;
- // in case of spurious wakeup, loop until predicate in lambda
- // is satisfied.
- can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
- rc = pop_(value);
- }
- if(rc)
- can_push_.notify_one();
-
- return rc;
- }
-
- private:
- bool push_(Data const& value)
- {
- if(queue_.size() == max_size_ || !active_)
- return false;
- queue_.push(value);
-
- return true;
- }
- bool pop_(Data& value)
- {
- if(queue_.empty() || !active_)
- return false;
- value = queue_.front();
- queue_.pop();
-
- return true;
- }
- std::queue<Data> queue_;
- mutable std::mutex mutex_;
- std::condition_variable can_pop_;
- std::condition_variable can_push_;
- bool active_;
- size_t max_size_;
-};
-struct BufferSrc
-{
- BufferSrc(void) : BufferSrc("") {}
- explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
- {}
- BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
- : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
- {}
- bool fromDisk(void)
- {
- return !file_.empty() && framePtr_ == nullptr;
- }
- size_t index() const
- {
- return clientFrameId_;
- }
- std::string file_;
- size_t clientFrameId_;
- size_t frameId_;
- uint8_t* framePtr_;
-};
-
-struct Messenger;
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
-
-struct Messenger
-{
- explicit Messenger(MessengerInit init)
- : running(true), initialized_(false), shutdown_(false), init_(init),
- outboundSynch_(nullptr),
- inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
- uncompressed_fd_(0), compressed_fd_(0)
- {}
- virtual ~Messenger(void)
- {
- running = false;
- sendQueue.deactivate();
- receiveQueue.deactivate();
-
- if (outboundSynch_) {
- outboundSynch_->post(SYNCH_RECEIVE_READY);
- outbound.join();
- }
-
- if (inboundSynch_) {
- inboundSynch_->post(SYNCH_SENT);
- inbound.join();
- }
-
- for(auto& p : processors_)
- p.join();
-
- delete outboundSynch_;
- delete inboundSynch_;
-
- deinitShm();
- }
- void startThreads(void) {
- outboundSynch_ =
- new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
- outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
-
- inboundSynch_ =
- new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
- inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
-
- for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
- processors_.push_back(std::thread(processorThread, this, init_.processor_));
- }
- size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
- size_t compressedLength)
- {
- char fname[512];
- if(!compressedPtr || !compressedLength)
- return 0;
- sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
- auto fp = fopen(fname, "wb");
- if(!fp)
- return 0;
- size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
- if(written != compressedLength)
- {
- fclose(fp);
- return 0;
- }
- fflush(fp);
- fclose(fp);
-
- return written;
- }
- bool initBuffers(void)
- {
- bool rc = true;
- if(init_.uncompressedFrameSize_)
- {
- rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- &uncompressed_fd_, &uncompressed_buffer_);
- }
- if(init_.compressedFrameSize_)
- {
- rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- &compressed_fd_, &compressed_buffer_);
- }
-
- return rc;
- }
-
- bool deinitShm(void)
- {
- bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- uncompressed_fd_, &uncompressed_buffer_);
- rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- compressed_fd_, &compressed_buffer_);
-
- return rc;
- }
- template<typename... Args>
- void send(const std::string& str, Args... args)
- {
- std::ostringstream oss;
- oss << str;
- int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
- static_cast<void>(dummy);
-
- sendQueue.push(oss.str());
- }
- static pid_t get_pid_by_process_name(const char* name)
- {
- char command[256];
- snprintf(command, sizeof(command), "pgrep %s", name);
- auto pgrep = popen(command, "r");
- if(!pgrep)
- return -1;
- pid_t pid;
- if(fscanf(pgrep, "%d", &pid) != 1)
- pid = -1;
- pclose(pgrep);
-
- return pid;
- }
- static bool terminate_process(const char* name)
- {
- auto pid = get_pid_by_process_name(name);
-
- return (pid != -1 && kill(pid, SIGTERM) != -1);
- }
- static bool kill_process(const char* name)
- {
- auto pid = get_pid_by_process_name(name);
-
- return (pid != -1 && kill(pid, SIGKILL) != -1);
- }
- void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
- uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
- int device, bool is4K, uint32_t fps, uint32_t bandwidth,
- const std::string server, uint32_t port,
- const std::string license)
- {
-
- std::unique_lock<std::mutex> lk(shutdownMutex_);
- if (async_result_.valid())
- return;
- if(MessengerInit::firstLaunch(true))
- init_.unlink();
- startThreads();
- char _cmd[4096];
- auto fullServer = server + ":" + std::to_string(port);
- sprintf(_cmd,
- "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
- "-G %d -%s %d,%d -j %s -J %s",
- GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
- device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
- license.c_str(), fullServer.c_str());
- launch(_cmd, dir);
- }
- void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
- {
- // client fills queue with pending uncompressed buffers
- init_.uncompressedFrameSize_ = uncompressedFrameSize;
- init_.compressedFrameSize_ = compressedFrameSize;
- init_.numFrames_ = numFrames;
- initBuffers();
- auto ptr = uncompressed_buffer_;
- for(size_t i = 0; i < init_.numFrames_; ++i)
- {
- availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
- ptr += init_.uncompressedFrameSize_;
- }
-
- std::unique_lock<std::mutex> lk(shutdownMutex_);
- initialized_ = true;
- clientInitializedCondition_.notify_all();
- }
- bool waitForClientInit(void)
- {
- if(initialized_)
- return true;
-
- std::unique_lock<std::mutex> lk(shutdownMutex_);
- if(initialized_)
- return true;
- else if (shutdown_)
- return false;
- clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
-
- return initialized_ && !shutdown_;
- }
- static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
- {
- return sizeof(uint16_t) * w * h * samplesPerPixel;
- }
- void reclaimCompressed(size_t frameId)
- {
- availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
- }
- void reclaimUncompressed(size_t frameId)
- {
- availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
- }
- uint8_t* getUncompressedFrame(size_t frameId)
- {
- assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
- return nullptr;
-
- return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
- }
- uint8_t* getCompressedFrame(size_t frameId)
- {
- assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
- return nullptr;
-
- return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
- }
- std::atomic_bool running;
- bool initialized_;
- bool shutdown_;
- MessengerBlockingQueue<std::string> sendQueue;
- MessengerBlockingQueue<std::string> receiveQueue;
- MessengerBlockingQueue<BufferSrc> availableBuffers_;
- MessengerInit init_;
- std::string cmd_;
- std::future<int> async_result_;
- std::mutex shutdownMutex_;
- std::condition_variable shutdownCondition_;
-
- protected:
- std::condition_variable clientInitializedCondition_;
- private:
- void launch(const std::string &cmd, const std::string &dir)
- {
- // Change the working directory
- if(!dir.empty())
- {
- if(chdir(dir.c_str()) != 0)
- {
- getMessengerLogger()->error("Error: failed to change the working directory");
- return;
- }
- }
- // Execute the command using std::async and std::system
- cmd_ = cmd;
- async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
- }
- std::thread outbound;
- Synch* outboundSynch_;
-
- std::thread inbound;
- Synch* inboundSynch_;
-
- std::vector<std::thread> processors_;
- char* uncompressed_buffer_;
- char* compressed_buffer_;
-
- grk_handle uncompressed_fd_;
- grk_handle compressed_fd_;
-};
-
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
-{
- grk_handle shm_fd = 0;
- char* send_buffer = nullptr;
-
- if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
- return;
- while(messenger->running)
- {
- synch->wait(SYNCH_RECEIVE_READY);
- if(!messenger->running)
- break;
- std::string message;
- if(!messenger->sendQueue.waitAndPop(message))
- break;
- if(!messenger->running)
- break;
- memcpy(send_buffer, message.c_str(), message.size() + 1);
- synch->post(SYNCH_SENT);
- }
- SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
-}
-
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
-{
- grk_handle shm_fd = 0;
- char* receive_buffer = nullptr;
-
- if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
- return;
- while(messenger->running)
- {
- synch->wait(SYNCH_SENT);
- if(!messenger->running)
- break;
- auto message = std::string(receive_buffer);
- synch->post(SYNCH_RECEIVE_READY);
- messenger->receiveQueue.push(message);
- }
- SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
-}
-struct Msg
-{
- explicit Msg(const std::string &msg) : ct_(0)
- {
- std::stringstream ss(msg);
- while(ss.good())
- {
- std::string substr;
- std::getline(ss, substr, ',');
- cs_.push_back(substr);
- }
- }
- std::string next()
- {
- if(ct_ == cs_.size())
- {
- getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
- return "";
- }
- return cs_[ct_++];
- }
-
- uint32_t nextUint(void)
- {
- return (uint32_t)std::stoi(next());
- }
-
- std::vector<std::string> cs_;
- size_t ct_;
-};
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
-{
- while(messenger->running)
- {
- std::string message;
- if(!messenger->receiveQueue.waitAndPop(message))
- break;
- if(!messenger->running)
- break;
- Msg msg(message);
- auto tag = msg.next();
- if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
- {
- auto width = msg.nextUint();
- auto stride = msg.nextUint();
- (void)stride;
- auto height = msg.nextUint();
- auto samplesPerPixel = msg.nextUint();
- auto depth = msg.nextUint();
- (void)depth;
- messenger->init_.uncompressedFrameSize_ =
- Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
- auto compressedFrameSize = msg.nextUint();
- auto numFrames = msg.nextUint();
- messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
- }
- else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
- {
- messenger->reclaimUncompressed(msg.nextUint());
- }
- else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
- {
- messenger->reclaimCompressed(msg.nextUint());
- }
- processor(message);
- }
-}
-
-template<typename F>
-struct ScheduledFrames
-{
- void store(F& val)
- {
- std::unique_lock<std::mutex> lk(mapMutex_);
- auto it = map_.find(val.index());
- if (it == map_.end())
- map_[val.index()] = val;
- }
- F retrieve(size_t index, bool &success)
- {
- std::unique_lock<std::mutex> lk(mapMutex_);
- success = false;
- auto it = map_.find(index);
- if(it == map_.end())
- return F();
-
- success = true;
- F val = it->second;
- map_.erase(index);
-
- return val;
- }
-
- private:
- std::mutex mapMutex_;
- std::map<size_t, F> map_;
-};
-
-template<typename F>
-struct ScheduledMessenger : public Messenger
-{
- explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
- framesScheduled_(0),
- framesCompressed_(0)
- {}
- ~ScheduledMessenger(void) {
- shutdown();
- }
- bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
- size_t frameSize = init_.uncompressedFrameSize_;
- assert(frameSize >= init_.uncompressedFrameSize_);
- BufferSrc src;
- if(!availableBuffers_.waitAndPop(src))
- return false;
- converter(src);
- scheduledFrames_.store(proxy);
- framesScheduled_++;
- send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
-
- return true;
- }
- void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
- Msg msg(message);
- msg.next();
- auto clientFrameId = msg.nextUint();
- auto compressedFrameId = msg.nextUint();
- auto compressedFrameLength = msg.nextUint();
- if (!needsRecompression) {
- bool success = false;
- auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
- if (!success)
- return;
- processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
- }
- ++framesCompressed_;
- send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
- if (shutdown_ && framesCompressed_ == framesScheduled_)
- shutdownCondition_.notify_all();
- }
- void shutdown(void){
- try {
- std::unique_lock<std::mutex> lk(shutdownMutex_);
- if (!async_result_.valid())
- return;
- shutdown_ = true;
- if (framesScheduled_) {
- uint32_t scheduled = framesScheduled_;
- send(GRK_MSGR_BATCH_FLUSH, scheduled);
- shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
- }
- availableBuffers_.deactivate();
- send(GRK_MSGR_BATCH_SHUTDOWN);
- int result = async_result_.get();
- if(result != 0)
- getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
- } catch (std::exception &ex) {
- getMessengerLogger()->error("%s",ex.what());
- }
-
- }
- F retrieve(size_t index, bool &success) {
- return scheduledFrames_.retrieve(index, success);
- }
- void store(F& val) {
- scheduledFrames_.store(val);
- }
-
-private:
- ScheduledFrames<F> scheduledFrames_;
- std::atomic<uint32_t> framesScheduled_;
- std::atomic<uint32_t> framesCompressed_;
-};
-
-} // namespace grk_plugin
* @brief J2KEncoder class.
*/
+#include "grok/context.h"
#include "cross.h"
#include "dcp_video.h"
#include <list>
#include <stdint.h>
-#include "grok_context.h"
-
class EncodeServerDescription;
class Film;
class Job;