2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 using std::stringstream;
45 using namespace boost;
47 int const Encoder::_history_size = 25;
49 /** @param f Film that we are encoding.
52 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
55 , _just_skipped (false)
58 #ifdef HAVE_SWRESAMPLE
61 , _audio_frames_written (0)
62 , _process_end (false)
64 if (_film->audio_stream()) {
65 /* Create sound output files with .tmp suffixes; we will rename
66 them if and when we complete.
68 for (int i = 0; i < _film->audio_channels(); ++i) {
70 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
71 /* We write mono files */
73 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
74 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
76 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
78 _sound_files.push_back (f);
86 terminate_worker_threads ();
90 Encoder::process_begin ()
92 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
93 #ifdef HAVE_SWRESAMPLE
96 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
97 _film->log()->log (s.str ());
99 /* We will be using planar float data when we call the resampler */
100 _swr_context = swr_alloc_set_opts (
102 _film->audio_stream()->channel_layout(),
104 _film->target_audio_sample_rate(),
105 _film->audio_stream()->channel_layout(),
107 _film->audio_stream()->sample_rate(),
111 swr_init (_swr_context);
113 throw EncodeError ("Cannot resample audio as libswresample is not present");
116 #ifdef HAVE_SWRESAMPLE
121 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
122 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
125 vector<ServerDescription*> servers = Config::instance()->servers ();
127 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
128 for (int j = 0; j < (*i)->threads (); ++j) {
129 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
136 Encoder::process_end ()
139 if (_film->audio_stream() && _swr_context) {
141 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
144 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
147 throw EncodeError ("could not run sample-rate converter");
154 out->set_frames (frames);
158 swr_free (&_swr_context);
162 if (_film->audio_stream()) {
163 close_sound_files ();
165 /* Rename .wav.tmp files to .wav */
166 for (int i = 0; i < _film->audio_channels(); ++i) {
167 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
168 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
170 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
174 boost::mutex::scoped_lock lock (_worker_mutex);
176 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
178 /* Keep waking workers until the queue is empty */
179 while (!_queue.empty ()) {
180 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
181 _worker_condition.notify_all ();
182 _worker_condition.wait (lock);
187 terminate_worker_threads ();
189 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
191 /* The following sequence of events can occur in the above code:
192 1. a remote worker takes the last image off the queue
193 2. the loop above terminates
194 3. the remote worker fails to encode the image and puts it back on the queue
195 4. the remote worker is then terminated by terminate_worker_threads
197 So just mop up anything left in the queue here.
200 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
201 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
203 shared_ptr<EncodedData> e = (*i)->encode_locally ();
204 e->write (_opt, (*i)->frame ());
206 } catch (std::exception& e) {
207 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
212 /** @return an estimate of the current number of frames we are encoding per second,
216 Encoder::current_frames_per_second () const
218 boost::mutex::scoped_lock lock (_history_mutex);
219 if (int (_time_history.size()) < _history_size) {
224 gettimeofday (&now, 0);
226 return _history_size / (seconds (now) - seconds (_time_history.back ()));
229 /** @return true if the last frame to be processed was skipped as it already existed */
231 Encoder::skipping () const
233 boost::mutex::scoped_lock (_history_mutex);
234 return _just_skipped;
237 /** @return Number of video frames that have been received */
239 Encoder::video_frame () const
241 boost::mutex::scoped_lock (_history_mutex);
245 /** Should be called when a frame has been encoded successfully.
246 * @param n Source frame index.
249 Encoder::frame_done ()
251 boost::mutex::scoped_lock lock (_history_mutex);
252 _just_skipped = false;
255 gettimeofday (&tv, 0);
256 _time_history.push_front (tv);
257 if (int (_time_history.size()) > _history_size) {
258 _time_history.pop_back ();
262 /** Called by a subclass when it has just skipped the processing
263 of a frame because it has already been done.
266 Encoder::frame_skipped ()
268 boost::mutex::scoped_lock lock (_history_mutex);
269 _just_skipped = true;
273 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
275 if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
280 if (_opt->video_range) {
281 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
282 if (_video_frame < r.first || _video_frame >= r.second) {
288 boost::mutex::scoped_lock lock (_worker_mutex);
290 /* Wait until the queue has gone down a bit */
291 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
292 TIMING ("decoder sleeps with queue of %1", _queue.size());
293 _worker_condition.wait (lock);
294 TIMING ("decoder wakes with queue of %1", _queue.size());
301 /* Only do the processing if we don't already have a file for this frame */
302 if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
308 /* Use the last frame that we encoded */
309 assert (_last_real_frame);
310 link (_opt->frame_out_path (_last_real_frame.get(), false), _opt->frame_out_path (_video_frame, false));
311 link (_opt->hash_out_path (_last_real_frame.get(), false), _opt->hash_out_path (_video_frame, false));
313 /* Queue this new frame for encoding */
314 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
315 TIMING ("adding to queue of %1", _queue.size ());
316 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
318 image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
319 _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
320 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
325 _worker_condition.notify_all ();
326 _last_real_frame = _video_frame;
333 Encoder::process_audio (shared_ptr<AudioBuffers> data)
335 if (_opt->audio_range) {
336 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
338 /* Range that we are encoding */
339 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
340 /* Range of this block of data */
341 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
343 if (this_range.second < required_range.first || required_range.second < this_range.first) {
344 /* No part of this audio is within the required range */
346 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
348 int64_t const shift = required_range.first - this_range.first;
349 trimmed->move (shift, 0, trimmed->frames() - shift);
350 trimmed->set_frames (trimmed->frames() - shift);
351 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
353 trimmed->set_frames (required_range.second - this_range.first);
360 /* Maybe sample-rate convert */
363 /* Compute the resampled frames count and add 32 for luck */
364 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
366 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
369 int const resampled_frames = swr_convert (
370 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
373 if (resampled_frames < 0) {
374 throw EncodeError ("could not run sample-rate converter");
377 resampled->set_frames (resampled_frames);
379 /* And point our variables at the resampled audio */
386 _audio_frame += data->frames ();
390 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
392 for (int i = 0; i < _film->audio_channels(); ++i) {
393 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
396 _audio_frames_written += audio->frames ();
400 Encoder::close_sound_files ()
402 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
406 _sound_files.clear ();
410 Encoder::terminate_worker_threads ()
412 boost::mutex::scoped_lock lock (_worker_mutex);
414 _worker_condition.notify_all ();
417 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
424 Encoder::encoder_thread (ServerDescription* server)
426 /* Number of seconds that we currently wait between attempts
427 to connect to the server; not relevant for localhost
430 int remote_backoff = 0;
434 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
435 boost::mutex::scoped_lock lock (_worker_mutex);
436 while (_queue.empty () && !_process_end) {
437 _worker_condition.wait (lock);
444 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
445 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
446 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
451 shared_ptr<EncodedData> encoded;
455 encoded = vf->encode_remotely (server);
457 if (remote_backoff > 0) {
458 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
461 /* This job succeeded, so remove any backoff */
464 } catch (std::exception& e) {
465 if (remote_backoff < 60) {
467 remote_backoff += 10;
471 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
472 vf->frame(), server->host_name(), e.what(), remote_backoff)
478 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
479 encoded = vf->encode_locally ();
480 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
481 } catch (std::exception& e) {
482 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
487 encoded->write (_opt, vf->frame ());
492 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
494 _queue.push_front (vf);
498 if (remote_backoff > 0) {
499 dvdomatic_sleep (remote_backoff);
503 _worker_condition.notify_all ();
508 Encoder::link (string a, string b) const
510 #ifdef DVDOMATIC_POSIX
511 int const r = symlink (a.c_str(), b.c_str());
513 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
517 #ifdef DVDOMATIC_WINDOWS
518 boost::filesystem::copy_file (a, b);