2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
45 using std::stringstream;
50 using boost::shared_ptr;
52 using boost::lexical_cast;
54 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const Film> f, shared_ptr<const Options> o)
56 #ifdef HAVE_SWRESAMPLE
59 , _audio_frames_written (0)
60 , _process_end (false)
62 if (_film->audio_stream()) {
63 /* Create sound output files with .tmp suffixes; we will rename
64 them if and when we complete.
66 for (int i = 0; i < _film->audio_channels(); ++i) {
68 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
69 /* We write mono files */
71 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
72 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
74 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
76 _sound_files.push_back (f);
81 J2KWAVEncoder::~J2KWAVEncoder ()
83 terminate_worker_threads ();
88 J2KWAVEncoder::terminate_worker_threads ()
90 boost::mutex::scoped_lock lock (_worker_mutex);
92 _worker_condition.notify_all ();
95 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
102 J2KWAVEncoder::close_sound_files ()
104 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
108 _sound_files.clear ();
112 J2KWAVEncoder::do_process_video (shared_ptr<Image> yuv, shared_ptr<Subtitle> sub)
114 boost::mutex::scoped_lock lock (_worker_mutex);
116 /* Wait until the queue has gone down a bit */
117 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
118 TIMING ("decoder sleeps with queue of %1", _queue.size());
119 _worker_condition.wait (lock);
120 TIMING ("decoder wakes with queue of %1", _queue.size());
127 /* Only do the processing if we don't already have a file for this frame */
128 if (!boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
129 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
130 TIMING ("adding to queue of %1", _queue.size ());
131 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
133 yuv, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
134 _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
135 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
140 _worker_condition.notify_all ();
147 J2KWAVEncoder::encoder_thread (ServerDescription* server)
149 /* Number of seconds that we currently wait between attempts
150 to connect to the server; not relevant for localhost
153 int remote_backoff = 0;
157 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
158 boost::mutex::scoped_lock lock (_worker_mutex);
159 while (_queue.empty () && !_process_end) {
160 _worker_condition.wait (lock);
167 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
168 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
169 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
174 shared_ptr<EncodedData> encoded;
178 encoded = vf->encode_remotely (server);
180 if (remote_backoff > 0) {
181 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
184 /* This job succeeded, so remove any backoff */
187 } catch (std::exception& e) {
188 if (remote_backoff < 60) {
190 remote_backoff += 10;
194 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
195 vf->frame(), server->host_name(), e.what(), remote_backoff)
201 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
202 encoded = vf->encode_locally ();
203 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
204 } catch (std::exception& e) {
205 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
210 encoded->write (_opt, vf->frame ());
215 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
217 _queue.push_front (vf);
221 if (remote_backoff > 0) {
222 dvdomatic_sleep (remote_backoff);
226 _worker_condition.notify_all ();
231 J2KWAVEncoder::process_begin ()
233 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
234 #ifdef HAVE_SWRESAMPLE
237 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
238 _film->log()->log (s.str ());
240 /* We will be using planar float data when we call the resampler */
241 _swr_context = swr_alloc_set_opts (
243 _film->audio_stream()->channel_layout(),
245 _film->target_audio_sample_rate(),
246 _film->audio_stream()->channel_layout(),
248 _film->audio_stream()->sample_rate(),
252 swr_init (_swr_context);
254 throw EncodeError ("Cannot resample audio as libswresample is not present");
257 #ifdef HAVE_SWRESAMPLE
262 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
263 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
266 vector<ServerDescription*> servers = Config::instance()->servers ();
268 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
269 for (int j = 0; j < (*i)->threads (); ++j) {
270 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
276 J2KWAVEncoder::process_end ()
278 boost::mutex::scoped_lock lock (_worker_mutex);
280 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
282 /* Keep waking workers until the queue is empty */
283 while (!_queue.empty ()) {
284 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
285 _worker_condition.notify_all ();
286 _worker_condition.wait (lock);
291 terminate_worker_threads ();
293 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
295 /* The following sequence of events can occur in the above code:
296 1. a remote worker takes the last image off the queue
297 2. the loop above terminates
298 3. the remote worker fails to encode the image and puts it back on the queue
299 4. the remote worker is then terminated by terminate_worker_threads
301 So just mop up anything left in the queue here.
304 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
305 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
307 shared_ptr<EncodedData> e = (*i)->encode_locally ();
308 e->write (_opt, (*i)->frame ());
310 } catch (std::exception& e) {
311 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
316 if (_film->audio_stream() && _swr_context) {
318 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
321 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
324 throw EncodeError ("could not run sample-rate converter");
331 out->set_frames (frames);
335 swr_free (&_swr_context);
339 if (_film->audio_stream()) {
340 close_sound_files ();
342 /* Rename .wav.tmp files to .wav */
343 for (int i = 0; i < _film->audio_channels(); ++i) {
344 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
345 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
347 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
353 J2KWAVEncoder::do_process_audio (shared_ptr<AudioBuffers> audio)
355 shared_ptr<AudioBuffers> resampled;
358 /* Maybe sample-rate convert */
361 /* Compute the resampled frames count and add 32 for luck */
362 int const max_resampled_frames = ceil ((int64_t) audio->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
364 resampled.reset (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
367 int const resampled_frames = swr_convert (
368 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
371 if (resampled_frames < 0) {
372 throw EncodeError ("could not run sample-rate converter");
375 resampled->set_frames (resampled_frames);
377 /* And point our variables at the resampled audio */
386 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
388 for (int i = 0; i < _film->audio_channels(); ++i) {
389 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
392 _audio_frames_written += audio->frames ();