diff options
| author | Carl Hetherington <cth@carlh.net> | 2023-10-18 13:47:29 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2023-10-18 13:47:29 +0200 |
| commit | 1baae6f052775956bab33a8d9ae9f94066227225 (patch) | |
| tree | 9c634a1b2914d4bf889e171b73c770469415f799 | |
| parent | 6eba9bffa2371aa71b8981b1a7bcde0448d7623e (diff) | |
| parent | b0c1482f98c7e00634c1bc3dd801e76ce69907e2 (diff) | |
Merge branch 'grok2' into v2.17.xv2.17.8
This is the DoM support for Aaron Boxer's "grok" GPU J2K encoder,
with some cleanups and other assorted/related DoM changes.
49 files changed, 2444 insertions, 187 deletions
@@ -508,7 +508,7 @@ def dependencies(target, options): # Use distro-provided FFmpeg on Arch deps = [] - deps.append(('libdcp', 'v1.8.85')) + deps.append(('libdcp', 'v1.8.87')) deps.append(('libsub', 'v1.6.44')) deps.append(('leqm-nrt', '30dcaea1373ac62fba050e02ce5b0c1085797a23')) deps.append(('rtaudio', 'f619b76')) @@ -564,6 +564,9 @@ def configure_options(target, options, for_package=False): if target.platform == 'osx' and target.arch == 'arm64': opt += ' --target-macos-arm64 --wx-config=%s/wx-config' % target.bin + if target.platform == 'linux' and target.distro == 'ubuntu' and target.version in ['22.04']: + opt += ' --enable-grok' + return opt def build(target, options, for_package): diff --git a/src/lib/config.cc b/src/lib/config.cc index 190817cbc..938be090c 100644 --- a/src/lib/config.cc +++ b/src/lib/config.cc @@ -212,6 +212,10 @@ Config::set_defaults () set_notification_email_to_default (); set_cover_sheet_to_default (); +#ifdef DCPOMATIC_GROK + _grok = boost::none; +#endif + _main_divider_sash_position = {}; _main_content_divider_sash_position = {}; @@ -633,6 +637,12 @@ try _allow_smpte_bv20 = f.optional_bool_child("AllowSMPTEBv20").get_value_or(false); _isdcf_name_part_length = f.optional_number_child<int>("ISDCFNamePartLength").get_value_or(14); +#ifdef DCPOMATIC_GROK + if (auto grok = f.optional_node_child("Grok")) { + _grok = Grok(grok); + } +#endif + _export.read(f.optional_node_child("Export")); } catch (...) { @@ -1119,6 +1129,12 @@ Config::write_config () const /* [XML] ISDCFNamePartLength Maximum length of the "name" part of an ISDCF name, which should be 14 according to the standard */ root->add_child("ISDCFNamePartLength")->add_child_text(raw_convert<string>(_isdcf_name_part_length)); +#ifdef DCPOMATIC_GROK + if (_grok) { + _grok->as_xml(root->add_child("Grok")); + } +#endif + _export.write(root->add_child("Export")); auto target = config_write_file(); @@ -1639,3 +1655,38 @@ Config::initial_path(string id) const return iter->second; } + +#ifdef DCPOMATIC_GROK + +Config::Grok::Grok(cxml::ConstNodePtr node) + : enable(node->bool_child("Enable")) + , binary_location(node->string_child("BinaryLocation")) + , selected(node->number_child<int>("Selected")) + , licence_server(node->string_child("LicenceServer")) + , licence_port(node->number_child<int>("LicencePort")) + , licence(node->string_child("Licence")) +{ + +} + + +void +Config::Grok::as_xml(xmlpp::Element* node) const +{ + node->add_child("BinaryLocation")->add_child_text(binary_location.string()); + node->add_child("Enable")->add_child_text((enable ? "1" : "0")); + node->add_child("Selected")->add_child_text(raw_convert<string>(selected)); + node->add_child("LicenceServer")->add_child_text(licence_server); + node->add_child("LicencePort")->add_child_text(raw_convert<string>(licence_port)); + node->add_child("Licence")->add_child_text(licence); +} + + +void +Config::set_grok(Grok const& grok) +{ + _grok = grok; + changed(OTHER); +} + +#endif diff --git a/src/lib/config.h b/src/lib/config.h index 0a332bcbb..eaf85451d 100644 --- a/src/lib/config.h +++ b/src/lib/config.h @@ -618,6 +618,28 @@ public: return _allow_smpte_bv20; } +#ifdef DCPOMATIC_GROK + class Grok + { + public: + Grok() = default; + Grok(cxml::ConstNodePtr node); + + void as_xml(xmlpp::Element* node) const; + + bool enable = false; + boost::filesystem::path binary_location; + int selected = 0; + std::string licence_server; + int licence_port = 5000; + std::string licence; + }; + + boost::optional<Grok> grok() const { + return _grok; + } +#endif + int isdcf_name_part_length() const { return _isdcf_name_part_length; } @@ -1199,10 +1221,15 @@ public: maybe_set(_allow_smpte_bv20, allow, ALLOW_SMPTE_BV20); } +#ifdef DCPOMATIC_GROK + void set_grok(Grok const& grok); +#endif + void set_isdcf_name_part_length(int length) { maybe_set(_isdcf_name_part_length, length, ISDCF_NAME_PART_LENGTH); } + void changed (Property p = OTHER); boost::signals2::signal<void (Property)> Changed; /** Emitted if read() failed on an existing Config file. There is nothing @@ -1443,6 +1470,10 @@ private: bool _allow_smpte_bv20; int _isdcf_name_part_length; +#ifdef DCPOMATIC_GROK + boost::optional<Grok> _grok; +#endif + ExportConfig _export; static int const _current_version; diff --git a/src/lib/cpu_j2k_encoder_thread.cc b/src/lib/cpu_j2k_encoder_thread.cc new file mode 100644 index 000000000..70afac236 --- /dev/null +++ b/src/lib/cpu_j2k_encoder_thread.cc @@ -0,0 +1,42 @@ +#include "cpu_j2k_encoder_thread.h" +#include "cross.h" +#include "dcpomatic_log.h" +#include "dcp_video.h" +#include "j2k_encoder.h" +#include "scope_guard.h" +#include "util.h" + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +CPUJ2KEncoderThread::CPUJ2KEncoderThread(J2KEncoder& encoder) + : J2KSyncEncoderThread(encoder) +{ + +} + + +void +CPUJ2KEncoderThread::log_thread_start() const +{ + start_of_thread("CPUJ2KEncoder"); + LOG_TIMING("start-encoder-thread thread=%1 server=localhost", thread_id()); +} + + +shared_ptr<dcp::ArrayData> +CPUJ2KEncoderThread::encode(DCPVideo const& frame) +{ + try { + return make_shared<dcp::ArrayData>(frame.encode_locally()); + } catch (std::exception& e) { + LOG_ERROR(N_("Local encode failed (%1)"), e.what()); + } + + return {}; +} + diff --git a/src/lib/cpu_j2k_encoder_thread.h b/src/lib/cpu_j2k_encoder_thread.h new file mode 100644 index 000000000..fb138f484 --- /dev/null +++ b/src/lib/cpu_j2k_encoder_thread.h @@ -0,0 +1,16 @@ +#include "j2k_sync_encoder_thread.h" +#include <dcp/data.h> + + +class DCPVideo; + + +class CPUJ2KEncoderThread : public J2KSyncEncoderThread +{ +public: + CPUJ2KEncoderThread(J2KEncoder& encoder); + + void log_thread_start() const override; + std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) override; +}; + diff --git a/src/lib/dcp_encoder.cc b/src/lib/dcp_encoder.cc index 9a840c8ab..bd78312fa 100644 --- a/src/lib/dcp_encoder.cc +++ b/src/lib/dcp_encoder.cc @@ -18,6 +18,7 @@ */ + /** @file src/dcp_encoder.cc * @brief A class which takes a Film and some Options, then uses those to encode the film into a DCP. * @@ -25,31 +26,33 @@ * as a parameter to the constructor. */ + +#include "audio_decoder.h" +#include "compose.hpp" #include "dcp_encoder.h" -#include "j2k_encoder.h" #include "film.h" -#include "video_decoder.h" -#include "audio_decoder.h" -#include "player.h" +#include "j2k_encoder.h" #include "job.h" -#include "writer.h" -#include "compose.hpp" +#include "player.h" +#include "player_video.h" #include "referenced_reel_asset.h" #include "text_content.h" -#include "player_video.h" +#include "video_decoder.h" +#include "writer.h" #include <boost/signals2.hpp> #include <iostream> #include "i18n.h" -using std::string; + using std::cout; +using std::dynamic_pointer_cast; using std::list; -using std::vector; +using std::make_shared; using std::shared_ptr; +using std::string; +using std::vector; using std::weak_ptr; -using std::dynamic_pointer_cast; -using std::make_shared; using boost::optional; #if BOOST_VERSION >= 106100 using namespace boost::placeholders; @@ -118,6 +121,20 @@ DCPEncoder::go () _writer.finish(_film->dir(_film->dcp_name())); } + +void +DCPEncoder::pause() +{ + _j2k_encoder.pause(); +} + + +void +DCPEncoder::resume() +{ + _j2k_encoder.resume(); +} + void DCPEncoder::video (shared_ptr<PlayerVideo> data, DCPTime time) { diff --git a/src/lib/dcp_encoder.h b/src/lib/dcp_encoder.h index ad77f6951..ce0b72204 100644 --- a/src/lib/dcp_encoder.h +++ b/src/lib/dcp_encoder.h @@ -35,6 +35,8 @@ class Job; class Player; class PlayerVideo; +struct frames_not_lost_when_threads_disappear; + /** @class DCPEncoder */ class DCPEncoder : public Encoder @@ -53,8 +55,13 @@ public: return _finishing; } + void pause() override; + void resume() override; + private: + friend struct ::frames_not_lost_when_threads_disappear; + void video (std::shared_ptr<PlayerVideo>, dcpomatic::DCPTime); void audio (std::shared_ptr<AudioBuffers>, dcpomatic::DCPTime); void text (PlayerText, TextType, boost::optional<DCPTextTrack>, dcpomatic::DCPTimePeriod); diff --git a/src/lib/dcp_video.cc b/src/lib/dcp_video.cc index 8eb76fdd6..940ca31d2 100644 --- a/src/lib/dcp_video.cc +++ b/src/lib/dcp_video.cc @@ -118,6 +118,30 @@ DCPVideo::convert_to_xyz (shared_ptr<const PlayerVideo> frame, dcp::NoteHandler return xyz; } +dcp::Size +DCPVideo::get_size() const +{ + auto image = _frame->image(bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false); + return image->size(); +} + + +void +DCPVideo::convert_to_xyz(uint16_t* dst) const +{ + auto image = _frame->image(bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false); + if (_frame->colour_conversion()) { + dcp::rgb_to_xyz ( + image->data()[0], + dst, + image->size(), + image->stride()[0], + _frame->colour_conversion().get() + ); + } +} + + /** J2K-encode this frame on the local host. * @return Encoded data. */ diff --git a/src/lib/dcp_video.h b/src/lib/dcp_video.h index 33df0942c..5f43dabb8 100644 --- a/src/lib/dcp_video.h +++ b/src/lib/dcp_video.h @@ -17,6 +17,8 @@ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>. */ +#ifndef DCPOMATIC_DCP_VIDEO_H +#define DCPOMATIC_DCP_VIDEO_H #include "encode_server_description.h" @@ -66,6 +68,9 @@ public: static std::shared_ptr<dcp::OpenJPEGImage> convert_to_xyz (std::shared_ptr<const PlayerVideo> frame, dcp::NoteHandler note); + void convert_to_xyz(uint16_t* dst) const; + dcp::Size get_size() const; + private: void add_metadata (xmlpp::Element *) const; @@ -76,3 +81,5 @@ private: int _j2k_bandwidth; ///< J2K bandwidth to use Resolution _resolution; ///< Resolution (2K or 4K) }; + +#endif diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc index 6501dcde1..11dbab628 100644 --- a/src/lib/encode_server.cc +++ b/src/lib/encode_server.cc @@ -81,6 +81,7 @@ EncodeServer::EncodeServer (bool verbose, int num_threads) #endif , _verbose (verbose) , _num_threads (num_threads) + , _frames_encoded(0) { } @@ -165,6 +166,8 @@ EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, st throw; } + ++_frames_encoded; + return dcp_video_frame.index (); } diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h index f93d66746..8059abd0f 100644 --- a/src/lib/encode_server.h +++ b/src/lib/encode_server.h @@ -32,6 +32,7 @@ #include "exception_store.h" #include "server.h" #include <boost/asio.hpp> +#include <boost/atomic.hpp> #include <boost/thread.hpp> #include <boost/thread/condition.hpp> #include <string> @@ -53,6 +54,10 @@ public: void run () override; + int frames_encoded() const { + return _frames_encoded; + } + private: void handle (std::shared_ptr<Socket>) override; void worker_thread (); @@ -67,6 +72,7 @@ private: bool _verbose; int _num_threads; Waker _waker; + boost::atomic<int> _frames_encoded; struct Broadcast { diff --git a/src/lib/encode_server_finder.h b/src/lib/encode_server_finder.h index f8a30af54..c478387f9 100644 --- a/src/lib/encode_server_finder.h +++ b/src/lib/encode_server_finder.h @@ -50,8 +50,6 @@ public: static EncodeServerFinder* instance (); static void drop (); - void stop (); - std::list<EncodeServerDescription> servers () const; /** Emitted whenever the list of servers changes */ @@ -62,6 +60,7 @@ private: ~EncodeServerFinder (); void start (); + void stop (); void search_thread (); void listen_thread (); diff --git a/src/lib/encoder.h b/src/lib/encoder.h index 9b67720d3..aeaf7f620 100644 --- a/src/lib/encoder.h +++ b/src/lib/encoder.h @@ -58,6 +58,8 @@ public: /** @return the number of frames that are done */ virtual Frame frames_done () const = 0; virtual bool finishing () const = 0; + virtual void pause() {} + virtual void resume() {} protected: std::shared_ptr<const Film> _film; diff --git a/src/lib/grok/context.h b/src/lib/grok/context.h new file mode 100644 index 000000000..521faae8d --- /dev/null +++ b/src/lib/grok/context.h @@ -0,0 +1,291 @@ +/* + Copyright (C) 2023 Grok Image Compression Inc. + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>. + +*/ + +#pragma once + + +#include "../config.h" +#include "../dcp_video.h" +#include "../film.h" +#include "../log.h" +#include "../dcpomatic_log.h" +#include "../writer.h" +#include "messenger.h" +#include <dcp/array_data.h> +#include <boost/filesystem.hpp> + + +static std::mutex launchMutex; + +namespace grk_plugin +{ + +struct GrokLogger : public MessengerLogger { + explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble) + {} + virtual ~GrokLogger() = default; + void info(const char* fmt, ...) override{ + va_list arg; + va_start(arg, fmt); + dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL); + va_end(arg); + } + void warn(const char* fmt, ...) override{ + va_list arg; + va_start(arg, fmt); + dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING); + va_end(arg); + } + void error(const char* fmt, ...) override{ + va_list arg; + va_start(arg, fmt); + dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR); + va_end(arg); + } +}; + +struct FrameProxy { + FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv) + {} + int index() const { + return index_; + } + Eyes eyes(void) const { + return eyes_; + } + int index_; + Eyes eyes_; + DCPVideo vf; +}; + +struct DcpomaticContext +{ + DcpomaticContext( + std::shared_ptr<const Film> film_, + Writer& writer_, + EventHistory& history_, + boost::filesystem::path const& location_ + ) + : film(film_) + , writer(writer_) + , history(history_) + , location(location_) + { + + } + + void set_dimensions(uint32_t w, uint32_t h) + { + width = w; + height = h; + } + + std::shared_ptr<const Film> film; + Writer& writer; + EventHistory& history; + boost::filesystem::path location; + uint32_t width = 0; + uint32_t height = 0; +}; + + +class GrokContext +{ +public: + explicit GrokContext(DcpomaticContext* dcpomatic_context) + : _dcpomatic_context(dcpomatic_context) + { + auto grok = Config::instance()->grok().get_value_or({}); + if (!grok.enable) { + return; + } + + boost::filesystem::path folder(_dcpomatic_context->location); + boost::filesystem::path binary_path = folder / "grk_compress"; + if (!boost::filesystem::exists(binary_path)) { + getMessengerLogger()->error( + "Invalid binary location %s", _dcpomatic_context->location.c_str() + ); + return; + } + + auto proc = [this](const std::string& str) { + try { + Msg msg(str); + auto tag = msg.next(); + if (tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED) { + auto clientFrameId = msg.nextUint(); + msg.nextUint(); // compressed frame ID + auto compressedFrameLength = msg.nextUint(); + auto processor = [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength) { + auto compressed_data = std::make_shared<dcp::ArrayData>(compressed, compressedFrameLength); + _dcpomatic_context->writer.write(compressed_data, srcFrame.index(), srcFrame.eyes()); + frame_done (); + }; + + int const minimum_size = 16384; + + bool needsRecompression = compressedFrameLength < minimum_size; + _messenger->processCompressed(str, processor, needsRecompression); + + if (needsRecompression) { + auto fp = _messenger->retrieve(clientFrameId); + if (!fp) { + return; + } + + auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally()); + _dcpomatic_context->writer.write(encoded, fp->vf.index(), fp->vf.eyes()); + frame_done (); + } + } + } catch (std::exception& ex) { + getMessengerLogger()->error("%s",ex.what()); + } + }; + + auto clientInit = MessengerInit( + clientToGrokMessageBuf, + clientSentSynch, + grokReceiveReadySynch, + grokToClientMessageBuf, + grokSentSynch, + clientReceiveReadySynch, + proc, + std::thread::hardware_concurrency() + ); + + _messenger = new ScheduledMessenger<FrameProxy>(clientInit); + } + + ~GrokContext() + { + shutdown(); + } + + bool launch(DCPVideo dcpv, int device) + { + namespace fs = boost::filesystem; + + if (!_messenger) { + return false; + } + if (_launched) { + return true; + } + if (_launch_failed) { + return false; + } + + std::unique_lock<std::mutex> lk_global(launchMutex); + + if (!_messenger) { + return false; + } + if (_launched) { + return true; + } + if (_launch_failed) { + return false; + } + + if (MessengerInit::firstLaunch(true)) { + + if (!fs::exists(_dcpomatic_context->location) || !fs::is_directory(_dcpomatic_context->location)) { + getMessengerLogger()->error("Invalid directory %s", _dcpomatic_context->location.c_str()); + return false; + } + + auto s = dcpv.get_size(); + _dcpomatic_context->set_dimensions(s.width, s.height); + auto grok = Config::instance()->grok().get_value_or({}); + if (!_messenger->launchGrok( + _dcpomatic_context->location, + _dcpomatic_context->width, + _dcpomatic_context->width, + _dcpomatic_context->height, + 3, + 12, + device, + _dcpomatic_context->film->resolution() == Resolution::FOUR_K, + _dcpomatic_context->film->video_frame_rate(), + _dcpomatic_context->film->j2k_bandwidth(), + grok.licence_server, + grok.licence_port, + grok.licence)) { + _launch_failed = true; + return false; + } + } + + _launched = _messenger->waitForClientInit(); + _launch_failed = _launched; + + return _launched; + } + + bool scheduleCompress(DCPVideo const& vf) + { + if (!_messenger) { + return false; + } + + auto fp = FrameProxy(vf.index(), vf.eyes(), vf); + auto cvt = [this, &fp](BufferSrc src) { + fp.vf.convert_to_xyz((uint16_t*)src.framePtr_); + }; + + return _messenger->scheduleCompress(fp, cvt); + } + + void shutdown() + { + if (!_messenger) { + return; + } + + std::unique_lock<std::mutex> lk_global(launchMutex); + + if (!_messenger) { + return; + } + + if (_launched) { + _messenger->shutdown(); + } + + delete _messenger; + _messenger = nullptr; + } + + void frame_done() + { + _dcpomatic_context->history.event(); + } + +private: + DcpomaticContext* _dcpomatic_context; + ScheduledMessenger<FrameProxy>* _messenger = nullptr; + bool _launched = false; + bool _launch_failed = false; +}; + +} + diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h new file mode 100644 index 000000000..eb2fe9560 --- /dev/null +++ b/src/lib/grok/messenger.h @@ -0,0 +1,906 @@ +/* + Copyright (C) 2023 Grok Image Compression Inc. + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>. + +*/ +#pragma once + +#include <iostream> +#include <string> +#include <cstring> +#include <atomic> +#include <functional> +#include <sstream> +#include <future> +#include <map> +#include <thread> +#include <mutex> +#include <condition_variable> +#include <queue> +#include <cassert> +#include <cstdarg> + +#ifdef _WIN32 +#include <windows.h> +#include <direct.h> +#include <tlhelp32.h> +#pragma warning(disable : 4100) +#else +#include <unistd.h> +#include <fcntl.h> +#include <sys/mman.h> +#include <semaphore.h> +#include <signal.h> +#endif + +namespace grk_plugin +{ +static std::string grokToClientMessageBuf = "Global\\grok_to_client_message"; +static std::string grokSentSynch = "Global\\grok_sent"; +static std::string clientReceiveReadySynch = "Global\\client_receive_ready"; +static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message"; +static std::string clientSentSynch = "Global\\client_sent"; +static std::string grokReceiveReadySynch = "Global\\grok_receive_ready"; +static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf"; +static std::string grokCompressedBuf = "Global\\grok_compressed_buf"; +static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE"; +static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT"; +static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED"; +static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED = + "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED"; +static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED"; +static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED = + "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED"; +static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN"; +static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH"; +static const size_t messageBufferLen = 256; +struct IMessengerLogger +{ + virtual ~IMessengerLogger(void) = default; + virtual void info(const char* fmt, ...) = 0; + virtual void warn(const char* fmt, ...) = 0; + virtual void error(const char* fmt, ...) = 0; + + protected: + template<typename... Args> + std::string log_message(char const* const format, Args&... args) noexcept + { + constexpr size_t message_size = 512; + char message[message_size]; + + std::snprintf(message, message_size, format, args...); + return std::string(message); + } +}; +struct MessengerLogger : public IMessengerLogger +{ + explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {} + virtual ~MessengerLogger() = default; + virtual void info(const char* fmt, ...) override + { + va_list args; + std::string new_fmt = preamble_ + fmt + "\n"; + va_start(args, fmt); + vfprintf(stdout, new_fmt.c_str(), args); + va_end(args); + } + virtual void warn(const char* fmt, ...) override + { + va_list args; + std::string new_fmt = preamble_ + fmt + "\n"; + va_start(args, fmt); + vfprintf(stdout, new_fmt.c_str(), args); + va_end(args); + } + virtual void error(const char* fmt, ...) override + { + va_list args; + std::string new_fmt = preamble_ + fmt + "\n"; + va_start(args, fmt); + vfprintf(stderr, new_fmt.c_str(), args); + va_end(args); + } + + protected: + std::string preamble_; +}; + +extern IMessengerLogger* sLogger; +void setMessengerLogger(IMessengerLogger* logger); +IMessengerLogger* getMessengerLogger(void); + +struct MessengerInit +{ + MessengerInit(const std::string &outBuf, const std::string &outSent, + const std::string &outReceiveReady, const std::string &inBuf, + const std::string &inSent, + const std::string &inReceiveReady, + std::function<void(std::string)> processor, + size_t numProcessingThreads) + : outboundMessageBuf(outBuf), outboundSentSynch(outSent), + outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf), + inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor), + numProcessingThreads_(numProcessingThreads), + uncompressedFrameSize_(0), compressedFrameSize_(0), + numFrames_(0) + { + if(firstLaunch(true)) + unlink(); + } + void unlink(void) + { +#ifndef _WIN32 + shm_unlink(grokToClientMessageBuf.c_str()); + shm_unlink(clientToGrokMessageBuf.c_str()); +#endif + } + static bool firstLaunch(bool isClient) + { + bool debugGrok = false; + return debugGrok != isClient; + } + std::string outboundMessageBuf; + std::string outboundSentSynch; + std::string outboundReceiveReadySynch; + + std::string inboundMessageBuf; + std::string inboundSentSynch; + std::string inboundReceiveReadySynch; + + std::function<void(std::string)> processor_; + size_t numProcessingThreads_; + + size_t uncompressedFrameSize_; + size_t compressedFrameSize_; + size_t numFrames_; +}; + +/*************************** Synchronization *******************************/ +enum SynchDirection +{ + SYNCH_SENT, + SYNCH_RECEIVE_READY +}; + +typedef int grk_handle; +struct Synch +{ + Synch(const std::string &sentSemName, const std::string &receiveReadySemName) + : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName) + { + // unlink semaphores in case of previous crash + if(MessengerInit::firstLaunch(true)) + unlink(); + open(); + } + ~Synch() + { + close(); + if(MessengerInit::firstLaunch(true)) + unlink(); + } + void post(SynchDirection dir) + { + auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_); + int rc = sem_post(sem); + if(rc) + getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno)); + } + void wait(SynchDirection dir) + { + auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_; + int rc = sem_wait(sem); + if(rc) + getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno)); + } + void open(void) + { + sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0); + if(!sentSem_) + getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); + receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1); + if(!receiveReadySem_) + getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); + } + void close(void) + { + int rc = sem_close(sentSem_); + if(rc) + getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(), + strerror(errno)); + rc = sem_close(receiveReadySem_); + if(rc) + getMessengerLogger()->error("Error closing semaphore %s: %s", + receiveReadySemName_.c_str(), strerror(errno)); + } + void unlink(void) + { + int rc = sem_unlink(sentSemName_.c_str()); + if(rc == -1 && errno != ENOENT) + getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(), + strerror(errno)); + rc = sem_unlink(receiveReadySemName_.c_str()); + if(rc == -1 && errno != ENOENT) + getMessengerLogger()->error("Error unlinking semaphore %s: %s", + receiveReadySemName_.c_str(), strerror(errno)); + } + sem_t* sentSem_; + sem_t* receiveReadySem_; + + private: + std::string sentSemName_; + std::string receiveReadySemName_; +}; +struct SharedMemoryManager +{ + static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer) + { + *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666); + if(*shm_fd < 0) + { + getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); + return false; + } + int rc = ftruncate(*shm_fd, sizeof(char) * len); + if(rc) + { + getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno)); + rc = close(*shm_fd); + if(rc) + getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno)); + rc = shm_unlink(name.c_str()); + // 2 == No such file or directory + if(rc && errno != 2) + getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno)); + return false; + } + *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0)); + if(!*buffer) + { + getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno)); + rc = close(*shm_fd); + if(rc) + getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno)); + rc = shm_unlink(name.c_str()); + // 2 == No such file or directory + if(rc && errno != 2) + getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno)); + } + + return *buffer != nullptr; + } + static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer) + { + if (!*buffer || !shm_fd) + return true; + + int rc = munmap(*buffer, len); + *buffer = nullptr; + if(rc) + getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno)); + rc = close(shm_fd); + shm_fd = 0; + if(rc) + getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno)); + rc = shm_unlink(name.c_str()); + // 2 == No such file or directory + if(rc && errno != 2) + fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno)); + + return true; + } +}; + +template<typename Data> +class MessengerBlockingQueue +{ + public: + explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {} + MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {} + size_t size() const + { + return queue_.size(); + } + // deactivate and clear queue + void deactivate() + { + { + std::lock_guard<std::mutex> lk(mutex_); + active_ = false; + while(!queue_.empty()) + queue_.pop(); + } + + // release all waiting threads + can_pop_.notify_all(); + can_push_.notify_all(); + } + void activate() + { + std::lock_guard<std::mutex> lk(mutex_); + active_ = true; + } + bool push(Data const& value) + { + bool rc; + { + std::unique_lock<std::mutex> lk(mutex_); + rc = push_(value); + } + if(rc) + can_pop_.notify_one(); + + return rc; + } + bool waitAndPush(Data& value) + { + bool rc; + { + std::unique_lock<std::mutex> lk(mutex_); + if(!active_) + return false; + // in case of spurious wakeup, loop until predicate in lambda + // is satisfied. + can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; }); + rc = push_(value); + } + if(rc) + can_pop_.notify_one(); + + return rc; + } + bool pop(Data& value) + { + bool rc; + { + std::unique_lock<std::mutex> lk(mutex_); + rc = pop_(value); + } + if(rc) + can_push_.notify_one(); + + return rc; + } + bool waitAndPop(Data& value) + { + bool rc; + { + std::unique_lock<std::mutex> lk(mutex_); + if(!active_) + return false; + // in case of spurious wakeup, loop until predicate in lambda + // is satisfied. + can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; }); + rc = pop_(value); + } + if(rc) + can_push_.notify_one(); + + return rc; + } + + private: + bool push_(Data const& value) + { + if(queue_.size() == max_size_ || !active_) + return false; + queue_.push(value); + + return true; + } + bool pop_(Data& value) + { + if(queue_.empty() || !active_) + return false; + value = queue_.front(); + queue_.pop(); + + return true; + } + std::queue<Data> queue_; + mutable std::mutex mutex_; + std::condition_variable can_pop_; + std::condition_variable can_push_; + bool active_; + size_t max_size_; +}; +struct BufferSrc +{ + BufferSrc(void) : BufferSrc("") {} + explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr) + {} + BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr) + : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr) + {} + bool fromDisk(void) + { + return !file_.empty() && framePtr_ == nullptr; + } + size_t index() const + { + return clientFrameId_; + } + std::string file_; + size_t clientFrameId_; + size_t frameId_; + uint8_t* framePtr_; +}; + +struct Messenger; +static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch); +static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch); +static void processorThread(Messenger* messenger, std::function<void(std::string)> processor); + +struct Messenger +{ + explicit Messenger(MessengerInit init) + : running(true), _initialized(false), _shutdown(false), init_(init), + outboundSynch_(nullptr), + inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr), + uncompressed_fd_(0), compressed_fd_(0) + {} + virtual ~Messenger(void) + { + running = false; + sendQueue.deactivate(); + receiveQueue.deactivate(); + + if (outboundSynch_) { + outboundSynch_->post(SYNCH_RECEIVE_READY); + outbound.join(); + } + + if (inboundSynch_) { + inboundSynch_->post(SYNCH_SENT); + inbound.join(); + } + + for(auto& p : processors_) + p.join(); + + delete outboundSynch_; + delete inboundSynch_; + + deinitShm(); + } + void startThreads(void) { + outboundSynch_ = + new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch); + outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_); + + inboundSynch_ = + new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch); + inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_); + + for(size_t i = 0; i < init_.numProcessingThreads_; ++i) + processors_.push_back(std::thread(processorThread, this, init_.processor_)); + } + bool initBuffers(void) + { + bool rc = true; + if(init_.uncompressedFrameSize_) + { + rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf, + init_.uncompressedFrameSize_ * init_.numFrames_, + &uncompressed_fd_, &uncompressed_buffer_); + } + if(init_.compressedFrameSize_) + { + rc = rc && SharedMemoryManager::initShm(grokCompressedBuf, + init_.compressedFrameSize_ * init_.numFrames_, + &compressed_fd_, &compressed_buffer_); + } + + return rc; + } + + bool deinitShm(void) + { + bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf, + init_.uncompressedFrameSize_ * init_.numFrames_, + uncompressed_fd_, &uncompressed_buffer_); + rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf, + init_.compressedFrameSize_ * init_.numFrames_, + compressed_fd_, &compressed_buffer_); + + return rc; + } + template<typename... Args> + void send(const std::string& str, Args... args) + { + std::ostringstream oss; + oss << str; + int dummy[] = {0, ((void)(oss << ',' << args), 0)...}; + static_cast<void>(dummy); + + sendQueue.push(oss.str()); + } + + bool launchGrok( + boost::filesystem::path const& dir, + uint32_t width, + uint32_t stride, + uint32_t height, + uint32_t samplesPerPixel, + uint32_t depth, + int device, + bool is4K, + uint32_t fps, + uint32_t bandwidth, + const std::string server, + uint32_t port, + const std::string license + ) + { + + std::unique_lock<std::mutex> lk(shutdownMutex_); + if (async_result_.valid()) + return true; + if(MessengerInit::firstLaunch(true)) + init_.unlink(); + startThreads(); + char _cmd[4096]; + auto fullServer = server + ":" + std::to_string(port); + sprintf(_cmd, + "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -k 1 " + "-G %d -%s %d,%d -j %s -J %s -v", + GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth, + device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth, + license.c_str(), fullServer.c_str()); + + return launch(_cmd, dir); + } + void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) + { + // client fills queue with pending uncompressed buffers + init_.uncompressedFrameSize_ = uncompressedFrameSize; + init_.compressedFrameSize_ = compressedFrameSize; + init_.numFrames_ = numFrames; + initBuffers(); + auto ptr = uncompressed_buffer_; + for(size_t i = 0; i < init_.numFrames_; ++i) + { + availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr)); + ptr += init_.uncompressedFrameSize_; + } + + std::unique_lock<std::mutex> lk(shutdownMutex_); + _initialized = true; + clientInitializedCondition_.notify_all(); + } + + bool waitForClientInit() + { + if (_initialized) { + return true; + } else if (_shutdown) { + return false; + } + + std::unique_lock<std::mutex> lk(shutdownMutex_); + + if (_initialized) { + return true; + } else if (_shutdown) { + return false; + } + + while (true) { + if (clientInitializedCondition_.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) { + break; + } + auto status = async_result_.wait_for(std::chrono::milliseconds(100)); + if (status == std::future_status::ready) { + getMessengerLogger()->error("Grok exited unexpectedly during initialization"); + return false; + } + } + + return _initialized && !_shutdown; + } + + static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) + { + return sizeof(uint16_t) * w * h * samplesPerPixel; + } + void reclaimCompressed(size_t frameId) + { + availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); + } + void reclaimUncompressed(size_t frameId) + { + availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); + } + uint8_t* getUncompressedFrame(size_t frameId) + { + assert(frameId < init_.numFrames_); + if(frameId >= init_.numFrames_) + return nullptr; + + return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_); + } + uint8_t* getCompressedFrame(size_t frameId) + { + assert(frameId < init_.numFrames_); + if(frameId >= init_.numFrames_) + return nullptr; + + return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_); + } + std::atomic_bool running; + bool _initialized; + bool _shutdown; + MessengerBlockingQueue<std::string> sendQueue; + MessengerBlockingQueue<std::string> receiveQueue; + MessengerBlockingQueue<BufferSrc> availableBuffers_; + MessengerInit init_; + std::string cmd_; + std::future<int> async_result_; + std::mutex shutdownMutex_; + std::condition_variable shutdownCondition_; + + protected: + std::condition_variable clientInitializedCondition_; + private: + bool launch(std::string const& cmd, boost::filesystem::path const& dir) + { + // Change the working directory + if(!dir.empty()) + { + boost::system::error_code ec; + boost::filesystem::current_path(dir, ec); + if (ec) { + getMessengerLogger()->error("Error: failed to change the working directory"); + return false; + } + } + // Execute the command using std::async and std::system + cmd_ = cmd; + getMessengerLogger()->info(cmd.c_str()); + async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); }); + bool success = async_result_.valid(); + if (!success) + getMessengerLogger()->error("Grok launch failed"); + + return success; + + } + std::thread outbound; + Synch* outboundSynch_; + + std::thread inbound; + Synch* inboundSynch_; + + std::vector<std::thread> processors_; + char* uncompressed_buffer_; + char* compressed_buffer_; + + grk_handle uncompressed_fd_; + grk_handle compressed_fd_; +}; + +static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch) +{ + grk_handle shm_fd = 0; + char* send_buffer = nullptr; + + if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) + return; + while(messenger->running) + { + synch->wait(SYNCH_RECEIVE_READY); + if(!messenger->running) + break; + std::string message; + if(!messenger->sendQueue.waitAndPop(message)) + break; + if(!messenger->running) + break; + memcpy(send_buffer, message.c_str(), message.size() + 1); + synch->post(SYNCH_SENT); + } + SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer); +} + +static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch) +{ + grk_handle shm_fd = 0; + char* receive_buffer = nullptr; + + if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) + return; + while(messenger->running) + { + synch->wait(SYNCH_SENT); + if(!messenger->running) + break; + auto message = std::string(receive_buffer); + synch->post(SYNCH_RECEIVE_READY); + messenger->receiveQueue.push(message); + } + SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer); +} +struct Msg +{ + explicit Msg(const std::string &msg) : ct_(0) + { + std::stringstream ss(msg); + while(ss.good()) + { + std::string substr; + std::getline(ss, substr, ','); + cs_.push_back(substr); + } + } + std::string next() + { + if(ct_ == cs_.size()) + { + getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty."); + return ""; + } + return cs_[ct_++]; + } + + uint32_t nextUint(void) + { + return (uint32_t)std::stoi(next()); + } + + std::vector<std::string> cs_; + size_t ct_; +}; + +static void processorThread(Messenger* messenger, std::function<void(std::string)> processor) +{ + while (messenger->running) { + std::string message; + if (!messenger->receiveQueue.waitAndPop(message)) { + break; + } + + if (!messenger->running) { + break; + } + + Msg msg(message); + auto tag = msg.next(); + if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) { + auto width = msg.nextUint(); + msg.nextUint(); // stride + auto height = msg.nextUint(); + auto samples_per_pixel = msg.nextUint(); + msg.nextUint(); // depth + messenger->init_.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); + auto compressed_frame_size = msg.nextUint(); + auto num_frames = msg.nextUint(); + messenger->initClient(compressed_frame_size, compressed_frame_size, num_frames); + } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) { + messenger->reclaimUncompressed(msg.nextUint()); + } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) { + messenger->reclaimCompressed(msg.nextUint()); + } + processor(message); + } +} + +template<typename F> +struct ScheduledFrames +{ + void store(F const& val) + { + std::unique_lock<std::mutex> lk(mapMutex_); + auto it = map_.find(val.index()); + if (it == map_.end()) + map_.emplace(std::make_pair(val.index(), val)); + } + boost::optional<F> retrieve(size_t index) + { + std::unique_lock<std::mutex> lk(mapMutex_); + auto it = map_.find(index); + if(it == map_.end()) + return {}; + + F val = it->second; + map_.erase(index); + + return val; + } + + private: + std::mutex mapMutex_; + std::map<size_t, F> map_; +}; + +template<typename F> +struct ScheduledMessenger : public Messenger +{ + explicit ScheduledMessenger(MessengerInit init) : Messenger(init), + framesScheduled_(0), + framesCompressed_(0) + {} + ~ScheduledMessenger(void) { + shutdown(); + } + bool scheduleCompress(F const& proxy, std::function<void(BufferSrc const&)> converter){ + size_t frameSize = init_.uncompressedFrameSize_; + assert(frameSize >= init_.uncompressedFrameSize_); + BufferSrc src; + if(!availableBuffers_.waitAndPop(src)) + return false; + converter(src); + scheduledFrames_.store(proxy); + framesScheduled_++; + send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); + + return true; + } + void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) { + Msg msg(message); + msg.next(); + auto clientFrameId = msg.nextUint(); + auto compressedFrameId = msg.nextUint(); + auto compressedFrameLength = msg.nextUint(); + if (!needsRecompression) { + auto src_frame = scheduledFrames_.retrieve(clientFrameId); + if (!src_frame) { + return; + } + processor(*src_frame, getCompressedFrame(compressedFrameId),compressedFrameLength); + } + ++framesCompressed_; + send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); + if (_shutdown && framesCompressed_ == framesScheduled_) + shutdownCondition_.notify_all(); + } + void shutdown(void){ + try { + std::unique_lock<std::mutex> lk(shutdownMutex_); + if (!async_result_.valid()) + return; + _shutdown = true; + if (framesScheduled_) { + uint32_t scheduled = framesScheduled_; + send(GRK_MSGR_BATCH_FLUSH, scheduled); + shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; }); + } + availableBuffers_.deactivate(); + send(GRK_MSGR_BATCH_SHUTDOWN); + int result = async_result_.get(); + if(result != 0) + getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); + } catch (std::exception &ex) { + getMessengerLogger()->error("%s",ex.what()); + } + + } + + boost::optional<F> retrieve(size_t index) { + return scheduledFrames_.retrieve(index); + } + + void store(F& val) { + scheduledFrames_.store(val); + } + +private: + ScheduledFrames<F> scheduledFrames_; + std::atomic<uint32_t> framesScheduled_; + std::atomic<uint32_t> framesCompressed_; +}; + +} // namespace grk_plugin diff --git a/src/lib/grok_j2k_encoder_thread.cc b/src/lib/grok_j2k_encoder_thread.cc new file mode 100644 index 000000000..79fb1bbae --- /dev/null +++ b/src/lib/grok_j2k_encoder_thread.cc @@ -0,0 +1,51 @@ +#include "config.h" +#include "cross.h" +#include "dcpomatic_log.h" +#include "dcp_video.h" +#include "grok/context.h" +#include "grok_j2k_encoder_thread.h" +#include "j2k_encoder.h" +#include "scope_guard.h" +#include "util.h" + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +GrokJ2KEncoderThread::GrokJ2KEncoderThread(J2KEncoder& encoder, grk_plugin::GrokContext* context) + : J2KEncoderThread(encoder) + , _context(context) +{ + +} + + +void +GrokJ2KEncoderThread::run() +try +{ + while (true) + { + LOG_TIMING("encoder-sleep thread=%1", thread_id()); + auto frame = _encoder.pop(); + + ScopeGuard frame_guard([this, &frame]() { + LOG_ERROR("Failed to schedule encode of %1 using grok", frame.index()); + _encoder.retry(frame); + }); + + LOG_TIMING("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), frame.index(), static_cast<int>(frame.eyes())); + + auto grok = Config::instance()->grok().get_value_or({}); + + if (_context->launch(frame, grok.selected) && _context->scheduleCompress(frame)) { + frame_guard.cancel(); + } + } +} catch (boost::thread_interrupted& e) { +} catch (...) { + store_current(); +} diff --git a/src/lib/grok_j2k_encoder_thread.h b/src/lib/grok_j2k_encoder_thread.h new file mode 100644 index 000000000..8171830a9 --- /dev/null +++ b/src/lib/grok_j2k_encoder_thread.h @@ -0,0 +1,23 @@ +#include "exception_store.h" +#include "j2k_encoder_thread.h" +#include <dcp/data.h> + + +class DCPVideo; + +namespace grk_plugin { + class GrokContext; +} + + +class GrokJ2KEncoderThread : public J2KEncoderThread, public ExceptionStore +{ +public: + GrokJ2KEncoderThread(J2KEncoder& encoder, grk_plugin::GrokContext* context); + + void run() override; + +private: + grk_plugin::GrokContext* _context; +}; + diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index d2e840f85..e68402483 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -32,6 +32,12 @@ #include "encode_server_description.h" #include "encode_server_finder.h" #include "film.h" +#include "cpu_j2k_encoder_thread.h" +#ifdef DCPOMATIC_GROK +#include "grok/context.h" +#include "grok_j2k_encoder_thread.h" +#endif +#include "remote_j2k_encoder_thread.h" #include "j2k_encoder.h" #include "log.h" #include "player_video.h" @@ -44,6 +50,7 @@ using std::cout; +using std::dynamic_pointer_cast; using std::exception; using std::list; using std::make_shared; @@ -53,6 +60,33 @@ using boost::optional; using dcp::Data; using namespace dcpomatic; +#ifdef DCPOMATIC_GROK + +namespace grk_plugin { + +IMessengerLogger* sLogger = nullptr; + +#if defined(__GNUC__) || defined(__clang__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-function" +#endif +void setMessengerLogger(grk_plugin::IMessengerLogger* logger) +{ + delete sLogger; + sLogger = logger; +} +#if defined(__GNUC__) || defined(__clang__) +#pragma GCC diagnostic pop +#endif +grk_plugin::IMessengerLogger* getMessengerLogger(void) +{ + return sLogger; +} + +} + +#endif + /** @param film Film that we are encoding. * @param writer Writer that we are using. @@ -62,6 +96,13 @@ J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer) , _history (200) , _writer (writer) { +#ifdef DCPOMATIC_GROK + auto grok = Config::instance()->grok().get_value_or({}); + _dcpomatic_context = new grk_plugin::DcpomaticContext(film, writer, _history, grok.binary_location); + if (grok.enable) { + _context = new grk_plugin::GrokContext(_dcpomatic_context); + } +#endif servers_list_changed (); } @@ -70,8 +111,29 @@ J2KEncoder::~J2KEncoder () { _server_found_connection.disconnect(); - boost::mutex::scoped_lock lm (_threads_mutex); - terminate_threads (); + terminate_threads(); + +#ifdef DCPOMATIC_GROK + delete _context; + delete _dcpomatic_context; +#endif +} + + +void +J2KEncoder::servers_list_changed() +{ + auto config = Config::instance(); +#ifdef DCPOMATIC_GROK + auto const grok_enable = config->grok().get_value_or({}).enable; +#else + auto const grok_enable = false; +#endif + + auto const cpu = (grok_enable || config->only_servers_encode()) ? 0 : config->master_encoding_threads(); + auto const gpu = grok_enable ? config->master_encoding_threads() : 0; + + remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers()); } @@ -85,7 +147,40 @@ J2KEncoder::begin () void -J2KEncoder::end () +J2KEncoder::pause() +{ +#ifdef DCPOMATIC_GROK + if (!Config::instance()->grok().get_value_or({}).enable) { + return; + } + return; + + terminate_threads (); + + /* Something might have been thrown during terminate_threads */ + rethrow (); + + delete _context; + _context = nullptr; +#endif +} + + +void J2KEncoder::resume() +{ +#ifdef DCPOMATIC_GROK + if (!Config::instance()->grok().get_value_or({}).enable) { + return; + } + + _context = new grk_plugin::GrokContext(_dcpomatic_context); + servers_list_changed(); +#endif +} + + +void +J2KEncoder::end() { boost::mutex::scoped_lock lock (_queue_mutex); @@ -94,18 +189,13 @@ J2KEncoder::end () /* Keep waking workers until the queue is empty */ while (!_queue.empty ()) { rethrow (); - _empty_condition.notify_all (); _full_condition.wait (lock); } - lock.unlock (); LOG_GENERAL_NC (N_("Terminating encoder threads")); - { - boost::mutex::scoped_lock lm (_threads_mutex); - terminate_threads (); - } + terminate_threads (); /* Something might have been thrown during terminate_threads */ rethrow (); @@ -120,20 +210,35 @@ J2KEncoder::end () So just mop up anything left in the queue here. */ - - for (auto const& i: _queue) { - LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); - try { - _writer.write( - make_shared<dcp::ArrayData>(i.encode_locally()), - i.index(), - i.eyes() - ); - frame_done (); - } catch (std::exception& e) { - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); + for (auto & i: _queue) { +#ifdef DCPOMATIC_GROK + if (Config::instance()->grok().get_value_or({}).enable) { + if (!_context->scheduleCompress(i)){ + LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index()); + // handle error + } + } else { +#else + { +#endif + LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); + try { + _writer.write( + make_shared<dcp::ArrayData>(i.encode_locally()), + i.index(), + i.eyes() + ); + frame_done (); + } catch (std::exception& e) { + LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); + } } } + +#ifdef DCPOMATIC_GROK + delete _context; + _context = nullptr; +#endif } @@ -183,7 +288,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time) size_t threads = 0; { boost::mutex::scoped_lock lm (_threads_mutex); - threads = _threads->size(); + threads = _threads.size(); } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -223,13 +328,14 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time) LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time)); /* Queue this new frame for encoding */ LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ()); - _queue.push_back (DCPVideo( + auto dcpv = DCPVideo( pv, position, _film->video_frame_rate(), _film->j2k_bandwidth(), _film->resolution() - )); + ); + _queue.push_back (dcpv); /* The queue might not be empty any more, so notify anything which is waiting on that. @@ -242,170 +348,143 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time) } -/** Caller must hold a lock on _threads_mutex */ void J2KEncoder::terminate_threads () { + boost::mutex::scoped_lock lm(_threads_mutex); boost::this_thread::disable_interruption dis; - if (!_threads) { - return; - } - - _threads->interrupt_all (); - try { - _threads->join_all (); - } catch (exception& e) { - LOG_ERROR ("join() threw an exception: %1", e.what()); - } catch (...) { - LOG_ERROR_NC ("join() threw an exception"); + for (auto& thread: _threads) { + thread->stop(); } - _threads.reset (); + _threads.clear(); + _ending = true; } void -J2KEncoder::encoder_thread (optional<EncodeServerDescription> server) -try +#ifdef DCPOMATIC_GROK +J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers) +#else +J2KEncoder::remake_threads(int cpu, int, list<EncodeServerDescription> servers) +#endif { - start_of_thread ("J2KEncoder"); + boost::mutex::scoped_lock lm (_threads_mutex); + if (_ending) { + return; + } - if (server) { - LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ()); - } else { - LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ()); + auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) { + for (auto i = wanted; i < current; ++i) { + auto iter = std::find_if(_threads.begin(), _threads.end(), predicate); + if (iter != _threads.end()) { + (*iter)->stop(); + _threads.erase(iter); + } + } + }; + + + /* CPU */ + + auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) { + return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread)); + }; + + auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread); + + for (auto i = current_cpu_threads; i < cpu; ++i) { + auto thread = make_shared<CPUJ2KEncoderThread>(*this); + thread->start(); + _threads.push_back(thread); } - /* Number of seconds that we currently wait between attempts - to connect to the server; not relevant for localhost - encodings. - */ - int remote_backoff = 0; + remove_threads(cpu, current_cpu_threads, is_cpu_thread); + +#ifdef DCPOMATIC_GROK + /* GPU */ + + auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) { + return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread)); + }; + + auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread); + + for (auto i = current_gpu_threads; i < gpu; ++i) { + auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context); + thread->start(); + _threads.push_back(thread); + } + + remove_threads(gpu, current_gpu_threads, is_grok_thread); +#endif - while (true) { + /* Remote */ - LOG_TIMING ("encoder-sleep thread=%1", thread_id ()); - boost::mutex::scoped_lock lock (_queue_mutex); - while (_queue.empty ()) { - _empty_condition.wait (lock); + for (auto const& server: servers) { + if (!server.current_link_version()) { + continue; } - LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); - auto vf = _queue.front (); + auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) { + auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread); + return remote && remote->server().host_name() == server.host_name(); + }; - /* We're about to commit to either encoding this frame or putting it back onto the queue, - so we must not be interrupted until one or other of these things have happened. This - block has thread interruption disabled. - */ - { - boost::this_thread::disable_interruption dis; - - LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes())); - _queue.pop_front (); - - lock.unlock (); - - shared_ptr<Data> encoded; - - /* We need to encode this input */ - if (server) { - try { - encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get())); - - if (remote_backoff > 0) { - LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); - } - - /* This job succeeded, so remove any backoff */ - remote_backoff = 0; - - } catch (std::exception& e) { - if (remote_backoff < 60) { - /* back off more */ - remote_backoff += 10; - } - LOG_ERROR ( - N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), - vf.index(), server->host_name(), e.what(), remote_backoff - ); - } - - } else { - try { - LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - encoded = make_shared<dcp::ArrayData>(vf.encode_locally()); - LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - } catch (std::exception& e) { - /* This is very bad, so don't cope with it, just pass it on */ - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); - throw; - } - } + auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread); - if (encoded) { - _writer.write(encoded, vf.index(), vf.eyes()); - frame_done (); - } else { - lock.lock (); - LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); - _queue.push_front (vf); - lock.unlock (); - } + auto const wanted_threads = server.threads(); + + if (wanted_threads > current_threads) { + LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name()); + } else if (wanted_threads < current_threads) { + LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name()); } - if (remote_backoff > 0) { - boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff)); + for (auto i = current_threads; i < wanted_threads; ++i) { + auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server); + thread->start(); + _threads.push_back(thread); } - /* The queue might not be full any more, so notify anything that is waiting on that */ - lock.lock (); - _full_condition.notify_all (); + remove_threads(wanted_threads, current_threads, is_remote_thread); } -} -catch (boost::thread_interrupted& e) { - /* Ignore these and just stop the thread */ - _full_condition.notify_all (); -} -catch (...) -{ - store_current (); - /* Wake anything waiting on _full_condition so it can see the exception */ - _full_condition.notify_all (); + + _writer.set_encoder_threads(_threads.size()); } -void -J2KEncoder::servers_list_changed () +DCPVideo +J2KEncoder::pop() { - boost::mutex::scoped_lock lm (_threads_mutex); + boost::mutex::scoped_lock lock(_queue_mutex); + while (_queue.empty()) { + _empty_condition.wait (lock); + } - terminate_threads (); - _threads = make_shared<boost::thread_group>(); + LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); - /* XXX: could re-use threads */ + auto vf = _queue.front(); + _queue.pop_front(); - if (!Config::instance()->only_servers_encode ()) { - for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { -#ifdef DCPOMATIC_LINUX - auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>())); - pthread_setname_np (t->native_handle(), "encode-worker"); -#else - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>())); -#endif - } - } + _full_condition.notify_all(); + return vf; +} - for (auto i: EncodeServerFinder::instance()->servers()) { - if (!i.current_link_version()) { - continue; - } - LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); - for (int j = 0; j < i.threads(); ++j) { - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); - } - } +void +J2KEncoder::retry(DCPVideo video) +{ + boost::mutex::scoped_lock lock(_queue_mutex); + _queue.push_front(video); + _empty_condition.notify_all(); +} - _writer.set_encoder_threads(_threads->size()); + +void +J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes) +{ + _writer.write(data, index, eyes); + frame_done(); } diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h index 63228a6b8..6bfbaea49 100644 --- a/src/lib/j2k_encoder.h +++ b/src/lib/j2k_encoder.h @@ -32,6 +32,7 @@ #include "enum_indexed_vector.h" #include "event_history.h" #include "exception_store.h" +#include "j2k_encoder_thread.h" #include "writer.h" #include <boost/optional.hpp> #include <boost/signals2.hpp> @@ -48,6 +49,15 @@ class Film; class Job; class PlayerVideo; +namespace grk_plugin { + struct DcpomaticContext; + struct GrokContext; +} + +struct local_threads_created_and_destroyed; +struct remote_threads_created_and_destroyed; +struct frames_not_lost_when_threads_disappear; + /** @class J2KEncoder * @brief Class to manage encoding to J2K. @@ -70,19 +80,27 @@ public: /** Called to pass a bit of video to be encoded as the next DCP frame */ void encode (std::shared_ptr<PlayerVideo> pv, dcpomatic::DCPTime time); + void pause(); + void resume(); + /** Called when a processing run has finished */ - void end (); + void end(); boost::optional<float> current_encoding_rate () const; int video_frames_enqueued () const; - void servers_list_changed (); + DCPVideo pop(); + void retry(DCPVideo frame); + void write(std::shared_ptr<const dcp::Data> data, int index, Eyes eyes); private: + friend struct ::local_threads_created_and_destroyed; + friend struct ::remote_threads_created_and_destroyed; + friend struct ::frames_not_lost_when_threads_disappear; void frame_done (); - - void encoder_thread (boost::optional<EncodeServerDescription>); + void servers_list_changed (); + void remake_threads(int cpu, int gpu, std::list<EncodeServerDescription> servers); void terminate_threads (); /** Film that we are encoding */ @@ -91,7 +109,7 @@ private: EventHistory _history; boost::mutex _threads_mutex; - std::shared_ptr<boost::thread_group> _threads; + std::vector<std::shared_ptr<J2KEncoderThread>> _threads; mutable boost::mutex _queue_mutex; std::list<DCPVideo> _queue; @@ -107,6 +125,13 @@ private: boost::optional<dcpomatic::DCPTime> _last_player_video_time; boost::signals2::scoped_connection _server_found_connection; + +#ifdef DCPOMATIC_GROK + grk_plugin::DcpomaticContext* _dcpomatic_context = nullptr; + grk_plugin::GrokContext *_context = nullptr; +#endif + + bool _ending = false; }; diff --git a/src/lib/j2k_encoder_thread.cc b/src/lib/j2k_encoder_thread.cc new file mode 100644 index 000000000..0199209be --- /dev/null +++ b/src/lib/j2k_encoder_thread.cc @@ -0,0 +1,38 @@ +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "j2k_encoder_thread.h" +#include "scope_guard.h" + + +J2KEncoderThread::J2KEncoderThread(J2KEncoder& encoder) + : _encoder(encoder) +{ + +} + + +void +J2KEncoderThread::start() +{ + _thread = boost::thread(boost::bind(&J2KEncoderThread::run, this)); +#ifdef DCPOMATIC_LINUX + pthread_setname_np(_thread.native_handle(), "encode-worker"); +#endif +} + + +void +J2KEncoderThread::stop() +{ + _thread.interrupt(); + try { + _thread.join(); + } catch (std::exception& e) { + LOG_ERROR("join() threw an exception: %1", e.what()); + } catch (...) { + LOG_ERROR_NC("join() threw an exception"); + } +} + + diff --git a/src/lib/j2k_encoder_thread.h b/src/lib/j2k_encoder_thread.h new file mode 100644 index 000000000..4a4b25f37 --- /dev/null +++ b/src/lib/j2k_encoder_thread.h @@ -0,0 +1,32 @@ +#ifndef DCPOMATIC_J2K_ENCODER_THREAD +#define DCPOMATIC_J2K_ENCODER_THREAD + + +#include <boost/thread.hpp> + + +class J2KEncoder; + + +class J2KEncoderThread +{ +public: + J2KEncoderThread(J2KEncoder& encoder); + + J2KEncoderThread(J2KEncoderThread const&) = delete; + J2KEncoderThread& operator=(J2KEncoderThread const&) = delete; + + void start(); + void stop(); + + virtual void run() = 0; + +protected: + J2KEncoder& _encoder; + +private: + boost::thread _thread; +}; + + +#endif diff --git a/src/lib/j2k_sync_encoder_thread.cc b/src/lib/j2k_sync_encoder_thread.cc new file mode 100644 index 000000000..9398bcb85 --- /dev/null +++ b/src/lib/j2k_sync_encoder_thread.cc @@ -0,0 +1,44 @@ +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "j2k_sync_encoder_thread.h" +#include "scope_guard.h" + + +J2KSyncEncoderThread::J2KSyncEncoderThread(J2KEncoder& encoder) + : J2KEncoderThread(encoder) +{ + +} + + +void +J2KSyncEncoderThread::run() +try +{ + log_thread_start(); + + while (true) { + LOG_TIMING("encoder-sleep thread=%1", thread_id()); + auto frame = _encoder.pop(); + + ScopeGuard frame_guard([this, &frame]() { + boost::this_thread::disable_interruption dis; + _encoder.retry(frame); + }); + + LOG_TIMING("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), frame.index(), static_cast<int>(frame.eyes())); + + auto encoded = encode(frame); + + if (encoded) { + boost::this_thread::disable_interruption dis; + frame_guard.cancel(); + _encoder.write(encoded, frame.index(), frame.eyes()); + } + } +} catch (boost::thread_interrupted& e) { +} catch (...) { + store_current(); +} + diff --git a/src/lib/j2k_sync_encoder_thread.h b/src/lib/j2k_sync_encoder_thread.h new file mode 100644 index 000000000..45222279e --- /dev/null +++ b/src/lib/j2k_sync_encoder_thread.h @@ -0,0 +1,32 @@ +#ifndef DCPOMATIC_J2K_SYNC_ENCODER_THREAD_H +#define DCPOMATIC_J2K_SYNC_ENCODER_THREAD_H + + +#include "exception_store.h" +#include "j2k_encoder_thread.h" +#include <dcp/array_data.h> +#include <boost/thread.hpp> + + +class DCPVideo; +class J2KEncoder; + + +class J2KSyncEncoderThread : public J2KEncoderThread, public ExceptionStore +{ +public: + J2KSyncEncoderThread(J2KEncoder& encoder); + + J2KSyncEncoderThread(J2KSyncEncoderThread const&) = delete; + J2KSyncEncoderThread& operator=(J2KSyncEncoderThread const&) = delete; + + virtual ~J2KSyncEncoderThread() {} + + void run() override; + + virtual void log_thread_start() const = 0; + virtual std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) = 0; +}; + + +#endif diff --git a/src/lib/job.cc b/src/lib/job.cc index 727456523..e900a427c 100644 --- a/src/lib/job.cc +++ b/src/lib/job.cc @@ -655,7 +655,7 @@ void Job::cancel () { if (_thread.joinable()) { - resume(); + Job::resume(); _thread.interrupt (); _thread.join (); @@ -682,6 +682,7 @@ Job::pause_by_user () } if (paused) { + pause(); _pause_changed.notify_all (); } @@ -694,6 +695,7 @@ Job::pause_by_priority () { if (running ()) { set_state (PAUSED_BY_PRIORITY); + pause(); _pause_changed.notify_all (); } } diff --git a/src/lib/job.h b/src/lib/job.h index dc5f7bc34..a4b7a319d 100644 --- a/src/lib/job.h +++ b/src/lib/job.h @@ -56,9 +56,10 @@ public: } void start (); + virtual void pause() {} bool pause_by_user (); void pause_by_priority (); - void resume (); + virtual void resume (); void cancel (); bool is_new () const; diff --git a/src/lib/make_dcp.cc b/src/lib/make_dcp.cc index 17d45be46..d8d42f49a 100644 --- a/src/lib/make_dcp.cc +++ b/src/lib/make_dcp.cc @@ -40,8 +40,8 @@ using std::shared_ptr; using std::string; -/** Add suitable Jobs to the JobManager to create a DCP for a Film */ -void +/** Add suitable Job to the JobManager to create a DCP for a Film */ +shared_ptr<TranscodeJob> make_dcp (shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour) { if (film->dcp_name().find("/") != string::npos) { @@ -101,5 +101,7 @@ make_dcp (shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour) auto tj = make_shared<DCPTranscodeJob>(film, behaviour); tj->set_encoder (make_shared<DCPEncoder>(film, tj)); JobManager::instance()->add (tj); + + return tj; } diff --git a/src/lib/make_dcp.h b/src/lib/make_dcp.h index 9f5072782..fe0bcd2f6 100644 --- a/src/lib/make_dcp.h +++ b/src/lib/make_dcp.h @@ -25,5 +25,5 @@ class Film; -void make_dcp (std::shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour); +std::shared_ptr<TranscodeJob> make_dcp(std::shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour); diff --git a/src/lib/remote_j2k_encoder_thread.cc b/src/lib/remote_j2k_encoder_thread.cc new file mode 100644 index 000000000..eac7bb538 --- /dev/null +++ b/src/lib/remote_j2k_encoder_thread.cc @@ -0,0 +1,64 @@ +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "remote_j2k_encoder_thread.h" +#include "scope_guard.h" +#include "util.h" + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +RemoteJ2KEncoderThread::RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server) + : J2KSyncEncoderThread(encoder) + , _server(server) +{ + +} + + +void +RemoteJ2KEncoderThread::log_thread_start() const +{ + start_of_thread("RemoteJ2KEncoder"); + LOG_TIMING("start-encoder-thread thread=%1 server=%2", thread_id(), _server.host_name()); +} + + +shared_ptr<dcp::ArrayData> +RemoteJ2KEncoderThread::encode(DCPVideo const& frame) +{ + shared_ptr<dcp::ArrayData> encoded; + + try { + encoded = make_shared<dcp::ArrayData>(frame.encode_remotely(_server)); + if (_remote_backoff > 0) { + LOG_GENERAL("%1 was lost, but now she is found; removing backoff", _server.host_name()); + _remote_backoff = 0; + } + } catch (std::exception& e) { + LOG_ERROR( + N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), + frame.index(), _server.host_name(), e.what(), _remote_backoff + ); + } catch (...) { + LOG_ERROR( + N_("Remote encode of %1 on %2 failed; thread sleeping for %4s"), + frame.index(), _server.host_name(), _remote_backoff + ); + } + + if (!encoded) { + if (_remote_backoff < 60) { + /* back off more */ + _remote_backoff += 10; + } + boost::this_thread::sleep(boost::posix_time::seconds(_remote_backoff)); + } + + return encoded; +} + diff --git a/src/lib/remote_j2k_encoder_thread.h b/src/lib/remote_j2k_encoder_thread.h new file mode 100644 index 000000000..f3fe7f94a --- /dev/null +++ b/src/lib/remote_j2k_encoder_thread.h @@ -0,0 +1,21 @@ +#include "encode_server_description.h" +#include "j2k_sync_encoder_thread.h" + + +class RemoteJ2KEncoderThread : public J2KSyncEncoderThread +{ +public: + RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server); + + void log_thread_start() const override; + std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) override; + + EncodeServerDescription server() const { + return _server; + } + +private: + EncodeServerDescription _server; + /** Number of seconds that we currently wait between attempts to connect to the server */ + int _remote_backoff = 0; +}; diff --git a/src/lib/scope_guard.h b/src/lib/scope_guard.h index ac60f9fea..e0d1e81fc 100644 --- a/src/lib/scope_guard.h +++ b/src/lib/scope_guard.h @@ -45,11 +45,19 @@ public: ~ScopeGuard () { - _function(); + if (!_cancelled) { + _function(); + } + } + + void cancel() + { + _cancelled = true; } private: std::function<void()> _function; + bool _cancelled = false; }; diff --git a/src/lib/transcode_job.cc b/src/lib/transcode_job.cc index 1b2d2ddd5..7c842a99f 100644 --- a/src/lib/transcode_job.cc +++ b/src/lib/transcode_job.cc @@ -148,6 +148,20 @@ TranscodeJob::run () } +void +TranscodeJob::pause() +{ + _encoder->pause(); +} + + +void TranscodeJob::resume() +{ + _encoder->resume(); + Job::resume(); +} + + string TranscodeJob::status () const { diff --git a/src/lib/transcode_job.h b/src/lib/transcode_job.h index b05b20a16..35870231d 100644 --- a/src/lib/transcode_job.h +++ b/src/lib/transcode_job.h @@ -37,6 +37,8 @@ class Encoder; +struct frames_not_lost_when_threads_disappear; + /** @class TranscodeJob * @brief A job which transcodes a Film to another format. @@ -56,6 +58,8 @@ public: std::string name () const override; std::string json_name () const override; void run () override; + void pause() override; + void resume() override; std::string status () const override; bool enable_notify () const override { return true; @@ -64,6 +68,8 @@ public: void set_encoder (std::shared_ptr<Encoder> t); private: + friend struct ::frames_not_lost_when_threads_disappear; + virtual void post_transcode () {} float frames_per_second() const; diff --git a/src/lib/writer.h b/src/lib/writer.h index 1fbf7bbd5..0b38e9030 100644 --- a/src/lib/writer.h +++ b/src/lib/writer.h @@ -34,6 +34,7 @@ #include "exception_store.h" #include "font_id_map.h" #include "player_text.h" +#include "text_type.h" #include "weak_film.h" #include <dcp/atmos_frame.h> #include <boost/thread.hpp> diff --git a/src/lib/wscript b/src/lib/wscript index dad8947b1..67c6b5869 100644 --- a/src/lib/wscript +++ b/src/lib/wscript @@ -59,6 +59,7 @@ sources = """ content_factory.cc combine_dcp_job.cc copy_dcp_details_to_film.cc + cpu_j2k_encoder_thread.cc create_cli.cc crop.cc cross_common.cc @@ -138,6 +139,8 @@ sources = """ job.cc job_manager.cc j2k_encoder.cc + j2k_encoder_thread.cc + j2k_sync_encoder_thread.cc json_server.cc kdm_cli.cc kdm_recipient.cc @@ -163,6 +166,7 @@ sources = """ referenced_reel_asset.cc release_notes.cc render_text.cc + remote_j2k_encoder_thread.cc resampler.cc resolution.cc rgba.cc @@ -241,6 +245,9 @@ def build(bld): if bld.env.TARGET_LINUX: obj.uselib += ' POLKIT' + if bld.env.ENABLE_GROK: + obj.source += ' grok_j2k_encoder_thread.cc' + if bld.env.TARGET_WINDOWS_64 or bld.env.TARGET_WINDOWS_32: obj.uselib += ' WINSOCK2 DBGHELP SHLWAPI MSWSOCK BOOST_LOCALE SETUPAPI OLE32 UUID' obj.source += ' cross_windows.cc' diff --git a/src/tools/dcpomatic.cc b/src/tools/dcpomatic.cc index 979672fb4..db18d1a2f 100644 --- a/src/tools/dcpomatic.cc +++ b/src/tools/dcpomatic.cc @@ -75,6 +75,9 @@ #include "lib/ffmpeg_encoder.h" #include "lib/film.h" #include "lib/font_config.h" +#ifdef DCPOMATIC_GROK +#include "lib/grok/context.h" +#endif #include "lib/hints.h" #include "lib/job_manager.h" #include "lib/kdm_with_metadata.h" @@ -1180,6 +1183,7 @@ private: FontConfig::drop(); ev.Skip (); + JobManager::drop (); } void active_jobs_changed() @@ -1711,6 +1715,10 @@ private: notes.Centre(); notes.ShowModal(); } + +#ifdef DCPOMATIC_GROK + grk_plugin::setMessengerLogger(new grk_plugin::GrokLogger("[GROK] ")); +#endif } catch (exception& e) { diff --git a/src/tools/dcpomatic_batch.cc b/src/tools/dcpomatic_batch.cc index dc092bf8c..d112f2060 100644 --- a/src/tools/dcpomatic_batch.cc +++ b/src/tools/dcpomatic_batch.cc @@ -31,6 +31,9 @@ #include "lib/config.h" #include "lib/dcpomatic_socket.h" #include "lib/film.h" +#ifdef DCPOMATIC_GROK +#include "lib/grok/context.h" +#endif #include "lib/job.h" #include "lib/job_manager.h" #include "lib/make_dcp.h" @@ -288,6 +291,7 @@ private: } ev.Skip (); + JobManager::drop (); } void file_add_film () @@ -495,6 +499,10 @@ class App : public wxApp } } +#ifdef DCPOMATIC_GROK + grk_plugin::setMessengerLogger(new grk_plugin::GrokLogger("[GROK] ")); +#endif + return true; } diff --git a/src/tools/dcpomatic_cli.cc b/src/tools/dcpomatic_cli.cc index 96bf83086..2abc3a149 100644 --- a/src/tools/dcpomatic_cli.cc +++ b/src/tools/dcpomatic_cli.cc @@ -413,7 +413,7 @@ main (int argc, char* argv[]) signal_manager = new SignalManager (); if (no_remote || export_format) { - EncodeServerFinder::instance()->stop (); + EncodeServerFinder::drop(); } if (json_port) { diff --git a/src/tools/dcpomatic_disk.cc b/src/tools/dcpomatic_disk.cc index 5941d6d70..989fe3385 100644 --- a/src/tools/dcpomatic_disk.cc +++ b/src/tools/dcpomatic_disk.cc @@ -269,6 +269,7 @@ private: } ev.Skip (); + JobManager::drop (); } void copy () diff --git a/src/tools/dcpomatic_server.cc b/src/tools/dcpomatic_server.cc index e5e3a7e5a..b7100d62a 100644 --- a/src/tools/dcpomatic_server.cc +++ b/src/tools/dcpomatic_server.cc @@ -25,6 +25,9 @@ #include "lib/encoded_log_entry.h" #include "lib/encode_server.h" #include "lib/config.h" +#ifdef DCPOMATIC_GROK +#include "lib/grok/context.h" +#endif #include "lib/log.h" #include "lib/signaller.h" #include "lib/cross.h" @@ -326,6 +329,10 @@ private: SetExitOnFrameDelete (false); +#ifdef DCPOMATIC_GROK + grk_plugin::setMessengerLogger(new grk_plugin::GrokLogger("[GROK] ")); +#endif + return true; } diff --git a/src/tools/dcpomatic_server_cli.cc b/src/tools/dcpomatic_server_cli.cc index 6d7f6aba7..9e4a8814f 100644 --- a/src/tools/dcpomatic_server_cli.cc +++ b/src/tools/dcpomatic_server_cli.cc @@ -25,6 +25,9 @@ #include "lib/config.h" #include "lib/image.h" #include "lib/file_log.h" +#ifdef DCPOMATIC_GROK +#include "lib/grok/context.h" +#endif #include "lib/null_log.h" #include "lib/version.h" #include "lib/encode_server.h" @@ -109,6 +112,10 @@ main (int argc, char* argv[]) dcpomatic_log.reset (new FileLog("dcpomatic_server_cli.log")); } +#ifdef DCPOMATIC_GROK + setMessengerLogger(new grk_plugin::GrokLogger("[GROK] ")); +#endif + EncodeServer server (verbose, num_threads); try { diff --git a/src/wx/about_dialog.cc b/src/wx/about_dialog.cc index fbb89bfd6..51d49ce6c 100644 --- a/src/wx/about_dialog.cc +++ b/src/wx/about_dialog.cc @@ -86,7 +86,7 @@ AboutDialog::AboutDialog (wxWindow* parent) t = new StaticText ( this, - _("(C) 2012-2023 Carl Hetherington, Terrence Meiczinger\n Ole Laursen"), + _("(C) 2012-2023 Carl Hetherington, Terrence Meiczinger\nOle Laursen, Aaron Boxer"), wxDefaultPosition, wxDefaultSize, wxALIGN_CENTER ); @@ -99,6 +99,7 @@ AboutDialog::AboutDialog (wxWindow* parent) written_by.Add (wxT ("Terrence Meiczinger")); written_by.Add (wxT ("Mart Jansink")); written_by.Add (wxT ("Ole Laursen")); + written_by.Add (wxT ("Aaron Boxer")); add_section (_("Written by"), written_by); wxArrayString with_help_from; diff --git a/src/wx/full_config_dialog.cc b/src/wx/full_config_dialog.cc index 175d78730..bc5b5de4e 100644 --- a/src/wx/full_config_dialog.cc +++ b/src/wx/full_config_dialog.cc @@ -45,6 +45,9 @@ #include "send_test_email_dialog.h" #include "server_dialog.h" #include "static_text.h" +#ifdef DCPOMATIC_GROK +#include "grok/gpu_config_panel.h" +#endif #include "wx_util.h" #include "lib/config.h" #include "lib/cross.h" @@ -1944,6 +1947,9 @@ create_full_config_dialog () e->AddPage (new SoundPage (ps, border)); e->AddPage (new DefaultsPage (ps, border)); e->AddPage (new EncodingServersPage(ps, border)); +#ifdef DCPOMATIC_GROK + e->AddPage (new GPUPage (ps, border)); +#endif e->AddPage (new KeysPage (ps, border)); e->AddPage (new TMSPage (ps, border)); e->AddPage (new EmailPage (ps, border)); diff --git a/src/wx/grok/gpu_config_panel.h b/src/wx/grok/gpu_config_panel.h new file mode 100644 index 000000000..cbf037592 --- /dev/null +++ b/src/wx/grok/gpu_config_panel.h @@ -0,0 +1,227 @@ +/* + Copyright (C) 2023 Grok Image Compression Inc. + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>. + +*/ + + +#pragma once + +static std::vector<std::string> get_gpu_names(boost::filesystem::path binary, boost::filesystem::path filename) +{ + // Execute the GPU listing program and redirect its output to a file + if (std::system((binary.string() + " > " + filename.string()).c_str()) < 0) { + return {}; + } + + std::vector<std::string> gpu_names; + std::ifstream file(filename.c_str()); + if (file.is_open()) + { + std::string line; + while (std::getline(file, line)) + gpu_names.push_back(line); + file.close(); + } + + return gpu_names; +} + + +class GpuList : public wxPanel +{ +public: + GpuList(wxPanel* parent) + : wxPanel(parent, wxID_ANY) + { + _combo_box = new wxComboBox(this, wxID_ANY, "", wxDefaultPosition, wxSize(400, -1)); + _combo_box->Bind(wxEVT_COMBOBOX, &GpuList::OnComboBox, this); + update(); + + auto sizer = new wxBoxSizer(wxHORIZONTAL); + sizer->Add(_combo_box, 0, wxALIGN_CENTER_VERTICAL); + SetSizerAndFit(sizer); + } + + void update() + { + auto grok = Config::instance()->grok().get_value_or({}); + auto lister_binary = grok.binary_location / "gpu_lister"; + auto lister_file = grok.binary_location / "gpus.txt"; + if (boost::filesystem::exists(lister_binary)) { + auto gpu_names = get_gpu_names(lister_binary, lister_file); + + _combo_box->Clear(); + for (auto const& name: gpu_names) { + _combo_box->Append(name); + } + } + } + + void set_selection(int sel) + { + if (sel < static_cast<int>(_combo_box->GetCount())) { + _combo_box->SetSelection(sel); + } + } + +private: + void OnComboBox(wxCommandEvent&) + { + auto selection = _combo_box->GetSelection(); + if (selection != wxNOT_FOUND) { + auto grok = Config::instance()->grok().get_value_or({}); + grok.selected = selection; + Config::instance()->set_grok(grok); + } + } + + wxComboBox* _combo_box; + int _selection = 0; +}; + + +class GPUPage : public Page +{ +public: + GPUPage(wxSize panel_size, int border) + : Page(panel_size, border) + {} + + wxString GetName() const override + { + return _("GPU"); + } + +#ifdef DCPOMATIC_OSX + /* XXX: this icon does not exist */ + wxBitmap GetLargeIcon() const override + { + return wxBitmap(icon_path("gpu"), wxBITMAP_TYPE_PNG); + } +#endif + +private: + void setup() override + { + _enable_gpu = new CheckBox(_panel, _("Enable GPU acceleration")); + _panel->GetSizer()->Add(_enable_gpu, 0, wxALL | wxEXPAND, _border); + + wxFlexGridSizer* table = new wxFlexGridSizer(2, DCPOMATIC_SIZER_X_GAP, DCPOMATIC_SIZER_Y_GAP); + table->AddGrowableCol(1, 1); + _panel->GetSizer()->Add(table, 1, wxALL | wxEXPAND, _border); + + add_label_to_sizer(table, _panel, _("Acceleration binary folder"), true, 0, wxLEFT | wxLEFT | wxALIGN_CENTRE_VERTICAL); + _binary_location = new wxDirPickerCtrl(_panel, wxDD_DIR_MUST_EXIST); + table->Add(_binary_location, 1, wxEXPAND); + + add_label_to_sizer(table, _panel, _("GPU selection"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _gpu_list_control = new GpuList(_panel); + table->Add(_gpu_list_control, 1, wxEXPAND); + + add_label_to_sizer(table, _panel, _("License server"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _server = new wxTextCtrl(_panel, wxID_ANY); + table->Add(_server, 1, wxEXPAND | wxALL); + + add_label_to_sizer(table, _panel, _("Port"), false, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _port = new wxSpinCtrl(_panel, wxID_ANY); + _port->SetRange(0, 65535); + table->Add(_port); + + add_label_to_sizer(table, _panel, _("License"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _licence = new PasswordEntry(_panel); + table->Add(_licence->get_panel(), 1, wxEXPAND | wxALL); + + _enable_gpu->bind(&GPUPage::enable_gpu_changed, this); + _binary_location->Bind(wxEVT_DIRPICKER_CHANGED, boost::bind (&GPUPage::binary_location_changed, this)); + _server->Bind(wxEVT_TEXT, boost::bind(&GPUPage::server_changed, this)); + _port->Bind(wxEVT_SPINCTRL, boost::bind(&GPUPage::port_changed, this)); + _licence->Changed.connect(boost::bind(&GPUPage::licence_changed, this)); + + setup_sensitivity(); + } + + void setup_sensitivity() + { + auto grok = Config::instance()->grok().get_value_or({}); + + _binary_location->Enable(grok.enable); + _gpu_list_control->Enable(grok.enable); + _server->Enable(grok.enable); + _port->Enable(grok.enable); + _licence->get_panel()->Enable(grok.enable); + } + + void config_changed() override + { + auto grok = Config::instance()->grok().get_value_or({}); + + checked_set(_enable_gpu, grok.enable); + _binary_location->SetPath(std_to_wx(grok.binary_location.string())); + _gpu_list_control->update(); + _gpu_list_control->set_selection(grok.selected); + checked_set(_server, grok.licence_server); + checked_set(_port, grok.licence_port); + checked_set(_licence, grok.licence); + } + + void enable_gpu_changed() + { + auto grok = Config::instance()->grok().get_value_or({}); + grok.enable = _enable_gpu->GetValue(); + Config::instance()->set_grok(grok); + + setup_sensitivity(); + } + + void binary_location_changed() + { + auto grok = Config::instance()->grok().get_value_or({}); + grok.binary_location = wx_to_std(_binary_location->GetPath()); + Config::instance()->set_grok(grok); + + _gpu_list_control->update(); + } + + void server_changed() + { + auto grok = Config::instance()->grok().get_value_or({}); + grok.licence_server = wx_to_std(_server->GetValue()); + Config::instance()->set_grok(grok); + } + + void port_changed() + { + auto grok = Config::instance()->grok().get_value_or({}); + grok.licence_port = _port->GetValue(); + Config::instance()->set_grok(grok); + } + + void licence_changed() + { + auto grok = Config::instance()->grok().get_value_or({}); + grok.licence = wx_to_std(_licence->get()); + Config::instance()->set_grok(grok); + } + + CheckBox* _enable_gpu = nullptr; + wxDirPickerCtrl* _binary_location = nullptr; + GpuList* _gpu_list_control = nullptr; + wxTextCtrl* _server = nullptr; + wxSpinCtrl* _port = nullptr; + PasswordEntry* _licence = nullptr; +}; diff --git a/test/client_server_test.cc b/test/client_server_test.cc index 4f5015fc8..1bfa4c5a6 100644 --- a/test/client_server_test.cc +++ b/test/client_server_test.cc @@ -20,20 +20,18 @@ /** @file test/client_server_test.cc - * @brief Test the server class. + * @brief Test the remote encoding code. * @ingroup feature - * - * Create a test image and then encode it using the standard mechanism - * and also using a EncodeServer object running on localhost. Compare the resulting - * encoded data to check that they are the same. */ +#include "lib/content_factory.h" #include "lib/cross.h" #include "lib/dcp_video.h" #include "lib/dcpomatic_log.h" #include "lib/encode_server.h" #include "lib/encode_server_description.h" +#include "lib/encode_server_finder.h" #include "lib/file_log.h" #include "lib/image.h" #include "lib/j2k_image_proxy.h" @@ -316,3 +314,22 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) } +BOOST_AUTO_TEST_CASE(real_encode_with_server) +{ + auto content = content_factory(TestPaths::private_data() / "dolby_aurora.vob"); + auto film = new_test_film2("real_encode_with_server", content); + + EncodeServerFinder::instance(); + + EncodeServer server(true, 4); + thread server_thread(boost::bind(&EncodeServer::run, &server)); + + make_and_verify_dcp(film); + + server.stop(); + server_thread.join(); + + BOOST_CHECK(server.frames_encoded() > 0); + EncodeServerFinder::drop(); +} + diff --git a/test/j2k_encode_threading_test.cc b/test/j2k_encode_threading_test.cc new file mode 100644 index 000000000..ca9bc5cd9 --- /dev/null +++ b/test/j2k_encode_threading_test.cc @@ -0,0 +1,96 @@ +#include "lib/config.h" +#include "lib/content_factory.h" +#include "lib/dcp_encoder.h" +#include "lib/dcp_transcode_job.h" +#include "lib/encode_server_description.h" +#include "lib/film.h" +#include "lib/j2k_encoder.h" +#include "lib/job_manager.h" +#include "lib/make_dcp.h" +#include "lib/transcode_job.h" +#include "test.h" +#include <dcp/cpl.h> +#include <dcp/dcp.h> +#include <dcp/reel.h> +#include <dcp/reel_picture_asset.h> +#include <boost/test/unit_test.hpp> + + +using std::dynamic_pointer_cast; +using std::list; + + +BOOST_AUTO_TEST_CASE(local_threads_created_and_destroyed) +{ + auto film = new_test_film2("local_threads_created_and_destroyed", {}); + Writer writer(film, {}); + J2KEncoder encoder(film, writer); + + encoder.remake_threads(32, 0, {}); + BOOST_CHECK_EQUAL(encoder._threads.size(), 32U); + + encoder.remake_threads(9, 0, {}); + BOOST_CHECK_EQUAL(encoder._threads.size(), 9U); + + encoder.end(); + BOOST_CHECK_EQUAL(encoder._threads.size(), 0U); +} + + +BOOST_AUTO_TEST_CASE(remote_threads_created_and_destroyed) +{ + auto film = new_test_film2("remote_threads_created_and_destroyed", {}); + Writer writer(film, {}); + J2KEncoder encoder(film, writer); + + list<EncodeServerDescription> servers = { + { "fred", 7, SERVER_LINK_VERSION }, + { "jim", 2, SERVER_LINK_VERSION }, + { "sheila", 14, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, 0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 7U + 2U + 14U); + + servers = { + { "fred", 7, SERVER_LINK_VERSION }, + { "jim", 5, SERVER_LINK_VERSION }, + { "sheila", 14, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, 0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 7U + 5U + 14U); + + servers = { + { "fred", 0, SERVER_LINK_VERSION }, + { "jim", 0, SERVER_LINK_VERSION }, + { "sheila", 11, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, 0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 11U); +} + + +BOOST_AUTO_TEST_CASE(frames_not_lost_when_threads_disappear) +{ + auto content = content_factory(TestPaths::private_data() / "clapperboard.mp4"); + auto film = new_test_film2("frames_not_lost", content); + film->write_metadata(); + auto job = make_dcp(film, TranscodeJob::ChangedBehaviour::IGNORE); + auto& encoder = dynamic_pointer_cast<DCPEncoder>(job->_encoder)->_j2k_encoder; + + while (JobManager::instance()->work_to_do()) { + encoder.remake_threads(rand() % 8, 0, {}); + dcpomatic_sleep_seconds(1); + } + + BOOST_CHECK(!JobManager::instance()->errors()); + + dcp::DCP dcp(film->dir(film->dcp_name())); + dcp.read(); + BOOST_REQUIRE_EQUAL(dcp.cpls().size(), 1U); + BOOST_REQUIRE_EQUAL(dcp.cpls()[0]->reels().size(), 1U); + BOOST_REQUIRE_EQUAL(dcp.cpls()[0]->reels()[0]->main_picture()->intrinsic_duration(), 423U); +} + diff --git a/test/map_cli_test.cc b/test/map_cli_test.cc index a4056dc46..94eb50b5a 100644 --- a/test/map_cli_test.cc +++ b/test/map_cli_test.cc @@ -433,6 +433,7 @@ BOOST_AUTO_TEST_CASE(map_with_given_config) }; boost::filesystem::remove_all(out); + boost::filesystem::remove_all("test/data/map_with_given_config/2.18"); Config::instance()->drop(); vector<string> output_messages; diff --git a/test/test.cc b/test/test.cc index 615e9d3bc..3d8c91b26 100644 --- a/test/test.cc +++ b/test/test.cc @@ -161,7 +161,7 @@ struct TestConfig setup_test_config (); capture_ffmpeg_logs(); - EncodeServerFinder::instance()->stop (); + EncodeServerFinder::drop(); signal_manager = new TestSignalManager (); diff --git a/test/wscript b/test/wscript index e8762009c..51df1b8ed 100644 --- a/test/wscript +++ b/test/wscript @@ -111,6 +111,7 @@ def build(bld): interrupt_encoder_test.cc isdcf_name_test.cc j2k_bandwidth_test.cc + j2k_encode_threading_test.cc job_manager_test.cc kdm_cli_test.cc kdm_naming_test.cc @@ -76,6 +76,7 @@ def options(opt): opt.add_option('--workaround-gssapi', action='store_true', default=False, help='link to gssapi_krb5') opt.add_option('--use-lld', action='store_true', default=False, help='use lld linker') opt.add_option('--enable-disk', action='store_true', default=False, help='build dcpomatic2_disk tool; requires Boost process, lwext4 and nanomsg libraries') + opt.add_option('--enable-grok', action='store_true', default=False, help='build with support for grok J2K encoder') opt.add_option('--warnings-are-errors', action='store_true', default=False, help='build with -Werror') opt.add_option('--wx-config', help='path to wx-config') @@ -96,6 +97,7 @@ def configure(conf): conf.env.DEBUG = conf.options.enable_debug conf.env.STATIC_DCPOMATIC = conf.options.static_dcpomatic conf.env.ENABLE_DISK = conf.options.enable_disk + conf.env.ENABLE_GROK = conf.options.enable_grok if conf.options.destdir == '': conf.env.INSTALL_PREFIX = conf.options.prefix else: @@ -110,6 +112,8 @@ def configure(conf): '-Wextra', '-Wwrite-strings', '-Wno-error=deprecated', + # getMessengerLogger() in the grok code triggers these warnings + '-Wno-nonnull', # I tried and failed to ignore these with _Pragma '-Wno-ignored-qualifiers', '-D_FILE_OFFSET_BITS=64', @@ -140,6 +144,9 @@ def configure(conf): if conf.options.enable_disk: conf.env.append_value('CXXFLAGS', '-DDCPOMATIC_DISK') + if conf.options.enable_grok: + conf.env.append_value('CXXFLAGS', '-DDCPOMATIC_GROK') + if conf.options.use_lld: try: conf.find_program('ld.lld') @@ -606,6 +613,21 @@ def configure(conf): def build(bld): create_version_cc(VERSION, bld.env.CXXFLAGS) + # waf can't find these dependencies by itself because they are only included if DCPOMATIC_GROK is defined, + # and I can't find a way to pass that to waf's dependency scanner + if bld.env.ENABLE_GROK: + for dep in ( + 'src/lib/j2k_encoder.cc', + 'src/tools/dcpomatic.cc', + 'src/tools/dcpomatic_server.cc', + 'src/tools/dcpomatic_server_cli.cc', + 'src/tools/dcpomatic_batch.cc' + ): + bld.add_manual_dependency(bld.path.find_node(dep), bld.path.find_node('src/lib/grok/context.h')) + bld.add_manual_dependency(bld.path.find_node(dep), bld.path.find_node('src/lib/grok/messenger.h')) + + bld.add_manual_dependency(bld.path.find_node('src/wx/full_config_dialog.cc'), bld.path.find_node('src/wx/grok/gpu_config_panel.h')) + bld.recurse('src') bld.recurse('graphics') |
