diff options
| author | Carl Hetherington <cth@carlh.net> | 2023-09-10 22:56:51 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2023-09-10 22:56:51 +0200 |
| commit | 96f25981e914d27377d13cccdd733d9996130703 (patch) | |
| tree | 91d779656477467d17a6661cb8b22f65cce866cd | |
| parent | 23306255085a0b48168d51629f38597ee0bf80ce (diff) | |
Rearrange encoder.rearrange-encoder
| -rw-r--r-- | src/lib/cpu_j2k_encoder_thread.cc | 42 | ||||
| -rw-r--r-- | src/lib/cpu_j2k_encoder_thread.h | 16 | ||||
| -rw-r--r-- | src/lib/dcp_encoder.h | 4 | ||||
| -rw-r--r-- | src/lib/encode_server.cc | 2 | ||||
| -rw-r--r-- | src/lib/encode_server.h | 5 | ||||
| -rw-r--r-- | src/lib/j2k_encoder.cc | 235 | ||||
| -rw-r--r-- | src/lib/j2k_encoder.h | 25 | ||||
| -rw-r--r-- | src/lib/j2k_encoder_thread.cc | 66 | ||||
| -rw-r--r-- | src/lib/j2k_encoder_thread.h | 38 | ||||
| -rw-r--r-- | src/lib/make_dcp.cc | 6 | ||||
| -rw-r--r-- | src/lib/make_dcp.h | 3 | ||||
| -rw-r--r-- | src/lib/remote_j2k_encoder_thread.cc | 64 | ||||
| -rw-r--r-- | src/lib/remote_j2k_encoder_thread.h | 21 | ||||
| -rw-r--r-- | src/lib/scope_guard.h | 10 | ||||
| -rw-r--r-- | src/lib/transcode_job.h | 4 | ||||
| -rw-r--r-- | src/lib/writer.h | 1 | ||||
| -rw-r--r-- | src/lib/wscript | 3 | ||||
| -rw-r--r-- | test/client_server_test.cc | 20 | ||||
| -rw-r--r-- | test/j2k_encode_threading_test.cc | 94 | ||||
| -rw-r--r-- | test/wscript | 1 |
20 files changed, 509 insertions, 151 deletions
diff --git a/src/lib/cpu_j2k_encoder_thread.cc b/src/lib/cpu_j2k_encoder_thread.cc new file mode 100644 index 000000000..f715bfcd6 --- /dev/null +++ b/src/lib/cpu_j2k_encoder_thread.cc @@ -0,0 +1,42 @@ +#include "cpu_j2k_encoder_thread.h" +#include "cross.h" +#include "dcpomatic_log.h" +#include "dcp_video.h" +#include "j2k_encoder.h" +#include "scope_guard.h" +#include "util.h" + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +CPUJ2KEncoderThread::CPUJ2KEncoderThread(J2KEncoder& encoder) + : J2KEncoderThread(encoder) +{ + +} + + +void +CPUJ2KEncoderThread::log_thread_start() const +{ + start_of_thread("CPUJ2KEncoder"); + LOG_TIMING("start-encoder-thread thread=%1 server=localhost", thread_id()); +} + + +shared_ptr<dcp::ArrayData> +CPUJ2KEncoderThread::encode(DCPVideo const& frame) +{ + try { + return make_shared<dcp::ArrayData>(frame.encode_locally()); + } catch (std::exception& e) { + LOG_ERROR(N_("Local encode failed (%1)"), e.what()); + } + + return {}; +} + diff --git a/src/lib/cpu_j2k_encoder_thread.h b/src/lib/cpu_j2k_encoder_thread.h new file mode 100644 index 000000000..52336a408 --- /dev/null +++ b/src/lib/cpu_j2k_encoder_thread.h @@ -0,0 +1,16 @@ +#include "j2k_encoder_thread.h" +#include <dcp/data.h> + + +class DCPVideo; + + +class CPUJ2KEncoderThread : public J2KEncoderThread +{ +public: + CPUJ2KEncoderThread(J2KEncoder& encoder); + + void log_thread_start() const override; + std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) override; +}; + diff --git a/src/lib/dcp_encoder.h b/src/lib/dcp_encoder.h index ad77f6951..43b9d372e 100644 --- a/src/lib/dcp_encoder.h +++ b/src/lib/dcp_encoder.h @@ -35,6 +35,8 @@ class Job; class Player; class PlayerVideo; +struct frames_not_lost_when_threads_disappear; + /** @class DCPEncoder */ class DCPEncoder : public Encoder @@ -55,6 +57,8 @@ public: private: + friend struct ::frames_not_lost_when_threads_disappear; + void video (std::shared_ptr<PlayerVideo>, dcpomatic::DCPTime); void audio (std::shared_ptr<AudioBuffers>, dcpomatic::DCPTime); void text (PlayerText, TextType, boost::optional<DCPTextTrack>, dcpomatic::DCPTimePeriod); diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc index 6501dcde1..91eb20dee 100644 --- a/src/lib/encode_server.cc +++ b/src/lib/encode_server.cc @@ -165,6 +165,8 @@ EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, st throw; } + ++_frames_encoded; + return dcp_video_frame.index (); } diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h index f93d66746..01793ea72 100644 --- a/src/lib/encode_server.h +++ b/src/lib/encode_server.h @@ -53,6 +53,10 @@ public: void run () override; + int frames_encoded() const { + return _frames_encoded; + } + private: void handle (std::shared_ptr<Socket>) override; void worker_thread (); @@ -67,6 +71,7 @@ private: bool _verbose; int _num_threads; Waker _waker; + boost::atomic<int> _frames_encoded; struct Broadcast { diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index d2e840f85..3ee2c3bb6 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -26,6 +26,7 @@ #include "compose.hpp" #include "config.h" +#include "cpu_j2k_encoder_thread.h" #include "cross.h" #include "dcp_video.h" #include "dcpomatic_log.h" @@ -33,8 +34,10 @@ #include "encode_server_finder.h" #include "film.h" #include "j2k_encoder.h" +#include "j2k_encoder_thread.h" #include "log.h" #include "player_video.h" +#include "remote_j2k_encoder_thread.h" #include "util.h" #include "writer.h" #include <libcxml/cxml.h> @@ -44,11 +47,13 @@ using std::cout; +using std::dynamic_pointer_cast; using std::exception; using std::list; using std::make_shared; using std::shared_ptr; using std::weak_ptr; +using std::vector; using boost::optional; using dcp::Data; using namespace dcpomatic; @@ -70,12 +75,21 @@ J2KEncoder::~J2KEncoder () { _server_found_connection.disconnect(); - boost::mutex::scoped_lock lm (_threads_mutex); terminate_threads (); } void +J2KEncoder::servers_list_changed() +{ + remake_threads( + Config::instance()->only_servers_encode() ? 0 : Config::instance()->master_encoding_threads(), + EncodeServerFinder::instance()->servers() + ); +} + + +void J2KEncoder::begin () { _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect( @@ -94,7 +108,6 @@ J2KEncoder::end () /* Keep waking workers until the queue is empty */ while (!_queue.empty ()) { rethrow (); - _empty_condition.notify_all (); _full_condition.wait (lock); } @@ -102,10 +115,7 @@ J2KEncoder::end () LOG_GENERAL_NC (N_("Terminating encoder threads")); - { - boost::mutex::scoped_lock lm (_threads_mutex); - terminate_threads (); - } + terminate_threads(); /* Something might have been thrown during terminate_threads */ rethrow (); @@ -183,7 +193,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time) size_t threads = 0; { boost::mutex::scoped_lock lm (_threads_mutex); - threads = _threads->size(); + threads = _threads.size(); } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -242,170 +252,119 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time) } -/** Caller must hold a lock on _threads_mutex */ void J2KEncoder::terminate_threads () { + boost::mutex::scoped_lock lm(_threads_mutex); boost::this_thread::disable_interruption dis; - if (!_threads) { - return; + for (auto& thread: _threads) { + thread->stop(); } - _threads->interrupt_all (); - try { - _threads->join_all (); - } catch (exception& e) { - LOG_ERROR ("join() threw an exception: %1", e.what()); - } catch (...) { - LOG_ERROR_NC ("join() threw an exception"); - } - - _threads.reset (); + _threads.clear(); + _ending = true; } void -J2KEncoder::encoder_thread (optional<EncodeServerDescription> server) -try +J2KEncoder::remake_threads(int local, list<EncodeServerDescription> servers) { - start_of_thread ("J2KEncoder"); + boost::mutex::scoped_lock lm (_threads_mutex); + if (_ending) { + return; + } - if (server) { - LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ()); - } else { - LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ()); + auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) { + for (auto i = wanted; i < current; ++i) { + auto iter = std::find_if(_threads.begin(), _threads.end(), predicate); + if (iter != _threads.end()) { + (*iter)->stop(); + _threads.erase(iter); + } + } + }; + + + auto const current_threads = std::count_if(_threads.begin(), _threads.end(), [](shared_ptr<J2KEncoderThread> thread) { + return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread)); + }); + + for (auto i = current_threads; i < local; ++i) { + auto thread = make_shared<CPUJ2KEncoderThread>(*this); + thread->start(); + _threads.push_back(thread); } - /* Number of seconds that we currently wait between attempts - to connect to the server; not relevant for localhost - encodings. - */ - int remote_backoff = 0; + remove_threads(local, current_threads, [](shared_ptr<J2KEncoderThread> thread) { + return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread)); + }); - while (true) { - LOG_TIMING ("encoder-sleep thread=%1", thread_id ()); - boost::mutex::scoped_lock lock (_queue_mutex); - while (_queue.empty ()) { - _empty_condition.wait (lock); + for (auto const& server: servers) { + if (!server.current_link_version()) { + continue; } - LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); - auto vf = _queue.front (); + auto const current_threads = std::count_if(_threads.begin(), _threads.end(), [server](shared_ptr<J2KEncoderThread> thread) { + auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread); + return remote && remote->server().host_name() == server.host_name(); + }); - /* We're about to commit to either encoding this frame or putting it back onto the queue, - so we must not be interrupted until one or other of these things have happened. This - block has thread interruption disabled. - */ - { - boost::this_thread::disable_interruption dis; - - LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes())); - _queue.pop_front (); - - lock.unlock (); - - shared_ptr<Data> encoded; - - /* We need to encode this input */ - if (server) { - try { - encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get())); - - if (remote_backoff > 0) { - LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); - } - - /* This job succeeded, so remove any backoff */ - remote_backoff = 0; - - } catch (std::exception& e) { - if (remote_backoff < 60) { - /* back off more */ - remote_backoff += 10; - } - LOG_ERROR ( - N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), - vf.index(), server->host_name(), e.what(), remote_backoff - ); - } - - } else { - try { - LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - encoded = make_shared<dcp::ArrayData>(vf.encode_locally()); - LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - } catch (std::exception& e) { - /* This is very bad, so don't cope with it, just pass it on */ - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); - throw; - } - } + auto const wanted_threads = server.threads(); - if (encoded) { - _writer.write(encoded, vf.index(), vf.eyes()); - frame_done (); - } else { - lock.lock (); - LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); - _queue.push_front (vf); - lock.unlock (); - } + if (wanted_threads > current_threads) { + LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name()); + } else if (wanted_threads < current_threads) { + LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name()); } - if (remote_backoff > 0) { - boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff)); + for (auto i = current_threads; i < wanted_threads; ++i) { + auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server); + thread->start(); + _threads.push_back(thread); } - /* The queue might not be full any more, so notify anything that is waiting on that */ - lock.lock (); - _full_condition.notify_all (); + remove_threads(wanted_threads, current_threads, [server](shared_ptr<J2KEncoderThread> thread) { + auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread); + return remote && remote->server().host_name() == server.host_name(); + }); } + + _writer.set_encoder_threads(_threads.size()); } -catch (boost::thread_interrupted& e) { - /* Ignore these and just stop the thread */ - _full_condition.notify_all (); -} -catch (...) + + +DCPVideo +J2KEncoder::pop() { - store_current (); - /* Wake anything waiting on _full_condition so it can see the exception */ - _full_condition.notify_all (); + boost::mutex::scoped_lock lock(_queue_mutex); + while (_queue.empty()) { + _empty_condition.wait (lock); + } + + LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); + + auto vf = _queue.front(); + _queue.pop_front(); + + _full_condition.notify_all(); + return vf; } void -J2KEncoder::servers_list_changed () +J2KEncoder::retry(DCPVideo video) { - boost::mutex::scoped_lock lm (_threads_mutex); - - terminate_threads (); - _threads = make_shared<boost::thread_group>(); - - /* XXX: could re-use threads */ - - if (!Config::instance()->only_servers_encode ()) { - for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { -#ifdef DCPOMATIC_LINUX - auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>())); - pthread_setname_np (t->native_handle(), "encode-worker"); -#else - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>())); -#endif - } - } - - for (auto i: EncodeServerFinder::instance()->servers()) { - if (!i.current_link_version()) { - continue; - } + boost::mutex::scoped_lock lock(_queue_mutex); + _queue.push_front(video); + _empty_condition.notify_all(); +} - LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); - for (int j = 0; j < i.threads(); ++j) { - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); - } - } - _writer.set_encoder_threads(_threads->size()); +void +J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes) +{ + _writer.write(data, index, eyes); + frame_done(); } diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h index 63228a6b8..94490f16a 100644 --- a/src/lib/j2k_encoder.h +++ b/src/lib/j2k_encoder.h @@ -32,6 +32,7 @@ #include "enum_indexed_vector.h" #include "event_history.h" #include "exception_store.h" +#include "types.h" #include "writer.h" #include <boost/optional.hpp> #include <boost/signals2.hpp> @@ -45,9 +46,14 @@ class DCPVideo; class EncodeServerDescription; class Film; +class J2KEncoderThread; class Job; class PlayerVideo; +struct local_threads_created_and_destroyed; +struct remote_threads_created_and_destroyed; +struct frames_not_lost_when_threads_disappear; + /** @class J2KEncoder * @brief Class to manage encoding to J2K. @@ -76,22 +82,33 @@ public: boost::optional<float> current_encoding_rate () const; int video_frames_enqueued () const; - void servers_list_changed (); + DCPVideo pop(); + void retry(DCPVideo frame); + void write(std::shared_ptr<const dcp::Data> data, int index, Eyes eyes); private: - void frame_done (); + friend struct ::local_threads_created_and_destroyed; + friend struct ::remote_threads_created_and_destroyed; + friend struct ::frames_not_lost_when_threads_disappear; - void encoder_thread (boost::optional<EncodeServerDescription>); + void frame_done (); void terminate_threads (); + void servers_list_changed (); + void remake_threads(int local, std::list<EncodeServerDescription> servers); + /** Film that we are encoding */ std::shared_ptr<const Film> _film; EventHistory _history; boost::mutex _threads_mutex; - std::shared_ptr<boost::thread_group> _threads; + std::list<std::shared_ptr<J2KEncoderThread>> _threads; + /** true if we shouldn't make any more threads, even if remake_threads() + * or servers_list_changed() is called. + */ + bool _ending = false; mutable boost::mutex _queue_mutex; std::list<DCPVideo> _queue; diff --git a/src/lib/j2k_encoder_thread.cc b/src/lib/j2k_encoder_thread.cc new file mode 100644 index 000000000..3ac380477 --- /dev/null +++ b/src/lib/j2k_encoder_thread.cc @@ -0,0 +1,66 @@ +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "j2k_encoder_thread.h" +#include "scope_guard.h" + + +J2KEncoderThread::J2KEncoderThread(J2KEncoder& encoder) + : _encoder(encoder) +{ + +} + + +void +J2KEncoderThread::start() +{ + _thread = boost::thread(boost::bind(&J2KEncoderThread::run, this)); +#ifdef DCPOMATIC_LINUX + pthread_setname_np(_thread.native_handle(), "encode-worker"); +#endif +} + + +void +J2KEncoderThread::stop() +{ + _thread.interrupt(); + try { + _thread.join(); + } catch (std::exception& e) { + LOG_ERROR("join() threw an exception: %1", e.what()); + } catch (...) { + LOG_ERROR_NC("join() threw an exception"); + } +} + + +void +J2KEncoderThread::run() +try +{ + log_thread_start(); + + while (true) { + LOG_TIMING("encoder-sleep thread=%1", thread_id()); + auto frame = _encoder.pop(); + + ScopeGuard frame_guard([this, &frame]() { + _encoder.retry(frame); + }); + + LOG_TIMING("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), frame.index(), static_cast<int>(frame.eyes())); + + auto encoded = encode(frame); + + if (encoded) { + frame_guard.cancel(); + _encoder.write(encoded, frame.index(), frame.eyes()); + } + } +} catch (boost::thread_interrupted& e) { +} catch (...) { + store_current(); +} + diff --git a/src/lib/j2k_encoder_thread.h b/src/lib/j2k_encoder_thread.h new file mode 100644 index 000000000..a1be194ee --- /dev/null +++ b/src/lib/j2k_encoder_thread.h @@ -0,0 +1,38 @@ +#ifndef DCPOMATIC_J2K_ENCODER_THREAD_H +#define DCPOMATIC_J2K_ENCODER_THREAD_H + + +#include "exception_store.h" +#include <dcp/array_data.h> +#include <boost/thread.hpp> + + +class DCPVideo; +class J2KEncoder; + + +class J2KEncoderThread : public ExceptionStore +{ +public: + J2KEncoderThread(J2KEncoder& encoder); + + J2KEncoderThread(J2KEncoderThread const&) = delete; + J2KEncoderThread& operator=(J2KEncoderThread const&) = delete; + + void start(); + void stop(); + + void run(); + + virtual void log_thread_start() const = 0; + virtual std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) = 0; + +protected: + J2KEncoder& _encoder; + +private: + boost::thread _thread; +}; + + +#endif diff --git a/src/lib/make_dcp.cc b/src/lib/make_dcp.cc index 17d45be46..6a195c52c 100644 --- a/src/lib/make_dcp.cc +++ b/src/lib/make_dcp.cc @@ -40,8 +40,8 @@ using std::shared_ptr; using std::string; -/** Add suitable Jobs to the JobManager to create a DCP for a Film */ -void +/** Add a suitable Job to the JobManager to create a DCP for a Film */ +shared_ptr<DCPTranscodeJob> make_dcp (shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour) { if (film->dcp_name().find("/") != string::npos) { @@ -101,5 +101,7 @@ make_dcp (shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour) auto tj = make_shared<DCPTranscodeJob>(film, behaviour); tj->set_encoder (make_shared<DCPEncoder>(film, tj)); JobManager::instance()->add (tj); + + return tj; } diff --git a/src/lib/make_dcp.h b/src/lib/make_dcp.h index 9f5072782..0acc89470 100644 --- a/src/lib/make_dcp.h +++ b/src/lib/make_dcp.h @@ -22,8 +22,9 @@ #include "transcode_job.h" +class DCPTranscodeJob; class Film; -void make_dcp (std::shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour); +std::shared_ptr<DCPTranscodeJob> make_dcp(std::shared_ptr<Film> film, TranscodeJob::ChangedBehaviour behaviour); diff --git a/src/lib/remote_j2k_encoder_thread.cc b/src/lib/remote_j2k_encoder_thread.cc new file mode 100644 index 000000000..4afa2d79d --- /dev/null +++ b/src/lib/remote_j2k_encoder_thread.cc @@ -0,0 +1,64 @@ +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder.h" +#include "remote_j2k_encoder_thread.h" +#include "scope_guard.h" +#include "util.h" + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; + + +RemoteJ2KEncoderThread::RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server) + : J2KEncoderThread(encoder) + , _server(server) +{ + +} + + +void +RemoteJ2KEncoderThread::log_thread_start() const +{ + start_of_thread("RemoteJ2KEncoder"); + LOG_TIMING("start-encoder-thread thread=%1 server=%2", thread_id(), _server.host_name()); +} + + +shared_ptr<dcp::ArrayData> +RemoteJ2KEncoderThread::encode(DCPVideo const& frame) +{ + shared_ptr<dcp::ArrayData> encoded; + + try { + encoded = make_shared<dcp::ArrayData>(frame.encode_remotely(_server)); + if (_remote_backoff > 0) { + LOG_GENERAL("%1 was lost, but now she is found; removing backoff", _server.host_name()); + _remote_backoff = 0; + } + } catch (std::exception& e) { + LOG_ERROR( + N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), + frame.index(), _server.host_name(), e.what(), _remote_backoff + ); + } catch (...) { + LOG_ERROR( + N_("Remote encode of %1 on %2 failed; thread sleeping for %4s"), + frame.index(), _server.host_name(), _remote_backoff + ); + } + + if (!encoded) { + if (_remote_backoff < 60) { + /* back off more */ + _remote_backoff += 10; + } + boost::this_thread::sleep(boost::posix_time::seconds(_remote_backoff)); + } + + return encoded; +} + diff --git a/src/lib/remote_j2k_encoder_thread.h b/src/lib/remote_j2k_encoder_thread.h new file mode 100644 index 000000000..f3c485c26 --- /dev/null +++ b/src/lib/remote_j2k_encoder_thread.h @@ -0,0 +1,21 @@ +#include "encode_server_description.h" +#include "j2k_encoder_thread.h" + + +class RemoteJ2KEncoderThread : public J2KEncoderThread +{ +public: + RemoteJ2KEncoderThread(J2KEncoder& encoder, EncodeServerDescription server); + + void log_thread_start() const override; + std::shared_ptr<dcp::ArrayData> encode(DCPVideo const& frame) override; + + EncodeServerDescription server() const { + return _server; + } + +private: + EncodeServerDescription _server; + /** Number of seconds that we currently wait between attempts to connect to the server */ + int _remote_backoff = 0; +}; diff --git a/src/lib/scope_guard.h b/src/lib/scope_guard.h index ac60f9fea..c53acd9ce 100644 --- a/src/lib/scope_guard.h +++ b/src/lib/scope_guard.h @@ -43,13 +43,21 @@ public: ScopeGuard (ScopeGuard const&) = delete; ScopeGuard& operator=(ScopeGuard const&) = delete; + void cancel() + { + _cancelled = true; + } + ~ScopeGuard () { - _function(); + if (!_cancelled) { + _function(); + } } private: std::function<void()> _function; + bool _cancelled = false; }; diff --git a/src/lib/transcode_job.h b/src/lib/transcode_job.h index b05b20a16..629fcb14b 100644 --- a/src/lib/transcode_job.h +++ b/src/lib/transcode_job.h @@ -37,6 +37,8 @@ class Encoder; +struct frames_not_lost_when_threads_disappear; + /** @class TranscodeJob * @brief A job which transcodes a Film to another format. @@ -64,6 +66,8 @@ public: void set_encoder (std::shared_ptr<Encoder> t); private: + friend struct ::frames_not_lost_when_threads_disappear; + virtual void post_transcode () {} float frames_per_second() const; diff --git a/src/lib/writer.h b/src/lib/writer.h index 1fbf7bbd5..0b38e9030 100644 --- a/src/lib/writer.h +++ b/src/lib/writer.h @@ -34,6 +34,7 @@ #include "exception_store.h" #include "font_id_map.h" #include "player_text.h" +#include "text_type.h" #include "weak_film.h" #include <dcp/atmos_frame.h> #include <boost/thread.hpp> diff --git a/src/lib/wscript b/src/lib/wscript index 251f09cf7..1c1c94b05 100644 --- a/src/lib/wscript +++ b/src/lib/wscript @@ -59,6 +59,7 @@ sources = """ content_factory.cc combine_dcp_job.cc copy_dcp_details_to_film.cc + cpu_j2k_encoder_thread.cc create_cli.cc crop.cc cross_common.cc @@ -138,6 +139,7 @@ sources = """ job.cc job_manager.cc j2k_encoder.cc + j2k_encoder_thread.cc json_server.cc kdm_cli.cc kdm_recipient.cc @@ -162,6 +164,7 @@ sources = """ reel_writer.cc referenced_reel_asset.cc release_notes.cc + remote_j2k_encoder_thread.cc render_text.cc resampler.cc resolution.cc diff --git a/test/client_server_test.cc b/test/client_server_test.cc index 596668aae..ded3a5dfc 100644 --- a/test/client_server_test.cc +++ b/test/client_server_test.cc @@ -20,15 +20,12 @@ /** @file test/client_server_test.cc - * @brief Test the server class. + * @brief Test the remote encoding code. * @ingroup feature - * - * Create a test image and then encode it using the standard mechanism - * and also using a EncodeServer object running on localhost. Compare the resulting - * encoded data to check that they are the same. */ +#include "lib/content_factory.h" #include "lib/cross.h" #include "lib/dcp_video.h" #include "lib/dcpomatic_log.h" @@ -315,3 +312,16 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) } +BOOST_AUTO_TEST_CASE(real_encode_with_server) +{ + auto content = content_factory(TestPaths::private_data() / "dolby_aurora.vob"); + auto film = new_test_film2("real_encode_with_server", content); + + EncodeServer server(true, 4); + thread server_thread(boost::bind(&EncodeServer::run, &server)); + + make_and_verify_dcp(film); + + BOOST_CHECK(server.frames_encoded() > 0); +} + diff --git a/test/j2k_encode_threading_test.cc b/test/j2k_encode_threading_test.cc new file mode 100644 index 000000000..42bb582e1 --- /dev/null +++ b/test/j2k_encode_threading_test.cc @@ -0,0 +1,94 @@ +#include "lib/config.h" +#include "lib/content_factory.h" +#include "lib/dcp_encoder.h" +#include "lib/dcp_transcode_job.h" +#include "lib/encode_server_description.h" +#include "lib/film.h" +#include "lib/j2k_encoder.h" +#include "lib/job_manager.h" +#include "lib/make_dcp.h" +#include "lib/transcode_job.h" +#include "test.h" +#include <dcp/cpl.h> +#include <dcp/dcp.h> +#include <dcp/reel.h> +#include <dcp/reel_picture_asset.h> +#include <boost/test/unit_test.hpp> + + +using std::dynamic_pointer_cast; +using std::list; + + +BOOST_AUTO_TEST_CASE(local_threads_created_and_destroyed) +{ + auto film = new_test_film2("local_threads_created_and_destroyed", {}); + Writer writer(film, {}); + J2KEncoder encoder(film, writer); + + encoder.remake_threads(32, {}); + BOOST_CHECK_EQUAL(encoder._threads.size(), 32); + + encoder.remake_threads(9, {}); + BOOST_CHECK_EQUAL(encoder._threads.size(), 9); + + encoder.end(); + BOOST_CHECK_EQUAL(encoder._threads.size(), 0); +} + + +BOOST_AUTO_TEST_CASE(remote_threads_created_and_destroyed) +{ + auto film = new_test_film2("remote_threads_created_and_destroyed", {}); + Writer writer(film, {}); + J2KEncoder encoder(film, writer); + + list<EncodeServerDescription> servers = { + { "fred", 7, SERVER_LINK_VERSION }, + { "jim", 2, SERVER_LINK_VERSION }, + { "sheila", 14, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 7 + 2 + 14); + + servers = { + { "fred", 7, SERVER_LINK_VERSION }, + { "jim", 5, SERVER_LINK_VERSION }, + { "sheila", 14, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 7 + 5 + 14); + + servers = { + { "fred", 0, SERVER_LINK_VERSION }, + { "jim", 0, SERVER_LINK_VERSION }, + { "sheila", 11, SERVER_LINK_VERSION }, + }; + + encoder.remake_threads(0, servers); + BOOST_CHECK_EQUAL(encoder._threads.size(), 11); +} + + +BOOST_AUTO_TEST_CASE(frames_not_lost_when_threads_disappear) +{ + auto content = content_factory(TestPaths::private_data() / "clapperboard.mp4"); + auto film = new_test_film2("frames_not_lost", content); + film->write_metadata(); + auto job = make_dcp(film, TranscodeJob::ChangedBehaviour::IGNORE); + auto& encoder = dynamic_pointer_cast<DCPEncoder>(job->_encoder)->_j2k_encoder; + + while (JobManager::instance()->work_to_do()) { + encoder.remake_threads(rand() % 8, {}); + dcpomatic_sleep_seconds(1); + } + + dcp::DCP dcp(film->dir(film->dcp_name())); + dcp.read(); + BOOST_REQUIRE_EQUAL(dcp.cpls().size(), 1U); + BOOST_REQUIRE_EQUAL(dcp.cpls()[0]->reels().size(), 1U); + BOOST_REQUIRE_EQUAL(dcp.cpls()[0]->reels()[0]->main_picture()->intrinsic_duration(), 423U); +} + diff --git a/test/wscript b/test/wscript index f40568e3c..14b682357 100644 --- a/test/wscript +++ b/test/wscript @@ -111,6 +111,7 @@ def build(bld): interrupt_encoder_test.cc isdcf_name_test.cc j2k_bandwidth_test.cc + j2k_encode_threading_test.cc job_manager_test.cc kdm_cli_test.cc kdm_naming_test.cc |
