summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/config.cc51
-rw-r--r--src/lib/config.h31
-rw-r--r--src/lib/cpu_j2k_encoder_thread.cc42
-rw-r--r--src/lib/cpu_j2k_encoder_thread.h16
-rw-r--r--src/lib/dcp_encoder.cc39
-rw-r--r--src/lib/dcp_encoder.h7
-rw-r--r--src/lib/dcp_video.cc24
-rw-r--r--src/lib/dcp_video.h7
-rw-r--r--src/lib/encode_server.cc3
-rw-r--r--src/lib/encode_server.h6
-rw-r--r--src/lib/encode_server_finder.h3
-rw-r--r--src/lib/encoder.h2
-rw-r--r--src/lib/grok/context.h291
-rw-r--r--src/lib/grok/messenger.h906
-rw-r--r--src/lib/grok_j2k_encoder_thread.cc51
-rw-r--r--src/lib/grok_j2k_encoder_thread.h23
-rw-r--r--src/lib/j2k_encoder.cc387
-rw-r--r--src/lib/j2k_encoder.h35
-rw-r--r--src/lib/j2k_encoder_thread.cc38
-rw-r--r--src/lib/j2k_encoder_thread.h32
-rw-r--r--src/lib/j2k_sync_encoder_thread.cc44
-rw-r--r--src/lib/j2k_sync_encoder_thread.h32
-rw-r--r--src/lib/job.cc4
-rw-r--r--src/lib/job.h3
-rw-r--r--src/lib/make_dcp.cc6
-rw-r--r--src/lib/make_dcp.h2
-rw-r--r--src/lib/remote_j2k_encoder_thread.cc64
-rw-r--r--src/lib/remote_j2k_encoder_thread.h21
-rw-r--r--src/lib/scope_guard.h10
-rw-r--r--src/lib/transcode_job.cc14
-rw-r--r--src/lib/transcode_job.h6
-rw-r--r--src/lib/writer.h1
-rw-r--r--src/lib/wscript7
-rw-r--r--src/tools/dcpomatic.cc8
-rw-r--r--src/tools/dcpomatic_batch.cc8
-rw-r--r--src/tools/dcpomatic_cli.cc2
-rw-r--r--src/tools/dcpomatic_disk.cc1
-rw-r--r--src/tools/dcpomatic_server.cc7
-rw-r--r--src/tools/dcpomatic_server_cli.cc7
-rw-r--r--src/wx/about_dialog.cc3
-rw-r--r--src/wx/full_config_dialog.cc6
-rw-r--r--src/wx/grok/gpu_config_panel.h227
42 files changed, 2297 insertions, 180 deletions
diff --git a/src/lib/config.cc b/src/lib/config.cc
index 190817cbc..938be090c 100644
--- a/src/lib/config.cc
+++ b/src/lib/config.cc
@@ -212,6 +212,10 @@ Config::set_defaults ()
set_notification_email_to_default ();
set_cover_sheet_to_default ();
+#ifdef DCPOMATIC_GROK
+ _grok = boost::none;
+#endif
+
_main_divider_sash_position = {};
_main_content_divider_sash_position = {};
@@ -633,6 +637,12 @@ try
_allow_smpte_bv20 = f.optional_bool_child("AllowSMPTEBv20").get_value_or(false);
_isdcf_name_part_length = f.optional_number_child<int>("ISDCFNamePartLength").get_value_or(14);
+#ifdef DCPOMATIC_GROK
+ if (auto grok = f.optional_node_child("Grok")) {
+ _grok = Grok(grok);
+ }
+#endif
+
_export.read(f.optional_node_child("Export"));
}
catch (...) {
@@ -1119,6 +1129,12 @@ Config::write_config () const
/* [XML] ISDCFNamePartLength Maximum length of the "name" part of an ISDCF name, which should be 14 according to the standard */
root->add_child("ISDCFNamePartLength")->add_child_text(raw_convert<string>(_isdcf_name_part_length));
+#ifdef DCPOMATIC_GROK
+ if (_grok) {
+ _grok->as_xml(root->add_child("Grok"));
+ }
+#endif
+
_export.write(root->add_child("Export"));
auto target = config_write_file();
@@ -1639,3 +1655,38 @@ Config::initial_path(string id) const
return iter->second;
}
+
+#ifdef DCPOMATIC_GROK
+
+Config::Grok::Grok(cxml::ConstNodePtr node)
+ : enable(node->bool_child("Enable"))
+ , binary_location(node->string_child("BinaryLocation"))
+ , selected(node->number_child<int>("Selected"))
+ , licence_server(node->string_child("LicenceServer"))
+ , licence_port(node->number_child<int>("LicencePort"))
+ , licence(node->string_child("Licence"))
+{
+
+}
+
+
+void
+Config::Grok::as_xml(xmlpp::Element* node) const
+{
+ node->add_child("BinaryLocation")->add_child_text(binary_location.string());
+ node->add_child("Enable")->add_child_text((enable ? "1" : "0"));
+ node->add_child("Selected")->add_child_text(raw_convert<string>(selected));
+ node->add_child("LicenceServer")->add_child_text(licence_server);
+ node->add_child("LicencePort")->add_child_text(raw_convert<string>(licence_port));
+ node->add_child("Licence")->add_child_text(licence);
+}
+
+
+void
+Config::set_grok(Grok const& grok)
+{
+ _grok = grok;
+ changed(OTHER);
+}
+
+#endif
diff --git a/src/lib/config.h b/src/lib/config.h
index 0a332bcbb..eaf85451d 100644
--- a/src/lib/config.h
+++ b/src/lib/config.h
@@ -618,6 +618,28 @@ public:
return _allow_smpte_bv20;
}
+#ifdef DCPOMATIC_GROK
+ class Grok
+ {
+ public:
+ Grok() = default;
+ Grok(cxml::ConstNodePtr node);
+
+ void as_xml(xmlpp::Element* node) const;
+
+ bool enable = false;
+ boost::filesystem::path binary_location;
+ int selected = 0;
+ std::string licence_server;
+ int licence_port = 5000;
+ std::string licence;
+ };
+
+ boost::optional<Grok> grok() const {
+ return _grok;
+ }
+#endif
+
int isdcf_name_part_length() const {
return _isdcf_name_part_length;
}
@@ -1199,10 +1221,15 @@ public:
maybe_set(_allow_smpte_bv20, allow, ALLOW_SMPTE_BV20);
}
+#ifdef DCPOMATIC_GROK
+ void set_grok(Grok const& grok);
+#endif
+
void set_isdcf_name_part_length(int length) {
maybe_set(_isdcf_name_part_length, length, ISDCF_NAME_PART_LENGTH);
}
+
void changed (Property p = OTHER);
boost::signals2::signal<void (Property)> Changed;
/** Emitted if read() failed on an existing Config file. There is nothing
@@ -1443,6 +1470,10 @@ private:
bool _allow_smpte_bv20;
int _isdcf_name_part_length;
+#ifdef DCPOMATIC_GROK
+ boost::optional<Grok> _grok;
+#endif
+
ExportConfig _export;
static int const _current_version;
diff --git a/src/lib/cpu_j2k_encoder_thread.cc b/src/lib/cpu_j2k_encoder_thread.cc
new file mode 100644
index 000000000..70afac236
--- /dev/null
+++ b/src/lib/cpu_j2k_encoder_thread.cc
@@ -0,0 +1,42 @@
+#include "cpu_j2k_encoder_thread.h"
+#include "cross.h"
+#include "dcpomatic_log.h"
+#include "dcp_video.h"
+#include "j2k_encoder.h"
+#include "scope_guard.h"
+#include "util.h"
+
+#include "i18n.h"
+
+
+using std::make_shared;
+using std::shared_ptr;
+
+
+CPUJ2KEncoderThread::CPUJ2KEncoderThread(J2KEncoder& encoder)
+ : J2KSyncEncoderThread(encoder)
+{
+
+}
+
+
+void
+CPUJ2KEncoderThread::log_thread_start() const
+{
+ start_of_thread("CPUJ2KEncoder");
+ LOG_TIMING("start-encoder-thread thread=%1 server=localhost", thread_id());
+}
+
+
+shared_ptr<dcp::ArrayData>
+CPUJ2KEncoderThread::encode(DCPVideo const& frame)
+{
+ try {
+ return make_shared<dcp::ArrayData>(frame.encode_locally());
+ } catch (std::exception& e) {
+ LOG_ERROR(N_("Local encode failed (%1)"), e.what());
+ }
+
+ return {};
+}
+
diff --git a/src/lib/cpu_j2k_encoder_thread.h b/src/lib/cpu_j2k_encoder_thread.h
new file mode 100644
index 000000000..fb138f484
--- /dev/null
+++ b/src/lib/cpu_j2k_encoder_thread.h
@@ -0,0 +1,16 @@
+#include "j2k_sync_encoder_thread.h"
+#include <dcp/data.h>
+
+
+class DCPVideo;
+
+
+class CPUJ2KEncoderThread : public J2KSyncEncoderThread
+{
+public:
+ CPUJ2KEncoderThread(J2KEncoder& encoder);
+
+ void log_thread_start() const override;
+ std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) override;
+};
+
diff --git a/src/lib/dcp_encoder.cc b/src/lib/dcp_encoder.cc
index 9a840c8ab..bd78312fa 100644
--- a/src/lib/dcp_encoder.cc
+++ b/src/lib/dcp_encoder.cc
@@ -18,6 +18,7 @@
*/
+
/** @file src/dcp_encoder.cc
* @brief A class which takes a Film and some Options, then uses those to encode the film into a DCP.
*
@@ -25,31 +26,33 @@
* as a parameter to the constructor.
*/
+
+#include "audio_decoder.h"
+#include "compose.hpp"
#include "dcp_encoder.h"
-#include "j2k_encoder.h"
#include "film.h"
-#include "video_decoder.h"
-#include "audio_decoder.h"
-#include "player.h"
+#include "j2k_encoder.h"
#include "job.h"
-#include "writer.h"
-#include "compose.hpp"
+#include "player.h"
+#include "player_video.h"
#include "referenced_reel_asset.h"
#include "text_content.h"
-#include "player_video.h"
+#include "video_decoder.h"
+#include "writer.h"
#include <boost/signals2.hpp>
#include <iostream>
#include "i18n.h"
-using std::string;
+
using std::cout;
+using std::dynamic_pointer_cast;
using std::list;
-using std::vector;
+using std::make_shared;
using std::shared_ptr;
+using std::string;
+using std::vector;
using std::weak_ptr;
-using std::dynamic_pointer_cast;
-using std::make_shared;
using boost::optional;
#if BOOST_VERSION >= 106100
using namespace boost::placeholders;
@@ -118,6 +121,20 @@ DCPEncoder::go ()
_writer.finish(_film->dir(_film->dcp_name()));
}
+
+void
+DCPEncoder::pause()
+{
+ _j2k_encoder.pause();
+}
+
+
+void
+DCPEncoder::resume()
+{
+ _j2k_encoder.resume();
+}
+
void
DCPEncoder::video (shared_ptr<PlayerVideo> data, DCPTime time)
{
diff --git a/src/lib/dcp_encoder.h b/src/lib/dcp_encoder.h
index ad77f6951..ce0b72204 100644
--- a/src/lib/dcp_encoder.h
+++ b/src/lib/dcp_encoder.h
@@ -35,6 +35,8 @@ class Job;
class Player;
class PlayerVideo;
+struct frames_not_lost_when_threads_disappear;
+
/** @class DCPEncoder */
class DCPEncoder : public Encoder
@@ -53,8 +55,13 @@ public:
return _finishing;
}
+ void pause() override;
+ void resume() override;
+
private:
+ friend struct ::frames_not_lost_when_threads_disappear;
+
void video (std::shared_ptr<PlayerVideo>, dcpomatic::DCPTime);
void audio (std::shared_ptr<AudioBuffers>, dcpomatic::DCPTime);
void text (PlayerText, TextType, boost::optional<DCPTextTrack>, dcpomatic::DCPTimePeriod);
diff --git a/src/lib/dcp_video.cc b/src/lib/dcp_video.cc
index 8eb76fdd6..940ca31d2 100644
--- a/src/lib/dcp_video.cc
+++ b/src/lib/dcp_video.cc
@@ -118,6 +118,30 @@ DCPVideo::convert_to_xyz (shared_ptr<const PlayerVideo> frame, dcp::NoteHandler
return xyz;
}
+dcp::Size
+DCPVideo::get_size() const
+{
+ auto image = _frame->image(bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
+ return image->size();
+}
+
+
+void
+DCPVideo::convert_to_xyz(uint16_t* dst) const
+{
+ auto image = _frame->image(bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false);
+ if (_frame->colour_conversion()) {
+ dcp::rgb_to_xyz (
+ image->data()[0],
+ dst,
+ image->size(),
+ image->stride()[0],
+ _frame->colour_conversion().get()
+ );
+ }
+}
+
+
/** J2K-encode this frame on the local host.
* @return Encoded data.
*/
diff --git a/src/lib/dcp_video.h b/src/lib/dcp_video.h
index 33df0942c..5f43dabb8 100644
--- a/src/lib/dcp_video.h
+++ b/src/lib/dcp_video.h
@@ -17,6 +17,8 @@
along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
*/
+#ifndef DCPOMATIC_DCP_VIDEO_H
+#define DCPOMATIC_DCP_VIDEO_H
#include "encode_server_description.h"
@@ -66,6 +68,9 @@ public:
static std::shared_ptr<dcp::OpenJPEGImage> convert_to_xyz (std::shared_ptr<const PlayerVideo> frame, dcp::NoteHandler note);
+ void convert_to_xyz(uint16_t* dst) const;
+ dcp::Size get_size() const;
+
private:
void add_metadata (xmlpp::Element *) const;
@@ -76,3 +81,5 @@ private:
int _j2k_bandwidth; ///< J2K bandwidth to use
Resolution _resolution; ///< Resolution (2K or 4K)
};
+
+#endif
diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc
index 6501dcde1..11dbab628 100644
--- a/src/lib/encode_server.cc
+++ b/src/lib/encode_server.cc
@@ -81,6 +81,7 @@ EncodeServer::EncodeServer (bool verbose, int num_threads)
#endif
, _verbose (verbose)
, _num_threads (num_threads)
+ , _frames_encoded(0)
{
}
@@ -165,6 +166,8 @@ EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, st
throw;
}
+ ++_frames_encoded;
+
return dcp_video_frame.index ();
}
diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h
index f93d66746..8059abd0f 100644
--- a/src/lib/encode_server.h
+++ b/src/lib/encode_server.h
@@ -32,6 +32,7 @@
#include "exception_store.h"
#include "server.h"
#include <boost/asio.hpp>
+#include <boost/atomic.hpp>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <string>
@@ -53,6 +54,10 @@ public:
void run () override;
+ int frames_encoded() const {
+ return _frames_encoded;
+ }
+
private:
void handle (std::shared_ptr<Socket>) override;
void worker_thread ();
@@ -67,6 +72,7 @@ private:
bool _verbose;
int _num_threads;
Waker _waker;
+ boost::atomic<int> _frames_encoded;
struct Broadcast {
diff --git a/src/lib/encode_server_finder.h b/src/lib/encode_server_finder.h
index f8a30af54..c478387f9 100644
--- a/src/lib/encode_server_finder.h
+++ b/src/lib/encode_server_finder.h
@@ -50,8 +50,6 @@ public:
static EncodeServerFinder* instance ();
static void drop ();
- void stop ();
-
std::list<EncodeServerDescription> servers () const;
/** Emitted whenever the list of servers changes */
@@ -62,6 +60,7 @@ private:
~EncodeServerFinder ();
void start ();
+ void stop ();
void search_thread ();
void listen_thread ();
diff --git a/src/lib/encoder.h b/src/lib/encoder.h
index 9b67720d3..aeaf7f620 100644
--- a/src/lib/encoder.h
+++ b/src/lib/encoder.h
@@ -58,6 +58,8 @@ public:
/** @return the number of frames that are done */
virtual Frame frames_done () const = 0;
virtual bool finishing () const = 0;
+ virtual void pause() {}
+ virtual void resume() {}
protected:
std::shared_ptr<const Film> _film;
diff --git a/src/lib/grok/context.h b/src/lib/grok/context.h
new file mode 100644
index 000000000..521faae8d
--- /dev/null
+++ b/src/lib/grok/context.h
@@ -0,0 +1,291 @@
+/*
+ Copyright (C) 2023 Grok Image Compression Inc.
+
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ DCP-o-matic is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+#pragma once
+
+
+#include "../config.h"
+#include "../dcp_video.h"
+#include "../film.h"
+#include "../log.h"
+#include "../dcpomatic_log.h"
+#include "../writer.h"
+#include "messenger.h"
+#include <dcp/array_data.h>
+#include <boost/filesystem.hpp>
+
+
+static std::mutex launchMutex;
+
+namespace grk_plugin
+{
+
+struct GrokLogger : public MessengerLogger {
+ explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
+ {}
+ virtual ~GrokLogger() = default;
+ void info(const char* fmt, ...) override{
+ va_list arg;
+ va_start(arg, fmt);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
+ va_end(arg);
+ }
+ void warn(const char* fmt, ...) override{
+ va_list arg;
+ va_start(arg, fmt);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
+ va_end(arg);
+ }
+ void error(const char* fmt, ...) override{
+ va_list arg;
+ va_start(arg, fmt);
+ dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
+ va_end(arg);
+ }
+};
+
+struct FrameProxy {
+ FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
+ {}
+ int index() const {
+ return index_;
+ }
+ Eyes eyes(void) const {
+ return eyes_;
+ }
+ int index_;
+ Eyes eyes_;
+ DCPVideo vf;
+};
+
+struct DcpomaticContext
+{
+ DcpomaticContext(
+ std::shared_ptr<const Film> film_,
+ Writer& writer_,
+ EventHistory& history_,
+ boost::filesystem::path const& location_
+ )
+ : film(film_)
+ , writer(writer_)
+ , history(history_)
+ , location(location_)
+ {
+
+ }
+
+ void set_dimensions(uint32_t w, uint32_t h)
+ {
+ width = w;
+ height = h;
+ }
+
+ std::shared_ptr<const Film> film;
+ Writer& writer;
+ EventHistory& history;
+ boost::filesystem::path location;
+ uint32_t width = 0;
+ uint32_t height = 0;
+};
+
+
+class GrokContext
+{
+public:
+ explicit GrokContext(DcpomaticContext* dcpomatic_context)
+ : _dcpomatic_context(dcpomatic_context)
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+ if (!grok.enable) {
+ return;
+ }
+
+ boost::filesystem::path folder(_dcpomatic_context->location);
+ boost::filesystem::path binary_path = folder / "grk_compress";
+ if (!boost::filesystem::exists(binary_path)) {
+ getMessengerLogger()->error(
+ "Invalid binary location %s", _dcpomatic_context->location.c_str()
+ );
+ return;
+ }
+
+ auto proc = [this](const std::string& str) {
+ try {
+ Msg msg(str);
+ auto tag = msg.next();
+ if (tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED) {
+ auto clientFrameId = msg.nextUint();
+ msg.nextUint(); // compressed frame ID
+ auto compressedFrameLength = msg.nextUint();
+ auto processor = [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength) {
+ auto compressed_data = std::make_shared<dcp::ArrayData>(compressed, compressedFrameLength);
+ _dcpomatic_context->writer.write(compressed_data, srcFrame.index(), srcFrame.eyes());
+ frame_done ();
+ };
+
+ int const minimum_size = 16384;
+
+ bool needsRecompression = compressedFrameLength < minimum_size;
+ _messenger->processCompressed(str, processor, needsRecompression);
+
+ if (needsRecompression) {
+ auto fp = _messenger->retrieve(clientFrameId);
+ if (!fp) {
+ return;
+ }
+
+ auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
+ _dcpomatic_context->writer.write(encoded, fp->vf.index(), fp->vf.eyes());
+ frame_done ();
+ }
+ }
+ } catch (std::exception& ex) {
+ getMessengerLogger()->error("%s",ex.what());
+ }
+ };
+
+ auto clientInit = MessengerInit(
+ clientToGrokMessageBuf,
+ clientSentSynch,
+ grokReceiveReadySynch,
+ grokToClientMessageBuf,
+ grokSentSynch,
+ clientReceiveReadySynch,
+ proc,
+ std::thread::hardware_concurrency()
+ );
+
+ _messenger = new ScheduledMessenger<FrameProxy>(clientInit);
+ }
+
+ ~GrokContext()
+ {
+ shutdown();
+ }
+
+ bool launch(DCPVideo dcpv, int device)
+ {
+ namespace fs = boost::filesystem;
+
+ if (!_messenger) {
+ return false;
+ }
+ if (_launched) {
+ return true;
+ }
+ if (_launch_failed) {
+ return false;
+ }
+
+ std::unique_lock<std::mutex> lk_global(launchMutex);
+
+ if (!_messenger) {
+ return false;
+ }
+ if (_launched) {
+ return true;
+ }
+ if (_launch_failed) {
+ return false;
+ }
+
+ if (MessengerInit::firstLaunch(true)) {
+
+ if (!fs::exists(_dcpomatic_context->location) || !fs::is_directory(_dcpomatic_context->location)) {
+ getMessengerLogger()->error("Invalid directory %s", _dcpomatic_context->location.c_str());
+ return false;
+ }
+
+ auto s = dcpv.get_size();
+ _dcpomatic_context->set_dimensions(s.width, s.height);
+ auto grok = Config::instance()->grok().get_value_or({});
+ if (!_messenger->launchGrok(
+ _dcpomatic_context->location,
+ _dcpomatic_context->width,
+ _dcpomatic_context->width,
+ _dcpomatic_context->height,
+ 3,
+ 12,
+ device,
+ _dcpomatic_context->film->resolution() == Resolution::FOUR_K,
+ _dcpomatic_context->film->video_frame_rate(),
+ _dcpomatic_context->film->j2k_bandwidth(),
+ grok.licence_server,
+ grok.licence_port,
+ grok.licence)) {
+ _launch_failed = true;
+ return false;
+ }
+ }
+
+ _launched = _messenger->waitForClientInit();
+ _launch_failed = _launched;
+
+ return _launched;
+ }
+
+ bool scheduleCompress(DCPVideo const& vf)
+ {
+ if (!_messenger) {
+ return false;
+ }
+
+ auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
+ auto cvt = [this, &fp](BufferSrc src) {
+ fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
+ };
+
+ return _messenger->scheduleCompress(fp, cvt);
+ }
+
+ void shutdown()
+ {
+ if (!_messenger) {
+ return;
+ }
+
+ std::unique_lock<std::mutex> lk_global(launchMutex);
+
+ if (!_messenger) {
+ return;
+ }
+
+ if (_launched) {
+ _messenger->shutdown();
+ }
+
+ delete _messenger;
+ _messenger = nullptr;
+ }
+
+ void frame_done()
+ {
+ _dcpomatic_context->history.event();
+ }
+
+private:
+ DcpomaticContext* _dcpomatic_context;
+ ScheduledMessenger<FrameProxy>* _messenger = nullptr;
+ bool _launched = false;
+ bool _launch_failed = false;
+};
+
+}
+
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
new file mode 100644
index 000000000..eb2fe9560
--- /dev/null
+++ b/src/lib/grok/messenger.h
@@ -0,0 +1,906 @@
+/*
+ Copyright (C) 2023 Grok Image Compression Inc.
+
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ DCP-o-matic is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+#pragma once
+
+#include <iostream>
+#include <string>
+#include <cstring>
+#include <atomic>
+#include <functional>
+#include <sstream>
+#include <future>
+#include <map>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <cassert>
+#include <cstdarg>
+
+#ifdef _WIN32
+#include <windows.h>
+#include <direct.h>
+#include <tlhelp32.h>
+#pragma warning(disable : 4100)
+#else
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <semaphore.h>
+#include <signal.h>
+#endif
+
+namespace grk_plugin
+{
+static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
+static std::string grokSentSynch = "Global\\grok_sent";
+static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
+static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
+static std::string clientSentSynch = "Global\\client_sent";
+static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
+static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
+static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
+static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
+static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
+static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
+ "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
+static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
+ "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
+static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
+static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
+static const size_t messageBufferLen = 256;
+struct IMessengerLogger
+{
+ virtual ~IMessengerLogger(void) = default;
+ virtual void info(const char* fmt, ...) = 0;
+ virtual void warn(const char* fmt, ...) = 0;
+ virtual void error(const char* fmt, ...) = 0;
+
+ protected:
+ template<typename... Args>
+ std::string log_message(char const* const format, Args&... args) noexcept
+ {
+ constexpr size_t message_size = 512;
+ char message[message_size];
+
+ std::snprintf(message, message_size, format, args...);
+ return std::string(message);
+ }
+};
+struct MessengerLogger : public IMessengerLogger
+{
+ explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
+ virtual ~MessengerLogger() = default;
+ virtual void info(const char* fmt, ...) override
+ {
+ va_list args;
+ std::string new_fmt = preamble_ + fmt + "\n";
+ va_start(args, fmt);
+ vfprintf(stdout, new_fmt.c_str(), args);
+ va_end(args);
+ }
+ virtual void warn(const char* fmt, ...) override
+ {
+ va_list args;
+ std::string new_fmt = preamble_ + fmt + "\n";
+ va_start(args, fmt);
+ vfprintf(stdout, new_fmt.c_str(), args);
+ va_end(args);
+ }
+ virtual void error(const char* fmt, ...) override
+ {
+ va_list args;
+ std::string new_fmt = preamble_ + fmt + "\n";
+ va_start(args, fmt);
+ vfprintf(stderr, new_fmt.c_str(), args);
+ va_end(args);
+ }
+
+ protected:
+ std::string preamble_;
+};
+
+extern IMessengerLogger* sLogger;
+void setMessengerLogger(IMessengerLogger* logger);
+IMessengerLogger* getMessengerLogger(void);
+
+struct MessengerInit
+{
+ MessengerInit(const std::string &outBuf, const std::string &outSent,
+ const std::string &outReceiveReady, const std::string &inBuf,
+ const std::string &inSent,
+ const std::string &inReceiveReady,
+ std::function<void(std::string)> processor,
+ size_t numProcessingThreads)
+ : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
+ outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
+ inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
+ numProcessingThreads_(numProcessingThreads),
+ uncompressedFrameSize_(0), compressedFrameSize_(0),
+ numFrames_(0)
+ {
+ if(firstLaunch(true))
+ unlink();
+ }
+ void unlink(void)
+ {
+#ifndef _WIN32
+ shm_unlink(grokToClientMessageBuf.c_str());
+ shm_unlink(clientToGrokMessageBuf.c_str());
+#endif
+ }
+ static bool firstLaunch(bool isClient)
+ {
+ bool debugGrok = false;
+ return debugGrok != isClient;
+ }
+ std::string outboundMessageBuf;
+ std::string outboundSentSynch;
+ std::string outboundReceiveReadySynch;
+
+ std::string inboundMessageBuf;
+ std::string inboundSentSynch;
+ std::string inboundReceiveReadySynch;
+
+ std::function<void(std::string)> processor_;
+ size_t numProcessingThreads_;
+
+ size_t uncompressedFrameSize_;
+ size_t compressedFrameSize_;
+ size_t numFrames_;
+};
+
+/*************************** Synchronization *******************************/
+enum SynchDirection
+{
+ SYNCH_SENT,
+ SYNCH_RECEIVE_READY
+};
+
+typedef int grk_handle;
+struct Synch
+{
+ Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
+ : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
+ {
+ // unlink semaphores in case of previous crash
+ if(MessengerInit::firstLaunch(true))
+ unlink();
+ open();
+ }
+ ~Synch()
+ {
+ close();
+ if(MessengerInit::firstLaunch(true))
+ unlink();
+ }
+ void post(SynchDirection dir)
+ {
+ auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
+ int rc = sem_post(sem);
+ if(rc)
+ getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
+ }
+ void wait(SynchDirection dir)
+ {
+ auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
+ int rc = sem_wait(sem);
+ if(rc)
+ getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
+ }
+ void open(void)
+ {
+ sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
+ if(!sentSem_)
+ getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+ receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
+ if(!receiveReadySem_)
+ getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+ }
+ void close(void)
+ {
+ int rc = sem_close(sentSem_);
+ if(rc)
+ getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
+ strerror(errno));
+ rc = sem_close(receiveReadySem_);
+ if(rc)
+ getMessengerLogger()->error("Error closing semaphore %s: %s",
+ receiveReadySemName_.c_str(), strerror(errno));
+ }
+ void unlink(void)
+ {
+ int rc = sem_unlink(sentSemName_.c_str());
+ if(rc == -1 && errno != ENOENT)
+ getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
+ strerror(errno));
+ rc = sem_unlink(receiveReadySemName_.c_str());
+ if(rc == -1 && errno != ENOENT)
+ getMessengerLogger()->error("Error unlinking semaphore %s: %s",
+ receiveReadySemName_.c_str(), strerror(errno));
+ }
+ sem_t* sentSem_;
+ sem_t* receiveReadySem_;
+
+ private:
+ std::string sentSemName_;
+ std::string receiveReadySemName_;
+};
+struct SharedMemoryManager
+{
+ static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
+ {
+ *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
+ if(*shm_fd < 0)
+ {
+ getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+ return false;
+ }
+ int rc = ftruncate(*shm_fd, sizeof(char) * len);
+ if(rc)
+ {
+ getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
+ rc = close(*shm_fd);
+ if(rc)
+ getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+ rc = shm_unlink(name.c_str());
+ // 2 == No such file or directory
+ if(rc && errno != 2)
+ getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+ return false;
+ }
+ *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
+ if(!*buffer)
+ {
+ getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
+ rc = close(*shm_fd);
+ if(rc)
+ getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+ rc = shm_unlink(name.c_str());
+ // 2 == No such file or directory
+ if(rc && errno != 2)
+ getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+ }
+
+ return *buffer != nullptr;
+ }
+ static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
+ {
+ if (!*buffer || !shm_fd)
+ return true;
+
+ int rc = munmap(*buffer, len);
+ *buffer = nullptr;
+ if(rc)
+ getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
+ rc = close(shm_fd);
+ shm_fd = 0;
+ if(rc)
+ getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
+ rc = shm_unlink(name.c_str());
+ // 2 == No such file or directory
+ if(rc && errno != 2)
+ fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
+
+ return true;
+ }
+};
+
+template<typename Data>
+class MessengerBlockingQueue
+{
+ public:
+ explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
+ MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
+ size_t size() const
+ {
+ return queue_.size();
+ }
+ // deactivate and clear queue
+ void deactivate()
+ {
+ {
+ std::lock_guard<std::mutex> lk(mutex_);
+ active_ = false;
+ while(!queue_.empty())
+ queue_.pop();
+ }
+
+ // release all waiting threads
+ can_pop_.notify_all();
+ can_push_.notify_all();
+ }
+ void activate()
+ {
+ std::lock_guard<std::mutex> lk(mutex_);
+ active_ = true;
+ }
+ bool push(Data const& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ rc = push_(value);
+ }
+ if(rc)
+ can_pop_.notify_one();
+
+ return rc;
+ }
+ bool waitAndPush(Data& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ if(!active_)
+ return false;
+ // in case of spurious wakeup, loop until predicate in lambda
+ // is satisfied.
+ can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
+ rc = push_(value);
+ }
+ if(rc)
+ can_pop_.notify_one();
+
+ return rc;
+ }
+ bool pop(Data& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ rc = pop_(value);
+ }
+ if(rc)
+ can_push_.notify_one();
+
+ return rc;
+ }
+ bool waitAndPop(Data& value)
+ {
+ bool rc;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ if(!active_)
+ return false;
+ // in case of spurious wakeup, loop until predicate in lambda
+ // is satisfied.
+ can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
+ rc = pop_(value);
+ }
+ if(rc)
+ can_push_.notify_one();
+
+ return rc;
+ }
+
+ private:
+ bool push_(Data const& value)
+ {
+ if(queue_.size() == max_size_ || !active_)
+ return false;
+ queue_.push(value);
+
+ return true;
+ }
+ bool pop_(Data& value)
+ {
+ if(queue_.empty() || !active_)
+ return false;
+ value = queue_.front();
+ queue_.pop();
+
+ return true;
+ }
+ std::queue<Data> queue_;
+ mutable std::mutex mutex_;
+ std::condition_variable can_pop_;
+ std::condition_variable can_push_;
+ bool active_;
+ size_t max_size_;
+};
+struct BufferSrc
+{
+ BufferSrc(void) : BufferSrc("") {}
+ explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
+ {}
+ BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
+ : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
+ {}
+ bool fromDisk(void)
+ {
+ return !file_.empty() && framePtr_ == nullptr;
+ }
+ size_t index() const
+ {
+ return clientFrameId_;
+ }
+ std::string file_;
+ size_t clientFrameId_;
+ size_t frameId_;
+ uint8_t* framePtr_;
+};
+
+struct Messenger;
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
+
+struct Messenger
+{
+ explicit Messenger(MessengerInit init)
+ : running(true), _initialized(false), _shutdown(false), init_(init),
+ outboundSynch_(nullptr),
+ inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
+ uncompressed_fd_(0), compressed_fd_(0)
+ {}
+ virtual ~Messenger(void)
+ {
+ running = false;
+ sendQueue.deactivate();
+ receiveQueue.deactivate();
+
+ if (outboundSynch_) {
+ outboundSynch_->post(SYNCH_RECEIVE_READY);
+ outbound.join();
+ }
+
+ if (inboundSynch_) {
+ inboundSynch_->post(SYNCH_SENT);
+ inbound.join();
+ }
+
+ for(auto& p : processors_)
+ p.join();
+
+ delete outboundSynch_;
+ delete inboundSynch_;
+
+ deinitShm();
+ }
+ void startThreads(void) {
+ outboundSynch_ =
+ new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
+ outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
+
+ inboundSynch_ =
+ new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
+ inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
+
+ for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
+ processors_.push_back(std::thread(processorThread, this, init_.processor_));
+ }
+ bool initBuffers(void)
+ {
+ bool rc = true;
+ if(init_.uncompressedFrameSize_)
+ {
+ rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ &uncompressed_fd_, &uncompressed_buffer_);
+ }
+ if(init_.compressedFrameSize_)
+ {
+ rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
+ init_.compressedFrameSize_ * init_.numFrames_,
+ &compressed_fd_, &compressed_buffer_);
+ }
+
+ return rc;
+ }
+
+ bool deinitShm(void)
+ {
+ bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ uncompressed_fd_, &uncompressed_buffer_);
+ rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
+ init_.compressedFrameSize_ * init_.numFrames_,
+ compressed_fd_, &compressed_buffer_);
+
+ return rc;
+ }
+ template<typename... Args>
+ void send(const std::string& str, Args... args)
+ {
+ std::ostringstream oss;
+ oss << str;
+ int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
+ static_cast<void>(dummy);
+
+ sendQueue.push(oss.str());
+ }
+
+ bool launchGrok(
+ boost::filesystem::path const& dir,
+ uint32_t width,
+ uint32_t stride,
+ uint32_t height,
+ uint32_t samplesPerPixel,
+ uint32_t depth,
+ int device,
+ bool is4K,
+ uint32_t fps,
+ uint32_t bandwidth,
+ const std::string server,
+ uint32_t port,
+ const std::string license
+ )
+ {
+
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+ if (async_result_.valid())
+ return true;
+ if(MessengerInit::firstLaunch(true))
+ init_.unlink();
+ startThreads();
+ char _cmd[4096];
+ auto fullServer = server + ":" + std::to_string(port);
+ sprintf(_cmd,
+ "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -k 1 "
+ "-G %d -%s %d,%d -j %s -J %s -v",
+ GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
+ device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
+ license.c_str(), fullServer.c_str());
+
+ return launch(_cmd, dir);
+ }
+ void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
+ {
+ // client fills queue with pending uncompressed buffers
+ init_.uncompressedFrameSize_ = uncompressedFrameSize;
+ init_.compressedFrameSize_ = compressedFrameSize;
+ init_.numFrames_ = numFrames;
+ initBuffers();
+ auto ptr = uncompressed_buffer_;
+ for(size_t i = 0; i < init_.numFrames_; ++i)
+ {
+ availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
+ ptr += init_.uncompressedFrameSize_;
+ }
+
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+ _initialized = true;
+ clientInitializedCondition_.notify_all();
+ }
+
+ bool waitForClientInit()
+ {
+ if (_initialized) {
+ return true;
+ } else if (_shutdown) {
+ return false;
+ }
+
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+
+ if (_initialized) {
+ return true;
+ } else if (_shutdown) {
+ return false;
+ }
+
+ while (true) {
+ if (clientInitializedCondition_.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) {
+ break;
+ }
+ auto status = async_result_.wait_for(std::chrono::milliseconds(100));
+ if (status == std::future_status::ready) {
+ getMessengerLogger()->error("Grok exited unexpectedly during initialization");
+ return false;
+ }
+ }
+
+ return _initialized && !_shutdown;
+ }
+
+ static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
+ {
+ return sizeof(uint16_t) * w * h * samplesPerPixel;
+ }
+ void reclaimCompressed(size_t frameId)
+ {
+ availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
+ }
+ void reclaimUncompressed(size_t frameId)
+ {
+ availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
+ }
+ uint8_t* getUncompressedFrame(size_t frameId)
+ {
+ assert(frameId < init_.numFrames_);
+ if(frameId >= init_.numFrames_)
+ return nullptr;
+
+ return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
+ }
+ uint8_t* getCompressedFrame(size_t frameId)
+ {
+ assert(frameId < init_.numFrames_);
+ if(frameId >= init_.numFrames_)
+ return nullptr;
+
+ return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
+ }
+ std::atomic_bool running;
+ bool _initialized;
+ bool _shutdown;
+ MessengerBlockingQueue<std::string> sendQueue;
+ MessengerBlockingQueue<std::string> receiveQueue;
+ MessengerBlockingQueue<BufferSrc> availableBuffers_;
+ MessengerInit init_;
+ std::string cmd_;
+ std::future<int> async_result_;
+ std::mutex shutdownMutex_;
+ std::condition_variable shutdownCondition_;
+
+ protected:
+ std::condition_variable clientInitializedCondition_;
+ private:
+ bool launch(std::string const& cmd, boost::filesystem::path const& dir)
+ {
+ // Change the working directory
+ if(!dir.empty())
+ {
+ boost::system::error_code ec;
+ boost::filesystem::current_path(dir, ec);
+ if (ec) {
+ getMessengerLogger()->error("Error: failed to change the working directory");
+ return false;
+ }
+ }
+ // Execute the command using std::async and std::system
+ cmd_ = cmd;
+ getMessengerLogger()->info(cmd.c_str());
+ async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
+ bool success = async_result_.valid();
+ if (!success)
+ getMessengerLogger()->error("Grok launch failed");
+
+ return success;
+
+ }
+ std::thread outbound;
+ Synch* outboundSynch_;
+
+ std::thread inbound;
+ Synch* inboundSynch_;
+
+ std::vector<std::thread> processors_;
+ char* uncompressed_buffer_;
+ char* compressed_buffer_;
+
+ grk_handle uncompressed_fd_;
+ grk_handle compressed_fd_;
+};
+
+static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
+{
+ grk_handle shm_fd = 0;
+ char* send_buffer = nullptr;
+
+ if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
+ return;
+ while(messenger->running)
+ {
+ synch->wait(SYNCH_RECEIVE_READY);
+ if(!messenger->running)
+ break;
+ std::string message;
+ if(!messenger->sendQueue.waitAndPop(message))
+ break;
+ if(!messenger->running)
+ break;
+ memcpy(send_buffer, message.c_str(), message.size() + 1);
+ synch->post(SYNCH_SENT);
+ }
+ SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
+}
+
+static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
+{
+ grk_handle shm_fd = 0;
+ char* receive_buffer = nullptr;
+
+ if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
+ return;
+ while(messenger->running)
+ {
+ synch->wait(SYNCH_SENT);
+ if(!messenger->running)
+ break;
+ auto message = std::string(receive_buffer);
+ synch->post(SYNCH_RECEIVE_READY);
+ messenger->receiveQueue.push(message);
+ }
+ SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
+}
+struct Msg
+{
+ explicit Msg(const std::string &msg) : ct_(0)
+ {
+ std::stringstream ss(msg);
+ while(ss.good())
+ {
+ std::string substr;
+ std::getline(ss, substr, ',');
+ cs_.push_back(substr);
+ }
+ }
+ std::string next()
+ {
+ if(ct_ == cs_.size())
+ {
+ getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
+ return "";
+ }
+ return cs_[ct_++];
+ }
+
+ uint32_t nextUint(void)
+ {
+ return (uint32_t)std::stoi(next());
+ }
+
+ std::vector<std::string> cs_;
+ size_t ct_;
+};
+
+static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
+{
+ while (messenger->running) {
+ std::string message;
+ if (!messenger->receiveQueue.waitAndPop(message)) {
+ break;
+ }
+
+ if (!messenger->running) {
+ break;
+ }
+
+ Msg msg(message);
+ auto tag = msg.next();
+ if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) {
+ auto width = msg.nextUint();
+ msg.nextUint(); // stride
+ auto height = msg.nextUint();
+ auto samples_per_pixel = msg.nextUint();
+ msg.nextUint(); // depth
+ messenger->init_.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel);
+ auto compressed_frame_size = msg.nextUint();
+ auto num_frames = msg.nextUint();
+ messenger->initClient(compressed_frame_size, compressed_frame_size, num_frames);
+ } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) {
+ messenger->reclaimUncompressed(msg.nextUint());
+ } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) {
+ messenger->reclaimCompressed(msg.nextUint());
+ }
+ processor(message);
+ }
+}
+
+template<typename F>
+struct ScheduledFrames
+{
+ void store(F const& val)
+ {
+ std::unique_lock<std::mutex> lk(mapMutex_);
+ auto it = map_.find(val.index());
+ if (it == map_.end())
+ map_.emplace(std::make_pair(val.index(), val));
+ }
+ boost::optional<F> retrieve(size_t index)
+ {
+ std::unique_lock<std::mutex> lk(mapMutex_);
+ auto it = map_.find(index);
+ if(it == map_.end())
+ return {};
+
+ F val = it->second;
+ map_.erase(index);
+
+ return val;
+ }
+
+ private:
+ std::mutex mapMutex_;
+ std::map<size_t, F> map_;
+};
+
+template<typename F>
+struct ScheduledMessenger : public Messenger
+{
+ explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
+ framesScheduled_(0),
+ framesCompressed_(0)
+ {}
+ ~ScheduledMessenger(void) {
+ shutdown();
+ }
+ bool scheduleCompress(F const& proxy, std::function<void(BufferSrc const&)> converter){
+ size_t frameSize = init_.uncompressedFrameSize_;
+ assert(frameSize >= init_.uncompressedFrameSize_);
+ BufferSrc src;
+ if(!availableBuffers_.waitAndPop(src))
+ return false;
+ converter(src);
+ scheduledFrames_.store(proxy);
+ framesScheduled_++;
+ send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
+
+ return true;
+ }
+ void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
+ Msg msg(message);
+ msg.next();
+ auto clientFrameId = msg.nextUint();
+ auto compressedFrameId = msg.nextUint();
+ auto compressedFrameLength = msg.nextUint();
+ if (!needsRecompression) {
+ auto src_frame = scheduledFrames_.retrieve(clientFrameId);
+ if (!src_frame) {
+ return;
+ }
+ processor(*src_frame, getCompressedFrame(compressedFrameId),compressedFrameLength);
+ }
+ ++framesCompressed_;
+ send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
+ if (_shutdown && framesCompressed_ == framesScheduled_)
+ shutdownCondition_.notify_all();
+ }
+ void shutdown(void){
+ try {
+ std::unique_lock<std::mutex> lk(shutdownMutex_);
+ if (!async_result_.valid())
+ return;
+ _shutdown = true;
+ if (framesScheduled_) {
+ uint32_t scheduled = framesScheduled_;
+ send(GRK_MSGR_BATCH_FLUSH, scheduled);
+ shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
+ }
+ availableBuffers_.deactivate();
+ send(GRK_MSGR_BATCH_SHUTDOWN);
+ int result = async_result_.get();
+ if(result != 0)
+ getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
+ } catch (std::exception &ex) {
+ getMessengerLogger()->error("%s",ex.what());
+ }
+
+ }
+
+ boost::optional<F> retrieve(size_t index) {
+ return scheduledFrames_.retrieve(index);
+ }
+
+ void store(F& val) {
+ scheduledFrames_.store(val);
+ }
+
+private:
+ ScheduledFrames<F> scheduledFrames_;
+ std::atomic<uint32_t> framesScheduled_;
+ std::atomic<uint32_t> framesCompressed_;
+};
+
+} // namespace grk_plugin
diff --git a/src/lib/grok_j2k_encoder_thread.cc b/src/lib/grok_j2k_encoder_thread.cc
new file mode 100644
index 000000000..79fb1bbae
--- /dev/null
+++ b/src/lib/grok_j2k_encoder_thread.cc
@@ -0,0 +1,51 @@
+#include "config.h"
+#include "cross.h"
+#include "dcpomatic_log.h"
+#include "dcp_video.h"
+#include "grok/context.h"
+#include "grok_j2k_encoder_thread.h"
+#include "j2k_encoder.h"
+#include "scope_guard.h"
+#include "util.h"
+
+#include "i18n.h"
+
+
+using std::make_shared;
+using std::shared_ptr;
+
+
+GrokJ2KEncoderThread::GrokJ2KEncoderThread(J2KEncoder& encoder, grk_plugin::GrokContext* context)
+ : J2KEncoderThread(encoder)
+ , _context(context)
+{
+
+}
+
+
+void
+GrokJ2KEncoderThread::run()
+try
+{
+ while (true)
+ {
+ LOG_TIMING("encoder-sleep thread=%1", thread_id());
+ auto frame = _encoder.pop();
+
+ ScopeGuard frame_guard([this, &frame]() {
+ LOG_ERROR("Failed to schedule encode of %1 using grok", frame.index());
+ _encoder.retry(frame);
+ });
+
+ LOG_TIMING("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), frame.index(), static_cast<int>(frame.eyes()));
+
+ auto grok = Config::instance()->grok().get_value_or({});
+
+ if (_context->launch(frame, grok.selected) && _context->scheduleCompress(frame)) {
+ frame_guard.cancel();
+ }
+ }
+} catch (boost::thread_interrupted& e) {
+} catch (...) {
+ store_current();
+}
diff --git a/src/lib/grok_j2k_encoder_thread.h b/src/lib/grok_j2k_encoder_thread.h
new file mode 100644
index 000000000..8171830a9
--- /dev/null
+++ b/src/lib/grok_j2k_encoder_thread.h
@@ -0,0 +1,23 @@
+#include "exception_store.h"
+#include "j2k_encoder_thread.h"
+#include <dcp/data.h>
+
+
+class DCPVideo;
+
+namespace grk_plugin {
+ class GrokContext;
+}
+
+
+class GrokJ2KEncoderThread : public J2KEncoderThread, public ExceptionStore
+{
+public:
+ GrokJ2KEncoderThread(J2KEncoder& encoder, grk_plugin::GrokContext* context);
+
+ void run() override;
+
+private:
+ grk_plugin::GrokContext* _context;
+};
+
diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc
index d2e840f85..e68402483 100644
--- a/src/lib/j2k_encoder.cc
+++ b/src/lib/j2k_encoder.cc
@@ -32,6 +32,12 @@
#include "encode_server_description.h"
#include "encode_server_finder.h"
#include "film.h"
+#include "cpu_j2k_encoder_thread.h"
+#ifdef DCPOMATIC_GROK
+#include "grok/context.h"
+#include "grok_j2k_encoder_thread.h"
+#endif
+#include "remote_j2k_encoder_thread.h"
#include "j2k_encoder.h"
#include "log.h"
#include "player_video.h"
@@ -44,6 +50,7 @@
using std::cout;
+using std::dynamic_pointer_cast;
using std::exception;
using std::list;
using std::make_shared;
@@ -53,6 +60,33 @@ using boost::optional;
using dcp::Data;
using namespace dcpomatic;
+#ifdef DCPOMATIC_GROK
+
+namespace grk_plugin {
+
+IMessengerLogger* sLogger = nullptr;
+
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wunused-function"
+#endif
+void setMessengerLogger(grk_plugin::IMessengerLogger* logger)
+{
+ delete sLogger;
+ sLogger = logger;
+}
+#if defined(__GNUC__) || defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+grk_plugin::IMessengerLogger* getMessengerLogger(void)
+{
+ return sLogger;
+}
+
+}
+
+#endif
+
/** @param film Film that we are encoding.
* @param writer Writer that we are using.
@@ -62,6 +96,13 @@ J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
, _history (200)
, _writer (writer)
{
+#ifdef DCPOMATIC_GROK
+ auto grok = Config::instance()->grok().get_value_or({});
+ _dcpomatic_context = new grk_plugin::DcpomaticContext(film, writer, _history, grok.binary_location);
+ if (grok.enable) {
+ _context = new grk_plugin::GrokContext(_dcpomatic_context);
+ }
+#endif
servers_list_changed ();
}
@@ -70,8 +111,29 @@ J2KEncoder::~J2KEncoder ()
{
_server_found_connection.disconnect();
- boost::mutex::scoped_lock lm (_threads_mutex);
- terminate_threads ();
+ terminate_threads();
+
+#ifdef DCPOMATIC_GROK
+ delete _context;
+ delete _dcpomatic_context;
+#endif
+}
+
+
+void
+J2KEncoder::servers_list_changed()
+{
+ auto config = Config::instance();
+#ifdef DCPOMATIC_GROK
+ auto const grok_enable = config->grok().get_value_or({}).enable;
+#else
+ auto const grok_enable = false;
+#endif
+
+ auto const cpu = (grok_enable || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
+ auto const gpu = grok_enable ? config->master_encoding_threads() : 0;
+
+ remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
}
@@ -85,7 +147,40 @@ J2KEncoder::begin ()
void
-J2KEncoder::end ()
+J2KEncoder::pause()
+{
+#ifdef DCPOMATIC_GROK
+ if (!Config::instance()->grok().get_value_or({}).enable) {
+ return;
+ }
+ return;
+
+ terminate_threads ();
+
+ /* Something might have been thrown during terminate_threads */
+ rethrow ();
+
+ delete _context;
+ _context = nullptr;
+#endif
+}
+
+
+void J2KEncoder::resume()
+{
+#ifdef DCPOMATIC_GROK
+ if (!Config::instance()->grok().get_value_or({}).enable) {
+ return;
+ }
+
+ _context = new grk_plugin::GrokContext(_dcpomatic_context);
+ servers_list_changed();
+#endif
+}
+
+
+void
+J2KEncoder::end()
{
boost::mutex::scoped_lock lock (_queue_mutex);
@@ -94,18 +189,13 @@ J2KEncoder::end ()
/* Keep waking workers until the queue is empty */
while (!_queue.empty ()) {
rethrow ();
- _empty_condition.notify_all ();
_full_condition.wait (lock);
}
-
lock.unlock ();
LOG_GENERAL_NC (N_("Terminating encoder threads"));
- {
- boost::mutex::scoped_lock lm (_threads_mutex);
- terminate_threads ();
- }
+ terminate_threads ();
/* Something might have been thrown during terminate_threads */
rethrow ();
@@ -120,20 +210,35 @@ J2KEncoder::end ()
So just mop up anything left in the queue here.
*/
-
- for (auto const& i: _queue) {
- LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
- try {
- _writer.write(
- make_shared<dcp::ArrayData>(i.encode_locally()),
- i.index(),
- i.eyes()
- );
- frame_done ();
- } catch (std::exception& e) {
- LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+ for (auto & i: _queue) {
+#ifdef DCPOMATIC_GROK
+ if (Config::instance()->grok().get_value_or({}).enable) {
+ if (!_context->scheduleCompress(i)){
+ LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
+ // handle error
+ }
+ } else {
+#else
+ {
+#endif
+ LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
+ try {
+ _writer.write(
+ make_shared<dcp::ArrayData>(i.encode_locally()),
+ i.index(),
+ i.eyes()
+ );
+ frame_done ();
+ } catch (std::exception& e) {
+ LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+ }
}
}
+
+#ifdef DCPOMATIC_GROK
+ delete _context;
+ _context = nullptr;
+#endif
}
@@ -183,7 +288,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
size_t threads = 0;
{
boost::mutex::scoped_lock lm (_threads_mutex);
- threads = _threads->size();
+ threads = _threads.size();
}
boost::mutex::scoped_lock queue_lock (_queue_mutex);
@@ -223,13 +328,14 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
/* Queue this new frame for encoding */
LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
- _queue.push_back (DCPVideo(
+ auto dcpv = DCPVideo(
pv,
position,
_film->video_frame_rate(),
_film->j2k_bandwidth(),
_film->resolution()
- ));
+ );
+ _queue.push_back (dcpv);
/* The queue might not be empty any more, so notify anything which is
waiting on that.
@@ -242,170 +348,143 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
}
-/** Caller must hold a lock on _threads_mutex */
void
J2KEncoder::terminate_threads ()
{
+ boost::mutex::scoped_lock lm(_threads_mutex);
boost::this_thread::disable_interruption dis;
- if (!_threads) {
- return;
- }
-
- _threads->interrupt_all ();
- try {
- _threads->join_all ();
- } catch (exception& e) {
- LOG_ERROR ("join() threw an exception: %1", e.what());
- } catch (...) {
- LOG_ERROR_NC ("join() threw an exception");
+ for (auto& thread: _threads) {
+ thread->stop();
}
- _threads.reset ();
+ _threads.clear();
+ _ending = true;
}
void
-J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
-try
+#ifdef DCPOMATIC_GROK
+J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
+#else
+J2KEncoder::remake_threads(int cpu, int, list<EncodeServerDescription> servers)
+#endif
{
- start_of_thread ("J2KEncoder");
+ boost::mutex::scoped_lock lm (_threads_mutex);
+ if (_ending) {
+ return;
+ }
- if (server) {
- LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
- } else {
- LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
+ auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
+ for (auto i = wanted; i < current; ++i) {
+ auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
+ if (iter != _threads.end()) {
+ (*iter)->stop();
+ _threads.erase(iter);
+ }
+ }
+ };
+
+
+ /* CPU */
+
+ auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
+ return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
+ };
+
+ auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
+
+ for (auto i = current_cpu_threads; i < cpu; ++i) {
+ auto thread = make_shared<CPUJ2KEncoderThread>(*this);
+ thread->start();
+ _threads.push_back(thread);
}
- /* Number of seconds that we currently wait between attempts
- to connect to the server; not relevant for localhost
- encodings.
- */
- int remote_backoff = 0;
+ remove_threads(cpu, current_cpu_threads, is_cpu_thread);
+
+#ifdef DCPOMATIC_GROK
+ /* GPU */
+
+ auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
+ return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
+ };
+
+ auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
+
+ for (auto i = current_gpu_threads; i < gpu; ++i) {
+ auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
+ thread->start();
+ _threads.push_back(thread);
+ }
+
+ remove_threads(gpu, current_gpu_threads, is_grok_thread);
+#endif
- while (true) {
+ /* Remote */
- LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
- boost::mutex::scoped_lock lock (_queue_mutex);
- while (_queue.empty ()) {
- _empty_condition.wait (lock);
+ for (auto const& server: servers) {
+ if (!server.current_link_version()) {
+ continue;
}
- LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
- auto vf = _queue.front ();
+ auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
+ auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
+ return remote && remote->server().host_name() == server.host_name();
+ };
- /* We're about to commit to either encoding this frame or putting it back onto the queue,
- so we must not be interrupted until one or other of these things have happened. This
- block has thread interruption disabled.
- */
- {
- boost::this_thread::disable_interruption dis;
-
- LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
- _queue.pop_front ();
-
- lock.unlock ();
-
- shared_ptr<Data> encoded;
-
- /* We need to encode this input */
- if (server) {
- try {
- encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
-
- if (remote_backoff > 0) {
- LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
- }
-
- /* This job succeeded, so remove any backoff */
- remote_backoff = 0;
-
- } catch (std::exception& e) {
- if (remote_backoff < 60) {
- /* back off more */
- remote_backoff += 10;
- }
- LOG_ERROR (
- N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
- vf.index(), server->host_name(), e.what(), remote_backoff
- );
- }
-
- } else {
- try {
- LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
- encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
- LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
- } catch (std::exception& e) {
- /* This is very bad, so don't cope with it, just pass it on */
- LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
- throw;
- }
- }
+ auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
- if (encoded) {
- _writer.write(encoded, vf.index(), vf.eyes());
- frame_done ();
- } else {
- lock.lock ();
- LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
- _queue.push_front (vf);
- lock.unlock ();
- }
+ auto const wanted_threads = server.threads();
+
+ if (wanted_threads > current_threads) {
+ LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
+ } else if (wanted_threads < current_threads) {
+ LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
}
- if (remote_backoff > 0) {
- boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
+ for (auto i = current_threads; i < wanted_threads; ++i) {
+ auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
+ thread->start();
+ _threads.push_back(thread);
}
- /* The queue might not be full any more, so notify anything that is waiting on that */
- lock.lock ();
- _full_condition.notify_all ();
+ remove_threads(wanted_threads, current_threads, is_remote_thread);
}
-}
-catch (boost::thread_interrupted& e) {
- /* Ignore these and just stop the thread */
- _full_condition.notify_all ();
-}
-catch (...)
-{
- store_current ();
- /* Wake anything waiting on _full_condition so it can see the exception */
- _full_condition.notify_all ();
+
+ _writer.set_encoder_threads(_threads.size());
}
-void
-J2KEncoder::servers_list_changed ()
+DCPVideo
+J2KEncoder::pop()
{
- boost::mutex::scoped_lock lm (_threads_mutex);
+ boost::mutex::scoped_lock lock(_queue_mutex);
+ while (_queue.empty()) {
+ _empty_condition.wait (lock);
+ }
- terminate_threads ();
- _threads = make_shared<boost::thread_group>();
+ LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
- /* XXX: could re-use threads */
+ auto vf = _queue.front();
+ _queue.pop_front();
- if (!Config::instance()->only_servers_encode ()) {
- for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
-#ifdef DCPOMATIC_LINUX
- auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
- pthread_setname_np (t->native_handle(), "encode-worker");
-#else
- _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
-#endif
- }
- }
+ _full_condition.notify_all();
+ return vf;
+}
- for (auto i: EncodeServerFinder::instance()->servers()) {
- if (!i.current_link_version()) {
- continue;
- }
- LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
- for (int j = 0; j < i.threads(); ++j) {
- _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
- }
- }
+void
+J2KEncoder::retry(DCPVideo video)
+{
+ boost::mutex::scoped_lock lock(_queue_mutex);
+ _queue.push_front(video);
+ _empty_condition.notify_all();
+}
- _writer.set_encoder_threads(_threads->size());
+
+void
+J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
+{
+ _writer.write(data, index, eyes);
+ frame_done();
}
diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h
index 63228a6b8..6bfbaea49 100644
--- a/src/lib/j2k_encoder.h
+++ b/src/lib/j2k_encoder.h
@@ -32,6 +32,7 @@
#include "enum_indexed_vector.h"
#include "event_history.h"
#include "exception_store.h"
+#include "j2k_encoder_thread.h"
#include "writer.h"
#include <boost/optional.hpp>
#include <boost/signals2.hpp>
@@ -48,6 +49,15 @@ class Film;
class Job;
class PlayerVideo;
+namespace grk_plugin {
+ struct DcpomaticContext;
+ struct GrokContext;
+}
+
+struct local_threads_created_and_destroyed;
+struct remote_threads_created_and_destroyed;
+struct frames_not_lost_when_threads_disappear;
+
/** @class J2KEncoder
* @brief Class to manage encoding to J2K.
@@ -70,19 +80,27 @@ public:
/** Called to pass a bit of video to be encoded as the next DCP frame */
void encode (std::shared_ptr<PlayerVideo> pv, dcpomatic::DCPTime time);
+ void pause();
+ void resume();
+
/** Called when a processing run has finished */
- void end ();
+ void end();
boost::optional<float> current_encoding_rate () const;
int video_frames_enqueued () const;
- void servers_list_changed ();
+ DCPVideo pop();
+ void retry(DCPVideo frame);
+ void write(std::shared_ptr<const dcp::Data> data, int index, Eyes eyes);
private:
+ friend struct ::local_threads_created_and_destroyed;
+ friend struct ::remote_threads_created_and_destroyed;
+ friend struct ::frames_not_lost_when_threads_disappear;
void frame_done ();
-
- void encoder_thread (boost::optional<EncodeServerDescription>);
+ void servers_list_changed ();
+ void remake_threads(int cpu, int gpu, std::list<EncodeServerDescription> servers);
void terminate_threads ();
/** Film that we are encoding */
@@ -91,7 +109,7 @@ private:
EventHistory _history;
boost::mutex _threads_mutex;
- std::shared_ptr<boost::thread_group> _threads;
+ std::vector<std::shared_ptr<J2KEncoderThread>> _threads;
mutable boost::mutex _queue_mutex;
std::list<DCPVideo> _queue;
@@ -107,6 +125,13 @@ private:
boost::optional<dcpomatic::DCPTime> _last_player_video_time;
boost::signals2::scoped_connection _server_found_connection;
+
+#ifdef DCPOMATIC_GROK
+ grk_plugin::DcpomaticContext* _dcpomatic_context = nullptr;
+ grk_plugin::GrokContext *_context = nullptr;
+#endif
+
+ bool _ending = false;
};
diff --git a/src/lib/j2k_encoder_thread.cc b/src/lib/j2k_encoder_thread.cc
new file mode 100644
index 000000000..0199209be
--- /dev/null
+++ b/src/lib/j2k_encoder_thread.cc
@@ -0,0 +1,38 @@
+#include "dcp_video.h"
+#include "dcpomatic_log.h"
+#include "j2k_encoder.h"
+#include "j2k_encoder_thread.h"
+#include "scope_guard.h"
+
+
+J2KEncoderThread::J2KEncoderThread(J2KEncoder& encoder)
+ : _encoder(encoder)
+{
+
+}
+
+
+void
+J2KEncoderThread::start()
+{
+ _thread = boost::thread(boost::bind(&J2KEncoderThread::run, this));
+#ifdef DCPOMATIC_LINUX
+ pthread_setname_np(_thread.native_handle(), "encode-worker");
+#endif
+}
+
+
+void
+J2KEncoderThread::stop()
+{
+ _thread.interrupt();
+ try {
+ _thread.join();
+ } catch (std::exception& e) {
+ LOG_ERROR("join() threw an exception: %1", e.what());
+ } catch (...) {
+ LOG_ERROR_NC("join() threw an exception");
+ }
+}
+
+
diff --git a/src/lib/j2k_encoder_thread.h b/src/lib/j2k_encoder_thread.h
new file mode 100644
index 000000000..4a4b25f37
--- /dev/null
+++ b/src/lib/j2k_encoder_thread.h
@@ -0,0 +1,32 @@
+#ifndef DCPOMATIC_J2K_ENCODER_THREAD
+#define DCPOMATIC_J2K_ENCODER_THREAD
+
+
+#include <boost/thread.hpp>
+
+
+class J2KEncoder;
+
+
+class J2KEncoderThread
+{
+public:
+ J2KEncoderThread(J2KEncoder& encoder);
+
+ J2KEncoderThread(J2KEncoderThread const&) = delete;
+ J2KEncoderThread& operator=(J2KEncoderThread const&) = delete;
+
+ void start();
+ void stop();
+
+ virtual void run() = 0;
+
+protected:
+ J2KEncoder& _encoder;
+
+private:
+ boost::thread _thread;
+};
+
+
+#endif
diff --git a/src/lib/j2k_sync_encoder_thread.cc b/src/lib/j2k_sync_encoder_thread.cc
new file mode 100644
index 000000000..9398bcb85
--- /dev/null
+++ b/src/lib/j2k_sync_encoder_thread.cc
@@ -0,0 +1,44 @@
+#include "dcp_video.h"
+#include "dcpomatic_log.h"
+#include "j2k_encoder.h"
+#include "j2k_sync_encoder_thread.h"
+#include "scope_guard.h"
+
+
+J2KSyncEncoderThread::J2KSyncEncoderThread(J2KEncoder& encoder)
+ : J2KEncoderThread(encoder)
+{
+
+}
+
+
+void
+J2KSyncEncoderThread::run()
+try
+{
+ log_thread_start();
+
+ while (true) {
+ LOG_TIMING("encoder-sleep thread=%1", thread_id());
+ auto frame = _encoder.pop();
+
+ ScopeGuard frame_guard([this, &frame]() {
+ boost::this_thread::disable_interruption dis;
+ _encoder.retry(frame);
+ });
+
+ LOG_TIMING("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), frame.index(), static_cast<int>(frame.eyes()));
+
+ auto encoded = encode(frame);
+
+ if (encoded) {
+ boost::this_thread::disable_interruption dis;
+ frame_guard.cancel();
+ _encoder.write(encoded, frame.index(), frame.eyes());
+ }
+ }
+} catch (boost::thread_interrupted& e) {
+} catch (...) {
+ store_current();
+}
+
diff --git a/src/lib/j2k_sync_encoder_thread.h b/src/lib/j2k_sync_encoder_thread.h
new file mode 100644
index 000000000..45222279e
--- /dev/null
+++ b/src/lib/j2k_sync_encoder_thread.h
@@ -0,0 +1,32 @@
+#ifndef DCPOMATIC_J2K_SYNC_ENCODER_THREAD_H
+#define DCPOMATIC_J2K_SYNC_ENCODER_THREAD_H
+
+
+#include "exception_store.h"
+#include "j2k_encoder_thread.h"
+#include <dcp/array_data.h>
+#include <boost/thread.hpp>
+
+
+class DCPVideo;
+class J2KEncoder;
+
+
+class J2KSyncEncoderThread : public J2KEncoderThread, public ExceptionStore
+{
+public:
+ J2KSyncEncoderThread(J2KEncoder& encoder);
+
+ J2KSyncEncoderThread(J2KSyncEncoderThread const&) = delete;
+ J2KSyncEncoderThread& operator=(J2KSyncEncoderThread const&) = delete;
+
+ virtual ~J2KSyncEncoderThread() {}
+
+ void run() override;
+
+ virtual void log_thread_start() const = 0;
+ virtual std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) = 0;
+};
+
+
+#endif
diff --git a/src/lib/job.cc b/src/lib/job.cc
index 727456523..e900a427c 100644
--- a/src/lib/job.cc
+++ b/src/lib/job.cc
@@ -655,7 +655,7 @@ void
Job::cancel ()
{
if (_thread.joinable()) {
- resume();
+ Job::resume();
_thread.interrupt ();
_thread.join ();
@@ -682,6 +682,7 @@ Job::pause_by_user ()
}
if (paused) {
+ pause();
_pause_changed.notify_all ();
}
@@ -694,6 +695,7 @@ Job::pause_by_priority ()
{
if (running ()) {
set_state (PAUSED_BY_PRIORITY);
+ pause();
_pause_changed.notify_all ();
}
}
diff --git a/src/lib/job.h b/src/lib/job.h
index dc5f7bc34..a4b7a319d 100644
--- a/src/lib/job.h
+++ b/src/lib/job.h
@@ -56,9 +56,10 @@ public:
}
void start ();
+ virtual void pause() {}
bool pause_by_user ();
void pause_by_priority ();
- void resume ();
+ virtual void resume ();
void cancel ();
bool is_new () const;
diff --git a/src/lib/make_dcp.cc b/src/lib/make_dcp.cc
index 17d45be46..d8d42f49a 100644
--- a/src/lib/make_dcp.cc
+++ b/src/lib/make_dcp.cc
@@ -40,8 +40,8 @@ using std::shared_ptr;
using std::string;
-/** Add suitable Jobs to the JobManager to create a DCP for a Film */
-void
+/** Add suitable Job to the JobManager to create a DCP for a Film */
+shared_ptr<TranscodeJob>
make_dcp (shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour)
{
if (film->dcp_name().find("/") != string::npos) {
@@ -101,5 +101,7 @@ make_dcp (shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour)
auto tj = make_shared<DCPTranscodeJob>(film, behaviour);
tj->set_encoder (make_shared<DCPEncoder>(film, tj));
JobManager::instance()->add (tj);
+
+ return tj;
}
diff --git a/src/lib/make_dcp.h b/src/lib/make_dcp.h
index 9f5072782..fe0bcd2f6 100644
--- a/src/lib/make_dcp.h
+++ b/src/lib/make_dcp.h
@@ -25,5 +25,5 @@
class Film;
-void make_dcp (std::shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour);
+std::shared_ptr<TranscodeJob> make_dcp(std::shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour);
diff --git a/src/lib/remote_j2k_encoder_thread.cc b/src/lib/remote_j2k_encoder_thread.cc
new file mode 100644
index 000000000..eac7bb538
--- /dev/null
+++ b/src/lib/remote_j2k_encoder_thread.cc
@@ -0,0 +1,64 @@
+#include "dcp_video.h"
+#include "dcpomatic_log.h"
+#include "j2k_encoder.h"
+#include "remote_j2k_encoder_thread.h"
+#include "scope_guard.h"
+#include "util.h"
+
+#include "i18n.h"
+
+
+using std::make_shared;
+using std::shared_ptr;
+
+
+RemoteJ2KEncoderThread::RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server)
+ : J2KSyncEncoderThread(encoder)
+ , _server(server)
+{
+
+}
+
+
+void
+RemoteJ2KEncoderThread::log_thread_start() const
+{
+ start_of_thread("RemoteJ2KEncoder");
+ LOG_TIMING("start-encoder-thread thread=%1 server=%2", thread_id(), _server.host_name());
+}
+
+
+shared_ptr<dcp::ArrayData>
+RemoteJ2KEncoderThread::encode(DCPVideo const& frame)
+{
+ shared_ptr<dcp::ArrayData> encoded;
+
+ try {
+ encoded = make_shared<dcp::ArrayData>(frame.encode_remotely(_server));
+ if (_remote_backoff > 0) {
+ LOG_GENERAL("%1 was lost, but now she is found; removing backoff", _server.host_name());
+ _remote_backoff = 0;
+ }
+ } catch (std::exception& e) {
+ LOG_ERROR(
+ N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
+ frame.index(), _server.host_name(), e.what(), _remote_backoff
+ );
+ } catch (...) {
+ LOG_ERROR(
+ N_("Remote encode of %1 on %2 failed; thread sleeping for %4s"),
+ frame.index(), _server.host_name(), _remote_backoff
+ );
+ }
+
+ if (!encoded) {
+ if (_remote_backoff < 60) {
+ /* back off more */
+ _remote_backoff += 10;
+ }
+ boost::this_thread::sleep(boost::posix_time::seconds(_remote_backoff));
+ }
+
+ return encoded;
+}
+
diff --git a/src/lib/remote_j2k_encoder_thread.h b/src/lib/remote_j2k_encoder_thread.h
new file mode 100644
index 000000000..f3fe7f94a
--- /dev/null
+++ b/src/lib/remote_j2k_encoder_thread.h
@@ -0,0 +1,21 @@
+#include "encode_server_description.h"
+#include "j2k_sync_encoder_thread.h"
+
+
+class RemoteJ2KEncoderThread : public J2KSyncEncoderThread
+{
+public:
+ RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server);
+
+ void log_thread_start() const override;
+ std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) override;
+
+ EncodeServerDescription server() const {
+ return _server;
+ }
+
+private:
+ EncodeServerDescription _server;
+ /** Number of seconds that we currently wait between attempts to connect to the server */
+ int _remote_backoff = 0;
+};
diff --git a/src/lib/scope_guard.h b/src/lib/scope_guard.h
index ac60f9fea..e0d1e81fc 100644
--- a/src/lib/scope_guard.h
+++ b/src/lib/scope_guard.h
@@ -45,11 +45,19 @@ public:
~ScopeGuard ()
{
- _function();
+ if (!_cancelled) {
+ _function();
+ }
+ }
+
+ void cancel()
+ {
+ _cancelled = true;
}
private:
std::function<void()> _function;
+ bool _cancelled = false;
};
diff --git a/src/lib/transcode_job.cc b/src/lib/transcode_job.cc
index 1b2d2ddd5..7c842a99f 100644
--- a/src/lib/transcode_job.cc
+++ b/src/lib/transcode_job.cc
@@ -148,6 +148,20 @@ TranscodeJob::run ()
}
+void
+TranscodeJob::pause()
+{
+ _encoder->pause();
+}
+
+
+void TranscodeJob::resume()
+{
+ _encoder->resume();
+ Job::resume();
+}
+
+
string
TranscodeJob::status () const
{
diff --git a/src/lib/transcode_job.h b/src/lib/transcode_job.h
index b05b20a16..35870231d 100644
--- a/src/lib/transcode_job.h
+++ b/src/lib/transcode_job.h
@@ -37,6 +37,8 @@
class Encoder;
+struct frames_not_lost_when_threads_disappear;
+
/** @class TranscodeJob
* @brief A job which transcodes a Film to another format.
@@ -56,6 +58,8 @@ public:
std::string name () const override;
std::string json_name () const override;
void run () override;
+ void pause() override;
+ void resume() override;
std::string status () const override;
bool enable_notify () const override {
return true;
@@ -64,6 +68,8 @@ public:
void set_encoder (std::shared_ptr<Encoder> t);
private:
+ friend struct ::frames_not_lost_when_threads_disappear;
+
virtual void post_transcode () {}
float frames_per_second() const;
diff --git a/src/lib/writer.h b/src/lib/writer.h
index 1fbf7bbd5..0b38e9030 100644
--- a/src/lib/writer.h
+++ b/src/lib/writer.h
@@ -34,6 +34,7 @@
#include "exception_store.h"
#include "font_id_map.h"
#include "player_text.h"
+#include "text_type.h"
#include "weak_film.h"
#include <dcp/atmos_frame.h>
#include <boost/thread.hpp>
diff --git a/src/lib/wscript b/src/lib/wscript
index dad8947b1..67c6b5869 100644
--- a/src/lib/wscript
+++ b/src/lib/wscript
@@ -59,6 +59,7 @@ sources = """
content_factory.cc
combine_dcp_job.cc
copy_dcp_details_to_film.cc
+ cpu_j2k_encoder_thread.cc
create_cli.cc
crop.cc
cross_common.cc
@@ -138,6 +139,8 @@ sources = """
job.cc
job_manager.cc
j2k_encoder.cc
+ j2k_encoder_thread.cc
+ j2k_sync_encoder_thread.cc
json_server.cc
kdm_cli.cc
kdm_recipient.cc
@@ -163,6 +166,7 @@ sources = """
referenced_reel_asset.cc
release_notes.cc
render_text.cc
+ remote_j2k_encoder_thread.cc
resampler.cc
resolution.cc
rgba.cc
@@ -241,6 +245,9 @@ def build(bld):
if bld.env.TARGET_LINUX:
obj.uselib += ' POLKIT'
+ if bld.env.ENABLE_GROK:
+ obj.source += ' grok_j2k_encoder_thread.cc'
+
if bld.env.TARGET_WINDOWS_64 or bld.env.TARGET_WINDOWS_32:
obj.uselib += ' WINSOCK2 DBGHELP SHLWAPI MSWSOCK BOOST_LOCALE SETUPAPI OLE32 UUID'
obj.source += ' cross_windows.cc'
diff --git a/src/tools/dcpomatic.cc b/src/tools/dcpomatic.cc
index 979672fb4..db18d1a2f 100644
--- a/src/tools/dcpomatic.cc
+++ b/src/tools/dcpomatic.cc
@@ -75,6 +75,9 @@
#include "lib/ffmpeg_encoder.h"
#include "lib/film.h"
#include "lib/font_config.h"
+#ifdef DCPOMATIC_GROK
+#include "lib/grok/context.h"
+#endif
#include "lib/hints.h"
#include "lib/job_manager.h"
#include "lib/kdm_with_metadata.h"
@@ -1180,6 +1183,7 @@ private:
FontConfig::drop();
ev.Skip ();
+ JobManager::drop ();
}
void active_jobs_changed()
@@ -1711,6 +1715,10 @@ private:
notes.Centre();
notes.ShowModal();
}
+
+#ifdef DCPOMATIC_GROK
+ grk_plugin::setMessengerLogger(new grk_plugin::GrokLogger("[GROK] "));
+#endif
}
catch (exception& e)
{
diff --git a/src/tools/dcpomatic_batch.cc b/src/tools/dcpomatic_batch.cc
index dc092bf8c..d112f2060 100644
--- a/src/tools/dcpomatic_batch.cc
+++ b/src/tools/dcpomatic_batch.cc
@@ -31,6 +31,9 @@
#include "lib/config.h"
#include "lib/dcpomatic_socket.h"
#include "lib/film.h"
+#ifdef DCPOMATIC_GROK
+#include "lib/grok/context.h"
+#endif
#include "lib/job.h"
#include "lib/job_manager.h"
#include "lib/make_dcp.h"
@@ -288,6 +291,7 @@ private:
}
ev.Skip ();
+ JobManager::drop ();
}
void file_add_film ()
@@ -495,6 +499,10 @@ class App : public wxApp
}
}
+#ifdef DCPOMATIC_GROK
+ grk_plugin::setMessengerLogger(new grk_plugin::GrokLogger("[GROK] "));
+#endif
+
return true;
}
diff --git a/src/tools/dcpomatic_cli.cc b/src/tools/dcpomatic_cli.cc
index 96bf83086..2abc3a149 100644
--- a/src/tools/dcpomatic_cli.cc
+++ b/src/tools/dcpomatic_cli.cc
@@ -413,7 +413,7 @@ main (int argc, char* argv[])
signal_manager = new SignalManager ();
if (no_remote || export_format) {
- EncodeServerFinder::instance()->stop ();
+ EncodeServerFinder::drop();
}
if (json_port) {
diff --git a/src/tools/dcpomatic_disk.cc b/src/tools/dcpomatic_disk.cc
index 5941d6d70..989fe3385 100644
--- a/src/tools/dcpomatic_disk.cc
+++ b/src/tools/dcpomatic_disk.cc
@@ -269,6 +269,7 @@ private:
}
ev.Skip ();
+ JobManager::drop ();
}
void copy ()
diff --git a/src/tools/dcpomatic_server.cc b/src/tools/dcpomatic_server.cc
index e5e3a7e5a..b7100d62a 100644
--- a/src/tools/dcpomatic_server.cc
+++ b/src/tools/dcpomatic_server.cc
@@ -25,6 +25,9 @@
#include "lib/encoded_log_entry.h"
#include "lib/encode_server.h"
#include "lib/config.h"
+#ifdef DCPOMATIC_GROK
+#include "lib/grok/context.h"
+#endif
#include "lib/log.h"
#include "lib/signaller.h"
#include "lib/cross.h"
@@ -326,6 +329,10 @@ private:
SetExitOnFrameDelete (false);
+#ifdef DCPOMATIC_GROK
+ grk_plugin::setMessengerLogger(new grk_plugin::GrokLogger("[GROK] "));
+#endif
+
return true;
}
diff --git a/src/tools/dcpomatic_server_cli.cc b/src/tools/dcpomatic_server_cli.cc
index 6d7f6aba7..9e4a8814f 100644
--- a/src/tools/dcpomatic_server_cli.cc
+++ b/src/tools/dcpomatic_server_cli.cc
@@ -25,6 +25,9 @@
#include "lib/config.h"
#include "lib/image.h"
#include "lib/file_log.h"
+#ifdef DCPOMATIC_GROK
+#include "lib/grok/context.h"
+#endif
#include "lib/null_log.h"
#include "lib/version.h"
#include "lib/encode_server.h"
@@ -109,6 +112,10 @@ main (int argc, char* argv[])
dcpomatic_log.reset (new FileLog("dcpomatic_server_cli.log"));
}
+#ifdef DCPOMATIC_GROK
+ setMessengerLogger(new grk_plugin::GrokLogger("[GROK] "));
+#endif
+
EncodeServer server (verbose, num_threads);
try {
diff --git a/src/wx/about_dialog.cc b/src/wx/about_dialog.cc
index fbb89bfd6..51d49ce6c 100644
--- a/src/wx/about_dialog.cc
+++ b/src/wx/about_dialog.cc
@@ -86,7 +86,7 @@ AboutDialog::AboutDialog (wxWindow* parent)
t = new StaticText (
this,
- _("(C) 2012-2023 Carl Hetherington, Terrence Meiczinger\n Ole Laursen"),
+ _("(C) 2012-2023 Carl Hetherington, Terrence Meiczinger\nOle Laursen, Aaron Boxer"),
wxDefaultPosition, wxDefaultSize, wxALIGN_CENTER
);
@@ -99,6 +99,7 @@ AboutDialog::AboutDialog (wxWindow* parent)
written_by.Add (wxT ("Terrence Meiczinger"));
written_by.Add (wxT ("Mart Jansink"));
written_by.Add (wxT ("Ole Laursen"));
+ written_by.Add (wxT ("Aaron Boxer"));
add_section (_("Written by"), written_by);
wxArrayString with_help_from;
diff --git a/src/wx/full_config_dialog.cc b/src/wx/full_config_dialog.cc
index 175d78730..bc5b5de4e 100644
--- a/src/wx/full_config_dialog.cc
+++ b/src/wx/full_config_dialog.cc
@@ -45,6 +45,9 @@
#include "send_test_email_dialog.h"
#include "server_dialog.h"
#include "static_text.h"
+#ifdef DCPOMATIC_GROK
+#include "grok/gpu_config_panel.h"
+#endif
#include "wx_util.h"
#include "lib/config.h"
#include "lib/cross.h"
@@ -1944,6 +1947,9 @@ create_full_config_dialog ()
e->AddPage (new SoundPage (ps, border));
e->AddPage (new DefaultsPage (ps, border));
e->AddPage (new EncodingServersPage(ps, border));
+#ifdef DCPOMATIC_GROK
+ e->AddPage (new GPUPage (ps, border));
+#endif
e->AddPage (new KeysPage (ps, border));
e->AddPage (new TMSPage (ps, border));
e->AddPage (new EmailPage (ps, border));
diff --git a/src/wx/grok/gpu_config_panel.h b/src/wx/grok/gpu_config_panel.h
new file mode 100644
index 000000000..cbf037592
--- /dev/null
+++ b/src/wx/grok/gpu_config_panel.h
@@ -0,0 +1,227 @@
+/*
+ Copyright (C) 2023 Grok Image Compression Inc.
+
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ DCP-o-matic is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#pragma once
+
+static std::vector<std::string> get_gpu_names(boost::filesystem::path binary, boost::filesystem::path filename)
+{
+ // Execute the GPU listing program and redirect its output to a file
+ if (std::system((binary.string() + " > " + filename.string()).c_str()) < 0) {
+ return {};
+ }
+
+ std::vector<std::string> gpu_names;
+ std::ifstream file(filename.c_str());
+ if (file.is_open())
+ {
+ std::string line;
+ while (std::getline(file, line))
+ gpu_names.push_back(line);
+ file.close();
+ }
+
+ return gpu_names;
+}
+
+
+class GpuList : public wxPanel
+{
+public:
+ GpuList(wxPanel* parent)
+ : wxPanel(parent, wxID_ANY)
+ {
+ _combo_box = new wxComboBox(this, wxID_ANY, "", wxDefaultPosition, wxSize(400, -1));
+ _combo_box->Bind(wxEVT_COMBOBOX, &GpuList::OnComboBox, this);
+ update();
+
+ auto sizer = new wxBoxSizer(wxHORIZONTAL);
+ sizer->Add(_combo_box, 0, wxALIGN_CENTER_VERTICAL);
+ SetSizerAndFit(sizer);
+ }
+
+ void update()
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+ auto lister_binary = grok.binary_location / "gpu_lister";
+ auto lister_file = grok.binary_location / "gpus.txt";
+ if (boost::filesystem::exists(lister_binary)) {
+ auto gpu_names = get_gpu_names(lister_binary, lister_file);
+
+ _combo_box->Clear();
+ for (auto const& name: gpu_names) {
+ _combo_box->Append(name);
+ }
+ }
+ }
+
+ void set_selection(int sel)
+ {
+ if (sel < static_cast<int>(_combo_box->GetCount())) {
+ _combo_box->SetSelection(sel);
+ }
+ }
+
+private:
+ void OnComboBox(wxCommandEvent&)
+ {
+ auto selection = _combo_box->GetSelection();
+ if (selection != wxNOT_FOUND) {
+ auto grok = Config::instance()->grok().get_value_or({});
+ grok.selected = selection;
+ Config::instance()->set_grok(grok);
+ }
+ }
+
+ wxComboBox* _combo_box;
+ int _selection = 0;
+};
+
+
+class GPUPage : public Page
+{
+public:
+ GPUPage(wxSize panel_size, int border)
+ : Page(panel_size, border)
+ {}
+
+ wxString GetName() const override
+ {
+ return _("GPU");
+ }
+
+#ifdef DCPOMATIC_OSX
+ /* XXX: this icon does not exist */
+ wxBitmap GetLargeIcon() const override
+ {
+ return wxBitmap(icon_path("gpu"), wxBITMAP_TYPE_PNG);
+ }
+#endif
+
+private:
+ void setup() override
+ {
+ _enable_gpu = new CheckBox(_panel, _("Enable GPU acceleration"));
+ _panel->GetSizer()->Add(_enable_gpu, 0, wxALL | wxEXPAND, _border);
+
+ wxFlexGridSizer* table = new wxFlexGridSizer(2, DCPOMATIC_SIZER_X_GAP, DCPOMATIC_SIZER_Y_GAP);
+ table->AddGrowableCol(1, 1);
+ _panel->GetSizer()->Add(table, 1, wxALL | wxEXPAND, _border);
+
+ add_label_to_sizer(table, _panel, _("Acceleration binary folder"), true, 0, wxLEFT | wxLEFT | wxALIGN_CENTRE_VERTICAL);
+ _binary_location = new wxDirPickerCtrl(_panel, wxDD_DIR_MUST_EXIST);
+ table->Add(_binary_location, 1, wxEXPAND);
+
+ add_label_to_sizer(table, _panel, _("GPU selection"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ _gpu_list_control = new GpuList(_panel);
+ table->Add(_gpu_list_control, 1, wxEXPAND);
+
+ add_label_to_sizer(table, _panel, _("License server"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ _server = new wxTextCtrl(_panel, wxID_ANY);
+ table->Add(_server, 1, wxEXPAND | wxALL);
+
+ add_label_to_sizer(table, _panel, _("Port"), false, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ _port = new wxSpinCtrl(_panel, wxID_ANY);
+ _port->SetRange(0, 65535);
+ table->Add(_port);
+
+ add_label_to_sizer(table, _panel, _("License"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL);
+ _licence = new PasswordEntry(_panel);
+ table->Add(_licence->get_panel(), 1, wxEXPAND | wxALL);
+
+ _enable_gpu->bind(&GPUPage::enable_gpu_changed, this);
+ _binary_location->Bind(wxEVT_DIRPICKER_CHANGED, boost::bind (&GPUPage::binary_location_changed, this));
+ _server->Bind(wxEVT_TEXT, boost::bind(&GPUPage::server_changed, this));
+ _port->Bind(wxEVT_SPINCTRL, boost::bind(&GPUPage::port_changed, this));
+ _licence->Changed.connect(boost::bind(&GPUPage::licence_changed, this));
+
+ setup_sensitivity();
+ }
+
+ void setup_sensitivity()
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+
+ _binary_location->Enable(grok.enable);
+ _gpu_list_control->Enable(grok.enable);
+ _server->Enable(grok.enable);
+ _port->Enable(grok.enable);
+ _licence->get_panel()->Enable(grok.enable);
+ }
+
+ void config_changed() override
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+
+ checked_set(_enable_gpu, grok.enable);
+ _binary_location->SetPath(std_to_wx(grok.binary_location.string()));
+ _gpu_list_control->update();
+ _gpu_list_control->set_selection(grok.selected);
+ checked_set(_server, grok.licence_server);
+ checked_set(_port, grok.licence_port);
+ checked_set(_licence, grok.licence);
+ }
+
+ void enable_gpu_changed()
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+ grok.enable = _enable_gpu->GetValue();
+ Config::instance()->set_grok(grok);
+
+ setup_sensitivity();
+ }
+
+ void binary_location_changed()
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+ grok.binary_location = wx_to_std(_binary_location->GetPath());
+ Config::instance()->set_grok(grok);
+
+ _gpu_list_control->update();
+ }
+
+ void server_changed()
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+ grok.licence_server = wx_to_std(_server->GetValue());
+ Config::instance()->set_grok(grok);
+ }
+
+ void port_changed()
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+ grok.licence_port = _port->GetValue();
+ Config::instance()->set_grok(grok);
+ }
+
+ void licence_changed()
+ {
+ auto grok = Config::instance()->grok().get_value_or({});
+ grok.licence = wx_to_std(_licence->get());
+ Config::instance()->set_grok(grok);
+ }
+
+ CheckBox* _enable_gpu = nullptr;
+ wxDirPickerCtrl* _binary_location = nullptr;
+ GpuList* _gpu_list_control = nullptr;
+ wxTextCtrl* _server = nullptr;
+ wxSpinCtrl* _port = nullptr;
+ PasswordEntry* _licence = nullptr;
+};