2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
43 using std::stringstream;
48 using namespace boost;
50 int const Encoder::_history_size = 25;
51 unsigned int const Encoder::_maximum_frames_in_memory = 8;
53 /** @param f Film that we are encoding.
56 Encoder::Encoder (shared_ptr<const Film> f)
58 , _just_skipped (false)
59 , _video_frames_in (0)
60 , _audio_frames_in (0)
61 , _video_frames_out (0)
62 , _audio_frames_out (0)
63 #ifdef HAVE_SWRESAMPLE
66 , _have_a_real_frame (false)
67 , _terminate_encoder (false)
69 , _finish_writer (false)
70 , _last_written_frame (-1)
72 if (_film->audio_stream()) {
73 /* Create sound output files with .tmp suffixes; we will rename
74 them if and when we complete.
76 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
78 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
79 /* We write mono files */
81 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
82 SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
84 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
86 _sound_files.push_back (f);
94 terminate_worker_threads ();
95 finish_writer_thread ();
99 Encoder::process_begin ()
101 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
102 #ifdef HAVE_SWRESAMPLE
105 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
106 _film->log()->log (s.str ());
108 /* We will be using planar float data when we call the resampler */
109 _swr_context = swr_alloc_set_opts (
111 _film->audio_stream()->channel_layout(),
113 _film->target_audio_sample_rate(),
114 _film->audio_stream()->channel_layout(),
116 _film->audio_stream()->sample_rate(),
120 swr_init (_swr_context);
122 throw EncodeError ("Cannot resample audio as libswresample is not present");
125 #ifdef HAVE_SWRESAMPLE
130 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
131 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
134 vector<ServerDescription*> servers = Config::instance()->servers ();
136 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
137 for (int j = 0; j < (*i)->threads (); ++j) {
138 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
143 _picture_asset.reset (
144 new libdcp::MonoPictureAsset (
145 _film->dir (_film->dcp_name()),
146 String::compose ("video_%1.mxf", 0),
147 DCPFrameRate (_film->frames_per_second()).frames_per_second,
148 _film->format()->dcp_size()
152 _picture_asset_writer = _picture_asset->start_write ();
153 _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
158 Encoder::process_end ()
161 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
163 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
166 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
169 throw EncodeError ("could not run sample-rate converter");
176 out->set_frames (frames);
180 swr_free (&_swr_context);
184 if (_film->audio_stream()) {
185 close_sound_files ();
187 /* Rename .wav.tmp files to .wav */
188 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
189 if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
190 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
192 boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
196 boost::mutex::scoped_lock lock (_worker_mutex);
198 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
200 /* Keep waking workers until the queue is empty */
201 while (!_encode_queue.empty ()) {
202 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
203 _worker_condition.notify_all ();
204 _worker_condition.wait (lock);
209 terminate_worker_threads ();
211 _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
213 /* The following sequence of events can occur in the above code:
214 1. a remote worker takes the last image off the queue
215 2. the loop above terminates
216 3. the remote worker fails to encode the image and puts it back on the queue
217 4. the remote worker is then terminated by terminate_worker_threads
219 So just mop up anything left in the queue here.
222 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
223 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
225 shared_ptr<EncodedData> e = (*i)->encode_locally ();
227 boost::mutex::scoped_lock lock2 (_writer_mutex);
228 _write_queue.push_back (make_pair (e, (*i)->frame ()));
229 _writer_condition.notify_all ();
232 } catch (std::exception& e) {
233 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
237 finish_writer_thread ();
238 _picture_asset_writer->finalize ();
241 /** @return an estimate of the current number of frames we are encoding per second,
245 Encoder::current_frames_per_second () const
247 boost::mutex::scoped_lock lock (_history_mutex);
248 if (int (_time_history.size()) < _history_size) {
253 gettimeofday (&now, 0);
255 return _history_size / (seconds (now) - seconds (_time_history.back ()));
258 /** @return true if the last frame to be processed was skipped as it already existed */
260 Encoder::skipping () const
262 boost::mutex::scoped_lock (_history_mutex);
263 return _just_skipped;
266 /** @return Number of video frames that have been sent out */
268 Encoder::video_frames_out () const
270 boost::mutex::scoped_lock (_history_mutex);
271 return _video_frames_out;
274 /** Should be called when a frame has been encoded successfully.
275 * @param n Source frame index.
278 Encoder::frame_done ()
280 boost::mutex::scoped_lock lock (_history_mutex);
281 _just_skipped = false;
284 gettimeofday (&tv, 0);
285 _time_history.push_front (tv);
286 if (int (_time_history.size()) > _history_size) {
287 _time_history.pop_back ();
291 /** Called by a subclass when it has just skipped the processing
292 of a frame because it has already been done.
295 Encoder::frame_skipped ()
297 boost::mutex::scoped_lock lock (_history_mutex);
298 _just_skipped = true;
302 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
304 DCPFrameRate dfr (_film->frames_per_second ());
306 if (dfr.skip && (_video_frames_in % 2)) {
311 boost::mutex::scoped_lock lock (_worker_mutex);
313 /* Wait until the queue has gone down a bit */
314 while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
315 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
316 _worker_condition.wait (lock);
317 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
320 if (_terminate_encoder) {
324 /* Only do the processing if we don't already have a file for this frame */
325 if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
330 if (same && _have_a_real_frame) {
331 /* Use the last frame that we encoded. We do this by putting a null encoded
332 frame straight onto the writer's queue. It will know to duplicate the previous frame
335 boost::mutex::scoped_lock lock2 (_writer_mutex);
336 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
339 /* Queue this new frame for encoding */
340 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
341 TIMING ("adding to queue of %1", _encode_queue.size ());
342 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
344 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
345 _film->subtitle_offset(), _film->subtitle_scale(),
346 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
347 _film->colour_lut(), _film->j2k_bandwidth(),
352 _worker_condition.notify_all ();
353 _have_a_real_frame = true;
360 boost::mutex::scoped_lock lock2 (_writer_mutex);
361 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
368 Encoder::process_audio (shared_ptr<AudioBuffers> data)
371 /* Maybe sample-rate convert */
374 /* Compute the resampled frames count and add 32 for luck */
375 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
377 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
380 int const resampled_frames = swr_convert (
381 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
384 if (resampled_frames < 0) {
385 throw EncodeError ("could not run sample-rate converter");
388 resampled->set_frames (resampled_frames);
390 /* And point our variables at the resampled audio */
395 if (_film->audio_channels() == 1) {
396 /* We need to switch things around so that the mono channel is on
397 the centre channel of a 5.1 set (with other channels silent).
400 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
401 b->make_silent (libdcp::LEFT);
402 b->make_silent (libdcp::RIGHT);
403 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
404 b->make_silent (libdcp::LFE);
405 b->make_silent (libdcp::LS);
406 b->make_silent (libdcp::RS);
413 _audio_frames_in += data->frames ();
417 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
419 for (int i = 0; i < audio->channels(); ++i) {
420 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
423 _audio_frames_out += audio->frames ();
427 Encoder::close_sound_files ()
429 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
433 _sound_files.clear ();
437 Encoder::terminate_worker_threads ()
439 boost::mutex::scoped_lock lock (_worker_mutex);
440 _terminate_encoder = true;
441 _worker_condition.notify_all ();
444 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
451 Encoder::finish_writer_thread ()
453 if (!_writer_thread) {
457 boost::mutex::scoped_lock lock (_writer_mutex);
458 _finish_writer = true;
459 _writer_condition.notify_all ();
462 _writer_thread->join ();
463 delete _writer_thread;
468 Encoder::encoder_thread (ServerDescription* server)
470 /* Number of seconds that we currently wait between attempts
471 to connect to the server; not relevant for localhost
474 int remote_backoff = 0;
478 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
479 boost::mutex::scoped_lock lock (_worker_mutex);
480 while (_encode_queue.empty () && !_terminate_encoder) {
481 _worker_condition.wait (lock);
484 if (_terminate_encoder) {
488 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
489 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
490 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
491 _encode_queue.pop_front ();
495 shared_ptr<EncodedData> encoded;
499 encoded = vf->encode_remotely (server);
501 if (remote_backoff > 0) {
502 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
505 /* This job succeeded, so remove any backoff */
508 } catch (std::exception& e) {
509 if (remote_backoff < 60) {
511 remote_backoff += 10;
515 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
516 vf->frame(), server->host_name(), e.what(), remote_backoff)
522 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
523 encoded = vf->encode_locally ();
524 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
525 } catch (std::exception& e) {
526 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
531 boost::mutex::scoped_lock lock2 (_writer_mutex);
532 _write_queue.push_back (make_pair (encoded, vf->frame ()));
533 _writer_condition.notify_all ();
538 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
540 _encode_queue.push_front (vf);
544 if (remote_backoff > 0) {
545 dvdomatic_sleep (remote_backoff);
549 _worker_condition.notify_all ();
554 Encoder::link (string a, string b) const
556 #ifdef DVDOMATIC_POSIX
557 int const r = symlink (a.c_str(), b.c_str());
559 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
563 #ifdef DVDOMATIC_WINDOWS
564 boost::filesystem::copy_file (a, b);
568 struct WriteQueueSorter
570 bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
571 return a.second < b.second;
576 Encoder::writer_thread ()
580 boost::mutex::scoped_lock lock (_writer_mutex);
583 if (_finish_writer ||
584 _write_queue.size() > _maximum_frames_in_memory ||
585 (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) {
590 TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size());
591 _writer_condition.wait (lock);
592 TIMING ("writer wakes with a queue of %1", _write_queue.size());
594 _write_queue.sort (WriteQueueSorter ());
597 if (_finish_writer && _write_queue.empty() && _pending.empty()) {
601 /* Write any frames that we can write; i.e. those that are in sequence */
602 while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) {
603 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
604 _write_queue.pop_front ();
607 _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
609 _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
610 _last_written = encoded.first;
612 _picture_asset_writer->write (_last_written->data(), _last_written->size());
616 ++_last_written_frame;
619 while (_write_queue.size() > _maximum_frames_in_memory) {
620 /* Too many frames in memory which can't yet be written to the stream.
624 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.back ();
625 _write_queue.pop_back ();
626 if (!encoded.first) {
627 /* This is a `repeat-last' frame, so no need to write it to disk */
632 _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, encoded.second));
633 encoded.first->write (_film, encoded.second);
636 _pending.push_back (encoded.second);
639 while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
640 /* We have some space in memory. Fetch some frames back off disk. */
643 int const fetch = _pending.front ();
646 _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
647 shared_ptr<EncodedData> encoded;
648 if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
649 /* It's an actual frame (not a repeat-last); load it in */
650 encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
654 _write_queue.push_back (make_pair (encoded, fetch));
655 _pending.remove (fetch);