diff options
| author | Carl Hetherington <cth@carlh.net> | 2012-12-19 23:50:17 +0000 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2012-12-19 23:50:17 +0000 |
| commit | 2f56f38ce56b36f20d59593f56981e7ed330c484 (patch) | |
| tree | 1889f6eff9545010815775671df54064bc796201 /src/lib/encoder.cc | |
| parent | 13337c62d8c0d052ba0377af9c00fe1d940be3cc (diff) | |
Re-work again so that there's just one encoder; various tweaks to still-image-with-audio.
Diffstat (limited to 'src/lib/encoder.cc')
| -rw-r--r-- | src/lib/encoder.cc | 217 |
1 files changed, 214 insertions, 3 deletions
diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index f352f5a52..7a475a859 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -22,16 +22,25 @@ */ #include <boost/filesystem.hpp> +#include <boost/lexical_cast.hpp> #include "encoder.h" #include "util.h" #include "options.h" #include "film.h" #include "log.h" #include "exceptions.h" +#include "filter.h" +#include "config.h" +#include "dcp_video_frame.h" +#include "server.h" +#include "cross.h" using std::pair; +using std::string; using std::stringstream; using std::vector; +using std::list; +using std::cout; using namespace boost; int const Encoder::_history_size = 25; @@ -49,6 +58,7 @@ Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o) , _swr_context (0) #endif , _audio_frames_written (0) + , _process_end (false) { if (_film->audio_stream()) { /* Create sound output files with .tmp suffixes; we will rename @@ -72,6 +82,7 @@ Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o) Encoder::~Encoder () { close_sound_files (); + terminate_worker_threads (); } void @@ -105,6 +116,18 @@ Encoder::process_begin () _swr_context = 0; #endif } + + for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { + _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0))); + } + + vector<ServerDescription*> servers = Config::instance()->servers (); + + for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) { + for (int j = 0; j < (*i)->threads (); ++j) { + _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i))); + } + } } @@ -146,6 +169,43 @@ Encoder::process_end () boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false)); } } + + boost::mutex::scoped_lock lock (_worker_mutex); + + _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ())); + + /* Keep waking workers until the queue is empty */ + while (!_queue.empty ()) { + _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE); + _worker_condition.notify_all (); + _worker_condition.wait (lock); + } + + lock.unlock (); + + terminate_worker_threads (); + + _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size())); + + /* The following sequence of events can occur in the above code: + 1. a remote worker takes the last image off the queue + 2. the loop above terminates + 3. the remote worker fails to encode the image and puts it back on the queue + 4. the remote worker is then terminated by terminate_worker_threads + + So just mop up anything left in the queue here. + */ + + for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) { + _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ())); + try { + shared_ptr<EncodedData> e = (*i)->encode_locally (); + e->write (_opt, (*i)->frame ()); + frame_done (); + } catch (std::exception& e) { + _film->log()->log (String::compose ("Local encode failed (%1)", e.what ())); + } + } } /** @return an estimate of the current number of frames we are encoding per second, @@ -209,7 +269,7 @@ Encoder::frame_skipped () } void -Encoder::process_video (shared_ptr<Image> i, boost::shared_ptr<Subtitle> s) +Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub) { if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) { ++_video_frame; @@ -224,7 +284,47 @@ Encoder::process_video (shared_ptr<Image> i, boost::shared_ptr<Subtitle> s) } } - do_process_video (i, s); + boost::mutex::scoped_lock lock (_worker_mutex); + + /* Wait until the queue has gone down a bit */ + while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) { + TIMING ("decoder sleeps with queue of %1", _queue.size()); + _worker_condition.wait (lock); + TIMING ("decoder wakes with queue of %1", _queue.size()); + } + + if (_process_end) { + return; + } + + /* Only do the processing if we don't already have a file for this frame */ + if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) { + frame_skipped (); + return; + } + + if (same) { + /* Use the last frame that we encoded */ + assert (_last_real_frame); + link (_opt->frame_out_path (_last_real_frame.get(), false), _opt->frame_out_path (_video_frame, false)); + link (_opt->hash_out_path (_last_real_frame.get(), false), _opt->hash_out_path (_video_frame, false)); + } else { + /* Queue this new frame for encoding */ + pair<string, string> const s = Filter::ffmpeg_strings (_film->filters()); + TIMING ("adding to queue of %1", _queue.size ()); + _queue.push_back (boost::shared_ptr<DCPVideoFrame> ( + new DCPVideoFrame ( + image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(), + _film->scaler(), _video_frame, _film->frames_per_second(), s.second, + Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (), + _film->log() + ) + )); + + _worker_condition.notify_all (); + _last_real_frame = _video_frame; + } + ++_video_frame; } @@ -232,7 +332,6 @@ void Encoder::process_audio (shared_ptr<AudioBuffers> data) { if (_opt->audio_range) { - shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ())); /* Range that we are encoding */ @@ -306,3 +405,115 @@ Encoder::close_sound_files () _sound_files.clear (); } +void +Encoder::terminate_worker_threads () +{ + boost::mutex::scoped_lock lock (_worker_mutex); + _process_end = true; + _worker_condition.notify_all (); + lock.unlock (); + + for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) { + (*i)->join (); + delete *i; + } +} + +void +Encoder::encoder_thread (ServerDescription* server) +{ + /* Number of seconds that we currently wait between attempts + to connect to the server; not relevant for localhost + encodings. + */ + int remote_backoff = 0; + + while (1) { + + TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id()); + boost::mutex::scoped_lock lock (_worker_mutex); + while (_queue.empty () && !_process_end) { + _worker_condition.wait (lock); + } + + if (_process_end) { + return; + } + + TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size()); + boost::shared_ptr<DCPVideoFrame> vf = _queue.front (); + _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE); + _queue.pop_front (); + + lock.unlock (); + + shared_ptr<EncodedData> encoded; + + if (server) { + try { + encoded = vf->encode_remotely (server); + + if (remote_backoff > 0) { + _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ())); + } + + /* This job succeeded, so remove any backoff */ + remote_backoff = 0; + + } catch (std::exception& e) { + if (remote_backoff < 60) { + /* back off more */ + remote_backoff += 10; + } + _film->log()->log ( + String::compose ( + "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s", + vf->frame(), server->host_name(), e.what(), remote_backoff) + ); + } + + } else { + try { + TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame()); + encoded = vf->encode_locally (); + TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame()); + } catch (std::exception& e) { + _film->log()->log (String::compose ("Local encode failed (%1)", e.what ())); + } + } + + if (encoded) { + encoded->write (_opt, vf->frame ()); + frame_done (); + } else { + lock.lock (); + _film->log()->log ( + String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()) + ); + _queue.push_front (vf); + lock.unlock (); + } + + if (remote_backoff > 0) { + dvdomatic_sleep (remote_backoff); + } + + lock.lock (); + _worker_condition.notify_all (); + } +} + +void +Encoder::link (string a, string b) const +{ +#ifdef DVDOMATIC_POSIX + int const r = symlink (a.c_str(), b.c_str()); + if (r) { + throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b)); + } +#endif + +#ifdef DVDOMATIC_WINDOWS + boost::filesystem::copy_file (a, b); +#endif +} |
