2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
44 using std::stringstream;
49 using namespace boost;
51 int const Encoder::_history_size = 25;
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<Film> f)
56 , _video_frames_in (0)
57 , _video_frames_out (0)
58 #ifdef HAVE_SWRESAMPLE
61 , _have_a_real_frame (false)
76 Encoder::process_begin ()
78 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
79 #ifdef HAVE_SWRESAMPLE
82 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
83 _film->log()->log (s.str ());
85 /* We will be using planar float data when we call the resampler */
86 _swr_context = swr_alloc_set_opts (
88 _film->audio_stream()->channel_layout(),
90 _film->target_audio_sample_rate(),
91 _film->audio_stream()->channel_layout(),
93 _film->audio_stream()->sample_rate(),
97 swr_init (_swr_context);
99 throw EncodeError ("Cannot resample audio as libswresample is not present");
102 #ifdef HAVE_SWRESAMPLE
107 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
108 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
111 vector<ServerDescription*> servers = Config::instance()->servers ();
113 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
114 for (int j = 0; j < (*i)->threads (); ++j) {
115 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
119 _writer.reset (new Writer (_film));
124 Encoder::process_end ()
127 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
129 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
132 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
135 throw EncodeError ("could not run sample-rate converter");
142 out->set_frames (frames);
143 _writer->write (out);
146 swr_free (&_swr_context);
150 boost::mutex::scoped_lock lock (_mutex);
152 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
154 /* Keep waking workers until the queue is empty */
155 while (!_queue.empty ()) {
156 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
157 _condition.notify_all ();
158 _condition.wait (lock);
163 terminate_threads ();
165 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
167 /* The following sequence of events can occur in the above code:
168 1. a remote worker takes the last image off the queue
169 2. the loop above terminates
170 3. the remote worker fails to encode the image and puts it back on the queue
171 4. the remote worker is then terminated by terminate_threads
173 So just mop up anything left in the queue here.
176 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
177 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
179 _writer->write ((*i)->encode_locally(), (*i)->frame ());
181 } catch (std::exception& e) {
182 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
190 /** @return an estimate of the current number of frames we are encoding per second,
194 Encoder::current_frames_per_second () const
196 boost::mutex::scoped_lock lock (_history_mutex);
197 if (int (_time_history.size()) < _history_size) {
202 gettimeofday (&now, 0);
204 return _history_size / (seconds (now) - seconds (_time_history.back ()));
207 /** @return Number of video frames that have been sent out */
209 Encoder::video_frames_out () const
211 boost::mutex::scoped_lock (_history_mutex);
212 return _video_frames_out;
215 /** Should be called when a frame has been encoded successfully.
216 * @param n Source frame index.
219 Encoder::frame_done ()
221 boost::mutex::scoped_lock lock (_history_mutex);
224 gettimeofday (&tv, 0);
225 _time_history.push_front (tv);
226 if (int (_time_history.size()) > _history_size) {
227 _time_history.pop_back ();
232 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
234 DCPFrameRate dfr (_film->frames_per_second ());
236 if (dfr.skip && (_video_frames_in % 2)) {
241 boost::mutex::scoped_lock lock (_mutex);
243 /* Wait until the queue has gone down a bit */
244 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
245 TIMING ("decoder sleeps with queue of %1", _queue.size());
246 _condition.wait (lock);
247 TIMING ("decoder wakes with queue of %1", _queue.size());
254 if (_writer->thrown ()) {
258 if (_writer->can_fake_write (_video_frames_out)) {
259 _writer->fake_write (_video_frames_out);
260 _have_a_real_frame = false;
262 } else if (same && _have_a_real_frame) {
263 /* Use the last frame that we encoded. */
264 _writer->repeat (_video_frames_out);
267 /* Queue this new frame for encoding */
268 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
269 TIMING ("adding to queue of %1", _queue.size ());
270 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
272 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
273 _film->subtitle_offset(), _film->subtitle_scale(),
274 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
275 _film->colour_lut(), _film->j2k_bandwidth(),
280 _condition.notify_all ();
281 _have_a_real_frame = true;
288 _writer->repeat (_video_frames_out);
295 Encoder::process_audio (shared_ptr<AudioBuffers> data)
298 /* Maybe sample-rate convert */
301 /* Compute the resampled frames count and add 32 for luck */
302 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
304 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
307 int const resampled_frames = swr_convert (
308 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
311 if (resampled_frames < 0) {
312 throw EncodeError ("could not run sample-rate converter");
315 resampled->set_frames (resampled_frames);
317 /* And point our variables at the resampled audio */
322 if (_film->audio_channels() == 1) {
323 /* We need to switch things around so that the mono channel is on
324 the centre channel of a 5.1 set (with other channels silent).
327 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
328 b->make_silent (libdcp::LEFT);
329 b->make_silent (libdcp::RIGHT);
330 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
331 b->make_silent (libdcp::LFE);
332 b->make_silent (libdcp::LS);
333 b->make_silent (libdcp::RS);
338 _writer->write (data);
342 Encoder::terminate_threads ()
344 boost::mutex::scoped_lock lock (_mutex);
346 _condition.notify_all ();
349 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
356 Encoder::encoder_thread (ServerDescription* server)
358 /* Number of seconds that we currently wait between attempts
359 to connect to the server; not relevant for localhost
362 int remote_backoff = 0;
366 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
367 boost::mutex::scoped_lock lock (_mutex);
368 while (_queue.empty () && !_terminate) {
369 _condition.wait (lock);
376 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
377 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
378 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
383 shared_ptr<EncodedData> encoded;
387 encoded = vf->encode_remotely (server);
389 if (remote_backoff > 0) {
390 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
393 /* This job succeeded, so remove any backoff */
396 } catch (std::exception& e) {
397 if (remote_backoff < 60) {
399 remote_backoff += 10;
403 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
404 vf->frame(), server->host_name(), e.what(), remote_backoff)
410 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
411 encoded = vf->encode_locally ();
412 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
413 } catch (std::exception& e) {
414 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
419 _writer->write (encoded, vf->frame ());
424 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
426 _queue.push_front (vf);
430 if (remote_backoff > 0) {
431 dvdomatic_sleep (remote_backoff);
435 _condition.notify_all ();