From: Carl Hetherington Date: Thu, 6 Jul 2023 21:09:08 +0000 (+0200) Subject: Patch from Aaron. X-Git-Url: https://git.carlh.net/gitweb/?a=commitdiff_plain;h=7158e24762c77465b2827bfa8c96d2fe2368be37;p=dcpomatic.git Patch from Aaron. --- diff --git a/src/lib/config.cc b/src/lib/config.cc index 3366a2bbc..2287cd592 100644 --- a/src/lib/config.cc +++ b/src/lib/config.cc @@ -213,6 +213,13 @@ Config::set_defaults () set_notification_email_to_default (); set_cover_sheet_to_default (); + _gpu_binary_location = ""; + _enable_gpu = false; + _selected_gpu = 0; + _gpu_license_server = ""; + _gpu_license_port = 5000; + _gpu_license = ""; + _main_divider_sash_position = {}; _main_content_divider_sash_position = {}; @@ -634,6 +641,13 @@ try _allow_smpte_bv20 = f.optional_bool_child("AllowSMPTEBv20").get_value_or(false); _isdcf_name_part_length = f.optional_number_child("ISDCFNamePartLength").get_value_or(14); + _enable_gpu = f.optional_bool_child("EnableGpu").get_value_or(false); + _gpu_binary_location = f.string_child("GpuBinaryLocation"); + _selected_gpu = f.number_child("SelectedGpu"); + _gpu_license_server = f.string_child ("GpuLicenseServer"); + _gpu_license_port = f.number_child ("GpuLicensePort"); + _gpu_license = f.string_child("GpuLicense"); + _export.read(f.optional_node_child("Export")); } catch (...) { @@ -1120,6 +1134,13 @@ Config::write_config () const /* [XML] ISDCFNamePartLength Maximum length of the "name" part of an ISDCF name, which should be 14 according to the standard */ root->add_child("ISDCFNamePartLength")->add_child_text(raw_convert(_isdcf_name_part_length)); + root->add_child("GpuBinaryLocation")->add_child_text (_gpu_binary_location); + root->add_child("EnableGpu")->add_child_text ((_enable_gpu ? "1" : "0")); + root->add_child("SelectedGpu")->add_child_text (raw_convert (_selected_gpu)); + root->add_child("GpuLicenseServer")->add_child_text (_gpu_license_server); + root->add_child("GpuLicensePort")->add_child_text (raw_convert (_gpu_license_port)); + root->add_child("GpuLicense")->add_child_text (_gpu_license); + _export.write(root->add_child("Export")); auto target = config_write_file(); diff --git a/src/lib/config.h b/src/lib/config.h index 0a332bcbb..0c9affbb6 100644 --- a/src/lib/config.h +++ b/src/lib/config.h @@ -618,6 +618,28 @@ public: return _allow_smpte_bv20; } + std::string gpu_binary_location () const { + return _gpu_binary_location; + } + + bool enable_gpu () const { + return _enable_gpu; + } + + int selected_gpu () const { + return _selected_gpu; + } + std::string gpu_license_server () const { + return _gpu_license_server; + } + + int gpu_license_port () const { + return _gpu_license_port; + } + std::string gpu_license () const { + return _gpu_license; + } + int isdcf_name_part_length() const { return _isdcf_name_part_length; } @@ -1198,11 +1220,29 @@ public: void set_allow_smpte_bv20(bool allow) { maybe_set(_allow_smpte_bv20, allow, ALLOW_SMPTE_BV20); } - + void set_gpu_binary_location (std::string location) { + maybe_set (_gpu_binary_location, location); + } + void set_enable_gpu (bool enable) { + maybe_set (_enable_gpu, enable); + } + void set_selected_gpu (int selected) { + maybe_set (_selected_gpu, selected); + } + void set_gpu_license_server (std::string s) { + maybe_set (_gpu_license_server, s); + } + void set_gpu_license_port (int p) { + maybe_set (_gpu_license_port, p); + } + void set_gpu_license (std::string p) { + maybe_set (_gpu_license, p); + } void set_isdcf_name_part_length(int length) { maybe_set(_isdcf_name_part_length, length, ISDCF_NAME_PART_LENGTH); } + void changed (Property p = OTHER); boost::signals2::signal Changed; /** Emitted if read() failed on an existing Config file. There is nothing @@ -1443,6 +1483,14 @@ private: bool _allow_smpte_bv20; int _isdcf_name_part_length; + /* GPU */ + bool _enable_gpu; + std::string _gpu_binary_location; + int _selected_gpu; + std::string _gpu_license_server; + int _gpu_license_port; + std::string _gpu_license; + ExportConfig _export; static int const _current_version; diff --git a/src/lib/dcp_encoder.cc b/src/lib/dcp_encoder.cc index 9a840c8ab..7088225cd 100644 --- a/src/lib/dcp_encoder.cc +++ b/src/lib/dcp_encoder.cc @@ -114,10 +114,17 @@ DCPEncoder::go () } _finishing = true; - _j2k_encoder.end(); + _j2k_encoder.end(true); _writer.finish(_film->dir(_film->dcp_name())); } +void DCPEncoder::pause(void) { + _j2k_encoder.pause(); +} + +void DCPEncoder::resume(void) { + _j2k_encoder.resume(); +} void DCPEncoder::video (shared_ptr data, DCPTime time) { diff --git a/src/lib/dcp_encoder.h b/src/lib/dcp_encoder.h index ad77f6951..a8043887e 100644 --- a/src/lib/dcp_encoder.h +++ b/src/lib/dcp_encoder.h @@ -52,6 +52,8 @@ public: bool finishing () const override { return _finishing; } + void pause(void) override; + void resume(void) override; private: diff --git a/src/lib/dcp_video.cc b/src/lib/dcp_video.cc index 8eb76fdd6..78e973ca3 100644 --- a/src/lib/dcp_video.cc +++ b/src/lib/dcp_video.cc @@ -118,6 +118,27 @@ DCPVideo::convert_to_xyz (shared_ptr frame, dcp::NoteHandler return xyz; } +dcp::Size +DCPVideo::get_size(void) { + auto image = _frame->image (bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false); + return image->size(); +} + +void +DCPVideo::convert_to_xyz (uint16_t *dst) { + + auto image = _frame->image (bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false); + if (_frame->colour_conversion()) { + dcp::rgb_to_xyz ( + image->data()[0], + dst, + image->size(), + image->stride()[0], + _frame->colour_conversion().get() + ); + } +} + /** J2K-encode this frame on the local host. * @return Encoded data. */ diff --git a/src/lib/dcp_video.h b/src/lib/dcp_video.h index 33df0942c..c09442d16 100644 --- a/src/lib/dcp_video.h +++ b/src/lib/dcp_video.h @@ -17,6 +17,8 @@ along with DCP-o-matic. If not, see . */ +#ifndef DCPOMATIC_DCP_VIDEO_H +#define DCPOMATIC_DCP_VIDEO_H #include "encode_server_description.h" @@ -47,6 +49,7 @@ class PlayerVideo; class DCPVideo { public: + DCPVideo (void) : DCPVideo(nullptr,0,0,0,Resolution::TWO_K){} DCPVideo (std::shared_ptr, int index, int dcp_fps, int bandwidth, Resolution r); DCPVideo (std::shared_ptr, cxml::ConstNodePtr); @@ -66,6 +69,8 @@ public: static std::shared_ptr convert_to_xyz (std::shared_ptr frame, dcp::NoteHandler note); + void convert_to_xyz (uint16_t *dst); + dcp::Size get_size(void); private: void add_metadata (xmlpp::Element *) const; @@ -76,3 +81,5 @@ private: int _j2k_bandwidth; ///< J2K bandwidth to use Resolution _resolution; ///< Resolution (2K or 4K) }; + +#endif diff --git a/src/lib/encoder.h b/src/lib/encoder.h index 9b67720d3..f921fcb51 100644 --- a/src/lib/encoder.h +++ b/src/lib/encoder.h @@ -58,6 +58,8 @@ public: /** @return the number of frames that are done */ virtual Frame frames_done () const = 0; virtual bool finishing () const = 0; + virtual void pause(void) {} + virtual void resume(void) {} protected: std::shared_ptr _film; diff --git a/src/lib/grok_context.h b/src/lib/grok_context.h new file mode 100644 index 000000000..1f9726aae --- /dev/null +++ b/src/lib/grok_context.h @@ -0,0 +1,245 @@ +/* + Copyright (C) 2023 Grok Image Compression Inc. + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + +#pragma once + +#include "config.h" +#include "log.h" +#include "dcpomatic_log.h" +#include "writer.h" +#include "grok_messenger.h" + +class Film; +using dcp::Data; +using namespace dcpomatic; + +static std::mutex launchMutex; + +namespace grk_plugin +{ + +struct GrokLogger : public MessengerLogger { + explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble) + {} + virtual ~GrokLogger() = default; + void info(const char* fmt, ...) override{ + va_list arg; + va_start(arg, fmt); + dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL); + va_end(arg); + } + void warn(const char* fmt, ...) override{ + va_list arg; + va_start(arg, fmt); + dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING); + va_end(arg); + } + void error(const char* fmt, ...) override{ + va_list arg; + va_start(arg, fmt); + dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR); + va_end(arg); + } +}; + +struct GrokInitializer { + GrokInitializer(void) { + setMessengerLogger(new GrokLogger("[GROK] ")); + } + ~GrokInitializer() = default; +}; + +struct FrameProxy { + FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo()) + {} + FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv) + {} + int index() const { + return index_; + } + Eyes eyes(void) const { + return eyes_; + } + int index_; + Eyes eyes_; + DCPVideo vf; +}; + +struct DcpomaticContext { + DcpomaticContext(std::shared_ptr film, Writer& writer, + EventHistory &history, const std::string &location) : + film_(film), writer_(writer), + history_(history), location_(location), + width_(0), height_(0) + {} + void setDimensions(uint32_t w, uint32_t h) { + width_ = w; + height_ = h; + } + std::shared_ptr film_; + Writer& writer_; + EventHistory &history_; + std::string location_; + uint32_t width_; + uint32_t height_; +}; + +class GrokContext { +public: + explicit GrokContext(const DcpomaticContext &dcpomaticContext) : + dcpomaticContext_(dcpomaticContext), + messenger_(nullptr), + launched_(false) + { + struct CompressedData : public dcp::Data { + explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen) + {} + ~CompressedData(void){ + delete[] data_; + } + uint8_t const * data () const override { + return data_; + } + uint8_t * data () override { + return data_; + } + int size () const override { + return dataLen_; + } + uint8_t *data_; + int dataLen_; + }; + if (Config::instance()->enable_gpu ()) { + boost::filesystem::path folder(dcpomaticContext_.location_); + boost::filesystem::path binaryPath = folder / "grk_compress"; + if (!boost::filesystem::exists(binaryPath)) { + getMessengerLogger()->error("Invalid binary location %s", + dcpomaticContext_.location_.c_str()); + return; + } + auto proc = [this](const std::string& str) { + try { + Msg msg(str); + auto tag = msg.next(); + if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED) + { + auto clientFrameId = msg.nextUint(); + auto compressedFrameId = msg.nextUint(); + (void)compressedFrameId; + auto compressedFrameLength = msg.nextUint(); + auto processor = + [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength) + { + auto compressedData = std::make_shared(compressedFrameLength); + memcpy(compressedData->data_,compressed,compressedFrameLength ); + dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes()); + frame_done (); + }; + int const minimum_size = 16384; + bool needsRecompression = compressedFrameLength < minimum_size; + messenger_->processCompressed(str, processor, needsRecompression); + if (needsRecompression) { + bool success = false; + auto fp = messenger_->retrieve(clientFrameId, success); + if (!success) + return; + + auto encoded = std::make_shared(fp.vf.encode_locally()); + dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes()); + frame_done (); + } + } + } catch (std::exception &ex){ + getMessengerLogger()->error("%s",ex.what()); + } + }; + auto clientInit = + MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch, + grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc, + std::thread::hardware_concurrency()); + messenger_ = new ScheduledMessenger(clientInit); + } + } + ~GrokContext(void) { + shutdown(); + } + bool launch(DCPVideo dcpv, int device){ + if (!messenger_ ) + return false; + if (launched_) + return true; + std::unique_lock lk_global(launchMutex); + if (!messenger_) + return false; + if (launched_) + return true; + if (MessengerInit::firstLaunch(true)) { + auto s = dcpv.get_size(); + dcpomaticContext_.setDimensions(s.width, s.height); + auto config = Config::instance(); + messenger_->launchGrok(dcpomaticContext_.location_, + dcpomaticContext_.width_,dcpomaticContext_.width_, + dcpomaticContext_.height_, + 3, 12, device, + dcpomaticContext_.film_->resolution() == Resolution::FOUR_K, + dcpomaticContext_.film_->video_frame_rate(), + dcpomaticContext_.film_->j2k_bandwidth(), + config->gpu_license_server(), + config->gpu_license_port(), + config->gpu_license()); + } + launched_ = messenger_->waitForClientInit(); + + return launched_; + } + bool scheduleCompress(const DCPVideo &vf){ + if (!messenger_) + return false; + + auto fp = FrameProxy(vf.index(),vf.eyes(),vf); + auto cvt = [this, &fp](BufferSrc src){ + // xyz conversion + fp.vf.convert_to_xyz((uint16_t*)src.framePtr_); + }; + return messenger_->scheduleCompress(fp, cvt); + } + void shutdown(void){ + if (!messenger_) + return; + + std::unique_lock lk_global(launchMutex); + if (!messenger_) + return; + if (launched_) + messenger_->shutdown(); + delete messenger_; + messenger_ = nullptr; + } + void frame_done () { + dcpomaticContext_.history_.event (); + } +private: + DcpomaticContext dcpomaticContext_; + ScheduledMessenger *messenger_; + bool launched_; +}; + +} + diff --git a/src/lib/grok_messenger.h b/src/lib/grok_messenger.h new file mode 100644 index 000000000..45ee752e5 --- /dev/null +++ b/src/lib/grok_messenger.h @@ -0,0 +1,930 @@ +/* + Copyright (C) 2023 Grok Image Compression Inc. + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#include +#include +#pragma warning(disable : 4100) +#else +#include +#include +#include +#include +#include +#endif + +namespace grk_plugin +{ +static std::string grokToClientMessageBuf = "Global\\grok_to_client_message"; +static std::string grokSentSynch = "Global\\grok_sent"; +static std::string clientReceiveReadySynch = "Global\\client_receive_ready"; +static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message"; +static std::string clientSentSynch = "Global\\client_sent"; +static std::string grokReceiveReadySynch = "Global\\grok_receive_ready"; +static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf"; +static std::string grokCompressedBuf = "Global\\grok_compressed_buf"; +static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE"; +static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT"; +static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED"; +static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED = + "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED"; +static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED"; +static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED = + "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED"; +static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN"; +static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH"; +static const size_t messageBufferLen = 256; +struct IMessengerLogger +{ + virtual ~IMessengerLogger(void) = default; + virtual void info(const char* fmt, ...) = 0; + virtual void warn(const char* fmt, ...) = 0; + virtual void error(const char* fmt, ...) = 0; + + protected: + template + std::string log_message(char const* const format, Args&... args) noexcept + { + constexpr size_t message_size = 512; + char message[message_size]; + + std::snprintf(message, message_size, format, args...); + return std::string(message); + } +}; +struct MessengerLogger : public IMessengerLogger +{ + explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {} + virtual ~MessengerLogger() = default; + virtual void info(const char* fmt, ...) override + { + va_list args; + std::string new_fmt = preamble_ + fmt + "\n"; + va_start(args, fmt); + vfprintf(stdout, new_fmt.c_str(), args); + va_end(args); + } + virtual void warn(const char* fmt, ...) override + { + va_list args; + std::string new_fmt = preamble_ + fmt + "\n"; + va_start(args, fmt); + vfprintf(stdout, new_fmt.c_str(), args); + va_end(args); + } + virtual void error(const char* fmt, ...) override + { + va_list args; + std::string new_fmt = preamble_ + fmt + "\n"; + va_start(args, fmt); + vfprintf(stderr, new_fmt.c_str(), args); + va_end(args); + } + + protected: + std::string preamble_; +}; + +static IMessengerLogger* sLogger = nullptr; +#if defined(__GNUC__) || defined(__clang__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-function" +#endif +static void setMessengerLogger(IMessengerLogger* logger) +{ + delete sLogger; + sLogger = logger; +} +#if defined(__GNUC__) || defined(__clang__) +#pragma GCC diagnostic pop +#endif +static IMessengerLogger* getMessengerLogger(void) +{ + return sLogger; +} +struct MessengerInit +{ + MessengerInit(const std::string &outBuf, const std::string &outSent, + const std::string &outReceiveReady, const std::string &inBuf, + const std::string &inSent, + const std::string &inReceiveReady, + std::function processor, + size_t numProcessingThreads) + : outboundMessageBuf(outBuf), outboundSentSynch(outSent), + outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf), + inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor), + numProcessingThreads_(numProcessingThreads), + uncompressedFrameSize_(0), compressedFrameSize_(0), + numFrames_(0) + { + if(firstLaunch(true)) + unlink(); + } + void unlink(void) + { +#ifndef _WIN32 + shm_unlink(grokToClientMessageBuf.c_str()); + shm_unlink(clientToGrokMessageBuf.c_str()); +#endif + } + static bool firstLaunch(bool isClient) + { + bool debugGrok = false; + return debugGrok != isClient; + } + std::string outboundMessageBuf; + std::string outboundSentSynch; + std::string outboundReceiveReadySynch; + + std::string inboundMessageBuf; + std::string inboundSentSynch; + std::string inboundReceiveReadySynch; + + std::function processor_; + size_t numProcessingThreads_; + + size_t uncompressedFrameSize_; + size_t compressedFrameSize_; + size_t numFrames_; +}; + +/*************************** Synchronization *******************************/ +enum SynchDirection +{ + SYNCH_SENT, + SYNCH_RECEIVE_READY +}; + +typedef int grk_handle; +struct Synch +{ + Synch(const std::string &sentSemName, const std::string &receiveReadySemName) + : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName) + { + // unlink semaphores in case of previous crash + if(MessengerInit::firstLaunch(true)) + unlink(); + open(); + } + ~Synch() + { + close(); + if(MessengerInit::firstLaunch(true)) + unlink(); + } + void post(SynchDirection dir) + { + auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_); + int rc = sem_post(sem); + if(rc) + getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno)); + } + void wait(SynchDirection dir) + { + auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_; + int rc = sem_wait(sem); + if(rc) + getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno)); + } + void open(void) + { + sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0); + if(!sentSem_) + getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); + receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1); + if(!receiveReadySem_) + getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); + } + void close(void) + { + int rc = sem_close(sentSem_); + if(rc) + getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(), + strerror(errno)); + rc = sem_close(receiveReadySem_); + if(rc) + getMessengerLogger()->error("Error closing semaphore %s: %s", + receiveReadySemName_.c_str(), strerror(errno)); + } + void unlink(void) + { + int rc = sem_unlink(sentSemName_.c_str()); + if(rc == -1 && errno != ENOENT) + getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(), + strerror(errno)); + rc = sem_unlink(receiveReadySemName_.c_str()); + if(rc == -1 && errno != ENOENT) + getMessengerLogger()->error("Error unlinking semaphore %s: %s", + receiveReadySemName_.c_str(), strerror(errno)); + } + sem_t* sentSem_; + sem_t* receiveReadySem_; + + private: + std::string sentSemName_; + std::string receiveReadySemName_; +}; +struct SharedMemoryManager +{ + static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer) + { + *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666); + if(*shm_fd < 0) + { + getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); + return false; + } + int rc = ftruncate(*shm_fd, sizeof(char) * len); + if(rc) + { + getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno)); + rc = close(*shm_fd); + if(rc) + getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno)); + rc = shm_unlink(name.c_str()); + if(rc) + getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno)); + return false; + } + *buffer = static_cast(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0)); + if(!*buffer) + { + getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno)); + rc = close(*shm_fd); + if(rc) + getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno)); + rc = shm_unlink(name.c_str()); + if(rc) + getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno)); + } + + return *buffer != nullptr; + } + static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer) + { + if (!*buffer || !shm_fd) + return true; + + int rc = munmap(*buffer, len); + *buffer = nullptr; + if(rc) + getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno)); + rc = close(shm_fd); + shm_fd = 0; + if(rc) + getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno)); + rc = shm_unlink(name.c_str()); + if(rc) + fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno)); + + return true; + } +}; + +template +class MessengerBlockingQueue +{ + public: + explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {} + MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {} + size_t size() const + { + return queue_.size(); + } + // deactivate and clear queue + void deactivate() + { + { + std::lock_guard lk(mutex_); + active_ = false; + while(!queue_.empty()) + queue_.pop(); + } + + // release all waiting threads + can_pop_.notify_all(); + can_push_.notify_all(); + } + void activate() + { + std::lock_guard lk(mutex_); + active_ = true; + } + bool push(Data const& value) + { + bool rc; + { + std::unique_lock lk(mutex_); + rc = push_(value); + } + if(rc) + can_pop_.notify_one(); + + return rc; + } + bool waitAndPush(Data& value) + { + bool rc; + { + std::unique_lock lk(mutex_); + if(!active_) + return false; + // in case of spurious wakeup, loop until predicate in lambda + // is satisfied. + can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; }); + rc = push_(value); + } + if(rc) + can_pop_.notify_one(); + + return rc; + } + bool pop(Data& value) + { + bool rc; + { + std::unique_lock lk(mutex_); + rc = pop_(value); + } + if(rc) + can_push_.notify_one(); + + return rc; + } + bool waitAndPop(Data& value) + { + bool rc; + { + std::unique_lock lk(mutex_); + if(!active_) + return false; + // in case of spurious wakeup, loop until predicate in lambda + // is satisfied. + can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; }); + rc = pop_(value); + } + if(rc) + can_push_.notify_one(); + + return rc; + } + + private: + bool push_(Data const& value) + { + if(queue_.size() == max_size_ || !active_) + return false; + queue_.push(value); + + return true; + } + bool pop_(Data& value) + { + if(queue_.empty() || !active_) + return false; + value = queue_.front(); + queue_.pop(); + + return true; + } + std::queue queue_; + mutable std::mutex mutex_; + std::condition_variable can_pop_; + std::condition_variable can_push_; + bool active_; + size_t max_size_; +}; +struct BufferSrc +{ + BufferSrc(void) : BufferSrc("") {} + explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr) + {} + BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr) + : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr) + {} + bool fromDisk(void) + { + return !file_.empty() && framePtr_ == nullptr; + } + size_t index() const + { + return clientFrameId_; + } + std::string file_; + size_t clientFrameId_; + size_t frameId_; + uint8_t* framePtr_; +}; + +struct Messenger; +static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch); +static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch); +static void processorThread(Messenger* messenger, std::function processor); + +struct Messenger +{ + explicit Messenger(MessengerInit init) + : running(true), initialized_(false), shutdown_(false), init_(init), + outboundSynch_(nullptr), + inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr), + uncompressed_fd_(0), compressed_fd_(0) + {} + virtual ~Messenger(void) + { + running = false; + sendQueue.deactivate(); + receiveQueue.deactivate(); + + if (outboundSynch_) { + outboundSynch_->post(SYNCH_RECEIVE_READY); + outbound.join(); + } + + if (inboundSynch_) { + inboundSynch_->post(SYNCH_SENT); + inbound.join(); + } + + for(auto& p : processors_) + p.join(); + + delete outboundSynch_; + delete inboundSynch_; + + deinitShm(); + } + void startThreads(void) { + outboundSynch_ = + new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch); + outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_); + + inboundSynch_ = + new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch); + inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_); + + for(size_t i = 0; i < init_.numProcessingThreads_; ++i) + processors_.push_back(std::thread(processorThread, this, init_.processor_)); + } + size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr, + size_t compressedLength) + { + char fname[512]; + if(!compressedPtr || !compressedLength) + return 0; + sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId); + auto fp = fopen(fname, "wb"); + if(!fp) + return 0; + size_t written = fwrite(compressedPtr, 1, compressedLength, fp); + if(written != compressedLength) + { + fclose(fp); + return 0; + } + fflush(fp); + fclose(fp); + + return written; + } + bool initBuffers(void) + { + bool rc = true; + if(init_.uncompressedFrameSize_) + { + rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf, + init_.uncompressedFrameSize_ * init_.numFrames_, + &uncompressed_fd_, &uncompressed_buffer_); + } + if(init_.compressedFrameSize_) + { + rc = rc && SharedMemoryManager::initShm(grokCompressedBuf, + init_.compressedFrameSize_ * init_.numFrames_, + &compressed_fd_, &compressed_buffer_); + } + + return rc; + } + + bool deinitShm(void) + { + bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf, + init_.uncompressedFrameSize_ * init_.numFrames_, + uncompressed_fd_, &uncompressed_buffer_); + rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf, + init_.compressedFrameSize_ * init_.numFrames_, + compressed_fd_, &compressed_buffer_); + + return rc; + } + template + void send(const std::string& str, Args... args) + { + std::ostringstream oss; + oss << str; + int dummy[] = {0, ((void)(oss << ',' << args), 0)...}; + static_cast(dummy); + + sendQueue.push(oss.str()); + } + static pid_t get_pid_by_process_name(const char* name) + { + char command[256]; + snprintf(command, sizeof(command), "pgrep %s", name); + auto pgrep = popen(command, "r"); + if(!pgrep) + return -1; + pid_t pid; + if(fscanf(pgrep, "%d", &pid) != 1) + pid = -1; + pclose(pgrep); + + return pid; + } + static bool terminate_process(const char* name) + { + auto pid = get_pid_by_process_name(name); + + return (pid != -1 && kill(pid, SIGTERM) != -1); + } + static bool kill_process(const char* name) + { + auto pid = get_pid_by_process_name(name); + + return (pid != -1 && kill(pid, SIGKILL) != -1); + } + void launchGrok(const std::string &dir, uint32_t width, uint32_t stride, + uint32_t height, uint32_t samplesPerPixel, uint32_t depth, + int device, bool is4K, uint32_t fps, uint32_t bandwidth, + const std::string server, uint32_t port, + const std::string license) + { + + std::unique_lock lk(shutdownMutex_); + if (async_result_.valid()) + return; + if(MessengerInit::firstLaunch(true)) + init_.unlink(); + startThreads(); + char _cmd[4096]; + auto fullServer = server + ":" + std::to_string(port); + sprintf(_cmd, + "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 " + "-G %d -%s %d,%d -j %s -J %s", + GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth, + device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth, + license.c_str(), fullServer.c_str()); + launch(_cmd, dir); + } + void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) + { + // client fills queue with pending uncompressed buffers + init_.uncompressedFrameSize_ = uncompressedFrameSize; + init_.compressedFrameSize_ = compressedFrameSize; + init_.numFrames_ = numFrames; + initBuffers(); + auto ptr = uncompressed_buffer_; + for(size_t i = 0; i < init_.numFrames_; ++i) + { + availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr)); + ptr += init_.uncompressedFrameSize_; + } + + std::unique_lock lk(shutdownMutex_); + initialized_ = true; + clientInitializedCondition_.notify_all(); + } + bool waitForClientInit(void) + { + if(initialized_) + return true; + + std::unique_lock lk(shutdownMutex_); + if(initialized_) + return true; + else if (shutdown_) + return false; + clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;}); + + return initialized_ && !shutdown_; + } + static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) + { + return sizeof(uint16_t) * w * h * samplesPerPixel; + } + void reclaimCompressed(size_t frameId) + { + availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); + } + void reclaimUncompressed(size_t frameId) + { + availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); + } + uint8_t* getUncompressedFrame(size_t frameId) + { + assert(frameId < init_.numFrames_); + if(frameId >= init_.numFrames_) + return nullptr; + + return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_); + } + uint8_t* getCompressedFrame(size_t frameId) + { + assert(frameId < init_.numFrames_); + if(frameId >= init_.numFrames_) + return nullptr; + + return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_); + } + std::atomic_bool running; + bool initialized_; + bool shutdown_; + MessengerBlockingQueue sendQueue; + MessengerBlockingQueue receiveQueue; + MessengerBlockingQueue availableBuffers_; + MessengerInit init_; + std::string cmd_; + std::future async_result_; + std::mutex shutdownMutex_; + std::condition_variable shutdownCondition_; + + protected: + std::condition_variable clientInitializedCondition_; + private: + void launch(const std::string &cmd, const std::string &dir) + { + // Change the working directory + if(!dir.empty()) + { + if(chdir(dir.c_str()) != 0) + { + getMessengerLogger()->error("Error: failed to change the working directory"); + return; + } + } + // Execute the command using std::async and std::system + cmd_ = cmd; + async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); }); + } + std::thread outbound; + Synch* outboundSynch_; + + std::thread inbound; + Synch* inboundSynch_; + + std::vector processors_; + char* uncompressed_buffer_; + char* compressed_buffer_; + + grk_handle uncompressed_fd_; + grk_handle compressed_fd_; +}; + +static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch) +{ + grk_handle shm_fd = 0; + char* send_buffer = nullptr; + + if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) + return; + while(messenger->running) + { + synch->wait(SYNCH_RECEIVE_READY); + if(!messenger->running) + break; + std::string message; + if(!messenger->sendQueue.waitAndPop(message)) + break; + if(!messenger->running) + break; + memcpy(send_buffer, message.c_str(), message.size() + 1); + synch->post(SYNCH_SENT); + } + SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer); +} + +static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch) +{ + grk_handle shm_fd = 0; + char* receive_buffer = nullptr; + + if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) + return; + while(messenger->running) + { + synch->wait(SYNCH_SENT); + if(!messenger->running) + break; + auto message = std::string(receive_buffer); + synch->post(SYNCH_RECEIVE_READY); + messenger->receiveQueue.push(message); + } + SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer); +} +struct Msg +{ + explicit Msg(const std::string &msg) : ct_(0) + { + std::stringstream ss(msg); + while(ss.good()) + { + std::string substr; + std::getline(ss, substr, ','); + cs_.push_back(substr); + } + } + std::string next() + { + if(ct_ == cs_.size()) + { + getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty."); + return ""; + } + return cs_[ct_++]; + } + + uint32_t nextUint(void) + { + return (uint32_t)std::stoi(next()); + } + + std::vector cs_; + size_t ct_; +}; +static void processorThread(Messenger* messenger, std::function processor) +{ + while(messenger->running) + { + std::string message; + if(!messenger->receiveQueue.waitAndPop(message)) + break; + if(!messenger->running) + break; + Msg msg(message); + auto tag = msg.next(); + if(tag == GRK_MSGR_BATCH_COMPRESS_INIT) + { + auto width = msg.nextUint(); + auto stride = msg.nextUint(); + (void)stride; + auto height = msg.nextUint(); + auto samplesPerPixel = msg.nextUint(); + auto depth = msg.nextUint(); + (void)depth; + messenger->init_.uncompressedFrameSize_ = + Messenger::uncompressedFrameSize(width, height, samplesPerPixel); + auto compressedFrameSize = msg.nextUint(); + auto numFrames = msg.nextUint(); + messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames); + } + else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) + { + messenger->reclaimUncompressed(msg.nextUint()); + } + else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) + { + messenger->reclaimCompressed(msg.nextUint()); + } + processor(message); + } +} + +template +struct ScheduledFrames +{ + void store(F& val) + { + std::unique_lock lk(mapMutex_); + auto it = map_.find(val.index()); + if (it == map_.end()) + map_[val.index()] = val; + } + F retrieve(size_t index, bool &success) + { + std::unique_lock lk(mapMutex_); + success = false; + auto it = map_.find(index); + if(it == map_.end()) + return F(); + + success = true; + F val = it->second; + map_.erase(index); + + return val; + } + + private: + std::mutex mapMutex_; + std::map map_; +}; + +template +struct ScheduledMessenger : public Messenger +{ + explicit ScheduledMessenger(MessengerInit init) : Messenger(init), + framesScheduled_(0), + framesCompressed_(0) + {} + ~ScheduledMessenger(void) { + shutdown(); + } + bool scheduleCompress(F proxy, std::function converter){ + size_t frameSize = init_.uncompressedFrameSize_; + assert(frameSize >= init_.uncompressedFrameSize_); + BufferSrc src; + if(!availableBuffers_.waitAndPop(src)) + return false; + converter(src); + scheduledFrames_.store(proxy); + framesScheduled_++; + send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); + + return true; + } + void processCompressed(const std::string &message, std::function processor, bool needsRecompression) { + Msg msg(message); + msg.next(); + auto clientFrameId = msg.nextUint(); + auto compressedFrameId = msg.nextUint(); + auto compressedFrameLength = msg.nextUint(); + if (!needsRecompression) { + bool success = false; + auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success); + if (!success) + return; + processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength); + } + ++framesCompressed_; + send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); + if (shutdown_ && framesCompressed_ == framesScheduled_) + shutdownCondition_.notify_all(); + } + void shutdown(void){ + try { + std::unique_lock lk(shutdownMutex_); + if (!async_result_.valid()) + return; + shutdown_ = true; + if (framesScheduled_) { + uint32_t scheduled = framesScheduled_; + send(GRK_MSGR_BATCH_FLUSH, scheduled); + shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; }); + } + availableBuffers_.deactivate(); + send(GRK_MSGR_BATCH_SHUTDOWN); + int result = async_result_.get(); + if(result != 0) + getMessengerLogger()->error("Accelerator failed with return code: %d\n",result); + } catch (std::exception &ex) { + getMessengerLogger()->error("%s",ex.what()); + } + + } + F retrieve(size_t index, bool &success) { + return scheduledFrames_.retrieve(index, success); + } + void store(F& val) { + scheduledFrames_.store(val); + } + +private: + ScheduledFrames scheduledFrames_; + std::atomic framesScheduled_; + std::atomic framesCompressed_; +}; + +} // namespace grk_plugin diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index d2e840f85..11f403d95 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -53,6 +53,7 @@ using boost::optional; using dcp::Data; using namespace dcpomatic; +static grk_plugin::GrokInitializer grokInitializer; /** @param film Film that we are encoding. * @param writer Writer that we are using. @@ -60,7 +61,9 @@ using namespace dcpomatic; J2KEncoder::J2KEncoder(shared_ptr film, Writer& writer) : _film (film) , _history (200) - , _writer (writer) + , _writer (writer) , + dcpomaticContext_(film,writer,_history, Config::instance()->gpu_binary_location ()), + context_(Config::instance()->enable_gpu () ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr) { servers_list_changed (); } @@ -70,10 +73,13 @@ J2KEncoder::~J2KEncoder () { _server_found_connection.disconnect(); + { boost::mutex::scoped_lock lm (_threads_mutex); terminate_threads (); -} + } + delete context_; +} void J2KEncoder::begin () @@ -83,23 +89,35 @@ J2KEncoder::begin () ); } +void J2KEncoder::pause(void){ + if (Config::instance()->enable_gpu ()) + end(false); +} + +void J2KEncoder::resume(void){ + if (Config::instance()->enable_gpu ()) { + context_ = new grk_plugin::GrokContext(dcpomaticContext_); + servers_list_changed (); + } +} void -J2KEncoder::end () +J2KEncoder::end (bool isFinal) { - boost::mutex::scoped_lock lock (_queue_mutex); + if (isFinal) { + boost::mutex::scoped_lock lock (_queue_mutex); - LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ()); + LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ()); - /* Keep waking workers until the queue is empty */ - while (!_queue.empty ()) { - rethrow (); - _empty_condition.notify_all (); - _full_condition.wait (lock); + /* Keep waking workers until the queue is empty */ + while (!_queue.empty ()) { + rethrow (); + _empty_condition.notify_all (); + _full_condition.wait (lock); + } + lock.unlock (); } - lock.unlock (); - LOG_GENERAL_NC (N_("Terminating encoder threads")); { @@ -113,27 +131,38 @@ J2KEncoder::end () LOG_GENERAL (N_("Mopping up %1"), _queue.size()); /* The following sequence of events can occur in the above code: - 1. a remote worker takes the last image off the queue - 2. the loop above terminates - 3. the remote worker fails to encode the image and puts it back on the queue - 4. the remote worker is then terminated by terminate_threads + 1. a remote worker takes the last image off the queue + 2. the loop above terminates + 3. the remote worker fails to encode the image and puts it back on the queue + 4. the remote worker is then terminated by terminate_threads - So just mop up anything left in the queue here. + So just mop up anything left in the queue here. */ - - for (auto const& i: _queue) { - LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); - try { - _writer.write( - make_shared(i.encode_locally()), - i.index(), - i.eyes() - ); - frame_done (); - } catch (std::exception& e) { - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); + if (isFinal) { + for (auto & i: _queue) { + if (Config::instance()->enable_gpu ()) { + if (!context_->scheduleCompress(i)){ + LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index()); + // handle error + } + } + else { + LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); + try { + _writer.write( + make_shared(i.encode_locally()), + i.index(), + i.eyes() + ); + frame_done (); + } catch (std::exception& e) { + LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); + } + } } } + delete context_; + context_ = nullptr; } @@ -183,7 +212,10 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) size_t threads = 0; { boost::mutex::scoped_lock lm (_threads_mutex); - threads = _threads->size(); + if (_threads) + threads = _threads->size(); + else + threads = std::thread::hardware_concurrency(); } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -223,13 +255,14 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time)); /* Queue this new frame for encoding */ LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ()); - _queue.push_back (DCPVideo( + auto dcpv = DCPVideo( pv, position, _film->video_frame_rate(), _film->j2k_bandwidth(), _film->resolution() - )); + ); + _queue.push_back (dcpv); /* The queue might not be empty any more, so notify anything which is waiting on that. @@ -269,6 +302,8 @@ void J2KEncoder::encoder_thread (optional server) try { + auto config = Config::instance (); + start_of_thread ("J2KEncoder"); if (server) { @@ -332,14 +367,22 @@ try } } else { - try { - LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - encoded = make_shared(vf.encode_locally()); - LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - } catch (std::exception& e) { - /* This is very bad, so don't cope with it, just pass it on */ - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); - throw; + if (context_) { + if (!context_->launch(vf, config->selected_gpu()) || !context_->scheduleCompress(vf)) { + LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); + _queue.push_front (vf); + } + + } else { + try { + LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index()); + encoded = make_shared(vf.encode_locally()); + LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index()); + } catch (std::exception& e) { + /* This is very bad, so don't cope with it, just pass it on */ + LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); + throw; + } } } @@ -347,10 +390,12 @@ try _writer.write(encoded, vf.index(), vf.eyes()); frame_done (); } else { - lock.lock (); - LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); - _queue.push_front (vf); - lock.unlock (); + if (!Config::instance()->enable_gpu ()) { + lock.lock (); + LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); + _queue.push_front (vf); + lock.unlock (); + } } } diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h index 63228a6b8..6df30a3f7 100644 --- a/src/lib/j2k_encoder.h +++ b/src/lib/j2k_encoder.h @@ -29,6 +29,7 @@ #include "cross.h" +#include "dcp_video.h" #include "enum_indexed_vector.h" #include "event_history.h" #include "exception_store.h" @@ -41,8 +42,8 @@ #include #include +#include "grok_context.h" -class DCPVideo; class EncodeServerDescription; class Film; class Job; @@ -70,8 +71,11 @@ public: /** Called to pass a bit of video to be encoded as the next DCP frame */ void encode (std::shared_ptr pv, dcpomatic::DCPTime time); + void pause(void); + void resume(void); + /** Called when a processing run has finished */ - void end (); + void end (bool isFinal); boost::optional current_encoding_rate () const; int video_frames_enqueued () const; @@ -107,6 +111,9 @@ private: boost::optional _last_player_video_time; boost::signals2::scoped_connection _server_found_connection; + + grk_plugin::DcpomaticContext dcpomaticContext_; + grk_plugin::GrokContext *context_; }; diff --git a/src/lib/job.cc b/src/lib/job.cc index 912c6a6ef..d4890d98b 100644 --- a/src/lib/job.cc +++ b/src/lib/job.cc @@ -641,7 +641,7 @@ void Job::cancel () { if (_thread.joinable()) { - resume(); + Job::resume(); _thread.interrupt (); _thread.join (); @@ -668,6 +668,7 @@ Job::pause_by_user () } if (paused) { + pause(); _pause_changed.notify_all (); } @@ -680,6 +681,7 @@ Job::pause_by_priority () { if (running ()) { set_state (PAUSED_BY_PRIORITY); + pause(); _pause_changed.notify_all (); } } diff --git a/src/lib/job.h b/src/lib/job.h index 5562afc16..21d8df0f6 100644 --- a/src/lib/job.h +++ b/src/lib/job.h @@ -56,9 +56,10 @@ public: } void start (); + virtual void pause() {} bool pause_by_user (); void pause_by_priority (); - void resume (); + virtual void resume (); void cancel (); bool is_new () const; diff --git a/src/lib/transcode_job.cc b/src/lib/transcode_job.cc index 6b1563b9b..96e93a50c 100644 --- a/src/lib/transcode_job.cc +++ b/src/lib/transcode_job.cc @@ -157,6 +157,14 @@ TranscodeJob::run () } } +void TranscodeJob::pause() { + _encoder->pause(); +} + +void TranscodeJob::resume() { + _encoder->resume(); + Job::resume(); +} string TranscodeJob::status () const diff --git a/src/lib/transcode_job.h b/src/lib/transcode_job.h index 8b145e362..8c96bb437 100644 --- a/src/lib/transcode_job.h +++ b/src/lib/transcode_job.h @@ -56,6 +56,8 @@ public: std::string name () const override; std::string json_name () const override; void run () override; + void pause() override; + void resume() override; std::string status () const override; bool enable_notify () const override { return true; diff --git a/src/tools/dcpomatic.cc b/src/tools/dcpomatic.cc index b53f39b45..acdd41fdb 100644 --- a/src/tools/dcpomatic.cc +++ b/src/tools/dcpomatic.cc @@ -1195,6 +1195,7 @@ private: FontConfig::drop(); ev.Skip (); + JobManager::drop (); } void active_jobs_changed() diff --git a/src/tools/dcpomatic_batch.cc b/src/tools/dcpomatic_batch.cc index 24897dfba..f0830ad35 100644 --- a/src/tools/dcpomatic_batch.cc +++ b/src/tools/dcpomatic_batch.cc @@ -287,6 +287,7 @@ private: } ev.Skip (); + JobManager::drop (); } void file_add_film () diff --git a/src/tools/dcpomatic_disk.cc b/src/tools/dcpomatic_disk.cc index 28799013f..27caac360 100644 --- a/src/tools/dcpomatic_disk.cc +++ b/src/tools/dcpomatic_disk.cc @@ -268,6 +268,7 @@ private: } ev.Skip (); + JobManager::drop (); } void copy () diff --git a/src/wx/full_config_dialog.cc b/src/wx/full_config_dialog.cc index c61c75ece..f035a1128 100644 --- a/src/wx/full_config_dialog.cc +++ b/src/wx/full_config_dialog.cc @@ -45,6 +45,7 @@ #include "send_test_email_dialog.h" #include "server_dialog.h" #include "static_text.h" +#include "gpu_config_panel.h" #include "wx_util.h" #include "lib/config.h" #include "lib/cross.h" @@ -1943,6 +1944,7 @@ create_full_config_dialog () e->AddPage (new SoundPage (ps, border)); e->AddPage (new DefaultsPage (ps, border)); e->AddPage (new EncodingServersPage(ps, border)); + e->AddPage (new GPUPage (ps, border)); e->AddPage (new KeysPage (ps, border)); e->AddPage (new TMSPage (ps, border)); e->AddPage (new EmailPage (ps, border)); diff --git a/src/wx/gpu_config_panel.h b/src/wx/gpu_config_panel.h new file mode 100644 index 000000000..1478434be --- /dev/null +++ b/src/wx/gpu_config_panel.h @@ -0,0 +1,186 @@ +#pragma once + +static std::vector get_gpu_names(std::string binary, std::string filename) +{ + // Execute the GPU listing program and redirect its output to a file + std::system((binary + " > " + filename).c_str()); + + std::vector gpu_names; + std::ifstream file(filename); + if (file.is_open()) + { + std::string line; + while (std::getline(file, line)) + gpu_names.push_back(line); + file.close(); + } + + return gpu_names; +} + +class GpuList : public wxPanel +{ +public: + GpuList(wxPanel* parent) : wxPanel(parent, wxID_ANY), selection(0) { + comboBox = new wxComboBox(this, wxID_ANY, "", wxDefaultPosition, wxSize(400, -1)); + comboBox->Bind(wxEVT_COMBOBOX, &GpuList::OnComboBox, this); + update(); + + wxBoxSizer* sizer = new wxBoxSizer(wxHORIZONTAL); + + sizer->Add(comboBox, 0, wxALIGN_CENTER_VERTICAL); // Vertically center the comboBox + + this->SetSizerAndFit(sizer); + } + void update(void) { + auto cfg = Config::instance(); + auto lister_binary = cfg->gpu_binary_location() + "/" + "gpu_lister"; + auto lister_file = cfg->gpu_binary_location () + "/" + "gpus.txt"; + if (boost::filesystem::exists(lister_binary)) { + auto gpu_names = get_gpu_names(lister_binary, lister_file); + + comboBox->Clear(); + for (const auto& name : gpu_names) + comboBox->Append(name); + } + } + + int getSelection(void) { + return selection; + } + void setSelection(int sel) { + if ((int)comboBox->GetCount() > sel) + comboBox->SetSelection(sel); + } + +private: + void OnComboBox([[maybe_unused]] wxCommandEvent& event) { + selection = comboBox->GetSelection(); + if (selection != wxNOT_FOUND) + Config::instance ()->set_selected_gpu(selection); + } + + wxComboBox* comboBox; + int selection; +}; + +class GPUPage : public Page +{ +public: + GPUPage (wxSize panel_size, int border) + : Page (panel_size, border), + _enable_gpu(nullptr), _binary_location(nullptr), _gpu_list_control(nullptr) + {} + + wxString GetName () const override + { + return _("GPU"); + } + +#ifdef DCPOMATIC_OSX + wxBitmap GetLargeIcon () const override + { + return wxBitmap(icon_path("tms"), wxBITMAP_TYPE_PNG); + } +#endif + +private: + void setup () override + { + auto config = Config::instance (); + + _enable_gpu = new CheckBox (_panel, _("Enable GPU Acceleration")); + _panel->GetSizer()->Add (_enable_gpu, 0, wxALL | wxEXPAND, _border); + + wxFlexGridSizer* table = new wxFlexGridSizer (2, DCPOMATIC_SIZER_X_GAP, DCPOMATIC_SIZER_Y_GAP); + table->AddGrowableCol (1, 1); + _panel->GetSizer()->Add (table, 1, wxALL | wxEXPAND, _border); + + add_label_to_sizer (table, _panel, _("Acceleration Binary Folder"), true, 0, wxLEFT | wxLEFT | wxALIGN_CENTRE_VERTICAL); + _binary_location = new wxDirPickerCtrl (_panel, wxDD_DIR_MUST_EXIST); + table->Add (_binary_location, 1, wxEXPAND); + + add_label_to_sizer (table, _panel, _("GPU Selection"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _gpu_list_control = new GpuList(_panel); + table->Add (_gpu_list_control, 1, wxEXPAND); + + add_label_to_sizer (table, _panel, _("License Server"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _server = new wxTextCtrl (_panel, wxID_ANY); + table->Add (_server, 1, wxEXPAND | wxALL); + + add_label_to_sizer (table, _panel, _("Port"), false, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _port = new wxSpinCtrl (_panel, wxID_ANY); + _port->SetRange (0, 65535); + table->Add (_port); + + add_label_to_sizer (table, _panel, _("License"), true, 0, wxLEFT | wxRIGHT | wxALIGN_CENTRE_VERTICAL); + _license = new PasswordEntry (_panel); + table->Add (_license->get_panel(), 1, wxEXPAND | wxALL); + + _enable_gpu->bind(&GPUPage::enable_gpu_changed, this); + _binary_location->Bind (wxEVT_DIRPICKER_CHANGED, boost::bind (&GPUPage::binary_location_changed, this)); + _server->Bind (wxEVT_TEXT, boost::bind(&GPUPage::server_changed, this)); + _port->Bind (wxEVT_SPINCTRL, boost::bind(&GPUPage::port_changed, this)); + _license->Changed.connect (boost::bind(&GPUPage::license_changed, this)); + + _binary_location->Enable(config->enable_gpu()); + _gpu_list_control->Enable(config->enable_gpu()); + _server->Enable(config->enable_gpu()); + _port->Enable(config->enable_gpu()); + _license->get_panel()->Enable(config->enable_gpu()); + } + + + void config_changed () override + { + auto config = Config::instance (); + + checked_set (_enable_gpu, config->enable_gpu()); + _binary_location->SetPath(config->gpu_binary_location ()); + _gpu_list_control->update(); + _gpu_list_control->setSelection(config->selected_gpu()); + checked_set (_server, config->gpu_license_server()); + checked_set (_port, config->gpu_license_port()); + checked_set (_license, config->gpu_license()); + } + + void enable_gpu_changed () + { + auto config = Config::instance (); + + config->set_enable_gpu (_enable_gpu->GetValue()); + _binary_location->Enable(config->enable_gpu()); + _gpu_list_control->Enable(config->enable_gpu()); + _server->Enable(config->enable_gpu()); + _port->Enable(config->enable_gpu()); + _license->get_panel()->Enable(config->enable_gpu()); + } + + void binary_location_changed () + { + Config::instance()->set_gpu_binary_location (wx_to_std (_binary_location->GetPath ())); + _gpu_list_control->update(); + } + + void server_changed () + { + Config::instance()->set_gpu_license_server(wx_to_std(_server->GetValue())); + } + + void port_changed () + { + Config::instance()->set_gpu_license_port(_port->GetValue()); + } + + void license_changed () + { + Config::instance()->set_gpu_license(_license->get()); + } + + CheckBox* _enable_gpu; + wxDirPickerCtrl* _binary_location; + GpuList *_gpu_list_control; + wxTextCtrl* _server; + wxSpinCtrl* _port; + PasswordEntry* _license; +};