2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
24 #include <boost/filesystem.hpp>
25 #include <boost/lexical_cast.hpp>
31 #include "exceptions.h"
34 #include "dcp_video_frame.h"
40 using std::stringstream;
44 using namespace boost;
46 int const Encoder::_history_size = 25;
48 /** @param f Film that we are encoding.
51 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
54 , _just_skipped (false)
57 #ifdef HAVE_SWRESAMPLE
60 , _audio_frames_written (0)
61 , _process_end (false)
63 if (_film->audio_stream()) {
64 /* Create sound output files with .tmp suffixes; we will rename
65 them if and when we complete.
67 for (int i = 0; i < _film->audio_channels(); ++i) {
69 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
70 /* We write mono files */
72 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
73 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
75 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
77 _sound_files.push_back (f);
85 terminate_worker_threads ();
89 Encoder::process_begin ()
91 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
92 #ifdef HAVE_SWRESAMPLE
95 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
96 _film->log()->log (s.str ());
98 /* We will be using planar float data when we call the resampler */
99 _swr_context = swr_alloc_set_opts (
101 _film->audio_stream()->channel_layout(),
103 _film->target_audio_sample_rate(),
104 _film->audio_stream()->channel_layout(),
106 _film->audio_stream()->sample_rate(),
110 swr_init (_swr_context);
112 throw EncodeError ("Cannot resample audio as libswresample is not present");
115 #ifdef HAVE_SWRESAMPLE
120 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
121 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
124 vector<ServerDescription*> servers = Config::instance()->servers ();
126 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
127 for (int j = 0; j < (*i)->threads (); ++j) {
128 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
135 Encoder::process_end ()
138 if (_film->audio_stream() && _swr_context) {
140 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
143 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
146 throw EncodeError ("could not run sample-rate converter");
153 out->set_frames (frames);
157 swr_free (&_swr_context);
161 if (_film->audio_stream()) {
162 close_sound_files ();
164 /* Rename .wav.tmp files to .wav */
165 for (int i = 0; i < _film->audio_channels(); ++i) {
166 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
167 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
169 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
173 boost::mutex::scoped_lock lock (_worker_mutex);
175 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
177 /* Keep waking workers until the queue is empty */
178 while (!_queue.empty ()) {
179 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
180 _worker_condition.notify_all ();
181 _worker_condition.wait (lock);
186 terminate_worker_threads ();
188 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
190 /* The following sequence of events can occur in the above code:
191 1. a remote worker takes the last image off the queue
192 2. the loop above terminates
193 3. the remote worker fails to encode the image and puts it back on the queue
194 4. the remote worker is then terminated by terminate_worker_threads
196 So just mop up anything left in the queue here.
199 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
200 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
202 shared_ptr<EncodedData> e = (*i)->encode_locally ();
203 e->write (_opt, (*i)->frame ());
205 } catch (std::exception& e) {
206 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
211 /** @return an estimate of the current number of frames we are encoding per second,
215 Encoder::current_frames_per_second () const
217 boost::mutex::scoped_lock lock (_history_mutex);
218 if (int (_time_history.size()) < _history_size) {
223 gettimeofday (&now, 0);
225 return _history_size / (seconds (now) - seconds (_time_history.back ()));
228 /** @return true if the last frame to be processed was skipped as it already existed */
230 Encoder::skipping () const
232 boost::mutex::scoped_lock (_history_mutex);
233 return _just_skipped;
236 /** @return Number of video frames that have been received */
238 Encoder::video_frame () const
240 boost::mutex::scoped_lock (_history_mutex);
244 /** Should be called when a frame has been encoded successfully.
245 * @param n Source frame index.
248 Encoder::frame_done ()
250 boost::mutex::scoped_lock lock (_history_mutex);
251 _just_skipped = false;
254 gettimeofday (&tv, 0);
255 _time_history.push_front (tv);
256 if (int (_time_history.size()) > _history_size) {
257 _time_history.pop_back ();
261 /** Called by a subclass when it has just skipped the processing
262 of a frame because it has already been done.
265 Encoder::frame_skipped ()
267 boost::mutex::scoped_lock lock (_history_mutex);
268 _just_skipped = true;
272 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
274 if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
279 if (_opt->video_range) {
280 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
281 if (_video_frame < r.first || _video_frame >= r.second) {
287 boost::mutex::scoped_lock lock (_worker_mutex);
289 /* Wait until the queue has gone down a bit */
290 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
291 TIMING ("decoder sleeps with queue of %1", _queue.size());
292 _worker_condition.wait (lock);
293 TIMING ("decoder wakes with queue of %1", _queue.size());
300 /* Only do the processing if we don't already have a file for this frame */
301 if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
307 /* Use the last frame that we encoded */
308 assert (_last_real_frame);
309 link (_opt->frame_out_path (_last_real_frame.get(), false), _opt->frame_out_path (_video_frame, false));
310 link (_opt->hash_out_path (_last_real_frame.get(), false), _opt->hash_out_path (_video_frame, false));
312 /* Queue this new frame for encoding */
313 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
314 TIMING ("adding to queue of %1", _queue.size ());
315 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
317 image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
318 _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
319 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
324 _worker_condition.notify_all ();
325 _last_real_frame = _video_frame;
332 Encoder::process_audio (shared_ptr<AudioBuffers> data)
334 if (_opt->audio_range) {
335 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
337 /* Range that we are encoding */
338 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
339 /* Range of this block of data */
340 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
342 if (this_range.second < required_range.first || required_range.second < this_range.first) {
343 /* No part of this audio is within the required range */
345 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
347 int64_t const shift = required_range.first - this_range.first;
348 trimmed->move (shift, 0, trimmed->frames() - shift);
349 trimmed->set_frames (trimmed->frames() - shift);
350 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
352 trimmed->set_frames (required_range.second - this_range.first);
359 /* Maybe sample-rate convert */
362 /* Compute the resampled frames count and add 32 for luck */
363 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
365 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
368 int const resampled_frames = swr_convert (
369 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
372 if (resampled_frames < 0) {
373 throw EncodeError ("could not run sample-rate converter");
376 resampled->set_frames (resampled_frames);
378 /* And point our variables at the resampled audio */
385 _audio_frame += data->frames ();
389 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
391 for (int i = 0; i < _film->audio_channels(); ++i) {
392 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
395 _audio_frames_written += audio->frames ();
399 Encoder::close_sound_files ()
401 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
405 _sound_files.clear ();
409 Encoder::terminate_worker_threads ()
411 boost::mutex::scoped_lock lock (_worker_mutex);
413 _worker_condition.notify_all ();
416 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
423 Encoder::encoder_thread (ServerDescription* server)
425 /* Number of seconds that we currently wait between attempts
426 to connect to the server; not relevant for localhost
429 int remote_backoff = 0;
433 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
434 boost::mutex::scoped_lock lock (_worker_mutex);
435 while (_queue.empty () && !_process_end) {
436 _worker_condition.wait (lock);
443 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
444 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
445 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
450 shared_ptr<EncodedData> encoded;
454 encoded = vf->encode_remotely (server);
456 if (remote_backoff > 0) {
457 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
460 /* This job succeeded, so remove any backoff */
463 } catch (std::exception& e) {
464 if (remote_backoff < 60) {
466 remote_backoff += 10;
470 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
471 vf->frame(), server->host_name(), e.what(), remote_backoff)
477 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
478 encoded = vf->encode_locally ();
479 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
480 } catch (std::exception& e) {
481 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
486 encoded->write (_opt, vf->frame ());
491 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
493 _queue.push_front (vf);
497 if (remote_backoff > 0) {
498 dvdomatic_sleep (remote_backoff);
502 _worker_condition.notify_all ();
507 Encoder::link (string a, string b) const
509 #ifdef DVDOMATIC_POSIX
510 int const r = symlink (a.c_str(), b.c_str());
512 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
516 #ifdef DVDOMATIC_WINDOWS
517 boost::filesystem::copy_file (a, b);