2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
44 using std::stringstream;
49 using namespace boost;
51 int const Encoder::_history_size = 25;
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<Film> f)
56 , _video_frames_in (0)
57 , _video_frames_out (0)
58 #ifdef HAVE_SWRESAMPLE
61 , _have_a_real_frame (false)
76 Encoder::process_begin ()
78 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
79 #ifdef HAVE_SWRESAMPLE
82 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
83 _film->log()->log (s.str ());
85 /* We will be using planar float data when we call the resampler */
86 _swr_context = swr_alloc_set_opts (
88 _film->audio_stream()->channel_layout(),
90 _film->target_audio_sample_rate(),
91 _film->audio_stream()->channel_layout(),
93 _film->audio_stream()->sample_rate(),
97 swr_init (_swr_context);
99 throw EncodeError ("Cannot resample audio as libswresample is not present");
102 #ifdef HAVE_SWRESAMPLE
107 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
108 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
111 vector<ServerDescription*> servers = Config::instance()->servers ();
113 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
114 for (int j = 0; j < (*i)->threads (); ++j) {
115 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
119 _writer.reset (new Writer (_film));
124 Encoder::process_end ()
127 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
129 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
132 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
135 throw EncodeError ("could not run sample-rate converter");
142 out->set_frames (frames);
146 swr_free (&_swr_context);
150 boost::mutex::scoped_lock lock (_mutex);
152 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
154 /* Keep waking workers until the queue is empty */
155 while (!_queue.empty ()) {
156 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
157 _condition.notify_all ();
158 _condition.wait (lock);
163 terminate_threads ();
165 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
167 /* The following sequence of events can occur in the above code:
168 1. a remote worker takes the last image off the queue
169 2. the loop above terminates
170 3. the remote worker fails to encode the image and puts it back on the queue
171 4. the remote worker is then terminated by terminate_threads
173 So just mop up anything left in the queue here.
176 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
177 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
179 _writer->write ((*i)->encode_locally(), (*i)->frame ());
181 } catch (std::exception& e) {
182 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
190 /** @return an estimate of the current number of frames we are encoding per second,
194 Encoder::current_frames_per_second () const
196 boost::mutex::scoped_lock lock (_history_mutex);
197 if (int (_time_history.size()) < _history_size) {
202 gettimeofday (&now, 0);
204 return _history_size / (seconds (now) - seconds (_time_history.back ()));
207 /** @return Number of video frames that have been sent out */
209 Encoder::video_frames_out () const
211 boost::mutex::scoped_lock (_history_mutex);
212 return _video_frames_out;
215 /** Should be called when a frame has been encoded successfully.
216 * @param n Source frame index.
219 Encoder::frame_done ()
221 boost::mutex::scoped_lock lock (_history_mutex);
224 gettimeofday (&tv, 0);
225 _time_history.push_front (tv);
226 if (int (_time_history.size()) > _history_size) {
227 _time_history.pop_back ();
232 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
234 DCPFrameRate dfr (_film->frames_per_second ());
236 if (dfr.skip && (_video_frames_in % 2)) {
241 boost::mutex::scoped_lock lock (_mutex);
243 /* Wait until the queue has gone down a bit */
244 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
245 TIMING ("decoder sleeps with queue of %1", _queue.size());
246 _condition.wait (lock);
247 TIMING ("decoder wakes with queue of %1", _queue.size());
254 if (_writer->thrown ()) {
258 if (_writer->can_fake_write (_video_frames_out)) {
259 _writer->fake_write (_video_frames_out);
260 _have_a_real_frame = false;
262 } else if (same && _have_a_real_frame) {
263 /* Use the last frame that we encoded. */
264 _writer->repeat (_video_frames_out);
267 /* Queue this new frame for encoding */
268 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
269 TIMING ("adding to queue of %1", _queue.size ());
270 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
272 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
273 _film->subtitle_offset(), _film->subtitle_scale(),
274 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
275 _film->colour_lut(), _film->j2k_bandwidth(),
280 _condition.notify_all ();
281 _have_a_real_frame = true;
288 _writer->repeat (_video_frames_out);
295 Encoder::process_audio (shared_ptr<AudioBuffers> data)
298 /* Maybe sample-rate convert */
301 /* Compute the resampled frames count and add 32 for luck */
302 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
304 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
307 int const resampled_frames = swr_convert (
308 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
311 if (resampled_frames < 0) {
312 throw EncodeError ("could not run sample-rate converter");
315 resampled->set_frames (resampled_frames);
317 /* And point our variables at the resampled audio */
326 Encoder::terminate_threads ()
328 boost::mutex::scoped_lock lock (_mutex);
330 _condition.notify_all ();
333 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
340 Encoder::encoder_thread (ServerDescription* server)
342 /* Number of seconds that we currently wait between attempts
343 to connect to the server; not relevant for localhost
346 int remote_backoff = 0;
350 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
351 boost::mutex::scoped_lock lock (_mutex);
352 while (_queue.empty () && !_terminate) {
353 _condition.wait (lock);
360 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
361 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
362 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
367 shared_ptr<EncodedData> encoded;
371 encoded = vf->encode_remotely (server);
373 if (remote_backoff > 0) {
374 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
377 /* This job succeeded, so remove any backoff */
380 } catch (std::exception& e) {
381 if (remote_backoff < 60) {
383 remote_backoff += 10;
387 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
388 vf->frame(), server->host_name(), e.what(), remote_backoff)
394 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
395 encoded = vf->encode_locally ();
396 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
397 } catch (std::exception& e) {
398 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
403 _writer->write (encoded, vf->frame ());
408 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
410 _queue.push_front (vf);
414 if (remote_backoff > 0) {
415 dvdomatic_sleep (remote_backoff);
419 _condition.notify_all ();
424 Encoder::write_audio (shared_ptr<const AudioBuffers> data)
426 if (_film->audio_channels() == 1) {
427 /* We need to switch things around so that the mono channel is on
428 the centre channel of a 5.1 set (with other channels silent).
431 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
432 b->make_silent (libdcp::LEFT);
433 b->make_silent (libdcp::RIGHT);
434 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
435 b->make_silent (libdcp::LFE);
436 b->make_silent (libdcp::LS);
437 b->make_silent (libdcp::RS);
442 _writer->write (data);