2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 using std::stringstream;
46 using namespace boost;
48 int const Encoder::_history_size = 25;
50 /** @param f Film that we are encoding.
53 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
56 , _just_skipped (false)
59 #ifdef HAVE_SWRESAMPLE
62 , _audio_frames_written (0)
63 , _process_end (false)
65 if (_film->audio_stream()) {
66 /* Create sound output files with .tmp suffixes; we will rename
67 them if and when we complete.
69 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
71 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
72 /* We write mono files */
74 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
75 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
77 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
79 _sound_files.push_back (f);
87 terminate_worker_threads ();
91 Encoder::process_begin ()
93 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
94 #ifdef HAVE_SWRESAMPLE
97 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
98 _film->log()->log (s.str ());
100 /* We will be using planar float data when we call the resampler */
101 _swr_context = swr_alloc_set_opts (
103 _film->audio_stream()->channel_layout(),
105 _film->target_audio_sample_rate(),
106 _film->audio_stream()->channel_layout(),
108 _film->audio_stream()->sample_rate(),
112 swr_init (_swr_context);
114 throw EncodeError ("Cannot resample audio as libswresample is not present");
117 #ifdef HAVE_SWRESAMPLE
122 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
123 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
126 vector<ServerDescription*> servers = Config::instance()->servers ();
128 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
129 for (int j = 0; j < (*i)->threads (); ++j) {
130 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
137 Encoder::process_end ()
140 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
142 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
145 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
148 throw EncodeError ("could not run sample-rate converter");
155 out->set_frames (frames);
159 swr_free (&_swr_context);
163 if (_film->audio_stream()) {
164 close_sound_files ();
166 /* Rename .wav.tmp files to .wav */
167 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
168 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
169 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
171 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
175 boost::mutex::scoped_lock lock (_worker_mutex);
177 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
179 /* Keep waking workers until the queue is empty */
180 while (!_queue.empty ()) {
181 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
182 _worker_condition.notify_all ();
183 _worker_condition.wait (lock);
188 terminate_worker_threads ();
190 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
192 /* The following sequence of events can occur in the above code:
193 1. a remote worker takes the last image off the queue
194 2. the loop above terminates
195 3. the remote worker fails to encode the image and puts it back on the queue
196 4. the remote worker is then terminated by terminate_worker_threads
198 So just mop up anything left in the queue here.
201 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
202 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
204 shared_ptr<EncodedData> e = (*i)->encode_locally ();
205 e->write (_opt, (*i)->frame ());
207 } catch (std::exception& e) {
208 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
212 /* Now do links (or copies on windows) to duplicate frames */
213 for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
214 link (_opt->frame_out_path (i->first, false), _opt->frame_out_path (i->second, false));
215 link (_opt->hash_out_path (i->first, false), _opt->hash_out_path (i->second, false));
219 /** @return an estimate of the current number of frames we are encoding per second,
223 Encoder::current_frames_per_second () const
225 boost::mutex::scoped_lock lock (_history_mutex);
226 if (int (_time_history.size()) < _history_size) {
231 gettimeofday (&now, 0);
233 return _history_size / (seconds (now) - seconds (_time_history.back ()));
236 /** @return true if the last frame to be processed was skipped as it already existed */
238 Encoder::skipping () const
240 boost::mutex::scoped_lock (_history_mutex);
241 return _just_skipped;
244 /** @return Number of video frames that have been received */
246 Encoder::video_frame () const
248 boost::mutex::scoped_lock (_history_mutex);
252 /** Should be called when a frame has been encoded successfully.
253 * @param n Source frame index.
256 Encoder::frame_done ()
258 boost::mutex::scoped_lock lock (_history_mutex);
259 _just_skipped = false;
262 gettimeofday (&tv, 0);
263 _time_history.push_front (tv);
264 if (int (_time_history.size()) > _history_size) {
265 _time_history.pop_back ();
269 /** Called by a subclass when it has just skipped the processing
270 of a frame because it has already been done.
273 Encoder::frame_skipped ()
275 boost::mutex::scoped_lock lock (_history_mutex);
276 _just_skipped = true;
280 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
282 if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
287 if (_opt->video_range) {
288 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
289 if (_video_frame < r.first || _video_frame >= r.second) {
295 boost::mutex::scoped_lock lock (_worker_mutex);
297 /* Wait until the queue has gone down a bit */
298 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
299 TIMING ("decoder sleeps with queue of %1", _queue.size());
300 _worker_condition.wait (lock);
301 TIMING ("decoder wakes with queue of %1", _queue.size());
308 /* Only do the processing if we don't already have a file for this frame */
309 if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
314 if (same && _last_real_frame) {
315 /* Use the last frame that we encoded. We need to postpone doing the actual link,
316 as on windows the link is really a copy and the reference frame might not have
317 finished encoding yet.
319 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frame));
321 /* Queue this new frame for encoding */
322 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
323 TIMING ("adding to queue of %1", _queue.size ());
324 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
326 image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
327 _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
328 _film->colour_lut(), _film->j2k_bandwidth(),
333 _worker_condition.notify_all ();
334 _last_real_frame = _video_frame;
341 Encoder::process_audio (shared_ptr<AudioBuffers> data)
343 if (_opt->audio_range) {
344 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
346 /* Range that we are encoding */
347 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
348 /* Range of this block of data */
349 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
351 if (this_range.second < required_range.first || required_range.second < this_range.first) {
352 /* No part of this audio is within the required range */
354 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
356 int64_t const shift = required_range.first - this_range.first;
357 trimmed->move (shift, 0, trimmed->frames() - shift);
358 trimmed->set_frames (trimmed->frames() - shift);
359 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
361 trimmed->set_frames (required_range.second - this_range.first);
368 /* Maybe sample-rate convert */
371 /* Compute the resampled frames count and add 32 for luck */
372 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
374 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
377 int const resampled_frames = swr_convert (
378 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
381 if (resampled_frames < 0) {
382 throw EncodeError ("could not run sample-rate converter");
385 resampled->set_frames (resampled_frames);
387 /* And point our variables at the resampled audio */
392 if (_film->audio_channels() == 1) {
393 /* We need to switch things around so that the mono channel is on
394 the centre channel of a 5.1 set (with other channels silent).
397 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
398 b->make_silent (libdcp::LEFT);
399 b->make_silent (libdcp::RIGHT);
400 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
401 b->make_silent (libdcp::LFE);
402 b->make_silent (libdcp::LS);
403 b->make_silent (libdcp::RS);
410 _audio_frame += data->frames ();
414 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
416 for (int i = 0; i < audio->channels(); ++i) {
417 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
420 _audio_frames_written += audio->frames ();
424 Encoder::close_sound_files ()
426 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
430 _sound_files.clear ();
434 Encoder::terminate_worker_threads ()
436 boost::mutex::scoped_lock lock (_worker_mutex);
438 _worker_condition.notify_all ();
441 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
448 Encoder::encoder_thread (ServerDescription* server)
450 /* Number of seconds that we currently wait between attempts
451 to connect to the server; not relevant for localhost
454 int remote_backoff = 0;
458 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
459 boost::mutex::scoped_lock lock (_worker_mutex);
460 while (_queue.empty () && !_process_end) {
461 _worker_condition.wait (lock);
468 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
469 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
470 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
475 shared_ptr<EncodedData> encoded;
479 encoded = vf->encode_remotely (server);
481 if (remote_backoff > 0) {
482 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
485 /* This job succeeded, so remove any backoff */
488 } catch (std::exception& e) {
489 if (remote_backoff < 60) {
491 remote_backoff += 10;
495 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
496 vf->frame(), server->host_name(), e.what(), remote_backoff)
502 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
503 encoded = vf->encode_locally ();
504 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
505 } catch (std::exception& e) {
506 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
511 encoded->write (_opt, vf->frame ());
516 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
518 _queue.push_front (vf);
522 if (remote_backoff > 0) {
523 dvdomatic_sleep (remote_backoff);
527 _worker_condition.notify_all ();
532 Encoder::link (string a, string b) const
534 #ifdef DVDOMATIC_POSIX
535 int const r = symlink (a.c_str(), b.c_str());
537 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
541 #ifdef DVDOMATIC_WINDOWS
542 boost::filesystem::copy_file (a, b);