2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
46 using std::stringstream;
52 using namespace boost;
54 int const Encoder::_history_size = 25;
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<Film> f)
59 , _video_frames_in (0)
60 , _video_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE
64 , _have_a_real_frame (false)
79 Encoder::process_begin ()
81 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
82 #ifdef HAVE_SWRESAMPLE
85 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_stream()->sample_rate(), _film->target_audio_sample_rate());
86 _film->log()->log (s.str ());
88 /* We will be using planar float data when we call the resampler */
89 _swr_context = swr_alloc_set_opts (
91 _film->audio_stream()->channel_layout(),
93 _film->target_audio_sample_rate(),
94 _film->audio_stream()->channel_layout(),
96 _film->audio_stream()->sample_rate(),
100 swr_init (_swr_context);
102 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
105 #ifdef HAVE_SWRESAMPLE
110 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
111 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
114 vector<ServerDescription*> servers = Config::instance()->servers ();
116 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
117 for (int j = 0; j < (*i)->threads (); ++j) {
118 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
122 _writer.reset (new Writer (_film));
127 Encoder::process_end ()
130 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
132 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
135 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
138 throw EncodeError (_("could not run sample-rate converter"));
145 out->set_frames (frames);
149 swr_free (&_swr_context);
153 if (_film->audio_channels() == 0 && _film->minimum_audio_channels() > 0) {
154 /* Put audio in where there is none at all */
155 int64_t af = video_frames_to_audio_frames (_video_frames_out, 48000, _film->dcp_frame_rate ());
157 int64_t const this_time = min (af, static_cast<int64_t> (24000));
158 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->minimum_audio_channels(), this_time));
160 out->set_frames (this_time);
167 boost::mutex::scoped_lock lock (_mutex);
169 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
171 /* Keep waking workers until the queue is empty */
172 while (!_queue.empty ()) {
173 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
174 _condition.notify_all ();
175 _condition.wait (lock);
180 terminate_threads ();
182 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
184 /* The following sequence of events can occur in the above code:
185 1. a remote worker takes the last image off the queue
186 2. the loop above terminates
187 3. the remote worker fails to encode the image and puts it back on the queue
188 4. the remote worker is then terminated by terminate_threads
190 So just mop up anything left in the queue here.
193 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
194 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
196 _writer->write ((*i)->encode_locally(), (*i)->frame ());
198 } catch (std::exception& e) {
199 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
207 /** @return an estimate of the current number of frames we are encoding per second,
211 Encoder::current_frames_per_second () const
213 boost::mutex::scoped_lock lock (_history_mutex);
214 if (int (_time_history.size()) < _history_size) {
219 gettimeofday (&now, 0);
221 return _history_size / (seconds (now) - seconds (_time_history.back ()));
224 /** @return Number of video frames that have been sent out */
226 Encoder::video_frames_out () const
228 boost::mutex::scoped_lock (_history_mutex);
229 return _video_frames_out;
232 /** Should be called when a frame has been encoded successfully.
233 * @param n Source frame index.
236 Encoder::frame_done ()
238 boost::mutex::scoped_lock lock (_history_mutex);
241 gettimeofday (&tv, 0);
242 _time_history.push_front (tv);
243 if (int (_time_history.size()) > _history_size) {
244 _time_history.pop_back ();
249 Encoder::process_video (shared_ptr<const Image> image, bool same, boost::shared_ptr<Subtitle> sub)
251 FrameRateConversion frc (_film->source_frame_rate(), _film->dcp_frame_rate());
253 if (frc.skip && (_video_frames_in % 2)) {
258 boost::mutex::scoped_lock lock (_mutex);
260 /* Wait until the queue has gone down a bit */
261 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
262 TIMING ("decoder sleeps with queue of %1", _queue.size());
263 _condition.wait (lock);
264 TIMING ("decoder wakes with queue of %1", _queue.size());
271 if (_writer->thrown ()) {
275 if (_writer->can_fake_write (_video_frames_out)) {
276 _writer->fake_write (_video_frames_out);
277 _have_a_real_frame = false;
279 } else if (same && _have_a_real_frame) {
280 /* Use the last frame that we encoded. */
281 _writer->repeat (_video_frames_out);
284 /* Queue this new frame for encoding */
285 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
286 TIMING ("adding to queue of %1", _queue.size ());
287 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
289 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
290 _film->subtitle_offset(), _film->subtitle_scale(),
291 _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
292 _film->colour_lut(), _film->j2k_bandwidth(),
297 _condition.notify_all ();
298 _have_a_real_frame = true;
305 _writer->repeat (_video_frames_out);
312 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
314 if (!data->frames ()) {
319 /* Maybe sample-rate convert */
322 /* Compute the resampled frames count and add 32 for luck */
323 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
325 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
328 int const resampled_frames = swr_convert (
329 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
332 if (resampled_frames < 0) {
333 throw EncodeError (_("could not run sample-rate converter"));
336 resampled->set_frames (resampled_frames);
338 /* And point our variables at the resampled audio */
347 Encoder::terminate_threads ()
349 boost::mutex::scoped_lock lock (_mutex);
351 _condition.notify_all ();
354 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
355 if ((*i)->joinable ()) {
365 Encoder::encoder_thread (ServerDescription* server)
367 /* Number of seconds that we currently wait between attempts
368 to connect to the server; not relevant for localhost
371 int remote_backoff = 0;
375 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
376 boost::mutex::scoped_lock lock (_mutex);
377 while (_queue.empty () && !_terminate) {
378 _condition.wait (lock);
385 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
386 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
387 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
392 shared_ptr<EncodedData> encoded;
396 encoded = vf->encode_remotely (server);
398 if (remote_backoff > 0) {
399 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
402 /* This job succeeded, so remove any backoff */
405 } catch (std::exception& e) {
406 if (remote_backoff < 60) {
408 remote_backoff += 10;
412 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
413 vf->frame(), server->host_name(), e.what(), remote_backoff)
419 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
420 encoded = vf->encode_locally ();
421 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
422 } catch (std::exception& e) {
423 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
428 _writer->write (encoded, vf->frame ());
433 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
435 _queue.push_front (vf);
439 if (remote_backoff > 0) {
440 dvdomatic_sleep (remote_backoff);
444 _condition.notify_all ();
449 Encoder::write_audio (shared_ptr<const AudioBuffers> data)
451 AudioMapping m (_film);
452 if (m.dcp_channels() != _film->audio_channels()) {
454 /* Remap and pad with silence */
456 shared_ptr<AudioBuffers> b (new AudioBuffers (m.dcp_channels(), data->frames ()));
457 for (int i = 0; i < m.dcp_channels(); ++i) {
458 optional<int> s = m.dcp_to_source (static_cast<libdcp::Channel> (i));
462 memcpy (b->data()[i], data->data()[s.get()], data->frames() * sizeof(float));
469 _writer->write (data);