/*
Copyright (C) 2023 Grok Image Compression Inc.
This file is part of DCP-o-matic.
DCP-o-matic is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
DCP-o-matic is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with DCP-o-matic. If not, see .
*/
#pragma once
#include "../config.h"
#include "../dcp_video.h"
#include "../log.h"
#include "../dcpomatic_log.h"
#include "../writer.h"
#include "messenger.h"
class Film;
using dcp::Data;
using namespace dcpomatic;
static std::mutex launchMutex;
namespace grk_plugin
{
struct GrokLogger : public MessengerLogger {
explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
{}
virtual ~GrokLogger() = default;
void info(const char* fmt, ...) override{
va_list arg;
va_start(arg, fmt);
dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
va_end(arg);
}
void warn(const char* fmt, ...) override{
va_list arg;
va_start(arg, fmt);
dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
va_end(arg);
}
void error(const char* fmt, ...) override{
va_list arg;
va_start(arg, fmt);
dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
va_end(arg);
}
};
struct GrokInitializer {
GrokInitializer(void) {
setMessengerLogger(new GrokLogger("[GROK] "));
}
~GrokInitializer() = default;
};
struct FrameProxy {
FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
{}
FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
{}
int index() const {
return index_;
}
Eyes eyes(void) const {
return eyes_;
}
int index_;
Eyes eyes_;
DCPVideo vf;
};
struct DcpomaticContext {
DcpomaticContext(std::shared_ptr film, Writer& writer,
EventHistory &history, const std::string &location) :
film_(film), writer_(writer),
history_(history), location_(location),
width_(0), height_(0)
{}
void setDimensions(uint32_t w, uint32_t h) {
width_ = w;
height_ = h;
}
std::shared_ptr film_;
Writer& writer_;
EventHistory &history_;
std::string location_;
uint32_t width_;
uint32_t height_;
};
class GrokContext {
public:
explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
dcpomaticContext_(dcpomaticContext),
messenger_(nullptr),
launched_(false)
{
struct CompressedData : public dcp::Data {
explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
{}
~CompressedData(void){
delete[] data_;
}
uint8_t const * data () const override {
return data_;
}
uint8_t * data () override {
return data_;
}
int size () const override {
return dataLen_;
}
uint8_t *data_;
int dataLen_;
};
if (Config::instance()->enable_gpu ()) {
boost::filesystem::path folder(dcpomaticContext_.location_);
boost::filesystem::path binaryPath = folder / "grk_compress";
if (!boost::filesystem::exists(binaryPath)) {
getMessengerLogger()->error("Invalid binary location %s",
dcpomaticContext_.location_.c_str());
return;
}
auto proc = [this](const std::string& str) {
try {
Msg msg(str);
auto tag = msg.next();
if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
{
auto clientFrameId = msg.nextUint();
auto compressedFrameId = msg.nextUint();
(void)compressedFrameId;
auto compressedFrameLength = msg.nextUint();
auto processor =
[this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
{
auto compressedData = std::make_shared(compressedFrameLength);
memcpy(compressedData->data_,compressed,compressedFrameLength );
dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
frame_done ();
};
int const minimum_size = 16384;
bool needsRecompression = compressedFrameLength < minimum_size;
messenger_->processCompressed(str, processor, needsRecompression);
if (needsRecompression) {
bool success = false;
auto fp = messenger_->retrieve(clientFrameId, success);
if (!success)
return;
auto encoded = std::make_shared(fp.vf.encode_locally());
dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
frame_done ();
}
}
} catch (std::exception &ex){
getMessengerLogger()->error("%s",ex.what());
}
};
auto clientInit =
MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
std::thread::hardware_concurrency());
messenger_ = new ScheduledMessenger(clientInit);
}
}
~GrokContext(void) {
shutdown();
}
bool launch(DCPVideo dcpv, int device){
if (!messenger_ )
return false;
if (launched_)
return true;
std::unique_lock lk_global(launchMutex);
if (!messenger_)
return false;
if (launched_)
return true;
if (MessengerInit::firstLaunch(true)) {
auto s = dcpv.get_size();
dcpomaticContext_.setDimensions(s.width, s.height);
auto config = Config::instance();
messenger_->launchGrok(dcpomaticContext_.location_,
dcpomaticContext_.width_,dcpomaticContext_.width_,
dcpomaticContext_.height_,
3, 12, device,
dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
dcpomaticContext_.film_->video_frame_rate(),
dcpomaticContext_.film_->j2k_bandwidth(),
config->gpu_license_server(),
config->gpu_license_port(),
config->gpu_license());
}
launched_ = messenger_->waitForClientInit();
return launched_;
}
bool scheduleCompress(const DCPVideo &vf){
if (!messenger_)
return false;
auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
auto cvt = [this, &fp](BufferSrc src){
// xyz conversion
fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
};
return messenger_->scheduleCompress(fp, cvt);
}
void shutdown(void){
if (!messenger_)
return;
std::unique_lock lk_global(launchMutex);
if (!messenger_)
return;
if (launched_)
messenger_->shutdown();
delete messenger_;
messenger_ = nullptr;
}
void frame_done () {
dcpomaticContext_.history_.event ();
}
private:
DcpomaticContext dcpomaticContext_;
ScheduledMessenger *messenger_;
bool launched_;
};
}