_enable_gpu = f.optional_bool_child("EnableGpu").get_value_or(false);
_gpu_binary_location = f.string_child("GpuBinaryLocation");
_selected_gpu = f.number_child<int>("SelectedGpu");
- _gpu_license_server = f.string_child ("GpuLicenseServer");
- _gpu_license_port = f.number_child<int> ("GpuLicensePort");
+ _gpu_license_server = f.string_child("GpuLicenseServer");
+ _gpu_license_port = f.number_child<int>("GpuLicensePort");
_gpu_license = f.string_child("GpuLicense");
_export.read(f.optional_node_child("Export"));
/* [XML] ISDCFNamePartLength Maximum length of the "name" part of an ISDCF name, which should be 14 according to the standard */
root->add_child("ISDCFNamePartLength")->add_child_text(raw_convert<string>(_isdcf_name_part_length));
- root->add_child("GpuBinaryLocation")->add_child_text (_gpu_binary_location);
- root->add_child("EnableGpu")->add_child_text ((_enable_gpu ? "1" : "0"));
- root->add_child("SelectedGpu")->add_child_text (raw_convert<string> (_selected_gpu));
- root->add_child("GpuLicenseServer")->add_child_text (_gpu_license_server);
- root->add_child("GpuLicensePort")->add_child_text (raw_convert<string> (_gpu_license_port));
- root->add_child("GpuLicense")->add_child_text (_gpu_license);
+ root->add_child("GpuBinaryLocation")->add_child_text(_gpu_binary_location);
+ root->add_child("EnableGpu")->add_child_text((_enable_gpu ? "1" : "0"));
+ root->add_child("SelectedGpu")->add_child_text(raw_convert<string>(_selected_gpu));
+ root->add_child("GpuLicenseServer")->add_child_text(_gpu_license_server);
+ root->add_child("GpuLicensePort")->add_child_text(raw_convert<string>(_gpu_license_port));
+ root->add_child("GpuLicense")->add_child_text(_gpu_license);
_export.write(root->add_child("Export"));
return _allow_smpte_bv20;
}
- std::string gpu_binary_location () const {
+ std::string gpu_binary_location() const
+ {
return _gpu_binary_location;
}
- bool enable_gpu () const {
+ bool enable_gpu() const
+ {
return _enable_gpu;
}
- int selected_gpu () const {
+ int selected_gpu() const
+ {
return _selected_gpu;
}
- std::string gpu_license_server () const {
+
+ std::string gpu_license_server() const
+ {
return _gpu_license_server;
}
- int gpu_license_port () const {
+ int gpu_license_port() const
+ {
return _gpu_license_port;
}
- std::string gpu_license () const {
+
+ std::string gpu_license() const
+ {
return _gpu_license;
}
void set_allow_smpte_bv20(bool allow) {
maybe_set(_allow_smpte_bv20, allow, ALLOW_SMPTE_BV20);
}
- void set_gpu_binary_location (std::string location) {
- maybe_set (_gpu_binary_location, location);
+
+ void set_gpu_binary_location(std::string location)
+ {
+ maybe_set(_gpu_binary_location, location);
}
- void set_enable_gpu (bool enable) {
- maybe_set (_enable_gpu, enable);
+
+ void set_enable_gpu(bool enable)
+ {
+ maybe_set(_enable_gpu, enable);
}
- void set_selected_gpu (int selected) {
- maybe_set (_selected_gpu, selected);
+
+ void set_selected_gpu(int selected)
+ {
+ maybe_set(_selected_gpu, selected);
}
- void set_gpu_license_server (std::string s) {
- maybe_set (_gpu_license_server, s);
+
+ void set_gpu_license_server(std::string s)
+ {
+ maybe_set(_gpu_license_server, s);
}
- void set_gpu_license_port (int p) {
- maybe_set (_gpu_license_port, p);
+
+ void set_gpu_license_port(int p)
+ {
+ maybe_set(_gpu_license_port, p);
}
- void set_gpu_license (std::string p) {
- maybe_set (_gpu_license, p);
+
+ void set_gpu_license(std::string p)
+ {
+ maybe_set(_gpu_license, p);
}
void set_isdcf_name_part_length(int length) {
maybe_set(_isdcf_name_part_length, length, ISDCF_NAME_PART_LENGTH);
_writer.finish(_film->dir(_film->dcp_name()));
}
-void DCPEncoder::pause(void) {
+void
+DCPEncoder::pause(void)
+{
_j2k_encoder.pause();
}
-void DCPEncoder::resume(void) {
+void
+DCPEncoder::resume(void)
+{
_j2k_encoder.resume();
}
void
bool finishing () const override {
return _finishing;
}
+
void pause(void) override;
void resume(void) override;
}
dcp::Size
-DCPVideo::get_size(void) {
- auto image = _frame->image (bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
+DCPVideo::get_size(void)
+{
+ auto image = _frame->image(bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
return image->size();
}
void
-DCPVideo::convert_to_xyz (uint16_t *dst) {
+DCPVideo::convert_to_xyz(uint16_t* dst)
+{
- auto image = _frame->image (bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
+ auto image = _frame->image(bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
if (_frame->colour_conversion()) {
- dcp::rgb_to_xyz (
- image->data()[0],
- dst,
- image->size(),
- image->stride()[0],
- _frame->colour_conversion().get()
- );
+ dcp::rgb_to_xyz(
+ image->data()[0],
+ dst,
+ image->size(),
+ image->stride()[0],
+ _frame->colour_conversion().get());
}
}
class DCPVideo
{
public:
- DCPVideo (void) : DCPVideo(nullptr,0,0,0,Resolution::TWO_K){}
+ DCPVideo(void)
+ : DCPVideo(nullptr, 0, 0, 0, Resolution::TWO_K)
+ {
+ }
DCPVideo (std::shared_ptr<const PlayerVideo>, int index, int dcp_fps, int bandwidth, Resolution r);
DCPVideo (std::shared_ptr<const PlayerVideo>, cxml::ConstNodePtr);
static std::shared_ptr<dcp::OpenJPEGImage> convert_to_xyz (std::shared_ptr<const PlayerVideo> frame, dcp::NoteHandler note);
- void convert_to_xyz (uint16_t *dst);
+ void convert_to_xyz(uint16_t* dst);
dcp::Size get_size(void);
+
private:
void add_metadata (xmlpp::Element *) const;
/** @return the number of frames that are done */
virtual Frame frames_done () const = 0;
- virtual bool finishing () const = 0;
+ virtual bool finishing() const = 0;
+
virtual void pause(void) {}
+
virtual void resume(void) {}
protected:
#pragma once
#include "config.h"
-#include "log.h"
#include "dcpomatic_log.h"
-#include "writer.h"
#include "grok_messenger.h"
+#include "log.h"
+#include "writer.h"
class Film;
using dcp::Data;
static std::mutex launchMutex;
-namespace grk_plugin
+namespace grk_plugin {
+
+struct GrokLogger : public MessengerLogger
{
+ explicit GrokLogger(const std::string& preamble)
+ : MessengerLogger(preamble)
+ {
+ }
-struct GrokLogger : public MessengerLogger {
- explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
- {}
virtual ~GrokLogger() = default;
- void info(const char* fmt, ...) override{
+
+ void info(const char* fmt, ...) override
+ {
va_list arg;
va_start(arg, fmt);
- dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg), LogEntry::TYPE_GENERAL);
va_end(arg);
}
- void warn(const char* fmt, ...) override{
+
+ void warn(const char* fmt, ...) override
+ {
va_list arg;
va_start(arg, fmt);
- dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg), LogEntry::TYPE_WARNING);
va_end(arg);
}
- void error(const char* fmt, ...) override{
+
+ void error(const char* fmt, ...) override
+ {
va_list arg;
va_start(arg, fmt);
- dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg), LogEntry::TYPE_ERROR);
va_end(arg);
}
};
-struct GrokInitializer {
- GrokInitializer(void) {
+struct GrokInitializer
+{
+ GrokInitializer(void)
+ {
setMessengerLogger(new GrokLogger("[GROK] "));
}
- ~GrokInitializer() = default;
+
+ ~GrokInitializer() = default;
};
-struct FrameProxy {
- FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
- {}
- FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
- {}
- int index() const {
+struct FrameProxy
+{
+ FrameProxy(void)
+ : FrameProxy(0, Eyes::LEFT, DCPVideo())
+ {
+ }
+
+ FrameProxy(int index, Eyes eyes, DCPVideo dcpv)
+ : index_(index)
+ , eyes_(eyes)
+ , vf(dcpv)
+ {
+ }
+
+ int index() const
+ {
return index_;
}
- Eyes eyes(void) const {
+
+ Eyes eyes(void) const
+ {
return eyes_;
}
+
int index_;
Eyes eyes_;
DCPVideo vf;
};
-struct DcpomaticContext {
+struct DcpomaticContext
+{
DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
- EventHistory &history, const std::string &location) :
- film_(film), writer_(writer),
- history_(history), location_(location),
- width_(0), height_(0)
- {}
- void setDimensions(uint32_t w, uint32_t h) {
+ EventHistory& history, const std::string& location)
+ : film_(film)
+ , writer_(writer)
+ , history_(history)
+ , location_(location)
+ , width_(0)
+ , height_(0)
+ {
+ }
+
+ void setDimensions(uint32_t w, uint32_t h)
+ {
width_ = w;
height_ = h;
}
+
std::shared_ptr<const Film> film_;
Writer& writer_;
- EventHistory &history_;
+ EventHistory& history_;
std::string location_;
uint32_t width_;
uint32_t height_;
};
-class GrokContext {
+class GrokContext
+{
public:
- explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
- dcpomaticContext_(dcpomaticContext),
- messenger_(nullptr),
- launched_(false)
- {
- struct CompressedData : public dcp::Data {
- explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
- {}
- ~CompressedData(void){
+ explicit GrokContext(const DcpomaticContext& dcpomaticContext)
+ : dcpomaticContext_(dcpomaticContext)
+ , messenger_(nullptr)
+ , launched_(false)
+ {
+ struct CompressedData : public dcp::Data
+ {
+ explicit CompressedData(int dataLen)
+ : data_(new uint8_t[dataLen])
+ , dataLen_(dataLen)
+ {
+ }
+
+ ~CompressedData(void)
+ {
delete[] data_;
}
- uint8_t const * data () const override {
+
+ uint8_t const* data() const override
+ {
return data_;
}
- uint8_t * data () override {
+
+ uint8_t* data() override
+ {
return data_;
}
- int size () const override {
+
+ int size() const override
+ {
return dataLen_;
}
- uint8_t *data_;
+
+ uint8_t* data_;
int dataLen_;
};
- if (Config::instance()->enable_gpu ()) {
- boost::filesystem::path folder(dcpomaticContext_.location_);
- boost::filesystem::path binaryPath = folder / "grk_compress";
- if (!boost::filesystem::exists(binaryPath)) {
- getMessengerLogger()->error("Invalid binary location %s",
- dcpomaticContext_.location_.c_str());
+
+ if (Config::instance()->enable_gpu()) {
+ boost::filesystem::path folder(dcpomaticContext_.location_);
+ boost::filesystem::path binaryPath = folder / "grk_compress";
+ if (!boost::filesystem::exists(binaryPath)) {
+ getMessengerLogger()->error("Invalid binary location %s",
+ dcpomaticContext_.location_.c_str());
return;
- }
+ }
auto proc = [this](const std::string& str) {
try {
Msg msg(str);
auto tag = msg.next();
- if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
- {
+ if (tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED) {
auto clientFrameId = msg.nextUint();
auto compressedFrameId = msg.nextUint();
(void)compressedFrameId;
auto compressedFrameLength = msg.nextUint();
- auto processor =
- [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
- {
- auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
- memcpy(compressedData->data_,compressed,compressedFrameLength );
- dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
- frame_done ();
- };
+ auto processor =
+ [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength) {
+ auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
+ memcpy(compressedData->data_, compressed, compressedFrameLength);
+ dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
+ frame_done();
+ };
int const minimum_size = 16384;
bool needsRecompression = compressedFrameLength < minimum_size;
messenger_->processCompressed(str, processor, needsRecompression);
auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
- frame_done ();
+ frame_done();
}
}
- } catch (std::exception &ex){
- getMessengerLogger()->error("%s",ex.what());
+ } catch (std::exception& ex) {
+ getMessengerLogger()->error("%s", ex.what());
}
};
auto clientInit =
- MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
- grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
- std::thread::hardware_concurrency());
+ MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
+ grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
+ std::thread::hardware_concurrency());
messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
}
}
- ~GrokContext(void) {
+
+ ~GrokContext(void)
+ {
shutdown();
}
- bool launch(DCPVideo dcpv, int device){
- if (!messenger_ )
+
+ bool launch(DCPVideo dcpv, int device)
+ {
+ if (!messenger_)
return false;
if (launched_)
return true;
dcpomaticContext_.setDimensions(s.width, s.height);
auto config = Config::instance();
messenger_->launchGrok(dcpomaticContext_.location_,
- dcpomaticContext_.width_,dcpomaticContext_.width_,
- dcpomaticContext_.height_,
- 3, 12, device,
- dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
- dcpomaticContext_.film_->video_frame_rate(),
- dcpomaticContext_.film_->j2k_bandwidth(),
- config->gpu_license_server(),
- config->gpu_license_port(),
- config->gpu_license());
+ dcpomaticContext_.width_, dcpomaticContext_.width_,
+ dcpomaticContext_.height_,
+ 3, 12, device,
+ dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
+ dcpomaticContext_.film_->video_frame_rate(),
+ dcpomaticContext_.film_->j2k_bandwidth(),
+ config->gpu_license_server(),
+ config->gpu_license_port(),
+ config->gpu_license());
}
- launched_ = messenger_->waitForClientInit();
+ launched_ = messenger_->waitForClientInit();
return launched_;
}
- bool scheduleCompress(const DCPVideo &vf){
+
+ bool scheduleCompress(const DCPVideo& vf)
+ {
if (!messenger_)
return false;
- auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
- auto cvt = [this, &fp](BufferSrc src){
+ auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
+ auto cvt = [this, &fp](BufferSrc src) {
// xyz conversion
fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
};
return messenger_->scheduleCompress(fp, cvt);
}
- void shutdown(void){
+
+ void shutdown(void)
+ {
if (!messenger_)
return;
delete messenger_;
messenger_ = nullptr;
}
- void frame_done () {
- dcpomaticContext_.history_.event ();
+
+ void frame_done()
+ {
+ dcpomaticContext_.history_.event();
}
+
private:
DcpomaticContext dcpomaticContext_;
- ScheduledMessenger<FrameProxy> *messenger_;
+ ScheduledMessenger<FrameProxy>* messenger_;
bool launched_;
};
-}
-
+} // namespace grk_plugin
*/
#pragma once
-#include <iostream>
-#include <string>
-#include <cstring>
#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <cstdarg>
+#include <cstring>
#include <functional>
-#include <sstream>
#include <future>
+#include <iostream>
#include <map>
-#include <thread>
#include <mutex>
-#include <condition_variable>
#include <queue>
-#include <cassert>
-#include <cstdarg>
+#include <sstream>
+#include <string>
+#include <thread>
#ifdef _WIN32
-#include <windows.h>
#include <direct.h>
#include <tlhelp32.h>
+#include <windows.h>
#pragma warning(disable : 4100)
#else
-#include <unistd.h>
#include <fcntl.h>
-#include <sys/mman.h>
#include <semaphore.h>
#include <signal.h>
+#include <sys/mman.h>
+#include <unistd.h>
#endif
-namespace grk_plugin
-{
+namespace grk_plugin {
static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
static std::string grokSentSynch = "Global\\grok_sent";
static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
- "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
+ "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
- "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
+ "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
static const size_t messageBufferLen = 256;
+
struct IMessengerLogger
{
virtual ~IMessengerLogger(void) = default;
virtual void warn(const char* fmt, ...) = 0;
virtual void error(const char* fmt, ...) = 0;
- protected:
- template<typename... Args>
+protected:
+ template <typename... Args>
std::string log_message(char const* const format, Args&... args) noexcept
{
constexpr size_t message_size = 512;
return std::string(message);
}
};
+
struct MessengerLogger : public IMessengerLogger
{
- explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
+ explicit MessengerLogger(const std::string& preamble)
+ : preamble_(preamble)
+ {
+ }
+
virtual ~MessengerLogger() = default;
+
virtual void info(const char* fmt, ...) override
{
va_list args;
vfprintf(stdout, new_fmt.c_str(), args);
va_end(args);
}
+
virtual void warn(const char* fmt, ...) override
{
va_list args;
vfprintf(stdout, new_fmt.c_str(), args);
va_end(args);
}
+
virtual void error(const char* fmt, ...) override
{
va_list args;
va_end(args);
}
- protected:
+protected:
std::string preamble_;
};
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-function"
#endif
-static void setMessengerLogger(IMessengerLogger* logger)
+static void
+setMessengerLogger(IMessengerLogger* logger)
{
- delete sLogger;
- sLogger = logger;
+ delete sLogger;
+ sLogger = logger;
}
#if defined(__GNUC__) || defined(__clang__)
#pragma GCC diagnostic pop
#endif
-static IMessengerLogger* getMessengerLogger(void)
+static IMessengerLogger*
+getMessengerLogger(void)
{
return sLogger;
}
+
struct MessengerInit
{
- MessengerInit(const std::string &outBuf, const std::string &outSent,
- const std::string &outReceiveReady, const std::string &inBuf,
- const std::string &inSent,
- const std::string &inReceiveReady,
- std::function<void(std::string)> processor,
- size_t numProcessingThreads)
- : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
- outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
- inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
- numProcessingThreads_(numProcessingThreads),
- uncompressedFrameSize_(0), compressedFrameSize_(0),
- numFrames_(0)
- {
- if(firstLaunch(true))
+ MessengerInit(const std::string& outBuf, const std::string& outSent,
+ const std::string& outReceiveReady, const std::string& inBuf,
+ const std::string& inSent,
+ const std::string& inReceiveReady,
+ std::function<void(std::string)> processor,
+ size_t numProcessingThreads)
+ : outboundMessageBuf(outBuf)
+ , outboundSentSynch(outSent)
+ , outboundReceiveReadySynch(outReceiveReady)
+ , inboundMessageBuf(inBuf)
+ , inboundSentSynch(inSent)
+ , inboundReceiveReadySynch(inReceiveReady)
+ , processor_(processor)
+ , numProcessingThreads_(numProcessingThreads)
+ , uncompressedFrameSize_(0)
+ , compressedFrameSize_(0)
+ , numFrames_(0)
+ {
+ if (firstLaunch(true))
unlink();
}
+
void unlink(void)
{
#ifndef _WIN32
shm_unlink(clientToGrokMessageBuf.c_str());
#endif
}
+
static bool firstLaunch(bool isClient)
{
bool debugGrok = false;
return debugGrok != isClient;
}
+
std::string outboundMessageBuf;
std::string outboundSentSynch;
std::string outboundReceiveReadySynch;
};
typedef int grk_handle;
+
struct Synch
{
- Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
- : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
+ Synch(const std::string& sentSemName, const std::string& receiveReadySemName)
+ : sentSemName_(sentSemName)
+ , receiveReadySemName_(receiveReadySemName)
{
// unlink semaphores in case of previous crash
- if(MessengerInit::firstLaunch(true))
+ if (MessengerInit::firstLaunch(true))
unlink();
open();
}
+
~Synch()
{
close();
- if(MessengerInit::firstLaunch(true))
+ if (MessengerInit::firstLaunch(true))
unlink();
}
+
void post(SynchDirection dir)
{
auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
int rc = sem_post(sem);
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
}
+
void wait(SynchDirection dir)
{
auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
int rc = sem_wait(sem);
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
}
+
void open(void)
{
sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
- if(!sentSem_)
+ if (!sentSem_)
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
- if(!receiveReadySem_)
+ if (!receiveReadySem_)
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
}
+
void close(void)
{
int rc = sem_close(sentSem_);
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
- strerror(errno));
+ strerror(errno));
rc = sem_close(receiveReadySem_);
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error closing semaphore %s: %s",
- receiveReadySemName_.c_str(), strerror(errno));
+ receiveReadySemName_.c_str(), strerror(errno));
}
+
void unlink(void)
{
int rc = sem_unlink(sentSemName_.c_str());
- if(rc == -1 && errno != ENOENT)
+ if (rc == -1 && errno != ENOENT)
getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
- strerror(errno));
+ strerror(errno));
rc = sem_unlink(receiveReadySemName_.c_str());
- if(rc == -1 && errno != ENOENT)
+ if (rc == -1 && errno != ENOENT)
getMessengerLogger()->error("Error unlinking semaphore %s: %s",
- receiveReadySemName_.c_str(), strerror(errno));
+ receiveReadySemName_.c_str(), strerror(errno));
}
+
sem_t* sentSem_;
sem_t* receiveReadySem_;
- private:
+private:
std::string sentSemName_;
std::string receiveReadySemName_;
};
+
struct SharedMemoryManager
{
- static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
+ static bool initShm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer)
{
*shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
- if(*shm_fd < 0)
- {
+ if (*shm_fd < 0) {
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
return false;
}
int rc = ftruncate(*shm_fd, sizeof(char) * len);
- if(rc)
- {
+ if (rc) {
getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
rc = close(*shm_fd);
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
rc = shm_unlink(name.c_str());
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
return false;
}
*buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
- if(!*buffer)
- {
+ if (!*buffer) {
getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
rc = close(*shm_fd);
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
rc = shm_unlink(name.c_str());
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
}
return *buffer != nullptr;
}
- static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
+
+ static bool deinitShm(const std::string& name, size_t len, grk_handle& shm_fd, char** buffer)
{
if (!*buffer || !shm_fd)
return true;
int rc = munmap(*buffer, len);
*buffer = nullptr;
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
rc = close(shm_fd);
shm_fd = 0;
- if(rc)
+ if (rc)
getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
rc = shm_unlink(name.c_str());
- if(rc)
- fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
+ if (rc)
+ fprintf(stderr, "Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
return true;
}
};
-template<typename Data>
+template <typename Data>
class MessengerBlockingQueue
{
- public:
- explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
- MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
+public:
+ explicit MessengerBlockingQueue(size_t max)
+ : active_(true)
+ , max_size_(max)
+ {
+ }
+
+ MessengerBlockingQueue()
+ : MessengerBlockingQueue(UINT_MAX)
+ {
+ }
+
size_t size() const
{
return queue_.size();
}
+
// deactivate and clear queue
void deactivate()
{
{
std::lock_guard<std::mutex> lk(mutex_);
active_ = false;
- while(!queue_.empty())
+ while (!queue_.empty())
queue_.pop();
}
can_pop_.notify_all();
can_push_.notify_all();
}
+
void activate()
{
std::lock_guard<std::mutex> lk(mutex_);
active_ = true;
}
+
bool push(Data const& value)
{
bool rc;
std::unique_lock<std::mutex> lk(mutex_);
rc = push_(value);
}
- if(rc)
+ if (rc)
can_pop_.notify_one();
return rc;
}
+
bool waitAndPush(Data& value)
{
bool rc;
{
std::unique_lock<std::mutex> lk(mutex_);
- if(!active_)
+ if (!active_)
return false;
// in case of spurious wakeup, loop until predicate in lambda
// is satisfied.
can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
rc = push_(value);
}
- if(rc)
+ if (rc)
can_pop_.notify_one();
return rc;
}
+
bool pop(Data& value)
{
bool rc;
std::unique_lock<std::mutex> lk(mutex_);
rc = pop_(value);
}
- if(rc)
+ if (rc)
can_push_.notify_one();
return rc;
}
+
bool waitAndPop(Data& value)
{
bool rc;
{
std::unique_lock<std::mutex> lk(mutex_);
- if(!active_)
+ if (!active_)
return false;
// in case of spurious wakeup, loop until predicate in lambda
// is satisfied.
can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
rc = pop_(value);
}
- if(rc)
+ if (rc)
can_push_.notify_one();
return rc;
}
- private:
+private:
bool push_(Data const& value)
{
- if(queue_.size() == max_size_ || !active_)
+ if (queue_.size() == max_size_ || !active_)
return false;
queue_.push(value);
return true;
}
+
bool pop_(Data& value)
{
- if(queue_.empty() || !active_)
+ if (queue_.empty() || !active_)
return false;
value = queue_.front();
queue_.pop();
return true;
}
+
std::queue<Data> queue_;
mutable std::mutex mutex_;
std::condition_variable can_pop_;
bool active_;
size_t max_size_;
};
+
struct BufferSrc
{
- BufferSrc(void) : BufferSrc("") {}
- explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
- {}
+ BufferSrc(void)
+ : BufferSrc("")
+ {
+ }
+
+ explicit BufferSrc(const std::string& file)
+ : file_(file)
+ , clientFrameId_(0)
+ , frameId_(0)
+ , framePtr_(nullptr)
+ {
+ }
+
BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
- : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
- {}
+ : file_("")
+ , clientFrameId_(clientFrameId)
+ , frameId_(frameId)
+ , framePtr_(framePtr)
+ {
+ }
+
bool fromDisk(void)
{
return !file_.empty() && framePtr_ == nullptr;
}
+
size_t index() const
{
return clientFrameId_;
}
+
std::string file_;
size_t clientFrameId_;
size_t frameId_;
};
struct Messenger;
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
+static void
+outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch);
+static void
+inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch);
+static void
+processorThread(Messenger* messenger, std::function<void(std::string)> processor);
struct Messenger
{
explicit Messenger(MessengerInit init)
- : running(true), initialized_(false), shutdown_(false), init_(init),
- outboundSynch_(nullptr),
- inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
- uncompressed_fd_(0), compressed_fd_(0)
- {}
+ : running(true)
+ , initialized_(false)
+ , shutdown_(false)
+ , init_(init)
+ , outboundSynch_(nullptr)
+ , inboundSynch_(nullptr)
+ , uncompressed_buffer_(nullptr)
+ , compressed_buffer_(nullptr)
+ , uncompressed_fd_(0)
+ , compressed_fd_(0)
+ {
+ }
+
virtual ~Messenger(void)
{
running = false;
inbound.join();
}
- for(auto& p : processors_)
+ for (auto& p : processors_)
p.join();
delete outboundSynch_;
deinitShm();
}
- void startThreads(void) {
+
+ void startThreads(void)
+ {
outboundSynch_ =
- new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
+ new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
inboundSynch_ =
- new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
+ new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
- for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
+ for (size_t i = 0; i < init_.numProcessingThreads_; ++i)
processors_.push_back(std::thread(processorThread, this, init_.processor_));
}
- size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
- size_t compressedLength)
+
+ size_t serialize(const std::string& dir, size_t clientFrameId, uint8_t* compressedPtr,
+ size_t compressedLength)
{
char fname[512];
- if(!compressedPtr || !compressedLength)
+ if (!compressedPtr || !compressedLength)
return 0;
sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
auto fp = fopen(fname, "wb");
- if(!fp)
+ if (!fp)
return 0;
size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
- if(written != compressedLength)
- {
+ if (written != compressedLength) {
fclose(fp);
return 0;
}
return written;
}
+
bool initBuffers(void)
{
bool rc = true;
- if(init_.uncompressedFrameSize_)
- {
+ if (init_.uncompressedFrameSize_) {
rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- &uncompressed_fd_, &uncompressed_buffer_);
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ &uncompressed_fd_, &uncompressed_buffer_);
}
- if(init_.compressedFrameSize_)
- {
+ if (init_.compressedFrameSize_) {
rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- &compressed_fd_, &compressed_buffer_);
+ init_.compressedFrameSize_ * init_.numFrames_,
+ &compressed_fd_, &compressed_buffer_);
}
return rc;
bool deinitShm(void)
{
bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- uncompressed_fd_, &uncompressed_buffer_);
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ uncompressed_fd_, &uncompressed_buffer_);
rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- compressed_fd_, &compressed_buffer_);
+ init_.compressedFrameSize_ * init_.numFrames_,
+ compressed_fd_, &compressed_buffer_);
return rc;
}
- template<typename... Args>
+
+ template <typename... Args>
void send(const std::string& str, Args... args)
{
std::ostringstream oss;
sendQueue.push(oss.str());
}
+
static pid_t get_pid_by_process_name(const char* name)
{
char command[256];
snprintf(command, sizeof(command), "pgrep %s", name);
auto pgrep = popen(command, "r");
- if(!pgrep)
+ if (!pgrep)
return -1;
pid_t pid;
- if(fscanf(pgrep, "%d", &pid) != 1)
+ if (fscanf(pgrep, "%d", &pid) != 1)
pid = -1;
pclose(pgrep);
return pid;
}
+
static bool terminate_process(const char* name)
{
auto pid = get_pid_by_process_name(name);
return (pid != -1 && kill(pid, SIGTERM) != -1);
}
+
static bool kill_process(const char* name)
{
auto pid = get_pid_by_process_name(name);
return (pid != -1 && kill(pid, SIGKILL) != -1);
}
- void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
- uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
- int device, bool is4K, uint32_t fps, uint32_t bandwidth,
- const std::string server, uint32_t port,
- const std::string license)
+
+ void launchGrok(const std::string& dir, uint32_t width, uint32_t stride,
+ uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
+ int device, bool is4K, uint32_t fps, uint32_t bandwidth,
+ const std::string server, uint32_t port,
+ const std::string license)
{
std::unique_lock<std::mutex> lk(shutdownMutex_);
if (async_result_.valid())
return;
- if(MessengerInit::firstLaunch(true))
+ if (MessengerInit::firstLaunch(true))
init_.unlink();
startThreads();
char _cmd[4096];
auto fullServer = server + ":" + std::to_string(port);
sprintf(_cmd,
- "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
- "-G %d -%s %d,%d -j %s -J %s",
- GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
- device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
- license.c_str(), fullServer.c_str());
+ "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
+ "-G %d -%s %d,%d -j %s -J %s",
+ GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
+ device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
+ license.c_str(), fullServer.c_str());
launch(_cmd, dir);
}
+
void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
{
// client fills queue with pending uncompressed buffers
init_.numFrames_ = numFrames;
initBuffers();
auto ptr = uncompressed_buffer_;
- for(size_t i = 0; i < init_.numFrames_; ++i)
- {
+ for (size_t i = 0; i < init_.numFrames_; ++i) {
availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
ptr += init_.uncompressedFrameSize_;
}
initialized_ = true;
clientInitializedCondition_.notify_all();
}
+
bool waitForClientInit(void)
{
- if(initialized_)
+ if (initialized_)
return true;
std::unique_lock<std::mutex> lk(shutdownMutex_);
- if(initialized_)
+ if (initialized_)
return true;
else if (shutdown_)
return false;
- clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
+ clientInitializedCondition_.wait(lk, [this] { return initialized_ || shutdown_; });
return initialized_ && !shutdown_;
}
+
static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
{
return sizeof(uint16_t) * w * h * samplesPerPixel;
}
+
void reclaimCompressed(size_t frameId)
{
availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
}
+
void reclaimUncompressed(size_t frameId)
{
availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
}
+
uint8_t* getUncompressedFrame(size_t frameId)
{
assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
+ if (frameId >= init_.numFrames_)
return nullptr;
return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
}
+
uint8_t* getCompressedFrame(size_t frameId)
{
assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
+ if (frameId >= init_.numFrames_)
return nullptr;
return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
}
+
std::atomic_bool running;
bool initialized_;
bool shutdown_;
std::mutex shutdownMutex_;
std::condition_variable shutdownCondition_;
- protected:
+protected:
std::condition_variable clientInitializedCondition_;
- private:
- void launch(const std::string &cmd, const std::string &dir)
+
+private:
+ void launch(const std::string& cmd, const std::string& dir)
{
// Change the working directory
- if(!dir.empty())
- {
- if(chdir(dir.c_str()) != 0)
- {
+ if (!dir.empty()) {
+ if (chdir(dir.c_str()) != 0) {
getMessengerLogger()->error("Error: failed to change the working directory");
return;
}
cmd_ = cmd;
async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
}
+
std::thread outbound;
Synch* outboundSynch_;
grk_handle compressed_fd_;
};
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
+static void
+outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch)
{
grk_handle shm_fd = 0;
char* send_buffer = nullptr;
- if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
+ if (!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
return;
- while(messenger->running)
- {
+ while (messenger->running) {
synch->wait(SYNCH_RECEIVE_READY);
- if(!messenger->running)
+ if (!messenger->running)
break;
std::string message;
- if(!messenger->sendQueue.waitAndPop(message))
+ if (!messenger->sendQueue.waitAndPop(message))
break;
- if(!messenger->running)
+ if (!messenger->running)
break;
memcpy(send_buffer, message.c_str(), message.size() + 1);
synch->post(SYNCH_SENT);
SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
}
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
+static void
+inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch)
{
grk_handle shm_fd = 0;
char* receive_buffer = nullptr;
- if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
+ if (!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
return;
- while(messenger->running)
- {
+ while (messenger->running) {
synch->wait(SYNCH_SENT);
- if(!messenger->running)
+ if (!messenger->running)
break;
auto message = std::string(receive_buffer);
synch->post(SYNCH_RECEIVE_READY);
}
SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
}
+
struct Msg
{
- explicit Msg(const std::string &msg) : ct_(0)
+ explicit Msg(const std::string& msg)
+ : ct_(0)
{
std::stringstream ss(msg);
- while(ss.good())
- {
+ while (ss.good()) {
std::string substr;
std::getline(ss, substr, ',');
cs_.push_back(substr);
}
}
+
std::string next()
{
- if(ct_ == cs_.size())
- {
+ if (ct_ == cs_.size()) {
getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
return "";
}
std::vector<std::string> cs_;
size_t ct_;
};
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
+
+static void
+processorThread(Messenger* messenger, std::function<void(std::string)> processor)
{
- while(messenger->running)
- {
+ while (messenger->running) {
std::string message;
- if(!messenger->receiveQueue.waitAndPop(message))
+ if (!messenger->receiveQueue.waitAndPop(message))
break;
- if(!messenger->running)
+ if (!messenger->running)
break;
Msg msg(message);
auto tag = msg.next();
- if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
- {
+ if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) {
auto width = msg.nextUint();
auto stride = msg.nextUint();
(void)stride;
auto depth = msg.nextUint();
(void)depth;
messenger->init_.uncompressedFrameSize_ =
- Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
+ Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
auto compressedFrameSize = msg.nextUint();
auto numFrames = msg.nextUint();
messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
- }
- else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
- {
+ } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) {
messenger->reclaimUncompressed(msg.nextUint());
- }
- else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
- {
+ } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) {
messenger->reclaimCompressed(msg.nextUint());
}
processor(message);
}
}
-template<typename F>
+template <typename F>
struct ScheduledFrames
{
void store(F& val)
if (it == map_.end())
map_[val.index()] = val;
}
- F retrieve(size_t index, bool &success)
+
+ F retrieve(size_t index, bool& success)
{
std::unique_lock<std::mutex> lk(mapMutex_);
success = false;
auto it = map_.find(index);
- if(it == map_.end())
+ if (it == map_.end())
return F();
success = true;
return val;
}
- private:
+private:
std::mutex mapMutex_;
std::map<size_t, F> map_;
};
-template<typename F>
+template <typename F>
struct ScheduledMessenger : public Messenger
{
- explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
- framesScheduled_(0),
- framesCompressed_(0)
- {}
- ~ScheduledMessenger(void) {
+ explicit ScheduledMessenger(MessengerInit init)
+ : Messenger(init)
+ , framesScheduled_(0)
+ , framesCompressed_(0)
+ {
+ }
+
+ ~ScheduledMessenger(void)
+ {
shutdown();
}
- bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
+
+ bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter)
+ {
size_t frameSize = init_.uncompressedFrameSize_;
assert(frameSize >= init_.uncompressedFrameSize_);
BufferSrc src;
- if(!availableBuffers_.waitAndPop(src))
+ if (!availableBuffers_.waitAndPop(src))
return false;
converter(src);
scheduledFrames_.store(proxy);
return true;
}
- void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
+
+ void processCompressed(const std::string& message, std::function<void(F, uint8_t*, uint32_t)> processor, bool needsRecompression)
+ {
Msg msg(message);
msg.next();
auto clientFrameId = msg.nextUint();
auto compressedFrameLength = msg.nextUint();
if (!needsRecompression) {
bool success = false;
- auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
+ auto srcFrame = scheduledFrames_.retrieve(clientFrameId, success);
if (!success)
return;
- processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
+ processor(srcFrame, getCompressedFrame(compressedFrameId), compressedFrameLength);
}
++framesCompressed_;
send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
if (shutdown_ && framesCompressed_ == framesScheduled_)
shutdownCondition_.notify_all();
}
- void shutdown(void){
+
+ void shutdown(void)
+ {
try {
std::unique_lock<std::mutex> lk(shutdownMutex_);
if (!async_result_.valid())
availableBuffers_.deactivate();
send(GRK_MSGR_BATCH_SHUTDOWN);
int result = async_result_.get();
- if(result != 0)
- getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
- } catch (std::exception &ex) {
- getMessengerLogger()->error("%s",ex.what());
+ if (result != 0)
+ getMessengerLogger()->error("Accelerator failed with return code: %d\n", result);
+ } catch (std::exception& ex) {
+ getMessengerLogger()->error("%s", ex.what());
}
-
}
- F retrieve(size_t index, bool &success) {
+
+ F retrieve(size_t index, bool& success)
+ {
return scheduledFrames_.retrieve(index, success);
}
- void store(F& val) {
+
+ void store(F& val)
+ {
scheduledFrames_.store(val);
}
* @param writer Writer that we are using.
*/
J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
- : _film (film)
- , _history (200)
- , _writer (writer) ,
- dcpomaticContext_(film,writer,_history, Config::instance()->gpu_binary_location ()),
- context_(Config::instance()->enable_gpu () ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr)
+ : _film(film)
+ , _history(200)
+ , _writer(writer)
+ , dcpomaticContext_(film, writer, _history, Config::instance()->gpu_binary_location())
+ , context_(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr)
{
servers_list_changed ();
}
_server_found_connection.disconnect();
{
- boost::mutex::scoped_lock lm (_threads_mutex);
- terminate_threads ();
+ boost::mutex::scoped_lock lm(_threads_mutex);
+ terminate_threads();
}
delete context_;
);
}
-void J2KEncoder::pause(void){
- if (Config::instance()->enable_gpu ())
+void
+J2KEncoder::pause(void)
+{
+ if (Config::instance()->enable_gpu())
end(false);
}
-void J2KEncoder::resume(void){
- if (Config::instance()->enable_gpu ()) {
+void
+J2KEncoder::resume(void)
+{
+ if (Config::instance()->enable_gpu()) {
context_ = new grk_plugin::GrokContext(dcpomaticContext_);
- servers_list_changed ();
+ servers_list_changed();
}
}
void
-J2KEncoder::end (bool isFinal)
+J2KEncoder::end(bool isFinal)
{
if (isFinal) {
- boost::mutex::scoped_lock lock (_queue_mutex);
+ boost::mutex::scoped_lock lock(_queue_mutex);
- LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
+ LOG_GENERAL(N_("Clearing queue of %1"), _queue.size());
/* Keep waking workers until the queue is empty */
- while (!_queue.empty ()) {
- rethrow ();
- _empty_condition.notify_all ();
- _full_condition.wait (lock);
- }
- lock.unlock ();
+ while (!_queue.empty()) {
+ rethrow();
+ _empty_condition.notify_all();
+ _full_condition.wait(lock);
+ }
+ lock.unlock();
}
LOG_GENERAL_NC (N_("Terminating encoder threads"));
LOG_GENERAL (N_("Mopping up %1"), _queue.size());
/* The following sequence of events can occur in the above code:
- 1. a remote worker takes the last image off the queue
- 2. the loop above terminates
- 3. the remote worker fails to encode the image and puts it back on the queue
- 4. the remote worker is then terminated by terminate_threads
+ 1. a remote worker takes the last image off the queue
+ 2. the loop above terminates
+ 3. the remote worker fails to encode the image and puts it back on the queue
+ 4. the remote worker is then terminated by terminate_threads
- So just mop up anything left in the queue here.
+ So just mop up anything left in the queue here.
*/
if (isFinal) {
- for (auto & i: _queue) {
- if (Config::instance()->enable_gpu ()) {
- if (!context_->scheduleCompress(i)){
- LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
+ for (auto& i : _queue) {
+ if (Config::instance()->enable_gpu()) {
+ if (!context_->scheduleCompress(i)) {
+ LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
// handle error
}
- }
- else {
+ } else {
LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
try {
_writer.write(
- make_shared<dcp::ArrayData>(i.encode_locally()),
- i.index(),
- i.eyes()
- );
- frame_done ();
+ make_shared<dcp::ArrayData>(i.encode_locally()),
+ i.index(),
+ i.eyes());
+ frame_done();
} catch (std::exception& e) {
- LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+ LOG_ERROR(N_("Local encode failed (%1)"), e.what());
}
}
}
/* Queue this new frame for encoding */
LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
auto dcpv = DCPVideo(
- pv,
- position,
- _film->video_frame_rate(),
- _film->j2k_bandwidth(),
- _film->resolution()
- );
- _queue.push_back (dcpv);
+ pv,
+ position,
+ _film->video_frame_rate(),
+ _film->j2k_bandwidth(),
+ _film->resolution());
+ _queue.push_back(dcpv);
/* The queue might not be empty any more, so notify anything which is
waiting on that.
J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
try
{
- auto config = Config::instance ();
+ auto config = Config::instance();
start_of_thread ("J2KEncoder");
} else {
if (context_) {
if (!context_->launch(vf, config->selected_gpu()) || !context_->scheduleCompress(vf)) {
- LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
- _queue.push_front (vf);
+ LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
+ _queue.push_front(vf);
}
} else {
try {
- LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
+ LOG_TIMING("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
- LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
+ LOG_TIMING("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
} catch (std::exception& e) {
/* This is very bad, so don't cope with it, just pass it on */
- LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+ LOG_ERROR(N_("Local encode failed (%1)"), e.what());
throw;
}
}
_writer.write(encoded, vf.index(), vf.eyes());
frame_done ();
} else {
- if (!Config::instance()->enable_gpu ()) {
- lock.lock ();
- LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
- _queue.push_front (vf);
- lock.unlock ();
+ if (!Config::instance()->enable_gpu()) {
+ lock.lock();
+ LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
+ _queue.push_front(vf);
+ lock.unlock();
}
}
}
void resume(void);
/** Called when a processing run has finished */
- void end (bool isFinal);
+ void end(bool isFinal);
boost::optional<float> current_encoding_rate () const;
int video_frames_enqueued () const;
boost::signals2::scoped_connection _server_found_connection;
grk_plugin::DcpomaticContext dcpomaticContext_;
- grk_plugin::GrokContext *context_;
+ grk_plugin::GrokContext* context_;
};
return false;
}
- void start ();
+ void start();
+
virtual void pause() {}
bool pause_by_user ();
void pause_by_priority ();
- virtual void resume ();
+ virtual void resume();
void cancel ();
bool is_new () const;
}
}
-void TranscodeJob::pause() {
+void
+TranscodeJob::pause()
+{
_encoder->pause();
}
-void TranscodeJob::resume() {
+void
+TranscodeJob::resume()
+{
_encoder->resume();
Job::resume();
}
FontConfig::drop();
ev.Skip ();
- JobManager::drop ();
+ JobManager::drop();
}
void active_jobs_changed()
}
ev.Skip ();
- JobManager::drop ();
+ JobManager::drop();
}
void file_add_film ()
}
ev.Skip ();
- JobManager::drop ();
+ JobManager::drop();
}
void copy ()
#pragma once
-static std::vector<std::string> get_gpu_names(std::string binary, std::string filename)
+static std::vector<std::string>
+get_gpu_names(std::string binary, std::string filename)
{
- // Execute the GPU listing program and redirect its output to a file
- std::system((binary + " > " + filename).c_str());
-
- std::vector<std::string> gpu_names;
- std::ifstream file(filename);
- if (file.is_open())
- {
- std::string line;
- while (std::getline(file, line))
- gpu_names.push_back(line);
- file.close();
- }
-
- return gpu_names;
+ // Execute the GPU listing program and redirect its output to a file
+ std::system((binary + " > " + filename).c_str());
+
+ std::vector<std::string> gpu_names;
+ std::ifstream file(filename);
+ if (file.is_open()) {
+ std::string line;
+ while (std::getline(file, line))
+ gpu_names.push_back(line);
+ file.close();
+ }
+
+ return gpu_names;
}
class GpuList : public wxPanel
{
public:
- GpuList(wxPanel* parent) : wxPanel(parent, wxID_ANY), selection(0) {
- comboBox = new wxComboBox(this, wxID_ANY, "", wxDefaultPosition, wxSize(400, -1));
- comboBox->Bind(wxEVT_COMBOBOX, &GpuList::OnComboBox, this);
- update();
-
- wxBoxSizer* sizer = new wxBoxSizer(wxHORIZONTAL);
-
- sizer->Add(comboBox, 0, wxALIGN_CENTER_VERTICAL); // Vertically center the comboBox
-
- this->SetSizerAndFit(sizer);
- }
- void update(void) {
- auto cfg = Config::instance();
- auto lister_binary = cfg->gpu_binary_location() + "/" + "gpu_lister";
- auto lister_file = cfg->gpu_binary_location () + "/" + "gpus.txt";
- if (boost::filesystem::exists(lister_binary)) {
+ GpuList(wxPanel* parent)
+ : wxPanel(parent, wxID_ANY)
+ , selection(0)
+ {
+ comboBox = new wxComboBox(this, wxID_ANY, "", wxDefaultPosition, wxSize(400, -1));
+ comboBox->Bind(wxEVT_COMBOBOX, &GpuList::OnComboBox, this);
+ update();
+
+ wxBoxSizer* sizer = new wxBoxSizer(wxHORIZONTAL);
+
+ sizer->Add(comboBox, 0, wxALIGN_CENTER_VERTICAL); // Vertically center the comboBox
+
+ this->SetSizerAndFit(sizer);
+ }
+
+ void update(void)
+ {
+ auto cfg = Config::instance();
+ auto lister_binary = cfg->gpu_binary_location() + "/" + "gpu_lister";
+ auto lister_file = cfg->gpu_binary_location() + "/" + "gpus.txt";
+ if (boost::filesystem::exists(lister_binary)) {
auto gpu_names = get_gpu_names(lister_binary, lister_file);
comboBox->Clear();
for (const auto& name : gpu_names)
- comboBox->Append(name);
- }
- }
-
- int getSelection(void) {
- return selection;
- }
- void setSelection(int sel) {
- if ((int)comboBox->GetCount() > sel)
- comboBox->SetSelection(sel);
- }
+ comboBox->Append(name);
+ }
+ }
+
+ int getSelection(void)
+ {
+ return selection;
+ }
+
+ void setSelection(int sel)
+ {
+ if ((int)comboBox->GetCount() > sel)
+ comboBox->SetSelection(sel);
+ }
private:
- void OnComboBox([[maybe_unused]] wxCommandEvent& event) {
- selection = comboBox->GetSelection();
- if (selection != wxNOT_FOUND)
- Config::instance ()->set_selected_gpu(selection);
- }
-
- wxComboBox* comboBox;
- int selection;
+ void OnComboBox([[maybe_unused]] wxCommandEvent& event)
+ {
+ selection = comboBox->GetSelection();
+ if (selection != wxNOT_FOUND)
+ Config::instance()->set_selected_gpu(selection);
+ }
+
+ wxComboBox* comboBox;
+ int selection;
};
class GPUPage : public Page
{
public:
- GPUPage (wxSize panel_size, int border)
- : Page (panel_size, border),
- _enable_gpu(nullptr), _binary_location(nullptr), _gpu_list_control(nullptr)
- {}
+ GPUPage(wxSize panel_size, int border)
+ : Page(panel_size, border)
+ , _enable_gpu(nullptr)
+ , _binary_location(nullptr)
+ , _gpu_list_control(nullptr)
+ {
+ }
- wxString GetName () const override
+ wxString GetName() const override
{
return _("GPU");
}
#ifdef DCPOMATIC_OSX
- wxBitmap GetLargeIcon () const override
+ wxBitmap GetLargeIcon() const override
{
return wxBitmap(icon_path("tms"), wxBITMAP_TYPE_PNG);
}
#endif
private:
- void setup () override
+ void setup() override
{
- auto config = Config::instance ();
+ auto config = Config::instance();
- _enable_gpu = new CheckBox (_panel, _("Enable GPU Acceleration"));
- _panel->GetSizer()->Add (_enable_gpu, 0, wxALL | wxEXPAND, _border);
+ _enable_gpu = new CheckBox(_panel, _("Enable GPU Acceleration"));
+ _panel->GetSizer()->Add(_enable_gpu, 0, wxALL | wxEXPAND, _border);
- wxFlexGridSizer* table = new wxFlexGridSizer (2, DCPOMATIC_SIZER_X_GAP, DCPOMATIC_SIZER_Y_GAP);
- table->AddGrowableCol (1, 1);
- _panel->GetSizer()->Add (table, 1, wxALL | wxEXPAND, _border);
+ wxFlexGridSizer* table = new wxFlexGridSizer(2, DCPOMATIC_SIZER_X_GAP, DCPOMATIC_SIZER_Y_GAP);
+ table->AddGrowableCol(1, 1);
+ _panel->GetSizer()->Add(table, 1, wxALL | wxEXPAND, _border);
- add_label_to_sizer (table, _panel, _("Acceleration Binary Folder"), true, 0, wxLEFT | wxLEFT | wxALIGN_CENTRE_VERTICAL);
- _binary_location = new wxDirPickerCtrl (_panel, wxDD_DIR_MUST_EXIST);
- table->Add (_binary_location, 1, wxEXPAND);
+ add_label_to_sizer(table, _panel, _("Acceleration Binary Folder"), true, 0, wxLEFT | wxLEFT | wxALIGN_CENTRE_VERTICAL);
+ _binary_location = new wxDirPickerCtrl(_panel, wxDD_DIR_MUST_EXIST);
+ table->Add(_binary_location, 1, wxEXPAND);
- add_label_to_sizer (table, _panel, _("GPU Selection"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ add_label_to_sizer(table, _panel, _("GPU Selection"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
_gpu_list_control = new GpuList(_panel);
- table->Add (_gpu_list_control, 1, wxEXPAND);
+ table->Add(_gpu_list_control, 1, wxEXPAND);
- add_label_to_sizer (table, _panel, _("License Server"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
- _server = new wxTextCtrl (_panel, wxID_ANY);
- table->Add (_server, 1, wxEXPAND | wxALL);
+ add_label_to_sizer(table, _panel, _("License Server"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ _server = new wxTextCtrl(_panel, wxID_ANY);
+ table->Add(_server, 1, wxEXPAND | wxALL);
- add_label_to_sizer (table, _panel, _("Port"), false, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
- _port = new wxSpinCtrl (_panel, wxID_ANY);
- _port->SetRange (0, 65535);
- table->Add (_port);
+ add_label_to_sizer(table, _panel, _("Port"), false, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ _port = new wxSpinCtrl(_panel, wxID_ANY);
+ _port->SetRange(0, 65535);
+ table->Add(_port);
- add_label_to_sizer (table, _panel, _("License"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
- _license = new PasswordEntry (_panel);
- table->Add (_license->get_panel(), 1, wxEXPAND | wxALL);
+ add_label_to_sizer(table, _panel, _("License"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ _license = new PasswordEntry(_panel);
+ table->Add(_license->get_panel(), 1, wxEXPAND | wxALL);
_enable_gpu->bind(&GPUPage::enable_gpu_changed, this);
- _binary_location->Bind (wxEVT_DIRPICKER_CHANGED, boost::bind (&GPUPage::binary_location_changed, this));
- _server->Bind (wxEVT_TEXT, boost::bind(&GPUPage::server_changed, this));
- _port->Bind (wxEVT_SPINCTRL, boost::bind(&GPUPage::port_changed, this));
- _license->Changed.connect (boost::bind(&GPUPage::license_changed, this));
+ _binary_location->Bind(wxEVT_DIRPICKER_CHANGED, boost::bind(&GPUPage::binary_location_changed, this));
+ _server->Bind(wxEVT_TEXT, boost::bind(&GPUPage::server_changed, this));
+ _port->Bind(wxEVT_SPINCTRL, boost::bind(&GPUPage::port_changed, this));
+ _license->Changed.connect(boost::bind(&GPUPage::license_changed, this));
_binary_location->Enable(config->enable_gpu());
_gpu_list_control->Enable(config->enable_gpu());
_license->get_panel()->Enable(config->enable_gpu());
}
-
- void config_changed () override
+ void config_changed() override
{
- auto config = Config::instance ();
+ auto config = Config::instance();
- checked_set (_enable_gpu, config->enable_gpu());
- _binary_location->SetPath(config->gpu_binary_location ());
+ checked_set(_enable_gpu, config->enable_gpu());
+ _binary_location->SetPath(config->gpu_binary_location());
_gpu_list_control->update();
_gpu_list_control->setSelection(config->selected_gpu());
- checked_set (_server, config->gpu_license_server());
- checked_set (_port, config->gpu_license_port());
- checked_set (_license, config->gpu_license());
+ checked_set(_server, config->gpu_license_server());
+ checked_set(_port, config->gpu_license_port());
+ checked_set(_license, config->gpu_license());
}
- void enable_gpu_changed ()
+ void enable_gpu_changed()
{
- auto config = Config::instance ();
+ auto config = Config::instance();
- config->set_enable_gpu (_enable_gpu->GetValue());
+ config->set_enable_gpu(_enable_gpu->GetValue());
_binary_location->Enable(config->enable_gpu());
_gpu_list_control->Enable(config->enable_gpu());
_server->Enable(config->enable_gpu());
_license->get_panel()->Enable(config->enable_gpu());
}
- void binary_location_changed ()
+ void binary_location_changed()
{
- Config::instance()->set_gpu_binary_location (wx_to_std (_binary_location->GetPath ()));
+ Config::instance()->set_gpu_binary_location(wx_to_std(_binary_location->GetPath()));
_gpu_list_control->update();
}
- void server_changed ()
+ void server_changed()
{
Config::instance()->set_gpu_license_server(wx_to_std(_server->GetValue()));
}
- void port_changed ()
+ void port_changed()
{
Config::instance()->set_gpu_license_port(_port->GetValue());
}
- void license_changed ()
+ void license_changed()
{
Config::instance()->set_gpu_license(_license->get());
}
CheckBox* _enable_gpu;
wxDirPickerCtrl* _binary_location;
- GpuList *_gpu_list_control;
+ GpuList* _gpu_list_control;
wxTextCtrl* _server;
wxSpinCtrl* _port;
PasswordEntry* _license;