2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
45 using std::stringstream;
49 using boost::shared_ptr;
51 using boost::lexical_cast;
53 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const Film> f, shared_ptr<const Options> o)
55 #ifdef HAVE_SWRESAMPLE
58 , _audio_frames_written (0)
59 , _process_end (false)
61 if (_film->audio_stream()) {
62 /* Create sound output files with .tmp suffixes; we will rename
63 them if and when we complete.
65 for (int i = 0; i < _film->audio_channels(); ++i) {
67 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream().get().sample_rate());
68 /* We write mono files */
70 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
71 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
73 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
75 _sound_files.push_back (f);
80 J2KWAVEncoder::~J2KWAVEncoder ()
82 terminate_worker_threads ();
87 J2KWAVEncoder::terminate_worker_threads ()
89 boost::mutex::scoped_lock lock (_worker_mutex);
91 _worker_condition.notify_all ();
94 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
101 J2KWAVEncoder::close_sound_files ()
103 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
107 _sound_files.clear ();
111 J2KWAVEncoder::do_process_video (shared_ptr<const Image> yuv, SourceFrame frame, shared_ptr<Subtitle> sub)
113 boost::mutex::scoped_lock lock (_worker_mutex);
115 /* Wait until the queue has gone down a bit */
116 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
117 TIMING ("decoder sleeps with queue of %1", _queue.size());
118 _worker_condition.wait (lock);
119 TIMING ("decoder wakes with queue of %1", _queue.size());
126 /* Only do the processing if we don't already have a file for this frame */
127 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
128 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
129 TIMING ("adding to queue of %1", _queue.size ());
130 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
132 yuv, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
133 _film->scaler(), frame, _film->frames_per_second(), s.second,
134 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
139 _worker_condition.notify_all ();
146 J2KWAVEncoder::encoder_thread (ServerDescription* server)
148 /* Number of seconds that we currently wait between attempts
149 to connect to the server; not relevant for localhost
152 int remote_backoff = 0;
156 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
157 boost::mutex::scoped_lock lock (_worker_mutex);
158 while (_queue.empty () && !_process_end) {
159 _worker_condition.wait (lock);
166 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
167 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
168 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
173 shared_ptr<EncodedData> encoded;
177 encoded = vf->encode_remotely (server);
179 if (remote_backoff > 0) {
180 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
183 /* This job succeeded, so remove any backoff */
186 } catch (std::exception& e) {
187 if (remote_backoff < 60) {
189 remote_backoff += 10;
193 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
194 vf->frame(), server->host_name(), e.what(), remote_backoff)
200 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
201 encoded = vf->encode_locally ();
202 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
203 } catch (std::exception& e) {
204 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
209 encoded->write (_opt, vf->frame ());
210 frame_done (vf->frame ());
214 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
216 _queue.push_front (vf);
220 if (remote_backoff > 0) {
221 dvdomatic_sleep (remote_backoff);
225 _worker_condition.notify_all ();
230 J2KWAVEncoder::process_begin ()
232 if (_film->audio_stream() && _film->audio_stream().get().sample_rate() != _film->target_audio_sample_rate()) {
233 #ifdef HAVE_SWRESAMPLE
236 s << "Will resample audio from " << _film->audio_stream().get().sample_rate() << " to " << _film->target_audio_sample_rate();
237 _film->log()->log (s.str ());
239 /* We will be using planar float data when we call the resampler */
240 _swr_context = swr_alloc_set_opts (
242 _film->audio_stream().get().channel_layout(),
244 _film->target_audio_sample_rate(),
245 _film->audio_stream().get().channel_layout(),
247 _film->audio_stream().get().sample_rate(),
251 swr_init (_swr_context);
253 throw EncodeError ("Cannot resample audio as libswresample is not present");
256 #ifdef HAVE_SWRESAMPLE
261 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
262 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
265 vector<ServerDescription*> servers = Config::instance()->servers ();
267 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
268 for (int j = 0; j < (*i)->threads (); ++j) {
269 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
275 J2KWAVEncoder::process_end ()
277 boost::mutex::scoped_lock lock (_worker_mutex);
279 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
281 /* Keep waking workers until the queue is empty */
282 while (!_queue.empty ()) {
283 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
284 _worker_condition.notify_all ();
285 _worker_condition.wait (lock);
290 terminate_worker_threads ();
292 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
294 /* The following sequence of events can occur in the above code:
295 1. a remote worker takes the last image off the queue
296 2. the loop above terminates
297 3. the remote worker fails to encode the image and puts it back on the queue
298 4. the remote worker is then terminated by terminate_worker_threads
300 So just mop up anything left in the queue here.
303 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
304 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
306 shared_ptr<EncodedData> e = (*i)->encode_locally ();
307 e->write (_opt, (*i)->frame ());
308 frame_done ((*i)->frame ());
309 } catch (std::exception& e) {
310 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
315 if (_film->audio_stream() && _swr_context) {
317 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream().get().channels(), 256));
320 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
323 throw EncodeError ("could not run sample-rate converter");
330 out->set_frames (frames);
334 swr_free (&_swr_context);
338 if (_film->audio_stream()) {
339 int const dcp_sr = dcp_audio_sample_rate (_film->audio_stream().get().sample_rate ());
340 int64_t const extra_audio_frames = dcp_sr - (_audio_frames_written % dcp_sr);
341 shared_ptr<AudioBuffers> silence (new AudioBuffers (_film->audio_stream().get().channels(), extra_audio_frames));
342 silence->make_silent ();
343 write_audio (silence);
345 close_sound_files ();
347 /* Rename .wav.tmp files to .wav */
348 for (int i = 0; i < _film->audio_channels(); ++i) {
349 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
350 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
352 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
358 J2KWAVEncoder::do_process_audio (shared_ptr<const AudioBuffers> audio)
360 shared_ptr<AudioBuffers> resampled;
363 /* Maybe sample-rate convert */
366 /* Compute the resampled frames count and add 32 for luck */
367 int const max_resampled_frames = ceil (audio->frames() * _film->target_audio_sample_rate() / _film->audio_stream().get().sample_rate()) + 32;
369 resampled.reset (new AudioBuffers (_film->audio_stream().get().channels(), max_resampled_frames));
372 int const resampled_frames = swr_convert (
373 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
376 if (resampled_frames < 0) {
377 throw EncodeError ("could not run sample-rate converter");
380 resampled->set_frames (resampled_frames);
382 /* And point our variables at the resampled audio */
391 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
393 for (int i = 0; i < _film->audio_channels(); ++i) {
394 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
397 _audio_frames_written += audio->frames ();