From f8d0dac871883c7cbfa6f31c182ca3e6d213aed1 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 24 Sep 2023 00:34:15 +0200 Subject: [PATCH] Rearrange encoder threading. Soon we'll add a new encoder type, and the existing structure was already creaking a bit at the seams while handling local and remote encodes. Here we split out an encoder thread and introduce the concept of a "sync" thread (which blocks while the encoding is happening). Later we'll have another type which submits the encode request to a GPU and receives the reply back later. --- src/lib/cpu_j2k_encoder_thread.cc | 62 ++++++ src/lib/cpu_j2k_encoder_thread.h | 16 ++ src/lib/dcp_encoder.h | 4 + src/lib/encode_server.cc | 3 + src/lib/encode_server.h | 6 + src/lib/grok_j2k_encoder_thread.cc | 76 ++++++++ src/lib/grok_j2k_encoder_thread.h | 41 ++++ src/lib/j2k_encoder.cc | 278 ++++++++++++--------------- src/lib/j2k_encoder.h | 20 +- src/lib/j2k_encoder_thread.cc | 58 ++++++ src/lib/j2k_encoder_thread.h | 53 +++++ src/lib/j2k_sync_encoder_thread.cc | 65 +++++++ src/lib/j2k_sync_encoder_thread.h | 32 +++ src/lib/make_dcp.cc | 6 +- src/lib/make_dcp.h | 2 +- src/lib/remote_j2k_encoder_thread.cc | 84 ++++++++ src/lib/remote_j2k_encoder_thread.h | 21 ++ src/lib/transcode_job.h | 4 + src/lib/wscript | 5 + test/client_server_test.cc | 27 ++- test/j2k_encode_threading_test.cc | 117 +++++++++++ test/wscript | 1 + 22 files changed, 813 insertions(+), 168 deletions(-) create mode 100644 src/lib/cpu_j2k_encoder_thread.cc create mode 100644 src/lib/cpu_j2k_encoder_thread.h create mode 100644 src/lib/grok_j2k_encoder_thread.cc create mode 100644 src/lib/grok_j2k_encoder_thread.h create mode 100644 src/lib/j2k_encoder_thread.cc create mode 100644 src/lib/j2k_encoder_thread.h create mode 100644 src/lib/j2k_sync_encoder_thread.cc create mode 100644 src/lib/j2k_sync_encoder_thread.h create mode 100644 src/lib/remote_j2k_encoder_thread.cc create mode 100644 src/lib/remote_j2k_encoder_thread.h create mode 100644 test/j2k_encode_threading_test.cc diff --git a/src/lib/cpu_j2k_encoder_thread.cc b/src/lib/cpu_j2k_encoder_thread.cc new file mode 100644 index 000000000..580facae9 --- /dev/null +++ b/src/lib/cpu_j2k_encoder_thread.cc @@ -0,0 +1,62 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "cpu_j2k_encoder_thread.h" +#include "cross.h" +#include "dcpomatic_log.h" +#include "dcp_video.h" +#include "j2k_encoder.h" +#include "util.h" + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +CPUJ2KEncoderThread::CPUJ2KEncoderThread(J2KEncoder& encoder) + : J2KSyncEncoderThread(encoder) +{ + +} + + +void +CPUJ2KEncoderThread::log_thread_start() const +{ + start_of_thread("CPUJ2KEncoder"); + LOG_TIMING("start-encoder-thread thread=%1 server=localhost", thread_id()); +} + + +shared_ptr +CPUJ2KEncoderThread::encode(DCPVideo const& frame) +{ + try { + return make_shared(frame.encode_locally()); + } catch (std::exception& e) { + LOG_ERROR(N_("Local encode failed (%1)"), e.what()); + } + + return {}; +} + diff --git a/src/lib/cpu_j2k_encoder_thread.h b/src/lib/cpu_j2k_encoder_thread.h new file mode 100644 index 000000000..fb138f484 --- /dev/null +++ b/src/lib/cpu_j2k_encoder_thread.h @@ -0,0 +1,16 @@ +#include "j2k_sync_encoder_thread.h" +#include + + +class DCPVideo; + + +class CPUJ2KEncoderThread : public J2KSyncEncoderThread +{ +public: + CPUJ2KEncoderThread(J2KEncoder& encoder); + + void log_thread_start() const override; + std::shared_ptr encode(DCPVideo const& frame) override; +}; + diff --git a/src/lib/dcp_encoder.h b/src/lib/dcp_encoder.h index 771679a98..ce0b72204 100644 --- a/src/lib/dcp_encoder.h +++ b/src/lib/dcp_encoder.h @@ -35,6 +35,8 @@ class Job; class Player; class PlayerVideo; +struct frames_not_lost_when_threads_disappear; + /** @class DCPEncoder */ class DCPEncoder : public Encoder @@ -58,6 +60,8 @@ public: private: + friend struct ::frames_not_lost_when_threads_disappear; + void video (std::shared_ptr, dcpomatic::DCPTime); void audio (std::shared_ptr, dcpomatic::DCPTime); void text (PlayerText, TextType, boost::optional, dcpomatic::DCPTimePeriod); diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc index 036ea58a5..7eae4375f 100644 --- a/src/lib/encode_server.cc +++ b/src/lib/encode_server.cc @@ -81,6 +81,7 @@ EncodeServer::EncodeServer (bool verbose, int num_threads) #endif , _verbose (verbose) , _num_threads (num_threads) + , _frames_encoded(0) { } @@ -169,6 +170,8 @@ EncodeServer::process (shared_ptr socket, struct timeval& after_read, st throw; } + ++_frames_encoded; + return dcp_video_frame.index (); } diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h index f93d66746..8059abd0f 100644 --- a/src/lib/encode_server.h +++ b/src/lib/encode_server.h @@ -32,6 +32,7 @@ #include "exception_store.h" #include "server.h" #include +#include #include #include #include @@ -53,6 +54,10 @@ public: void run () override; + int frames_encoded() const { + return _frames_encoded; + } + private: void handle (std::shared_ptr) override; void worker_thread (); @@ -67,6 +72,7 @@ private: bool _verbose; int _num_threads; Waker _waker; + boost::atomic _frames_encoded; struct Broadcast { diff --git a/src/lib/grok_j2k_encoder_thread.cc b/src/lib/grok_j2k_encoder_thread.cc new file mode 100644 index 000000000..54e5fe252 --- /dev/null +++ b/src/lib/grok_j2k_encoder_thread.cc @@ -0,0 +1,76 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "config.h" +#include "cross.h" +#include "dcpomatic_log.h" +#include "dcp_video.h" +#include "grok_j2k_encoder_thread.h" +#include "j2k_encoder.h" +#include "util.h" +#include + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +GrokJ2KEncoderThread::GrokJ2KEncoderThread(J2KEncoder& encoder, grk_plugin::GrokContext* context) + : J2KEncoderThread(encoder) + , _context(context) +{ + +} + + +void +GrokJ2KEncoderThread::run() +try +{ + while (true) + { + LOG_TIMING("encoder-sleep thread=%1", thread_id()); + auto frame = _encoder.pop(); + +<<<<<<< HEAD + ScopeGuard frame_guard([this, &frame]() { +||||||| parent of 04d2316ac (fixup! Rearrange encoder threading.) + ScopeGuard frame_guard([this, &frame]() { + LOG_ERROR("Failed to schedule encode of %1 using grok", frame.index()); +======= + dcp::ScopeGuard frame_guard([this, &frame]() { + LOG_ERROR("Failed to schedule encode of %1 using grok", frame.index()); +>>>>>>> 04d2316ac (fixup! Rearrange encoder threading.) + _encoder.retry(frame); + }); + + LOG_TIMING("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), frame.index(), static_cast(frame.eyes())); + + if (_context->launch(frame, Config::instance()->selected_gpu()) && _context->scheduleCompress(frame)) { + frame_guard.cancel(); + } + } +} catch (boost::thread_interrupted& e) { +} catch (...) { + store_current(); +} diff --git a/src/lib/grok_j2k_encoder_thread.h b/src/lib/grok_j2k_encoder_thread.h new file mode 100644 index 000000000..5301e1670 --- /dev/null +++ b/src/lib/grok_j2k_encoder_thread.h @@ -0,0 +1,41 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "exception_store.h" +#include "j2k_encoder_thread.h" + + +namespace grk_plugin { + class GrokContext; +} + + +class GrokJ2KEncoderThread : public J2KEncoderThread, public ExceptionStore +{ +public: + GrokJ2KEncoderThread(J2KEncoder& encoder, grk_plugin::GrokContext* context); + + void run() override; + +private: + grk_plugin::GrokContext* _context; +}; + diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index 1f0c606d1..8c7a1ef1b 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -32,6 +32,9 @@ #include "encode_server_description.h" #include "encode_server_finder.h" #include "film.h" +#include "cpu_j2k_encoder_thread.h" +#include "grok_j2k_encoder_thread.h" +#include "remote_j2k_encoder_thread.h" #include "j2k_encoder.h" #include "log.h" #include "player_video.h" @@ -44,6 +47,7 @@ using std::cout; +using std::dynamic_pointer_cast; using std::exception; using std::list; using std::make_shared; @@ -53,6 +57,7 @@ using boost::optional; using dcp::Data; using namespace dcpomatic; + static grk_plugin::GrokInitializer grokInitializer; /** @param film Film that we are encoding. @@ -72,14 +77,24 @@ J2KEncoder::~J2KEncoder () { _server_found_connection.disconnect(); - { - boost::mutex::scoped_lock lm(_threads_mutex); - terminate_threads(); - } + terminate_threads(); delete _context; } + +void +J2KEncoder::servers_list_changed() +{ + auto config = Config::instance(); + + auto const cpu = (config->enable_gpu() || config->only_servers_encode()) ? 0 : config->master_encoding_threads(); + auto const gpu = config->enable_gpu() ? config->master_encoding_threads() : 0; + + remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers()); +} + + void J2KEncoder::begin () { @@ -97,10 +112,7 @@ J2KEncoder::pause() return; } - { - boost::mutex::scoped_lock lm (_threads_mutex); - terminate_threads (); - } + terminate_threads (); /* Something might have been thrown during terminate_threads */ rethrow (); @@ -131,17 +143,13 @@ J2KEncoder::end() /* Keep waking workers until the queue is empty */ while (!_queue.empty ()) { rethrow (); - _empty_condition.notify_all (); _full_condition.wait (lock); } lock.unlock (); LOG_GENERAL_NC (N_("Terminating encoder threads")); - { - boost::mutex::scoped_lock lm (_threads_mutex); - terminate_threads (); - } + terminate_threads (); /* Something might have been thrown during terminate_threads */ rethrow (); @@ -167,7 +175,7 @@ J2KEncoder::end() LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); try { _writer.write( - make_shared(i.encode_locally()), + make_shared(i.encode_locally()), i.index(), i.eyes() ); @@ -229,7 +237,7 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) size_t threads = 0; { boost::mutex::scoped_lock lm (_threads_mutex); - threads = _threads->size(); + threads = _threads.size(); } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -289,181 +297,139 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) } -/** Caller must hold a lock on _threads_mutex */ void J2KEncoder::terminate_threads () { + boost::mutex::scoped_lock lm(_threads_mutex); boost::this_thread::disable_interruption dis; - if (!_threads) { - return; - } - - _threads->interrupt_all (); - try { - _threads->join_all (); - } catch (exception& e) { - LOG_ERROR ("join() threw an exception: %1", e.what()); - } catch (...) { - LOG_ERROR_NC ("join() threw an exception"); + for (auto& thread: _threads) { + thread->stop(); } - _threads.reset (); + _threads.clear(); + _ending = true; } void -J2KEncoder::encoder_thread (optional server) -try +J2KEncoder::remake_threads(int cpu, int gpu, list servers) { - auto config = Config::instance (); + boost::mutex::scoped_lock lm (_threads_mutex); + if (_ending) { + return; + } - start_of_thread ("J2KEncoder"); + auto remove_threads = [this](int wanted, int current, std::function)> predicate) { + for (auto i = wanted; i < current; ++i) { + auto iter = std::find_if(_threads.begin(), _threads.end(), predicate); + if (iter != _threads.end()) { + (*iter)->stop(); + _threads.erase(iter); + } + } + }; - if (server) { - LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ()); - } else { - LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ()); + + /* CPU */ + + auto const is_cpu_thread = [](shared_ptr thread) { + return static_cast(dynamic_pointer_cast(thread)); + }; + + auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread); + + for (auto i = current_cpu_threads; i < cpu; ++i) { + auto thread = make_shared(*this); + thread->start(); + _threads.push_back(thread); } - /* Number of seconds that we currently wait between attempts - to connect to the server; not relevant for localhost - encodings. - */ - int remote_backoff = 0; + remove_threads(cpu, current_cpu_threads, is_cpu_thread); + - while (true) { + /* GPU */ - LOG_TIMING ("encoder-sleep thread=%1", thread_id ()); - boost::mutex::scoped_lock lock (_queue_mutex); - while (_queue.empty ()) { - _empty_condition.wait (lock); + auto const is_grok_thread = [](shared_ptr thread) { + return static_cast(dynamic_pointer_cast(thread)); + }; + + auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread); + + for (auto i = current_gpu_threads; i < gpu; ++i) { + auto thread = make_shared(*this, _context); + thread->start(); + _threads.push_back(thread); + } + + remove_threads(gpu, current_gpu_threads, is_grok_thread); + + + /* Remote */ + + for (auto const& server: servers) { + if (!server.current_link_version()) { + continue; } - LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); - auto vf = _queue.front (); + auto is_remote_thread = [server](shared_ptr thread) { + auto remote = dynamic_pointer_cast(thread); + return remote && remote->server().host_name() == server.host_name(); + }; - /* We're about to commit to either encoding this frame or putting it back onto the queue, - so we must not be interrupted until one or other of these things have happened. This - block has thread interruption disabled. - */ - { - boost::this_thread::disable_interruption dis; - - LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast(vf.eyes())); - _queue.pop_front (); - - lock.unlock (); - - shared_ptr encoded; - - /* We need to encode this input */ - if (server) { - try { - encoded = make_shared(vf.encode_remotely(server.get())); - - if (remote_backoff > 0) { - LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); - } - - /* This job succeeded, so remove any backoff */ - remote_backoff = 0; - - } catch (std::exception& e) { - if (remote_backoff < 60) { - /* back off more */ - remote_backoff += 10; - } - LOG_ERROR ( - N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), - vf.index(), server->host_name(), e.what(), remote_backoff - ); - } - - } else { - if (_context) { - if (!_context->launch(vf, config->selected_gpu()) || !_context->scheduleCompress(vf)) { - LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); - _queue.push_front (vf); - } - } else { - try { - LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - encoded = make_shared(vf.encode_locally()); - LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - } catch (std::exception& e) { - /* This is very bad, so don't cope with it, just pass it on */ - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); - throw; - } - } - } + auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread); - if (encoded) { - _writer.write(encoded, vf.index(), vf.eyes()); - frame_done (); - } else { - if (!Config::instance()->enable_gpu ()) { - lock.lock (); - LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); - _queue.push_front (vf); - lock.unlock (); - } - } + auto const wanted_threads = server.threads(); + + if (wanted_threads > current_threads) { + LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name()); + } else if (wanted_threads < current_threads) { + LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name()); } - if (remote_backoff > 0) { - boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff)); + for (auto i = current_threads; i < wanted_threads; ++i) { + auto thread = make_shared(*this, server); + thread->start(); + _threads.push_back(thread); } - /* The queue might not be full any more, so notify anything that is waiting on that */ - lock.lock (); - _full_condition.notify_all (); + remove_threads(wanted_threads, current_threads, is_remote_thread); } + + _writer.set_encoder_threads(_threads.size()); } -catch (boost::thread_interrupted& e) { - /* Ignore these and just stop the thread */ - _full_condition.notify_all (); -} -catch (...) + + +DCPVideo +J2KEncoder::pop() { - store_current (); - /* Wake anything waiting on _full_condition so it can see the exception */ - _full_condition.notify_all (); + boost::mutex::scoped_lock lock(_queue_mutex); + while (_queue.empty()) { + _empty_condition.wait (lock); + } + + LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); + + auto vf = _queue.front(); + _queue.pop_front(); + + _full_condition.notify_all(); + return vf; } void -J2KEncoder::servers_list_changed () +J2KEncoder::retry(DCPVideo video) { - boost::mutex::scoped_lock lm (_threads_mutex); - - terminate_threads (); - _threads = make_shared(); - - /* XXX: could re-use threads */ - - if (!Config::instance()->only_servers_encode ()) { - for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { -#ifdef DCPOMATIC_LINUX - auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); - pthread_setname_np (t->native_handle(), "encode-worker"); -#else - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); -#endif - } - } - - for (auto i: EncodeServerFinder::instance()->servers()) { - if (!i.current_link_version()) { - continue; - } + boost::mutex::scoped_lock lock(_queue_mutex); + _queue.push_front(video); + _empty_condition.notify_all(); +} - LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); - for (int j = 0; j < i.threads(); ++j) { - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); - } - } - _writer.set_encoder_threads(_threads->size()); +void +J2KEncoder::write(shared_ptr data, int index, Eyes eyes) +{ + _writer.write(data, index, eyes); + frame_done(); } diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h index 840602dbb..a6e190dcf 100644 --- a/src/lib/j2k_encoder.h +++ b/src/lib/j2k_encoder.h @@ -33,6 +33,7 @@ #include "enum_indexed_vector.h" #include "event_history.h" #include "exception_store.h" +#include "j2k_encoder_thread.h" #include "writer.h" #include #include @@ -49,6 +50,10 @@ class Film; class Job; class PlayerVideo; +struct local_threads_created_and_destroyed; +struct remote_threads_created_and_destroyed; +struct frames_not_lost_when_threads_disappear; + /** @class J2KEncoder * @brief Class to manage encoding to J2K. @@ -80,13 +85,18 @@ public: boost::optional current_encoding_rate () const; int video_frames_enqueued () const; - void servers_list_changed (); + DCPVideo pop(); + void retry(DCPVideo frame); + void write(std::shared_ptr data, int index, Eyes eyes); private: + friend struct ::local_threads_created_and_destroyed; + friend struct ::remote_threads_created_and_destroyed; + friend struct ::frames_not_lost_when_threads_disappear; void frame_done (); - - void encoder_thread (boost::optional); + void servers_list_changed (); + void remake_threads(int cpu, int gpu, std::list servers); void terminate_threads (); /** Film that we are encoding */ @@ -95,7 +105,7 @@ private: EventHistory _history; boost::mutex _threads_mutex; - std::shared_ptr _threads; + std::vector> _threads; mutable boost::mutex _queue_mutex; std::list _queue; @@ -114,6 +124,8 @@ private: grk_plugin::DcpomaticContext _dcpomatic_context; grk_plugin::GrokContext *_context; + + bool _ending = false; }; diff --git a/src/lib/j2k_encoder_thread.cc b/src/lib/j2k_encoder_thread.cc new file mode 100644 index 000000000..d0e8a439c --- /dev/null +++ b/src/lib/j2k_encoder_thread.cc @@ -0,0 +1,58 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "j2k_encoder_thread.h" + + +J2KEncoderThread::J2KEncoderThread(J2KEncoder& encoder) + : _encoder(encoder) +{ + +} + + +void +J2KEncoderThread::start() +{ + _thread = boost::thread(boost::bind(&J2KEncoderThread::run, this)); +#ifdef DCPOMATIC_LINUX + pthread_setname_np(_thread.native_handle(), "encode-worker"); +#endif +} + + +void +J2KEncoderThread::stop() +{ + _thread.interrupt(); + try { + _thread.join(); + } catch (std::exception& e) { + LOG_ERROR("join() threw an exception: %1", e.what()); + } catch (...) { + LOG_ERROR_NC("join() threw an exception"); + } +} + + diff --git a/src/lib/j2k_encoder_thread.h b/src/lib/j2k_encoder_thread.h new file mode 100644 index 000000000..b03b6f356 --- /dev/null +++ b/src/lib/j2k_encoder_thread.h @@ -0,0 +1,53 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#ifndef DCPOMATIC_J2K_ENCODER_THREAD +#define DCPOMATIC_J2K_ENCODER_THREAD + + +#include + + +class J2KEncoder; + + +class J2KEncoderThread +{ +public: + J2KEncoderThread(J2KEncoder& encoder); + + J2KEncoderThread(J2KEncoderThread const&) = delete; + J2KEncoderThread& operator=(J2KEncoderThread const&) = delete; + + void start(); + void stop(); + + virtual void run() = 0; + +protected: + J2KEncoder& _encoder; + +private: + boost::thread _thread; +}; + + +#endif diff --git a/src/lib/j2k_sync_encoder_thread.cc b/src/lib/j2k_sync_encoder_thread.cc new file mode 100644 index 000000000..ef6834f60 --- /dev/null +++ b/src/lib/j2k_sync_encoder_thread.cc @@ -0,0 +1,65 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "j2k_sync_encoder_thread.h" +#include + + +J2KSyncEncoderThread::J2KSyncEncoderThread(J2KEncoder& encoder) + : J2KEncoderThread(encoder) +{ + +} + + +void +J2KSyncEncoderThread::run() +try +{ + log_thread_start(); + + while (true) { + LOG_TIMING("encoder-sleep thread=%1", thread_id()); + auto frame = _encoder.pop(); + + dcp::ScopeGuard frame_guard([this, &frame]() { + boost::this_thread::disable_interruption dis; + _encoder.retry(frame); + }); + + LOG_TIMING("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), frame.index(), static_cast(frame.eyes())); + + auto encoded = encode(frame); + + if (encoded) { + boost::this_thread::disable_interruption dis; + frame_guard.cancel(); + _encoder.write(encoded, frame.index(), frame.eyes()); + } + } +} catch (boost::thread_interrupted& e) { +} catch (...) { + store_current(); +} + diff --git a/src/lib/j2k_sync_encoder_thread.h b/src/lib/j2k_sync_encoder_thread.h new file mode 100644 index 000000000..45222279e --- /dev/null +++ b/src/lib/j2k_sync_encoder_thread.h @@ -0,0 +1,32 @@ +#ifndef DCPOMATIC_J2K_SYNC_ENCODER_THREAD_H +#define DCPOMATIC_J2K_SYNC_ENCODER_THREAD_H + + +#include "exception_store.h" +#include "j2k_encoder_thread.h" +#include +#include + + +class DCPVideo; +class J2KEncoder; + + +class J2KSyncEncoderThread : public J2KEncoderThread, public ExceptionStore +{ +public: + J2KSyncEncoderThread(J2KEncoder& encoder); + + J2KSyncEncoderThread(J2KSyncEncoderThread const&) = delete; + J2KSyncEncoderThread& operator=(J2KSyncEncoderThread const&) = delete; + + virtual ~J2KSyncEncoderThread() {} + + void run() override; + + virtual void log_thread_start() const = 0; + virtual std::shared_ptr encode(DCPVideo const& frame) = 0; +}; + + +#endif diff --git a/src/lib/make_dcp.cc b/src/lib/make_dcp.cc index 17d45be46..d8d42f49a 100644 --- a/src/lib/make_dcp.cc +++ b/src/lib/make_dcp.cc @@ -40,8 +40,8 @@ using std::shared_ptr; using std::string; -/** Add suitable Jobs to the JobManager to create a DCP for a Film */ -void +/** Add suitable Job to the JobManager to create a DCP for a Film */ +shared_ptr make_dcp (shared_ptr film, TranscodeJob::ChangedBehaviour behaviour) { if (film->dcp_name().find("/") != string::npos) { @@ -101,5 +101,7 @@ make_dcp (shared_ptr film, TranscodeJob::ChangedBehaviour behaviour) auto tj = make_shared(film, behaviour); tj->set_encoder (make_shared(film, tj)); JobManager::instance()->add (tj); + + return tj; } diff --git a/src/lib/make_dcp.h b/src/lib/make_dcp.h index 9f5072782..fe0bcd2f6 100644 --- a/src/lib/make_dcp.h +++ b/src/lib/make_dcp.h @@ -25,5 +25,5 @@ class Film; -void make_dcp (std::shared_ptr film, TranscodeJob::ChangedBehaviour behaviour); +std::shared_ptr make_dcp(std::shared_ptr film, TranscodeJob::ChangedBehaviour behaviour); diff --git a/src/lib/remote_j2k_encoder_thread.cc b/src/lib/remote_j2k_encoder_thread.cc new file mode 100644 index 000000000..49d80953d --- /dev/null +++ b/src/lib/remote_j2k_encoder_thread.cc @@ -0,0 +1,84 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "remote_j2k_encoder_thread.h" +#include "util.h" + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +RemoteJ2KEncoderThread::RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server) + : J2KSyncEncoderThread(encoder) + , _server(server) +{ + +} + + +void +RemoteJ2KEncoderThread::log_thread_start() const +{ + start_of_thread("RemoteJ2KEncoder"); + LOG_TIMING("start-encoder-thread thread=%1 server=%2", thread_id(), _server.host_name()); +} + + +shared_ptr +RemoteJ2KEncoderThread::encode(DCPVideo const& frame) +{ + shared_ptr encoded; + + try { + encoded = make_shared(frame.encode_remotely(_server)); + if (_remote_backoff > 0) { + LOG_GENERAL("%1 was lost, but now she is found; removing backoff", _server.host_name()); + _remote_backoff = 0; + } + } catch (std::exception& e) { + LOG_ERROR( + N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), + frame.index(), _server.host_name(), e.what(), _remote_backoff + ); + } catch (...) { + LOG_ERROR( + N_("Remote encode of %1 on %2 failed; thread sleeping for %4s"), + frame.index(), _server.host_name(), _remote_backoff + ); + } + + if (!encoded) { + if (_remote_backoff < 60) { + /* back off more */ + _remote_backoff += 10; + } + boost::this_thread::sleep(boost::posix_time::seconds(_remote_backoff)); + } + + return encoded; +} + diff --git a/src/lib/remote_j2k_encoder_thread.h b/src/lib/remote_j2k_encoder_thread.h new file mode 100644 index 000000000..f3fe7f94a --- /dev/null +++ b/src/lib/remote_j2k_encoder_thread.h @@ -0,0 +1,21 @@ +#include "encode_server_description.h" +#include "j2k_sync_encoder_thread.h" + + +class RemoteJ2KEncoderThread : public J2KSyncEncoderThread +{ +public: + RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server); + + void log_thread_start() const override; + std::shared_ptr encode(DCPVideo const& frame) override; + + EncodeServerDescription server() const { + return _server; + } + +private: + EncodeServerDescription _server; + /** Number of seconds that we currently wait between attempts to connect to the server */ + int _remote_backoff = 0; +}; diff --git a/src/lib/transcode_job.h b/src/lib/transcode_job.h index 794cf7016..35870231d 100644 --- a/src/lib/transcode_job.h +++ b/src/lib/transcode_job.h @@ -37,6 +37,8 @@ class Encoder; +struct frames_not_lost_when_threads_disappear; + /** @class TranscodeJob * @brief A job which transcodes a Film to another format. @@ -66,6 +68,8 @@ public: void set_encoder (std::shared_ptr t); private: + friend struct ::frames_not_lost_when_threads_disappear; + virtual void post_transcode () {} float frames_per_second() const; diff --git a/src/lib/wscript b/src/lib/wscript index d71f2a368..00d50b279 100644 --- a/src/lib/wscript +++ b/src/lib/wscript @@ -59,6 +59,7 @@ sources = """ content_factory.cc combine_dcp_job.cc copy_dcp_details_to_film.cc + cpu_j2k_encoder_thread.cc create_cli.cc crop.cc cross_common.cc @@ -122,6 +123,7 @@ sources = """ font_id_map.cc frame_interval_checker.cc frame_rate_change.cc + grok_j2k_encoder_thread.cc guess_crop.cc hints.cc internet.cc @@ -138,6 +140,8 @@ sources = """ job.cc job_manager.cc j2k_encoder.cc + j2k_encoder_thread.cc + j2k_sync_encoder_thread.cc json_server.cc kdm_cli.cc kdm_recipient.cc @@ -163,6 +167,7 @@ sources = """ referenced_reel_asset.cc release_notes.cc render_text.cc + remote_j2k_encoder_thread.cc resampler.cc resolution.cc rgba.cc diff --git a/test/client_server_test.cc b/test/client_server_test.cc index 4f5015fc8..1bfa4c5a6 100644 --- a/test/client_server_test.cc +++ b/test/client_server_test.cc @@ -20,20 +20,18 @@ /** @file test/client_server_test.cc - * @brief Test the server class. + * @brief Test the remote encoding code. * @ingroup feature - * - * Create a test image and then encode it using the standard mechanism - * and also using a EncodeServer object running on localhost. Compare the resulting - * encoded data to check that they are the same. */ +#include "lib/content_factory.h" #include "lib/cross.h" #include "lib/dcp_video.h" #include "lib/dcpomatic_log.h" #include "lib/encode_server.h" #include "lib/encode_server_description.h" +#include "lib/encode_server_finder.h" #include "lib/file_log.h" #include "lib/image.h" #include "lib/j2k_image_proxy.h" @@ -316,3 +314,22 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) } +BOOST_AUTO_TEST_CASE(real_encode_with_server) +{ + auto content = content_factory(TestPaths::private_data() / "dolby_aurora.vob"); + auto film = new_test_film2("real_encode_with_server", content); + + EncodeServerFinder::instance(); + + EncodeServer server(true, 4); + thread server_thread(boost::bind(&EncodeServer::run, &server)); + + make_and_verify_dcp(film); + + server.stop(); + server_thread.join(); + + BOOST_CHECK(server.frames_encoded() > 0); + EncodeServerFinder::drop(); +} + diff --git a/test/j2k_encode_threading_test.cc b/test/j2k_encode_threading_test.cc new file mode 100644 index 000000000..ff2e7b0dc --- /dev/null +++ b/test/j2k_encode_threading_test.cc @@ -0,0 +1,117 @@ +/* + Copyright (C) 2023 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "lib/config.h" +#include "lib/content_factory.h" +#include "lib/dcp_encoder.h" +#include "lib/dcp_transcode_job.h" +#include "lib/encode_server_description.h" +#include "lib/film.h" +#include "lib/j2k_encoder.h" +#include "lib/job_manager.h" +#include "lib/make_dcp.h" +#include "lib/transcode_job.h" +#include "test.h" +#include +#include +#include +#include +#include + + +using std::dynamic_pointer_cast; +using std::list; + + +BOOST_AUTO_TEST_CASE(local_threads_created_and_destroyed) +{ + auto film = new_test_film2("local_threads_created_and_destroyed", {}); + Writer writer(film, {}); + J2KEncoder encoder(film, writer); + + encoder.remake_threads(32, 0, {}); + BOOST_CHECK_EQUAL(encoder._threads.size(), 32U); + + encoder.remake_threads(9, 0, {}); + BOOST_CHECK_EQUAL(encoder._threads.size(), 9U); + + encoder.end(); + BOOST_CHECK_EQUAL(encoder._threads.size(), 0U); +} + + +BOOST_AUTO_TEST_CASE(remote_threads_created_and_destroyed) +{ + auto film = new_test_film2("remote_threads_created_and_destroyed", {}); + Writer writer(film, {}); + J2KEncoder encoder(film, writer); + + list servers = { + { "fred", 7, SERVER_LINK_VERSION }, + { "jim", 2, SERVER_LINK_VERSION }, + { "sheila", 14, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, 0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 7U + 2U + 14U); + + servers = { + { "fred", 7, SERVER_LINK_VERSION }, + { "jim", 5, SERVER_LINK_VERSION }, + { "sheila", 14, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, 0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 7U + 5U + 14U); + + servers = { + { "fred", 0, SERVER_LINK_VERSION }, + { "jim", 0, SERVER_LINK_VERSION }, + { "sheila", 11, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, 0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 11U); +} + + +BOOST_AUTO_TEST_CASE(frames_not_lost_when_threads_disappear) +{ + auto content = content_factory(TestPaths::private_data() / "clapperboard.mp4"); + auto film = new_test_film2("frames_not_lost", content); + film->write_metadata(); + auto job = make_dcp(film, TranscodeJob::ChangedBehaviour::IGNORE); + auto& encoder = dynamic_pointer_cast(job->_encoder)->_j2k_encoder; + + while (JobManager::instance()->work_to_do()) { + encoder.remake_threads(rand() % 8, 0, {}); + dcpomatic_sleep_seconds(1); + } + + BOOST_CHECK(!JobManager::instance()->errors()); + + dcp::DCP dcp(film->dir(film->dcp_name())); + dcp.read(); + BOOST_REQUIRE_EQUAL(dcp.cpls().size(), 1U); + BOOST_REQUIRE_EQUAL(dcp.cpls()[0]->reels().size(), 1U); + BOOST_REQUIRE_EQUAL(dcp.cpls()[0]->reels()[0]->main_picture()->intrinsic_duration(), 423U); +} + diff --git a/test/wscript b/test/wscript index 7930397f6..f464b34cd 100644 --- a/test/wscript +++ b/test/wscript @@ -111,6 +111,7 @@ def build(bld): interrupt_encoder_test.cc isdcf_name_test.cc j2k_bandwidth_test.cc + j2k_encode_threading_test.cc job_manager_test.cc kdm_cli_test.cc kdm_naming_test.cc -- 2.30.2