2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 using std::stringstream;
45 using namespace boost;
47 int const Encoder::_history_size = 25;
49 /** @param f Film that we are encoding.
52 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
55 , _just_skipped (false)
58 #ifdef HAVE_SWRESAMPLE
61 , _audio_frames_written (0)
62 , _process_end (false)
64 if (_film->audio_stream()) {
65 /* Create sound output files with .tmp suffixes; we will rename
66 them if and when we complete.
68 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
70 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
71 /* We write mono files */
73 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
74 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
76 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
78 _sound_files.push_back (f);
86 terminate_worker_threads ();
90 Encoder::process_begin ()
92 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
93 #ifdef HAVE_SWRESAMPLE
96 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
97 _film->log()->log (s.str ());
99 /* We will be using planar float data when we call the resampler */
100 _swr_context = swr_alloc_set_opts (
102 _film->audio_stream()->channel_layout(),
104 _film->target_audio_sample_rate(),
105 _film->audio_stream()->channel_layout(),
107 _film->audio_stream()->sample_rate(),
111 swr_init (_swr_context);
113 throw EncodeError ("Cannot resample audio as libswresample is not present");
116 #ifdef HAVE_SWRESAMPLE
121 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
122 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
125 vector<ServerDescription*> servers = Config::instance()->servers ();
127 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
128 for (int j = 0; j < (*i)->threads (); ++j) {
129 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
136 Encoder::process_end ()
139 if (_film->audio_stream() && _swr_context) {
141 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
144 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
147 throw EncodeError ("could not run sample-rate converter");
154 out->set_frames (frames);
158 swr_free (&_swr_context);
162 if (_film->audio_stream()) {
163 close_sound_files ();
165 /* Rename .wav.tmp files to .wav */
166 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
167 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
168 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
170 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
174 boost::mutex::scoped_lock lock (_worker_mutex);
176 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
178 /* Keep waking workers until the queue is empty */
179 while (!_queue.empty ()) {
180 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
181 _worker_condition.notify_all ();
182 _worker_condition.wait (lock);
187 terminate_worker_threads ();
189 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
191 /* The following sequence of events can occur in the above code:
192 1. a remote worker takes the last image off the queue
193 2. the loop above terminates
194 3. the remote worker fails to encode the image and puts it back on the queue
195 4. the remote worker is then terminated by terminate_worker_threads
197 So just mop up anything left in the queue here.
200 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
201 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
203 shared_ptr<EncodedData> e = (*i)->encode_locally ();
204 e->write (_opt, (*i)->frame ());
206 } catch (std::exception& e) {
207 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
212 /** @return an estimate of the current number of frames we are encoding per second,
216 Encoder::current_frames_per_second () const
218 boost::mutex::scoped_lock lock (_history_mutex);
219 if (int (_time_history.size()) < _history_size) {
224 gettimeofday (&now, 0);
226 return _history_size / (seconds (now) - seconds (_time_history.back ()));
229 /** @return true if the last frame to be processed was skipped as it already existed */
231 Encoder::skipping () const
233 boost::mutex::scoped_lock (_history_mutex);
234 return _just_skipped;
237 /** @return Number of video frames that have been received */
239 Encoder::video_frame () const
241 boost::mutex::scoped_lock (_history_mutex);
245 /** Should be called when a frame has been encoded successfully.
246 * @param n Source frame index.
249 Encoder::frame_done ()
251 boost::mutex::scoped_lock lock (_history_mutex);
252 _just_skipped = false;
255 gettimeofday (&tv, 0);
256 _time_history.push_front (tv);
257 if (int (_time_history.size()) > _history_size) {
258 _time_history.pop_back ();
262 /** Called by a subclass when it has just skipped the processing
263 of a frame because it has already been done.
266 Encoder::frame_skipped ()
268 boost::mutex::scoped_lock lock (_history_mutex);
269 _just_skipped = true;
273 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
275 if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
280 if (_opt->video_range) {
281 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
282 if (_video_frame < r.first || _video_frame >= r.second) {
288 boost::mutex::scoped_lock lock (_worker_mutex);
290 /* Wait until the queue has gone down a bit */
291 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
292 TIMING ("decoder sleeps with queue of %1", _queue.size());
293 _worker_condition.wait (lock);
294 TIMING ("decoder wakes with queue of %1", _queue.size());
301 /* Only do the processing if we don't already have a file for this frame */
302 if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
307 if (same && _last_real_frame) {
308 /* Use the last frame that we encoded */
309 link (_opt->frame_out_path (_last_real_frame.get(), false), _opt->frame_out_path (_video_frame, false));
310 link (_opt->hash_out_path (_last_real_frame.get(), false), _opt->hash_out_path (_video_frame, false));
312 /* Queue this new frame for encoding */
313 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
314 TIMING ("adding to queue of %1", _queue.size ());
315 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
317 image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
318 _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
319 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
324 _worker_condition.notify_all ();
325 _last_real_frame = _video_frame;
332 Encoder::process_audio (shared_ptr<AudioBuffers> data)
334 if (_opt->audio_range) {
335 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
337 /* Range that we are encoding */
338 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
339 /* Range of this block of data */
340 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
342 if (this_range.second < required_range.first || required_range.second < this_range.first) {
343 /* No part of this audio is within the required range */
345 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
347 int64_t const shift = required_range.first - this_range.first;
348 trimmed->move (shift, 0, trimmed->frames() - shift);
349 trimmed->set_frames (trimmed->frames() - shift);
350 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
352 trimmed->set_frames (required_range.second - this_range.first);
359 /* Maybe sample-rate convert */
362 /* Compute the resampled frames count and add 32 for luck */
363 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
365 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
368 int const resampled_frames = swr_convert (
369 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
372 if (resampled_frames < 0) {
373 throw EncodeError ("could not run sample-rate converter");
376 resampled->set_frames (resampled_frames);
378 /* And point our variables at the resampled audio */
383 if (_film->audio_channels() == 1) {
384 /* We need to switch things around so that the mono channel is on
385 the centre channel of a 5.1 set (with other channels silent).
388 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
389 b->make_silent (libdcp::LEFT);
390 b->make_silent (libdcp::RIGHT);
391 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
392 b->make_silent (libdcp::LFE);
393 b->make_silent (libdcp::LS);
394 b->make_silent (libdcp::RS);
401 _audio_frame += data->frames ();
405 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
407 for (int i = 0; i < audio->channels(); ++i) {
408 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
411 _audio_frames_written += audio->frames ();
415 Encoder::close_sound_files ()
417 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
421 _sound_files.clear ();
425 Encoder::terminate_worker_threads ()
427 boost::mutex::scoped_lock lock (_worker_mutex);
429 _worker_condition.notify_all ();
432 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
439 Encoder::encoder_thread (ServerDescription* server)
441 /* Number of seconds that we currently wait between attempts
442 to connect to the server; not relevant for localhost
445 int remote_backoff = 0;
449 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
450 boost::mutex::scoped_lock lock (_worker_mutex);
451 while (_queue.empty () && !_process_end) {
452 _worker_condition.wait (lock);
459 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
460 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
461 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
466 shared_ptr<EncodedData> encoded;
470 encoded = vf->encode_remotely (server);
472 if (remote_backoff > 0) {
473 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
476 /* This job succeeded, so remove any backoff */
479 } catch (std::exception& e) {
480 if (remote_backoff < 60) {
482 remote_backoff += 10;
486 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
487 vf->frame(), server->host_name(), e.what(), remote_backoff)
493 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
494 encoded = vf->encode_locally ();
495 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
496 } catch (std::exception& e) {
497 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
502 encoded->write (_opt, vf->frame ());
507 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
509 _queue.push_front (vf);
513 if (remote_backoff > 0) {
514 dvdomatic_sleep (remote_backoff);
518 _worker_condition.notify_all ();
523 Encoder::link (string a, string b) const
525 #ifdef DVDOMATIC_POSIX
526 int const r = symlink (a.c_str(), b.c_str());
528 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
532 #ifdef DVDOMATIC_WINDOWS
533 boost::filesystem::copy_file (a, b);