X-Git-Url: https://git.carlh.net/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Flib%2Fencoder.cc;h=c32d688342e994ab81c32c8d5786c44953e1c17c;hb=3882d34aed9dee417ceed93bf0bf5372b3970ff6;hp=7352dcfb126c60f654af4a62f8217c253b1aae4e;hpb=bd8fa9a370f1739952c83107352baa08c79d095e;p=dcpomatic.git diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index 7352dcfb1..c32d68834 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -21,9 +21,29 @@ * @brief Parent class for classes which can encode video and audio frames. */ +#include +#include +#include #include "encoder.h" #include "util.h" +#include "options.h" +#include "film.h" +#include "log.h" +#include "exceptions.h" +#include "filter.h" +#include "config.h" +#include "dcp_video_frame.h" +#include "server.h" +#include "format.h" +#include "cross.h" +using std::pair; +using std::string; +using std::stringstream; +using std::vector; +using std::list; +using std::cout; +using std::make_pair; using namespace boost; int const Encoder::_history_size = 25; @@ -31,15 +51,170 @@ int const Encoder::_history_size = 25; /** @param f Film that we are encoding. * @param o Options. */ -Encoder::Encoder (shared_ptr f, shared_ptr o) +Encoder::Encoder (shared_ptr f) : _film (f) - , _opt (o) , _just_skipped (false) - , _last_frame (0) + , _video_frame (0) + , _audio_frame (0) +#ifdef HAVE_SWRESAMPLE + , _swr_context (0) +#endif + , _audio_frames_written (0) + , _process_end (false) { + if (_film->audio_stream()) { + /* Create sound output files with .tmp suffixes; we will rename + them if and when we complete. + */ + for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) { + SF_INFO sf_info; + sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate()); + /* We write mono files */ + sf_info.channels = 1; + sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24; + SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info); + if (f == 0) { + throw CreateFileError (_film->multichannel_audio_out_path (i, true)); + } + _sound_files.push_back (f); + } + } +} +Encoder::~Encoder () +{ + close_sound_files (); + terminate_worker_threads (); } +void +Encoder::process_begin () +{ + if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) { +#ifdef HAVE_SWRESAMPLE + + stringstream s; + s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate(); + _film->log()->log (s.str ()); + + /* We will be using planar float data when we call the resampler */ + _swr_context = swr_alloc_set_opts ( + 0, + _film->audio_stream()->channel_layout(), + AV_SAMPLE_FMT_FLTP, + _film->target_audio_sample_rate(), + _film->audio_stream()->channel_layout(), + AV_SAMPLE_FMT_FLTP, + _film->audio_stream()->sample_rate(), + 0, 0 + ); + + swr_init (_swr_context); +#else + throw EncodeError ("Cannot resample audio as libswresample is not present"); +#endif + } else { +#ifdef HAVE_SWRESAMPLE + _swr_context = 0; +#endif + } + + for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { + _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0))); + } + + vector servers = Config::instance()->servers (); + + for (vector::iterator i = servers.begin(); i != servers.end(); ++i) { + for (int j = 0; j < (*i)->threads (); ++j) { + _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i))); + } + } +} + + +void +Encoder::process_end () +{ +#if HAVE_SWRESAMPLE + if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) { + + shared_ptr out (new AudioBuffers (_film->audio_stream()->channels(), 256)); + + while (1) { + int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0); + + if (frames < 0) { + throw EncodeError ("could not run sample-rate converter"); + } + + if (frames == 0) { + break; + } + + out->set_frames (frames); + write_audio (out); + } + + swr_free (&_swr_context); + } +#endif + + if (_film->audio_stream()) { + close_sound_files (); + + /* Rename .wav.tmp files to .wav */ + for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) { + if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) { + boost::filesystem::remove (_film->multichannel_audio_out_path (i, false)); + } + boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false)); + } + } + + boost::mutex::scoped_lock lock (_worker_mutex); + + _film->log()->log ("Clearing queue of " + lexical_cast (_queue.size ())); + + /* Keep waking workers until the queue is empty */ + while (!_queue.empty ()) { + _film->log()->log ("Waking with " + lexical_cast (_queue.size ()), Log::VERBOSE); + _worker_condition.notify_all (); + _worker_condition.wait (lock); + } + + lock.unlock (); + + terminate_worker_threads (); + + _film->log()->log ("Mopping up " + lexical_cast (_queue.size())); + + /* The following sequence of events can occur in the above code: + 1. a remote worker takes the last image off the queue + 2. the loop above terminates + 3. the remote worker fails to encode the image and puts it back on the queue + 4. the remote worker is then terminated by terminate_worker_threads + + So just mop up anything left in the queue here. + */ + + for (list >::iterator i = _queue.begin(); i != _queue.end(); ++i) { + _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ())); + try { + shared_ptr e = (*i)->encode_locally (); + e->write (_film, (*i)->frame ()); + frame_done (); + } catch (std::exception& e) { + _film->log()->log (String::compose ("Local encode failed (%1)", e.what ())); + } + } + + /* Now do links (or copies on windows) to duplicate frames */ + for (list >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) { + link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false)); + link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false)); + } +} /** @return an estimate of the current number of frames we are encoding per second, * or 0 if not known. @@ -66,23 +241,22 @@ Encoder::skipping () const return _just_skipped; } -/** @return Index of last frame to be successfully encoded */ -int -Encoder::last_frame () const +/** @return Number of video frames that have been received */ +SourceFrame +Encoder::video_frame () const { boost::mutex::scoped_lock (_history_mutex); - return _last_frame; + return _video_frame; } /** Should be called when a frame has been encoded successfully. - * @param n Frame index. + * @param n Source frame index. */ void -Encoder::frame_done (int n) +Encoder::frame_done () { boost::mutex::scoped_lock lock (_history_mutex); _just_skipped = false; - _last_frame = n; struct timeval tv; gettimeofday (&tv, 0); @@ -101,3 +275,241 @@ Encoder::frame_skipped () boost::mutex::scoped_lock lock (_history_mutex); _just_skipped = true; } + +void +Encoder::process_video (shared_ptr image, bool same, boost::shared_ptr sub) +{ + DCPFrameRate dfr (_film->frames_per_second ()); + + if (dfr.skip && (_video_frame % 2)) { + ++_video_frame; + return; + } + + boost::mutex::scoped_lock lock (_worker_mutex); + + /* Wait until the queue has gone down a bit */ + while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) { + TIMING ("decoder sleeps with queue of %1", _queue.size()); + _worker_condition.wait (lock); + TIMING ("decoder wakes with queue of %1", _queue.size()); + } + + if (_process_end) { + return; + } + + /* Only do the processing if we don't already have a file for this frame */ + if (boost::filesystem::exists (_film->frame_out_path (_video_frame, false))) { + frame_skipped (); + return; + } + + if (same && _last_real_frame) { + /* Use the last frame that we encoded. We need to postpone doing the actual link, + as on windows the link is really a copy and the reference frame might not have + finished encoding yet. + */ + _links_required.push_back (make_pair (_last_real_frame.get(), _video_frame)); + } else { + /* Queue this new frame for encoding */ + pair const s = Filter::ffmpeg_strings (_film->filters()); + TIMING ("adding to queue of %1", _queue.size ()); + _queue.push_back (boost::shared_ptr ( + new DCPVideoFrame ( + image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film), + _film->subtitle_offset(), _film->subtitle_scale(), + _film->scaler(), _video_frame, _film->frames_per_second(), s.second, + _film->colour_lut(), _film->j2k_bandwidth(), + _film->log() + ) + )); + + _worker_condition.notify_all (); + _last_real_frame = _video_frame; + } + + ++_video_frame; +} + +void +Encoder::process_audio (shared_ptr data) +{ +#if HAVE_SWRESAMPLE + /* Maybe sample-rate convert */ + if (_swr_context) { + + /* Compute the resampled frames count and add 32 for luck */ + int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32; + + shared_ptr resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames)); + + /* Resample audio */ + int const resampled_frames = swr_convert ( + _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames() + ); + + if (resampled_frames < 0) { + throw EncodeError ("could not run sample-rate converter"); + } + + resampled->set_frames (resampled_frames); + + /* And point our variables at the resampled audio */ + data = resampled; + } +#endif + + if (_film->audio_channels() == 1) { + /* We need to switch things around so that the mono channel is on + the centre channel of a 5.1 set (with other channels silent). + */ + + shared_ptr b (new AudioBuffers (6, data->frames ())); + b->make_silent (libdcp::LEFT); + b->make_silent (libdcp::RIGHT); + memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float)); + b->make_silent (libdcp::LFE); + b->make_silent (libdcp::LS); + b->make_silent (libdcp::RS); + + data = b; + } + + write_audio (data); + + _audio_frame += data->frames (); +} + +void +Encoder::write_audio (shared_ptr audio) +{ + for (int i = 0; i < audio->channels(); ++i) { + sf_write_float (_sound_files[i], audio->data(i), audio->frames()); + } + + _audio_frames_written += audio->frames (); +} + +void +Encoder::close_sound_files () +{ + for (vector::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) { + sf_close (*i); + } + + _sound_files.clear (); +} + +void +Encoder::terminate_worker_threads () +{ + boost::mutex::scoped_lock lock (_worker_mutex); + _process_end = true; + _worker_condition.notify_all (); + lock.unlock (); + + for (list::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) { + (*i)->join (); + delete *i; + } +} + +void +Encoder::encoder_thread (ServerDescription* server) +{ + /* Number of seconds that we currently wait between attempts + to connect to the server; not relevant for localhost + encodings. + */ + int remote_backoff = 0; + + while (1) { + + TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id()); + boost::mutex::scoped_lock lock (_worker_mutex); + while (_queue.empty () && !_process_end) { + _worker_condition.wait (lock); + } + + if (_process_end) { + return; + } + + TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size()); + boost::shared_ptr vf = _queue.front (); + _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE); + _queue.pop_front (); + + lock.unlock (); + + shared_ptr encoded; + + if (server) { + try { + encoded = vf->encode_remotely (server); + + if (remote_backoff > 0) { + _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ())); + } + + /* This job succeeded, so remove any backoff */ + remote_backoff = 0; + + } catch (std::exception& e) { + if (remote_backoff < 60) { + /* back off more */ + remote_backoff += 10; + } + _film->log()->log ( + String::compose ( + "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s", + vf->frame(), server->host_name(), e.what(), remote_backoff) + ); + } + + } else { + try { + TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame()); + encoded = vf->encode_locally (); + TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame()); + } catch (std::exception& e) { + _film->log()->log (String::compose ("Local encode failed (%1)", e.what ())); + } + } + + if (encoded) { + encoded->write (_film, vf->frame ()); + frame_done (); + } else { + lock.lock (); + _film->log()->log ( + String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()) + ); + _queue.push_front (vf); + lock.unlock (); + } + + if (remote_backoff > 0) { + dvdomatic_sleep (remote_backoff); + } + + lock.lock (); + _worker_condition.notify_all (); + } +} + +void +Encoder::link (string a, string b) const +{ +#ifdef DVDOMATIC_POSIX + int const r = symlink (a.c_str(), b.c_str()); + if (r) { + throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b)); + } +#endif + +#ifdef DVDOMATIC_WINDOWS + boost::filesystem::copy_file (a, b); +#endif +}