X-Git-Url: https://git.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fencoder.cc;h=978d787e88985c97513b2dc425b635bd7bd54c97;hb=0b760c0526b0b9d13def519ab8afba1e511d8111;hp=17a6726a6841db8c3bd753b35d5f9ac1686b1991;hpb=70447e72a5595fa03eb0a82b5e93247fcc5cad2b;p=dcpomatic.git diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index 17a6726a6..978d787e8 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -21,11 +21,31 @@ * @brief Parent class for classes which can encode video and audio frames. */ +#include +#include +#include +#include #include "encoder.h" #include "util.h" #include "options.h" +#include "film.h" +#include "log.h" +#include "exceptions.h" +#include "filter.h" +#include "config.h" +#include "dcp_video_frame.h" +#include "server.h" +#include "format.h" +#include "cross.h" +#include "writer.h" using std::pair; +using std::string; +using std::stringstream; +using std::vector; +using std::list; +using std::cout; +using std::make_pair; using namespace boost; int const Encoder::_history_size = 25; @@ -33,17 +53,142 @@ int const Encoder::_history_size = 25; /** @param f Film that we are encoding. * @param o Options. */ -Encoder::Encoder (shared_ptr f, shared_ptr o) +Encoder::Encoder (shared_ptr f) : _film (f) - , _opt (o) - , _just_skipped (false) - , _video_frame (0) - , _audio_frame (0) + , _video_frames_in (0) + , _video_frames_out (0) +#ifdef HAVE_SWRESAMPLE + , _swr_context (0) +#endif + , _have_a_real_frame (false) + , _terminate_encoder (false) { + +} + +Encoder::~Encoder () +{ + terminate_worker_threads (); + if (_writer) { + _writer->finish (); + } +} + +void +Encoder::process_begin () +{ + if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) { +#ifdef HAVE_SWRESAMPLE + + stringstream s; + s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate(); + _film->log()->log (s.str ()); + + /* We will be using planar float data when we call the resampler */ + _swr_context = swr_alloc_set_opts ( + 0, + _film->audio_stream()->channel_layout(), + AV_SAMPLE_FMT_FLTP, + _film->target_audio_sample_rate(), + _film->audio_stream()->channel_layout(), + AV_SAMPLE_FMT_FLTP, + _film->audio_stream()->sample_rate(), + 0, 0 + ); + + swr_init (_swr_context); +#else + throw EncodeError ("Cannot resample audio as libswresample is not present"); +#endif + } else { +#ifdef HAVE_SWRESAMPLE + _swr_context = 0; +#endif + } + + for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { + _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0))); + } + + vector servers = Config::instance()->servers (); + + for (vector::iterator i = servers.begin(); i != servers.end(); ++i) { + for (int j = 0; j < (*i)->threads (); ++j) { + _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i))); + } + } + _writer.reset (new Writer (_film)); } +void +Encoder::process_end () +{ +#if HAVE_SWRESAMPLE + if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) { + + shared_ptr out (new AudioBuffers (_film->audio_stream()->channels(), 256)); + + while (1) { + int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0); + + if (frames < 0) { + throw EncodeError ("could not run sample-rate converter"); + } + + if (frames == 0) { + break; + } + + out->set_frames (frames); + _writer->write (out); + } + + swr_free (&_swr_context); + } +#endif + + boost::mutex::scoped_lock lock (_worker_mutex); + + _film->log()->log ("Clearing queue of " + lexical_cast (_encode_queue.size ())); + + /* Keep waking workers until the queue is empty */ + while (!_encode_queue.empty ()) { + _film->log()->log ("Waking with " + lexical_cast (_encode_queue.size ()), Log::VERBOSE); + _worker_condition.notify_all (); + _worker_condition.wait (lock); + } + + lock.unlock (); + + terminate_worker_threads (); + + _film->log()->log ("Mopping up " + lexical_cast (_encode_queue.size())); + + /* The following sequence of events can occur in the above code: + 1. a remote worker takes the last image off the queue + 2. the loop above terminates + 3. the remote worker fails to encode the image and puts it back on the queue + 4. the remote worker is then terminated by terminate_worker_threads + + So just mop up anything left in the queue here. + */ + + for (list >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) { + _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ())); + try { + _writer->write ((*i)->encode_locally(), (*i)->frame ()); + frame_done (); + } catch (std::exception& e) { + _film->log()->log (String::compose ("Local encode failed (%1)", e.what ())); + } + } + + _writer->finish (); + _writer.reset (); +} + /** @return an estimate of the current number of frames we are encoding per second, * or 0 if not known. */ @@ -61,20 +206,12 @@ Encoder::current_frames_per_second () const return _history_size / (seconds (now) - seconds (_time_history.back ())); } -/** @return true if the last frame to be processed was skipped as it already existed */ -bool -Encoder::skipping () const +/** @return Number of video frames that have been sent out */ +int +Encoder::video_frames_out () const { boost::mutex::scoped_lock (_history_mutex); - return _just_skipped; -} - -/** @return Number of video frames that have been received */ -SourceFrame -Encoder::video_frame () const -{ - boost::mutex::scoped_lock (_history_mutex); - return _video_frame; + return _video_frames_out; } /** Should be called when a frame has been encoded successfully. @@ -84,7 +221,6 @@ void Encoder::frame_done () { boost::mutex::scoped_lock lock (_history_mutex); - _just_skipped = false; struct timeval tv; gettimeofday (&tv, 0); @@ -94,65 +230,202 @@ Encoder::frame_done () } } -/** Called by a subclass when it has just skipped the processing - of a frame because it has already been done. -*/ void -Encoder::frame_skipped () +Encoder::process_video (shared_ptr image, bool same, boost::shared_ptr sub) { - boost::mutex::scoped_lock lock (_history_mutex); - _just_skipped = true; -} + DCPFrameRate dfr (_film->frames_per_second ()); + + if (dfr.skip && (_video_frames_in % 2)) { + ++_video_frames_in; + return; + } -void -Encoder::process_video (shared_ptr i, boost::shared_ptr s) -{ - if (_opt->decode_video_skip != 0 && (_video_frame % _opt->decode_video_skip) != 0) { - ++_video_frame; + boost::mutex::scoped_lock lock (_worker_mutex); + + /* Wait until the queue has gone down a bit */ + while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) { + TIMING ("decoder sleeps with queue of %1", _encode_queue.size()); + _worker_condition.wait (lock); + TIMING ("decoder wakes with queue of %1", _encode_queue.size()); + } + + if (_terminate_encoder) { return; } - if (_opt->video_decode_range) { - pair const r = _opt->video_decode_range.get(); - if (_video_frame < r.first || _video_frame >= r.second) { - ++_video_frame; - return; - } + if (same && _have_a_real_frame) { + /* Use the last frame that we encoded. */ + _writer->repeat (_video_frames_out); + frame_done (); + } else { + /* Queue this new frame for encoding */ + pair const s = Filter::ffmpeg_strings (_film->filters()); + TIMING ("adding to queue of %1", _encode_queue.size ()); + _encode_queue.push_back (boost::shared_ptr ( + new DCPVideoFrame ( + image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film), + _film->subtitle_offset(), _film->subtitle_scale(), + _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second, + _film->colour_lut(), _film->j2k_bandwidth(), + _film->log() + ) + )); + + _worker_condition.notify_all (); + _have_a_real_frame = true; } - do_process_video (i, s); - ++_video_frame; + ++_video_frames_in; + ++_video_frames_out; + + if (dfr.repeat) { + _writer->repeat (_video_frames_out); + ++_video_frames_out; + frame_done (); + } } void Encoder::process_audio (shared_ptr data) { - if (_opt->audio_decode_range) { +#if HAVE_SWRESAMPLE + /* Maybe sample-rate convert */ + if (_swr_context) { + + /* Compute the resampled frames count and add 32 for luck */ + int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32; + + shared_ptr resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames)); + + /* Resample audio */ + int const resampled_frames = swr_convert ( + _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames() + ); + + if (resampled_frames < 0) { + throw EncodeError ("could not run sample-rate converter"); + } - shared_ptr trimmed (new AudioBuffers (*data.get ())); + resampled->set_frames (resampled_frames); - /* Range that we are encoding */ - pair required_range = _opt->audio_decode_range.get(); - /* Range of this block of data */ - pair this_range (_audio_frame, _audio_frame + trimmed->frames()); + /* And point our variables at the resampled audio */ + data = resampled; + } +#endif + + if (_film->audio_channels() == 1) { + /* We need to switch things around so that the mono channel is on + the centre channel of a 5.1 set (with other channels silent). + */ + + shared_ptr b (new AudioBuffers (6, data->frames ())); + b->make_silent (libdcp::LEFT); + b->make_silent (libdcp::RIGHT); + memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float)); + b->make_silent (libdcp::LFE); + b->make_silent (libdcp::LS); + b->make_silent (libdcp::RS); + + data = b; + } - if (this_range.second < required_range.first || required_range.second < this_range.first) { - /* No part of this audio is within the required range */ + _writer->write (data); +} + +void +Encoder::terminate_worker_threads () +{ + boost::mutex::scoped_lock lock (_worker_mutex); + _terminate_encoder = true; + _worker_condition.notify_all (); + lock.unlock (); + + for (list::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) { + (*i)->join (); + delete *i; + } +} + +void +Encoder::encoder_thread (ServerDescription* server) +{ + /* Number of seconds that we currently wait between attempts + to connect to the server; not relevant for localhost + encodings. + */ + int remote_backoff = 0; + + while (1) { + + TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id()); + boost::mutex::scoped_lock lock (_worker_mutex); + while (_encode_queue.empty () && !_terminate_encoder) { + _worker_condition.wait (lock); + } + + if (_terminate_encoder) { return; - } else if (required_range.first >= this_range.first && required_range.first < this_range.second) { - /* Trim start */ - int64_t const shift = required_range.first - this_range.first; - trimmed->move (shift, 0, trimmed->frames() - shift); - trimmed->set_frames (trimmed->frames() - shift); - } else if (required_range.second >= this_range.first && required_range.second < this_range.second) { - /* Trim end */ - trimmed->set_frames (required_range.second - this_range.first); } - data = trimmed; - } + TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size()); + boost::shared_ptr vf = _encode_queue.front (); + _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE); + _encode_queue.pop_front (); + + lock.unlock (); + + shared_ptr encoded; + + if (server) { + try { + encoded = vf->encode_remotely (server); - do_process_audio (data); + if (remote_backoff > 0) { + _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ())); + } + + /* This job succeeded, so remove any backoff */ + remote_backoff = 0; + + } catch (std::exception& e) { + if (remote_backoff < 60) { + /* back off more */ + remote_backoff += 10; + } + _film->log()->log ( + String::compose ( + "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s", + vf->frame(), server->host_name(), e.what(), remote_backoff) + ); + } + + } else { + try { + TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame()); + encoded = vf->encode_locally (); + TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame()); + } catch (std::exception& e) { + _film->log()->log (String::compose ("Local encode failed (%1)", e.what ())); + } + } + + if (encoded) { + _writer->write (encoded, vf->frame ()); + frame_done (); + } else { + lock.lock (); + _film->log()->log ( + String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()) + ); + _encode_queue.push_front (vf); + lock.unlock (); + } - _audio_frame += data->frames (); + if (remote_backoff > 0) { + dvdomatic_sleep (remote_backoff); + } + + lock.lock (); + _worker_condition.notify_all (); + } }