X-Git-Url: https://git.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fencoder.cc;h=2ec32deb72e037bdb9d55dd2a2c35a9785787731;hb=d98bdad019ba9be5d800dece0414d7a080609027;hp=a1c02479936d00fbb2af25f9f81cf04ae35fe539;hpb=74a8d26a8907c6e00e29f054178a3425f44e38ed;p=dcpomatic.git diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index a1c024799..2ec32deb7 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -22,6 +22,8 @@ */ #include +#include +#include #include "encoder.h" #include "util.h" #include "film.h" @@ -31,6 +33,7 @@ #include "server.h" #include "cross.h" #include "writer.h" +#include "server_finder.h" #include "i18n.h" @@ -44,6 +47,7 @@ using std::min; using std::make_pair; using boost::shared_ptr; using boost::optional; +using boost::scoped_array; int const Encoder::_history_size = 25; @@ -67,22 +71,41 @@ Encoder::~Encoder () } } +/** Add a worker thread for a each thread on a remote server. Caller must hold + * a lock on _mutex, or know that one is not currently required to + * safely modify _threads. + */ +void +Encoder::add_worker_threads (ServerDescription d) +{ + for (int i = 0; i < d.threads(); ++i) { + _threads.push_back ( + make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d))) + ); + } +} + void Encoder::process_begin () { for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { - _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, shared_ptr ()))); + _threads.push_back ( + make_pair ( + optional (), + new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional ())) + ) + ); } - vector > servers = Config::instance()->servers (); + vector servers = Config::instance()->servers (); - for (vector >::iterator i = servers.begin(); i != servers.end(); ++i) { - for (int j = 0; j < (*i)->threads (); ++j) { - _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i))); - } + for (vector::iterator i = servers.begin(); i != servers.end(); ++i) { + add_worker_threads (*i); } _writer.reset (new Writer (_film, _job)); + _server_finder.reset (new ServerFinder ()); + _server_finder->ServerFound.connect (boost::bind (&Encoder::server_found, this, _1)); } @@ -124,7 +147,7 @@ Encoder::process_end () _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ())); } } - + _writer->finish (); _writer.reset (); } @@ -135,7 +158,7 @@ Encoder::process_end () float Encoder::current_encoding_rate () const { - boost::mutex::scoped_lock lock (_history_mutex); + boost::mutex::scoped_lock lock (_state_mutex); if (int (_time_history.size()) < _history_size) { return 0; } @@ -150,7 +173,7 @@ Encoder::current_encoding_rate () const int Encoder::video_frames_out () const { - boost::mutex::scoped_lock (_history_mutex); + boost::mutex::scoped_lock (_state_mutex); return _video_frames_out; } @@ -160,7 +183,7 @@ Encoder::video_frames_out () const void Encoder::frame_done () { - boost::mutex::scoped_lock lock (_history_mutex); + boost::mutex::scoped_lock lock (_state_mutex); struct timeval tv; gettimeofday (&tv, 0); @@ -171,7 +194,7 @@ Encoder::frame_done () } void -Encoder::process_video (shared_ptr image, Eyes eyes, bool same) +Encoder::process_video (shared_ptr image, Eyes eyes, ColourConversion conversion, bool same) { boost::mutex::scoped_lock lock (_mutex); @@ -205,7 +228,7 @@ Encoder::process_video (shared_ptr image, Eyes eyes, bool same) TIMING ("adding to queue of %1", _queue.size ()); _queue.push_back (shared_ptr ( new DCPVideoFrame ( - image, _video_frames_out, eyes, _film->video_frame_rate(), + image, _video_frames_out, eyes, conversion, _film->video_frame_rate(), _film->j2k_bandwidth(), _film->log() ) )); @@ -228,23 +251,24 @@ Encoder::process_audio (shared_ptr data) void Encoder::terminate_threads () { - boost::mutex::scoped_lock lock (_mutex); - _terminate = true; - _condition.notify_all (); - lock.unlock (); + { + boost::mutex::scoped_lock lock (_mutex); + _terminate = true; + _condition.notify_all (); + } - for (list::iterator i = _threads.begin(); i != _threads.end(); ++i) { - if ((*i)->joinable ()) { - (*i)->join (); + for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) { + if (i->second->joinable ()) { + i->second->join (); } - delete *i; + delete i->second; } _threads.clear (); } void -Encoder::encoder_thread (shared_ptr server) +Encoder::encoder_thread (optional server) { /* Number of seconds that we currently wait between attempts to connect to the server; not relevant for localhost @@ -275,7 +299,7 @@ Encoder::encoder_thread (shared_ptr server) if (server) { try { - encoded = vf->encode_remotely (server); + encoded = vf->encode_remotely (server.get ()); if (remote_backoff > 0) { _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ())); @@ -326,3 +350,18 @@ Encoder::encoder_thread (shared_ptr server) _condition.notify_all (); } } + +void +Encoder::server_found (ServerDescription s) +{ + /* See if we already know about this server */ + boost::mutex::scoped_lock lm (_mutex); + ThreadList::iterator i = _threads.begin(); + while (i != _threads.end() && (!i->first || i->first.get().host_name() != s.host_name())) { + ++i; + } + + if (i == _threads.end ()) { + add_worker_threads (s); + } +}