2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
45 using std::stringstream;
49 using boost::shared_ptr;
51 using boost::lexical_cast;
53 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const Film> f, shared_ptr<const Options> o)
55 #ifdef HAVE_SWRESAMPLE
58 , _audio_frames_written (0)
59 , _process_end (false)
61 /* Create sound output files with .tmp suffixes; we will rename
62 them if and when we complete.
64 for (int i = 0; i < _film->audio_channels(); ++i) {
66 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_sample_rate());
67 /* We write mono files */
69 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
70 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
72 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
74 _sound_files.push_back (f);
78 J2KWAVEncoder::~J2KWAVEncoder ()
80 terminate_worker_threads ();
85 J2KWAVEncoder::terminate_worker_threads ()
87 boost::mutex::scoped_lock lock (_worker_mutex);
89 _worker_condition.notify_all ();
92 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
99 J2KWAVEncoder::close_sound_files ()
101 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
105 _sound_files.clear ();
109 J2KWAVEncoder::process_video (shared_ptr<const Image> yuv, SourceFrame frame, shared_ptr<Subtitle> sub)
111 boost::mutex::scoped_lock lock (_worker_mutex);
113 /* Wait until the queue has gone down a bit */
114 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
115 TIMING ("decoder sleeps with queue of %1", _queue.size());
116 _worker_condition.wait (lock);
117 TIMING ("decoder wakes with queue of %1", _queue.size());
124 /* Only do the processing if we don't already have a file for this frame */
125 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
126 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
127 TIMING ("adding to queue of %1", _queue.size ());
128 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
130 yuv, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
131 _film->scaler(), frame, _film->frames_per_second(), s.second,
132 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
137 _worker_condition.notify_all ();
144 J2KWAVEncoder::encoder_thread (ServerDescription* server)
146 /* Number of seconds that we currently wait between attempts
147 to connect to the server; not relevant for localhost
150 int remote_backoff = 0;
154 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
155 boost::mutex::scoped_lock lock (_worker_mutex);
156 while (_queue.empty () && !_process_end) {
157 _worker_condition.wait (lock);
164 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
165 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
166 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
171 shared_ptr<EncodedData> encoded;
175 encoded = vf->encode_remotely (server);
177 if (remote_backoff > 0) {
178 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
181 /* This job succeeded, so remove any backoff */
184 } catch (std::exception& e) {
185 if (remote_backoff < 60) {
187 remote_backoff += 10;
191 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
192 vf->frame(), server->host_name(), e.what(), remote_backoff)
198 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
199 encoded = vf->encode_locally ();
200 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
201 } catch (std::exception& e) {
202 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
207 encoded->write (_opt, vf->frame ());
208 frame_done (vf->frame ());
212 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
214 _queue.push_front (vf);
218 if (remote_backoff > 0) {
219 dvdomatic_sleep (remote_backoff);
223 _worker_condition.notify_all ();
228 J2KWAVEncoder::process_begin (int64_t audio_channel_layout)
230 if (_film->audio_sample_rate() != _film->target_audio_sample_rate()) {
231 #ifdef HAVE_SWRESAMPLE
234 s << "Will resample audio from " << _film->audio_sample_rate() << " to " << _film->target_audio_sample_rate();
235 _film->log()->log (s.str ());
237 /* We will be using planar float data when we call the resampler */
238 _swr_context = swr_alloc_set_opts (
240 audio_channel_layout,
242 _film->target_audio_sample_rate(),
243 audio_channel_layout,
245 _film->audio_sample_rate(),
249 swr_init (_swr_context);
251 throw EncodeError ("Cannot resample audio as libswresample is not present");
254 #ifdef HAVE_SWRESAMPLE
259 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
260 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
263 vector<ServerDescription*> servers = Config::instance()->servers ();
265 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
266 for (int j = 0; j < (*i)->threads (); ++j) {
267 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
273 J2KWAVEncoder::process_end ()
275 boost::mutex::scoped_lock lock (_worker_mutex);
277 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
279 /* Keep waking workers until the queue is empty */
280 while (!_queue.empty ()) {
281 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
282 _worker_condition.notify_all ();
283 _worker_condition.wait (lock);
288 terminate_worker_threads ();
290 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
292 /* The following sequence of events can occur in the above code:
293 1. a remote worker takes the last image off the queue
294 2. the loop above terminates
295 3. the remote worker fails to encode the image and puts it back on the queue
296 4. the remote worker is then terminated by terminate_worker_threads
298 So just mop up anything left in the queue here.
301 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
302 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
304 shared_ptr<EncodedData> e = (*i)->encode_locally ();
305 e->write (_opt, (*i)->frame ());
306 frame_done ((*i)->frame ());
307 } catch (std::exception& e) {
308 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
315 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_channels(), 256));
318 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
321 throw EncodeError ("could not run sample-rate converter");
328 out->set_frames (frames);
332 swr_free (&_swr_context);
336 int const dcp_sr = dcp_audio_sample_rate (_film->audio_sample_rate ());
337 int64_t const extra_audio_frames = dcp_sr - (_audio_frames_written % dcp_sr);
338 shared_ptr<AudioBuffers> silence (new AudioBuffers (_film->audio_channels(), extra_audio_frames));
339 silence->make_silent ();
340 write_audio (silence);
342 close_sound_files ();
344 /* Rename .wav.tmp files to .wav */
345 for (int i = 0; i < _film->audio_channels(); ++i) {
346 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
347 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
349 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
354 J2KWAVEncoder::process_audio (shared_ptr<const AudioBuffers> audio)
356 shared_ptr<AudioBuffers> resampled;
359 /* Maybe sample-rate convert */
362 /* Compute the resampled frames count and add 32 for luck */
363 int const max_resampled_frames = ceil (audio->frames() * _film->target_audio_sample_rate() / _film->audio_sample_rate()) + 32;
365 resampled.reset (new AudioBuffers (_film->audio_channels(), max_resampled_frames));
368 int const resampled_frames = swr_convert (
369 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
372 if (resampled_frames < 0) {
373 throw EncodeError ("could not run sample-rate converter");
376 resampled->set_frames (resampled_frames);
378 /* And point our variables at the resampled audio */
387 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
389 for (int i = 0; i < _film->audio_channels(); ++i) {
390 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
393 _audio_frames_written += audio->frames ();