2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 using std::stringstream;
46 using namespace boost;
48 int const Encoder::_history_size = 25;
50 /** @param f Film that we are encoding.
53 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
56 , _just_skipped (false)
59 #ifdef HAVE_SWRESAMPLE
62 , _audio_frames_written (0)
63 , _terminate_encoder (false)
65 , _terminate_writer (false)
67 if (_film->audio_stream()) {
68 /* Create sound output files with .tmp suffixes; we will rename
69 them if and when we complete.
71 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
73 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
74 /* We write mono files */
76 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
77 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
79 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
81 _sound_files.push_back (f);
89 terminate_worker_threads ();
90 terminate_writer_thread ();
94 Encoder::process_begin ()
96 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
97 #ifdef HAVE_SWRESAMPLE
100 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
101 _film->log()->log (s.str ());
103 /* We will be using planar float data when we call the resampler */
104 _swr_context = swr_alloc_set_opts (
106 _film->audio_stream()->channel_layout(),
108 _film->target_audio_sample_rate(),
109 _film->audio_stream()->channel_layout(),
111 _film->audio_stream()->sample_rate(),
115 swr_init (_swr_context);
117 throw EncodeError ("Cannot resample audio as libswresample is not present");
120 #ifdef HAVE_SWRESAMPLE
125 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
126 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
129 vector<ServerDescription*> servers = Config::instance()->servers ();
131 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
132 for (int j = 0; j < (*i)->threads (); ++j) {
133 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
137 _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
142 Encoder::process_end ()
145 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
147 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
150 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
153 throw EncodeError ("could not run sample-rate converter");
160 out->set_frames (frames);
164 swr_free (&_swr_context);
168 if (_film->audio_stream()) {
169 close_sound_files ();
171 /* Rename .wav.tmp files to .wav */
172 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
173 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
174 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
176 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
180 boost::mutex::scoped_lock lock (_worker_mutex);
182 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
184 /* Keep waking workers until the queue is empty */
185 while (!_encode_queue.empty ()) {
186 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
187 _worker_condition.notify_all ();
188 _worker_condition.wait (lock);
193 terminate_worker_threads ();
194 terminate_writer_thread ();
196 _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
198 /* The following sequence of events can occur in the above code:
199 1. a remote worker takes the last image off the queue
200 2. the loop above terminates
201 3. the remote worker fails to encode the image and puts it back on the queue
202 4. the remote worker is then terminated by terminate_worker_threads
204 So just mop up anything left in the queue here.
207 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
208 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
210 shared_ptr<EncodedData> e = (*i)->encode_locally ();
211 e->write (_opt, (*i)->frame ());
213 } catch (std::exception& e) {
214 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
218 /* Mop up any unwritten things in the writer's queue */
219 for (list<pair<shared_ptr<EncodedData>, int> >::iterator i = _write_queue.begin(); i != _write_queue.end(); ++i) {
220 i->first->write (_opt, i->second);
223 /* Now do links (or copies on windows) to duplicate frames */
224 for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
225 link (_opt->frame_out_path (i->first, false), _opt->frame_out_path (i->second, false));
226 link (_opt->hash_out_path (i->first, false), _opt->hash_out_path (i->second, false));
230 /** @return an estimate of the current number of frames we are encoding per second,
234 Encoder::current_frames_per_second () const
236 boost::mutex::scoped_lock lock (_history_mutex);
237 if (int (_time_history.size()) < _history_size) {
242 gettimeofday (&now, 0);
244 return _history_size / (seconds (now) - seconds (_time_history.back ()));
247 /** @return true if the last frame to be processed was skipped as it already existed */
249 Encoder::skipping () const
251 boost::mutex::scoped_lock (_history_mutex);
252 return _just_skipped;
255 /** @return Number of video frames that have been received */
257 Encoder::video_frame () const
259 boost::mutex::scoped_lock (_history_mutex);
263 /** Should be called when a frame has been encoded successfully.
264 * @param n Source frame index.
267 Encoder::frame_done ()
269 boost::mutex::scoped_lock lock (_history_mutex);
270 _just_skipped = false;
273 gettimeofday (&tv, 0);
274 _time_history.push_front (tv);
275 if (int (_time_history.size()) > _history_size) {
276 _time_history.pop_back ();
280 /** Called by a subclass when it has just skipped the processing
281 of a frame because it has already been done.
284 Encoder::frame_skipped ()
286 boost::mutex::scoped_lock lock (_history_mutex);
287 _just_skipped = true;
291 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
293 if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
298 if (_opt->video_range) {
299 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
300 if (_video_frame < r.first || _video_frame >= r.second) {
306 boost::mutex::scoped_lock lock (_worker_mutex);
308 /* Wait until the queue has gone down a bit */
309 while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
310 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
311 _worker_condition.wait (lock);
312 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
315 if (_terminate_encoder) {
319 /* Only do the processing if we don't already have a file for this frame */
320 if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
325 if (same && _last_real_frame) {
326 /* Use the last frame that we encoded. We need to postpone doing the actual link,
327 as on windows the link is really a copy and the reference frame might not have
328 finished encoding yet.
330 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frame));
332 /* Queue this new frame for encoding */
333 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
334 TIMING ("adding to queue of %1", _encode_queue.size ());
335 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
337 image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
338 _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
339 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
344 _worker_condition.notify_all ();
345 _last_real_frame = _video_frame;
352 Encoder::process_audio (shared_ptr<AudioBuffers> data)
354 if (_opt->audio_range) {
355 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
357 /* Range that we are encoding */
358 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
359 /* Range of this block of data */
360 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
362 if (this_range.second < required_range.first || required_range.second < this_range.first) {
363 /* No part of this audio is within the required range */
365 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
367 int64_t const shift = required_range.first - this_range.first;
368 trimmed->move (shift, 0, trimmed->frames() - shift);
369 trimmed->set_frames (trimmed->frames() - shift);
370 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
372 trimmed->set_frames (required_range.second - this_range.first);
379 /* Maybe sample-rate convert */
382 /* Compute the resampled frames count and add 32 for luck */
383 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
385 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
388 int const resampled_frames = swr_convert (
389 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
392 if (resampled_frames < 0) {
393 throw EncodeError ("could not run sample-rate converter");
396 resampled->set_frames (resampled_frames);
398 /* And point our variables at the resampled audio */
403 if (_film->audio_channels() == 1) {
404 /* We need to switch things around so that the mono channel is on
405 the centre channel of a 5.1 set (with other channels silent).
408 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
409 b->make_silent (libdcp::LEFT);
410 b->make_silent (libdcp::RIGHT);
411 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
412 b->make_silent (libdcp::LFE);
413 b->make_silent (libdcp::LS);
414 b->make_silent (libdcp::RS);
421 _audio_frame += data->frames ();
425 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
427 for (int i = 0; i < audio->channels(); ++i) {
428 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
431 _audio_frames_written += audio->frames ();
435 Encoder::close_sound_files ()
437 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
441 _sound_files.clear ();
445 Encoder::terminate_worker_threads ()
447 boost::mutex::scoped_lock lock (_worker_mutex);
448 _terminate_encoder = true;
449 _worker_condition.notify_all ();
452 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
459 Encoder::terminate_writer_thread ()
461 if (!_writer_thread) {
465 boost::mutex::scoped_lock lock (_writer_mutex);
466 _terminate_writer = true;
467 _writer_condition.notify_all ();
470 _writer_thread->join ();
471 delete _writer_thread;
476 Encoder::encoder_thread (ServerDescription* server)
478 /* Number of seconds that we currently wait between attempts
479 to connect to the server; not relevant for localhost
482 int remote_backoff = 0;
486 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
487 boost::mutex::scoped_lock lock (_worker_mutex);
488 while (_encode_queue.empty () && !_terminate_encoder) {
489 _worker_condition.wait (lock);
492 if (_terminate_encoder) {
496 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
497 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
498 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
499 _encode_queue.pop_front ();
503 shared_ptr<EncodedData> encoded;
507 encoded = vf->encode_remotely (server);
509 if (remote_backoff > 0) {
510 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
513 /* This job succeeded, so remove any backoff */
516 } catch (std::exception& e) {
517 if (remote_backoff < 60) {
519 remote_backoff += 10;
523 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
524 vf->frame(), server->host_name(), e.what(), remote_backoff)
530 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
531 encoded = vf->encode_locally ();
532 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
533 } catch (std::exception& e) {
534 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
539 boost::mutex::scoped_lock lock2 (_writer_mutex);
540 _write_queue.push_back (make_pair (encoded, vf->frame ()));
541 _writer_condition.notify_all ();
545 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
547 _encode_queue.push_front (vf);
551 if (remote_backoff > 0) {
552 dvdomatic_sleep (remote_backoff);
556 _worker_condition.notify_all ();
561 Encoder::link (string a, string b) const
563 #ifdef DVDOMATIC_POSIX
564 int const r = symlink (a.c_str(), b.c_str());
566 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
570 #ifdef DVDOMATIC_WINDOWS
571 boost::filesystem::copy_file (a, b);
576 Encoder::writer_thread ()
580 TIMING ("writer sleeps");
581 boost::mutex::scoped_lock lock (_writer_mutex);
582 while (_write_queue.empty() && !_terminate_writer) {
583 _writer_condition.wait (lock);
585 TIMING ("writer wakes with a queue of %1", _write_queue.size());
587 if (_terminate_writer) {
591 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
592 _write_queue.pop_front ();
595 encoded.first->write (_opt, encoded.second);