2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
44 using std::stringstream;
49 using namespace boost;
51 int const Encoder::_history_size = 25;
53 /** @param f Film that we are encoding.
56 Encoder::Encoder (shared_ptr<Film> f)
58 , _video_frames_in (0)
59 , _video_frames_out (0)
60 #ifdef HAVE_SWRESAMPLE
63 , _have_a_real_frame (false)
64 , _terminate_encoder (false)
71 terminate_worker_threads ();
78 Encoder::process_begin ()
80 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
81 #ifdef HAVE_SWRESAMPLE
84 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
85 _film->log()->log (s.str ());
87 /* We will be using planar float data when we call the resampler */
88 _swr_context = swr_alloc_set_opts (
90 _film->audio_stream()->channel_layout(),
92 _film->target_audio_sample_rate(),
93 _film->audio_stream()->channel_layout(),
95 _film->audio_stream()->sample_rate(),
99 swr_init (_swr_context);
101 throw EncodeError ("Cannot resample audio as libswresample is not present");
104 #ifdef HAVE_SWRESAMPLE
109 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
110 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
113 vector<ServerDescription*> servers = Config::instance()->servers ();
115 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
116 for (int j = 0; j < (*i)->threads (); ++j) {
117 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
121 _writer.reset (new Writer (_film));
126 Encoder::process_end ()
129 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
131 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
134 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
137 throw EncodeError ("could not run sample-rate converter");
144 out->set_frames (frames);
145 _writer->write (out);
148 swr_free (&_swr_context);
152 boost::mutex::scoped_lock lock (_worker_mutex);
154 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
156 /* Keep waking workers until the queue is empty */
157 while (!_encode_queue.empty ()) {
158 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
159 _worker_condition.notify_all ();
160 _worker_condition.wait (lock);
165 terminate_worker_threads ();
167 _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
169 /* The following sequence of events can occur in the above code:
170 1. a remote worker takes the last image off the queue
171 2. the loop above terminates
172 3. the remote worker fails to encode the image and puts it back on the queue
173 4. the remote worker is then terminated by terminate_worker_threads
175 So just mop up anything left in the queue here.
178 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
179 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
181 _writer->write ((*i)->encode_locally(), (*i)->frame ());
183 } catch (std::exception& e) {
184 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
192 /** @return an estimate of the current number of frames we are encoding per second,
196 Encoder::current_frames_per_second () const
198 boost::mutex::scoped_lock lock (_history_mutex);
199 if (int (_time_history.size()) < _history_size) {
204 gettimeofday (&now, 0);
206 return _history_size / (seconds (now) - seconds (_time_history.back ()));
209 /** @return Number of video frames that have been sent out */
211 Encoder::video_frames_out () const
213 boost::mutex::scoped_lock (_history_mutex);
214 return _video_frames_out;
217 /** Should be called when a frame has been encoded successfully.
218 * @param n Source frame index.
221 Encoder::frame_done ()
223 boost::mutex::scoped_lock lock (_history_mutex);
226 gettimeofday (&tv, 0);
227 _time_history.push_front (tv);
228 if (int (_time_history.size()) > _history_size) {
229 _time_history.pop_back ();
234 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
236 DCPFrameRate dfr (_film->frames_per_second ());
238 if (dfr.skip && (_video_frames_in % 2)) {
243 boost::mutex::scoped_lock lock (_worker_mutex);
245 /* Wait until the queue has gone down a bit */
246 while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
247 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
248 _worker_condition.wait (lock);
249 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
252 if (_terminate_encoder) {
256 if (_writer->can_fake_write (_video_frames_out)) {
257 _writer->fake_write (_video_frames_out);
258 _have_a_real_frame = false;
259 } else if (same && _have_a_real_frame) {
260 /* Use the last frame that we encoded. */
261 _writer->repeat (_video_frames_out);
264 /* Queue this new frame for encoding */
265 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
266 TIMING ("adding to queue of %1", _encode_queue.size ());
267 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
269 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
270 _film->subtitle_offset(), _film->subtitle_scale(),
271 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
272 _film->colour_lut(), _film->j2k_bandwidth(),
277 _worker_condition.notify_all ();
278 _have_a_real_frame = true;
285 _writer->repeat (_video_frames_out);
292 Encoder::process_audio (shared_ptr<AudioBuffers> data)
295 /* Maybe sample-rate convert */
298 /* Compute the resampled frames count and add 32 for luck */
299 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
301 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
304 int const resampled_frames = swr_convert (
305 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
308 if (resampled_frames < 0) {
309 throw EncodeError ("could not run sample-rate converter");
312 resampled->set_frames (resampled_frames);
314 /* And point our variables at the resampled audio */
319 if (_film->audio_channels() == 1) {
320 /* We need to switch things around so that the mono channel is on
321 the centre channel of a 5.1 set (with other channels silent).
324 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
325 b->make_silent (libdcp::LEFT);
326 b->make_silent (libdcp::RIGHT);
327 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
328 b->make_silent (libdcp::LFE);
329 b->make_silent (libdcp::LS);
330 b->make_silent (libdcp::RS);
335 _writer->write (data);
339 Encoder::terminate_worker_threads ()
341 boost::mutex::scoped_lock lock (_worker_mutex);
342 _terminate_encoder = true;
343 _worker_condition.notify_all ();
346 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
353 Encoder::encoder_thread (ServerDescription* server)
355 /* Number of seconds that we currently wait between attempts
356 to connect to the server; not relevant for localhost
359 int remote_backoff = 0;
363 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
364 boost::mutex::scoped_lock lock (_worker_mutex);
365 while (_encode_queue.empty () && !_terminate_encoder) {
366 _worker_condition.wait (lock);
369 if (_terminate_encoder) {
373 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
374 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
375 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
376 _encode_queue.pop_front ();
380 shared_ptr<EncodedData> encoded;
384 encoded = vf->encode_remotely (server);
386 if (remote_backoff > 0) {
387 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
390 /* This job succeeded, so remove any backoff */
393 } catch (std::exception& e) {
394 if (remote_backoff < 60) {
396 remote_backoff += 10;
400 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
401 vf->frame(), server->host_name(), e.what(), remote_backoff)
407 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
408 encoded = vf->encode_locally ();
409 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
410 } catch (std::exception& e) {
411 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
416 _writer->write (encoded, vf->frame ());
421 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
423 _encode_queue.push_front (vf);
427 if (remote_backoff > 0) {
428 dvdomatic_sleep (remote_backoff);
432 _worker_condition.notify_all ();