2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
44 using std::stringstream;
49 using namespace boost;
51 int const Encoder::_history_size = 25;
53 /** @param f Film that we are encoding.
56 Encoder::Encoder (shared_ptr<Film> f)
58 , _just_skipped (false)
59 , _video_frames_in (0)
60 , _video_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE
64 , _have_a_real_frame (false)
65 , _terminate_encoder (false)
72 terminate_worker_threads ();
79 Encoder::process_begin ()
81 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
82 #ifdef HAVE_SWRESAMPLE
85 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
86 _film->log()->log (s.str ());
88 /* We will be using planar float data when we call the resampler */
89 _swr_context = swr_alloc_set_opts (
91 _film->audio_stream()->channel_layout(),
93 _film->target_audio_sample_rate(),
94 _film->audio_stream()->channel_layout(),
96 _film->audio_stream()->sample_rate(),
100 swr_init (_swr_context);
102 throw EncodeError ("Cannot resample audio as libswresample is not present");
105 #ifdef HAVE_SWRESAMPLE
110 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
111 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
114 vector<ServerDescription*> servers = Config::instance()->servers ();
116 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
117 for (int j = 0; j < (*i)->threads (); ++j) {
118 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
122 _writer.reset (new Writer (_film));
127 Encoder::process_end ()
130 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
132 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
135 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
138 throw EncodeError ("could not run sample-rate converter");
145 out->set_frames (frames);
146 _writer->write (out);
149 swr_free (&_swr_context);
153 boost::mutex::scoped_lock lock (_worker_mutex);
155 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
157 /* Keep waking workers until the queue is empty */
158 while (!_encode_queue.empty ()) {
159 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
160 _worker_condition.notify_all ();
161 _worker_condition.wait (lock);
166 terminate_worker_threads ();
168 _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
170 /* The following sequence of events can occur in the above code:
171 1. a remote worker takes the last image off the queue
172 2. the loop above terminates
173 3. the remote worker fails to encode the image and puts it back on the queue
174 4. the remote worker is then terminated by terminate_worker_threads
176 So just mop up anything left in the queue here.
179 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
180 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
182 _writer->write ((*i)->encode_locally(), (*i)->frame ());
184 } catch (std::exception& e) {
185 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
193 /** @return an estimate of the current number of frames we are encoding per second,
197 Encoder::current_frames_per_second () const
199 boost::mutex::scoped_lock lock (_history_mutex);
200 if (int (_time_history.size()) < _history_size) {
205 gettimeofday (&now, 0);
207 return _history_size / (seconds (now) - seconds (_time_history.back ()));
210 /** @return true if the last frame to be processed was skipped as it already existed */
212 Encoder::skipping () const
214 boost::mutex::scoped_lock (_history_mutex);
215 return _just_skipped;
218 /** @return Number of video frames that have been sent out */
220 Encoder::video_frames_out () const
222 boost::mutex::scoped_lock (_history_mutex);
223 return _video_frames_out;
226 /** Should be called when a frame has been encoded successfully.
227 * @param n Source frame index.
230 Encoder::frame_done ()
232 boost::mutex::scoped_lock lock (_history_mutex);
233 _just_skipped = false;
236 gettimeofday (&tv, 0);
237 _time_history.push_front (tv);
238 if (int (_time_history.size()) > _history_size) {
239 _time_history.pop_back ();
243 /** Called by a subclass when it has just skipped the processing
244 of a frame because it has already been done.
247 Encoder::frame_skipped ()
249 boost::mutex::scoped_lock lock (_history_mutex);
250 _just_skipped = true;
254 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
256 DCPFrameRate dfr (_film->frames_per_second ());
258 if (dfr.skip && (_video_frames_in % 2)) {
263 boost::mutex::scoped_lock lock (_worker_mutex);
265 /* Wait until the queue has gone down a bit */
266 while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
267 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
268 _worker_condition.wait (lock);
269 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
272 if (_terminate_encoder) {
276 /* Only do the processing if we don't already have a file for this frame */
277 if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
282 if (same && _have_a_real_frame) {
283 /* Use the last frame that we encoded. */
284 _writer->repeat (_video_frames_out);
287 /* Queue this new frame for encoding */
288 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
289 TIMING ("adding to queue of %1", _encode_queue.size ());
290 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
292 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
293 _film->subtitle_offset(), _film->subtitle_scale(),
294 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
295 _film->colour_lut(), _film->j2k_bandwidth(),
300 _worker_condition.notify_all ();
301 _have_a_real_frame = true;
308 _writer->repeat (_video_frames_out);
315 Encoder::process_audio (shared_ptr<AudioBuffers> data)
318 /* Maybe sample-rate convert */
321 /* Compute the resampled frames count and add 32 for luck */
322 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
324 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
327 int const resampled_frames = swr_convert (
328 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
331 if (resampled_frames < 0) {
332 throw EncodeError ("could not run sample-rate converter");
335 resampled->set_frames (resampled_frames);
337 /* And point our variables at the resampled audio */
342 if (_film->audio_channels() == 1) {
343 /* We need to switch things around so that the mono channel is on
344 the centre channel of a 5.1 set (with other channels silent).
347 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
348 b->make_silent (libdcp::LEFT);
349 b->make_silent (libdcp::RIGHT);
350 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
351 b->make_silent (libdcp::LFE);
352 b->make_silent (libdcp::LS);
353 b->make_silent (libdcp::RS);
358 _writer->write (data);
362 Encoder::terminate_worker_threads ()
364 boost::mutex::scoped_lock lock (_worker_mutex);
365 _terminate_encoder = true;
366 _worker_condition.notify_all ();
369 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
376 Encoder::encoder_thread (ServerDescription* server)
378 /* Number of seconds that we currently wait between attempts
379 to connect to the server; not relevant for localhost
382 int remote_backoff = 0;
386 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
387 boost::mutex::scoped_lock lock (_worker_mutex);
388 while (_encode_queue.empty () && !_terminate_encoder) {
389 _worker_condition.wait (lock);
392 if (_terminate_encoder) {
396 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
397 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
398 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
399 _encode_queue.pop_front ();
403 shared_ptr<EncodedData> encoded;
407 encoded = vf->encode_remotely (server);
409 if (remote_backoff > 0) {
410 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
413 /* This job succeeded, so remove any backoff */
416 } catch (std::exception& e) {
417 if (remote_backoff < 60) {
419 remote_backoff += 10;
423 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
424 vf->frame(), server->host_name(), e.what(), remote_backoff)
430 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
431 encoded = vf->encode_locally ();
432 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
433 } catch (std::exception& e) {
434 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
439 _writer->write (encoded, vf->frame ());
444 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
446 _encode_queue.push_front (vf);
450 if (remote_backoff > 0) {
451 dvdomatic_sleep (remote_backoff);
455 _worker_condition.notify_all ();