2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
42 using std::stringstream;
47 using namespace boost;
49 int const Encoder::_history_size = 25;
51 /** @param f Film that we are encoding.
54 Encoder::Encoder (shared_ptr<const Film> f)
56 , _just_skipped (false)
57 , _video_frames_in (0)
58 , _audio_frames_in (0)
59 , _video_frames_out (0)
60 , _audio_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE
64 , _process_end (false)
66 if (_film->audio_stream()) {
67 /* Create sound output files with .tmp suffixes; we will rename
68 them if and when we complete.
70 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
72 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
73 /* We write mono files */
75 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
76 SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
78 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
80 _sound_files.push_back (f);
88 terminate_worker_threads ();
92 Encoder::process_begin ()
94 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
95 #ifdef HAVE_SWRESAMPLE
98 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
99 _film->log()->log (s.str ());
101 /* We will be using planar float data when we call the resampler */
102 _swr_context = swr_alloc_set_opts (
104 _film->audio_stream()->channel_layout(),
106 _film->target_audio_sample_rate(),
107 _film->audio_stream()->channel_layout(),
109 _film->audio_stream()->sample_rate(),
113 swr_init (_swr_context);
115 throw EncodeError ("Cannot resample audio as libswresample is not present");
118 #ifdef HAVE_SWRESAMPLE
123 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
124 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
127 vector<ServerDescription*> servers = Config::instance()->servers ();
129 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
130 for (int j = 0; j < (*i)->threads (); ++j) {
131 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
138 Encoder::process_end ()
141 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
143 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
146 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
149 throw EncodeError ("could not run sample-rate converter");
156 out->set_frames (frames);
160 swr_free (&_swr_context);
164 if (_film->audio_stream()) {
165 close_sound_files ();
167 /* Rename .wav.tmp files to .wav */
168 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
169 if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
170 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
172 boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
176 boost::mutex::scoped_lock lock (_worker_mutex);
178 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
180 /* Keep waking workers until the queue is empty */
181 while (!_queue.empty ()) {
182 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
183 _worker_condition.notify_all ();
184 _worker_condition.wait (lock);
189 terminate_worker_threads ();
191 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
193 /* The following sequence of events can occur in the above code:
194 1. a remote worker takes the last image off the queue
195 2. the loop above terminates
196 3. the remote worker fails to encode the image and puts it back on the queue
197 4. the remote worker is then terminated by terminate_worker_threads
199 So just mop up anything left in the queue here.
202 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
203 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
205 shared_ptr<EncodedData> e = (*i)->encode_locally ();
206 e->write (_film, (*i)->frame ());
208 } catch (std::exception& e) {
209 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
213 /* Now do links (or copies on windows) to duplicate frames */
214 for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
215 link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false));
216 link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false));
220 /** @return an estimate of the current number of frames we are encoding per second,
224 Encoder::current_frames_per_second () const
226 boost::mutex::scoped_lock lock (_history_mutex);
227 if (int (_time_history.size()) < _history_size) {
232 gettimeofday (&now, 0);
234 return _history_size / (seconds (now) - seconds (_time_history.back ()));
237 /** @return true if the last frame to be processed was skipped as it already existed */
239 Encoder::skipping () const
241 boost::mutex::scoped_lock (_history_mutex);
242 return _just_skipped;
245 /** @return Number of video frames that have been sent out */
247 Encoder::video_frames_out () const
249 boost::mutex::scoped_lock (_history_mutex);
250 return _video_frames_out;
253 /** Should be called when a frame has been encoded successfully.
254 * @param n Source frame index.
257 Encoder::frame_done ()
259 boost::mutex::scoped_lock lock (_history_mutex);
260 _just_skipped = false;
263 gettimeofday (&tv, 0);
264 _time_history.push_front (tv);
265 if (int (_time_history.size()) > _history_size) {
266 _time_history.pop_back ();
270 /** Called by a subclass when it has just skipped the processing
271 of a frame because it has already been done.
274 Encoder::frame_skipped ()
276 boost::mutex::scoped_lock lock (_history_mutex);
277 _just_skipped = true;
281 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
283 DCPFrameRate dfr (_film->frames_per_second ());
285 if (dfr.skip && (_video_frames_in % 2)) {
290 boost::mutex::scoped_lock lock (_worker_mutex);
292 /* Wait until the queue has gone down a bit */
293 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
294 TIMING ("decoder sleeps with queue of %1", _queue.size());
295 _worker_condition.wait (lock);
296 TIMING ("decoder wakes with queue of %1", _queue.size());
303 /* Only do the processing if we don't already have a file for this frame */
304 if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
309 if (same && _last_real_frame) {
310 /* Use the last frame that we encoded. We need to postpone doing the actual link,
311 as on windows the link is really a copy and the reference frame might not have
312 finished encoding yet.
314 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frames_out));
316 /* Queue this new frame for encoding */
317 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
318 TIMING ("adding to queue of %1", _queue.size ());
319 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
321 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
322 _film->subtitle_offset(), _film->subtitle_scale(),
323 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
324 _film->colour_lut(), _film->j2k_bandwidth(),
329 _worker_condition.notify_all ();
330 _last_real_frame = _video_frames_out;
337 _links_required.push_back (make_pair (_video_frames_out, _video_frames_out - 1));
343 Encoder::process_audio (shared_ptr<AudioBuffers> data)
346 /* Maybe sample-rate convert */
349 /* Compute the resampled frames count and add 32 for luck */
350 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
352 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
355 int const resampled_frames = swr_convert (
356 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
359 if (resampled_frames < 0) {
360 throw EncodeError ("could not run sample-rate converter");
363 resampled->set_frames (resampled_frames);
365 /* And point our variables at the resampled audio */
370 if (_film->audio_channels() == 1) {
371 /* We need to switch things around so that the mono channel is on
372 the centre channel of a 5.1 set (with other channels silent).
375 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
376 b->make_silent (libdcp::LEFT);
377 b->make_silent (libdcp::RIGHT);
378 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
379 b->make_silent (libdcp::LFE);
380 b->make_silent (libdcp::LS);
381 b->make_silent (libdcp::RS);
388 _audio_frames_in += data->frames ();
392 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
394 for (int i = 0; i < audio->channels(); ++i) {
395 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
398 _audio_frames_out += audio->frames ();
402 Encoder::close_sound_files ()
404 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
408 _sound_files.clear ();
412 Encoder::terminate_worker_threads ()
414 boost::mutex::scoped_lock lock (_worker_mutex);
416 _worker_condition.notify_all ();
419 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
426 Encoder::encoder_thread (ServerDescription* server)
428 /* Number of seconds that we currently wait between attempts
429 to connect to the server; not relevant for localhost
432 int remote_backoff = 0;
436 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
437 boost::mutex::scoped_lock lock (_worker_mutex);
438 while (_queue.empty () && !_process_end) {
439 _worker_condition.wait (lock);
446 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
447 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
448 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
453 shared_ptr<EncodedData> encoded;
457 encoded = vf->encode_remotely (server);
459 if (remote_backoff > 0) {
460 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
463 /* This job succeeded, so remove any backoff */
466 } catch (std::exception& e) {
467 if (remote_backoff < 60) {
469 remote_backoff += 10;
473 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
474 vf->frame(), server->host_name(), e.what(), remote_backoff)
480 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
481 encoded = vf->encode_locally ();
482 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
483 } catch (std::exception& e) {
484 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
489 encoded->write (_film, vf->frame ());
494 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
496 _queue.push_front (vf);
500 if (remote_backoff > 0) {
501 dvdomatic_sleep (remote_backoff);
505 _worker_condition.notify_all ();
510 Encoder::link (string a, string b) const
512 #ifdef DVDOMATIC_POSIX
513 int const r = symlink (a.c_str(), b.c_str());
515 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
519 #ifdef DVDOMATIC_WINDOWS
520 boost::filesystem::copy_file (a, b);