2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 #ifdef HAVE_SWRESAMPLE
52 , _deinterleave_buffer_size (8192)
53 , _deinterleave_buffer (0)
54 , _process_end (false)
56 /* Create sound output files with .tmp suffixes; we will rename
57 them if and when we complete.
59 for (int i = 0; i < _fs->audio_channels; ++i) {
61 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
62 /* We write mono files */
64 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
65 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
67 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
69 _sound_files.push_back (f);
72 /* Create buffer for deinterleaving audio */
73 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
76 J2KWAVEncoder::~J2KWAVEncoder ()
78 terminate_worker_threads ();
79 delete[] _deinterleave_buffer;
84 J2KWAVEncoder::terminate_worker_threads ()
86 boost::mutex::scoped_lock lock (_worker_mutex);
88 _worker_condition.notify_all ();
91 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
98 J2KWAVEncoder::close_sound_files ()
100 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
104 _sound_files.clear ();
108 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
110 boost::mutex::scoped_lock lock (_worker_mutex);
112 /* Wait until the queue has gone down a bit */
113 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
114 TIMING ("decoder sleeps with queue of %1", _queue.size());
115 _worker_condition.wait (lock);
116 TIMING ("decoder wakes with queue of %1", _queue.size());
123 /* Only do the processing if we don't already have a file for this frame */
124 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
125 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
126 TIMING ("adding to queue of %1", _queue.size ());
127 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
129 yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
130 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
135 _worker_condition.notify_all ();
142 J2KWAVEncoder::encoder_thread (ServerDescription* server)
144 /* Number of seconds that we currently wait between attempts
145 to connect to the server; not relevant for localhost
148 int remote_backoff = 0;
152 TIMING ("encoder thread %1 sleeps", pthread_self ());
153 boost::mutex::scoped_lock lock (_worker_mutex);
154 while (_queue.empty () && !_process_end) {
155 _worker_condition.wait (lock);
162 TIMING ("encoder thread %1 wakes with queue of %2", pthread_self(), _queue.size());
163 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
164 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", pthread_self(), vf->frame()));
169 shared_ptr<EncodedData> encoded;
173 encoded = vf->encode_remotely (server);
175 if (remote_backoff > 0) {
176 _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
179 /* This job succeeded, so remove any backoff */
182 } catch (std::exception& e) {
183 if (remote_backoff < 60) {
185 remote_backoff += 10;
189 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
190 vf->frame(), server->host_name(), e.what(), remote_backoff)
196 TIMING ("encoder thread %1 begins local encode of %2", pthread_self(), vf->frame());
197 encoded = vf->encode_locally ();
198 TIMING ("encoder thread %1 finishes local encode of %2", pthread_self(), vf->frame());
199 } catch (std::exception& e) {
200 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
205 encoded->write (_opt, vf->frame ());
206 frame_done (vf->frame ());
209 _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", pthread_self(), vf->frame()));
210 _queue.push_front (vf);
214 if (remote_backoff > 0) {
215 dvdomatic_sleep (remote_backoff);
219 _worker_condition.notify_all ();
224 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
226 if (_fs->audio_sample_rate != _fs->target_sample_rate ()) {
227 #ifdef HAVE_SWRESAMPLE
230 s << "Will resample audio from " << _fs->audio_sample_rate << " to " << _fs->target_sample_rate();
231 _log->log (s.str ());
233 _swr_context = swr_alloc_set_opts (
235 audio_channel_layout,
237 _fs->target_sample_rate(),
238 audio_channel_layout,
240 _fs->audio_sample_rate,
244 swr_init (_swr_context);
246 throw EncodeError ("Cannot resample audio as libswresample is not present");
249 #ifdef HAVE_SWRESAMPLE
254 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
255 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
258 vector<ServerDescription*> servers = Config::instance()->servers ();
260 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
261 for (int j = 0; j < (*i)->threads (); ++j) {
262 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
268 J2KWAVEncoder::process_end ()
270 boost::mutex::scoped_lock lock (_worker_mutex);
272 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
274 /* Keep waking workers until the queue is empty */
275 while (!_queue.empty ()) {
276 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
277 _worker_condition.notify_all ();
278 _worker_condition.wait (lock);
283 terminate_worker_threads ();
285 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
287 /* The following sequence of events can occur in the above code:
288 1. a remote worker takes the last image off the queue
289 2. the loop above terminates
290 3. the remote worker fails to encode the image and puts it back on the queue
291 4. the remote worker is then terminated by terminate_worker_threads
293 So just mop up anything left in the queue here.
296 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
297 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
299 shared_ptr<EncodedData> e = (*i)->encode_locally ();
300 e->write (_opt, (*i)->frame ());
301 frame_done ((*i)->frame ());
302 } catch (std::exception& e) {
303 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
311 uint8_t buffer[256 * _fs->bytes_per_sample() * _fs->audio_channels];
317 int const frames = swr_convert (_swr_context, out, 256, 0, 0);
320 throw EncodeError ("could not run sample-rate converter");
327 write_audio (buffer, frames * _fs->bytes_per_sample() * _fs->audio_channels);
330 swr_free (&_swr_context);
334 close_sound_files ();
336 /* Rename .wav.tmp files to .wav */
337 for (int i = 0; i < _fs->audio_channels; ++i) {
338 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
339 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
341 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
346 J2KWAVEncoder::process_audio (uint8_t* data, int size)
348 /* This is a buffer we might use if we are sample-rate converting;
349 it will need freeing if so.
351 uint8_t* out_buffer = 0;
353 /* Maybe sample-rate convert */
357 uint8_t const * in[2] = {
362 /* Here's samples per channel */
363 int const samples = size / _fs->bytes_per_sample();
365 /* And here's frames (where 1 frame is a collection of samples, 1 for each channel,
366 so for 5.1 a frame would be 6 samples)
368 int const frames = samples / _fs->audio_channels;
370 /* Compute the resampled frame count and add 32 for luck */
371 int const out_buffer_size_frames = ceil (frames * _fs->target_sample_rate() / _fs->audio_sample_rate) + 32;
372 int const out_buffer_size_bytes = out_buffer_size_frames * _fs->audio_channels * _fs->bytes_per_sample();
373 out_buffer = new uint8_t[out_buffer_size_bytes];
381 int out_frames = swr_convert (_swr_context, out, out_buffer_size_frames, in, frames);
382 if (out_frames < 0) {
383 throw EncodeError ("could not run sample-rate converter");
386 /* And point our variables at the resampled audio */
388 size = out_frames * _fs->audio_channels * _fs->bytes_per_sample();
392 write_audio (data, size);
394 /* Delete the sample-rate conversion buffer, if it exists */
399 J2KWAVEncoder::write_audio (uint8_t* data, int size)
401 /* XXX: we are assuming that the _deinterleave_buffer_size is a multiple
402 of the sample size and that size is a multiple of _fs->audio_channels * sample_size.
405 assert ((size % (_fs->audio_channels * _fs->bytes_per_sample())) == 0);
406 assert ((_deinterleave_buffer_size % _fs->bytes_per_sample()) == 0);
408 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
410 /* Number of bytes left to read this time */
411 int remaining = size;
412 /* Our position in the output buffers, in bytes */
414 while (remaining > 0) {
415 /* How many bytes of the deinterleaved data to do this time */
416 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
417 for (int i = 0; i < _fs->audio_channels; ++i) {
418 for (int j = 0; j < this_time; j += _fs->bytes_per_sample()) {
419 for (int k = 0; k < _fs->bytes_per_sample(); ++k) {
420 int const to = j + k;
421 int const from = position + (i * _fs->bytes_per_sample()) + (j * _fs->audio_channels) + k;
422 _deinterleave_buffer[to] = data[from];
426 switch (_fs->audio_sample_format) {
427 case AV_SAMPLE_FMT_S16:
428 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / _fs->bytes_per_sample());
431 throw EncodeError ("unknown audio sample format");
435 position += this_time;
436 remaining -= this_time * _fs->audio_channels;