2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 #ifdef HAVE_SWRESAMPLE
52 , _deinterleave_buffer_size (8192)
53 , _deinterleave_buffer (0)
54 , _process_end (false)
56 /* Create sound output files with .tmp suffixes; we will rename
57 them if and when we complete.
59 for (int i = 0; i < _fs->audio_channels; ++i) {
61 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
62 /* We write mono files */
64 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
65 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
67 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
69 _sound_files.push_back (f);
72 /* Create buffer for deinterleaving audio */
73 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
76 J2KWAVEncoder::~J2KWAVEncoder ()
78 terminate_worker_threads ();
79 delete[] _deinterleave_buffer;
84 J2KWAVEncoder::terminate_worker_threads ()
86 boost::mutex::scoped_lock lock (_worker_mutex);
88 _worker_condition.notify_all ();
91 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
98 J2KWAVEncoder::close_sound_files ()
100 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
104 _sound_files.clear ();
108 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame, shared_ptr<Subtitle> sub)
110 boost::mutex::scoped_lock lock (_worker_mutex);
112 /* Wait until the queue has gone down a bit */
113 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
114 TIMING ("decoder sleeps with queue of %1", _queue.size());
115 _worker_condition.wait (lock);
116 TIMING ("decoder wakes with queue of %1", _queue.size());
123 /* Only do the processing if we don't already have a file for this frame */
124 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
125 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
126 TIMING ("adding to queue of %1", _queue.size ());
127 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
129 yuv, sub, _opt->out_size, _opt->padding, _fs->subtitle_offset, _fs->subtitle_scale,
130 _fs->scaler, frame, _fs->frames_per_second, s.second,
131 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
136 _worker_condition.notify_all ();
143 J2KWAVEncoder::encoder_thread (ServerDescription* server)
145 /* Number of seconds that we currently wait between attempts
146 to connect to the server; not relevant for localhost
149 int remote_backoff = 0;
153 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
154 boost::mutex::scoped_lock lock (_worker_mutex);
155 while (_queue.empty () && !_process_end) {
156 _worker_condition.wait (lock);
163 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
164 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
165 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()));
170 shared_ptr<EncodedData> encoded;
174 encoded = vf->encode_remotely (server);
176 if (remote_backoff > 0) {
177 _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
180 /* This job succeeded, so remove any backoff */
183 } catch (std::exception& e) {
184 if (remote_backoff < 60) {
186 remote_backoff += 10;
190 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
191 vf->frame(), server->host_name(), e.what(), remote_backoff)
197 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
198 encoded = vf->encode_locally ();
199 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
200 } catch (std::exception& e) {
201 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
206 encoded->write (_opt, vf->frame ());
207 frame_done (vf->frame ());
210 _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()));
211 _queue.push_front (vf);
215 if (remote_backoff > 0) {
216 dvdomatic_sleep (remote_backoff);
220 _worker_condition.notify_all ();
225 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
227 if (_fs->audio_sample_rate != _fs->target_sample_rate ()) {
228 #ifdef HAVE_SWRESAMPLE
231 s << "Will resample audio from " << _fs->audio_sample_rate << " to " << _fs->target_sample_rate();
232 _log->log (s.str ());
234 _swr_context = swr_alloc_set_opts (
236 audio_channel_layout,
238 _fs->target_sample_rate(),
239 audio_channel_layout,
241 _fs->audio_sample_rate,
245 swr_init (_swr_context);
247 throw EncodeError ("Cannot resample audio as libswresample is not present");
250 #ifdef HAVE_SWRESAMPLE
255 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
256 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
259 vector<ServerDescription*> servers = Config::instance()->servers ();
261 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
262 for (int j = 0; j < (*i)->threads (); ++j) {
263 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
269 J2KWAVEncoder::process_end ()
271 boost::mutex::scoped_lock lock (_worker_mutex);
273 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
275 /* Keep waking workers until the queue is empty */
276 while (!_queue.empty ()) {
277 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
278 _worker_condition.notify_all ();
279 _worker_condition.wait (lock);
284 terminate_worker_threads ();
286 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
288 /* The following sequence of events can occur in the above code:
289 1. a remote worker takes the last image off the queue
290 2. the loop above terminates
291 3. the remote worker fails to encode the image and puts it back on the queue
292 4. the remote worker is then terminated by terminate_worker_threads
294 So just mop up anything left in the queue here.
297 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
298 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
300 shared_ptr<EncodedData> e = (*i)->encode_locally ();
301 e->write (_opt, (*i)->frame ());
302 frame_done ((*i)->frame ());
303 } catch (std::exception& e) {
304 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
312 uint8_t buffer[256 * _fs->bytes_per_sample() * _fs->audio_channels];
318 int const frames = swr_convert (_swr_context, out, 256, 0, 0);
321 throw EncodeError ("could not run sample-rate converter");
328 write_audio (buffer, frames * _fs->bytes_per_sample() * _fs->audio_channels);
331 swr_free (&_swr_context);
335 close_sound_files ();
337 /* Rename .wav.tmp files to .wav */
338 for (int i = 0; i < _fs->audio_channels; ++i) {
339 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
340 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
342 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
347 J2KWAVEncoder::process_audio (uint8_t* data, int size)
349 /* This is a buffer we might use if we are sample-rate converting;
350 it will need freeing if so.
352 uint8_t* out_buffer = 0;
354 /* Maybe sample-rate convert */
358 uint8_t const * in[2] = {
363 /* Here's samples per channel */
364 int const samples = size / _fs->bytes_per_sample();
366 /* And here's frames (where 1 frame is a collection of samples, 1 for each channel,
367 so for 5.1 a frame would be 6 samples)
369 int const frames = samples / _fs->audio_channels;
371 /* Compute the resampled frame count and add 32 for luck */
372 int const out_buffer_size_frames = ceil (frames * _fs->target_sample_rate() / _fs->audio_sample_rate) + 32;
373 int const out_buffer_size_bytes = out_buffer_size_frames * _fs->audio_channels * _fs->bytes_per_sample();
374 out_buffer = new uint8_t[out_buffer_size_bytes];
382 int out_frames = swr_convert (_swr_context, out, out_buffer_size_frames, in, frames);
383 if (out_frames < 0) {
384 throw EncodeError ("could not run sample-rate converter");
387 /* And point our variables at the resampled audio */
389 size = out_frames * _fs->audio_channels * _fs->bytes_per_sample();
393 write_audio (data, size);
395 /* Delete the sample-rate conversion buffer, if it exists */
400 J2KWAVEncoder::write_audio (uint8_t* data, int size)
402 /* XXX: we are assuming that the _deinterleave_buffer_size is a multiple
403 of the sample size and that size is a multiple of _fs->audio_channels * sample_size.
406 assert ((size % (_fs->audio_channels * _fs->bytes_per_sample())) == 0);
407 assert ((_deinterleave_buffer_size % _fs->bytes_per_sample()) == 0);
409 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
411 /* Number of bytes left to read this time */
412 int remaining = size;
413 /* Our position in the output buffers, in bytes */
415 while (remaining > 0) {
416 /* How many bytes of the deinterleaved data to do this time */
417 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
418 for (int i = 0; i < _fs->audio_channels; ++i) {
419 for (int j = 0; j < this_time; j += _fs->bytes_per_sample()) {
420 for (int k = 0; k < _fs->bytes_per_sample(); ++k) {
421 int const to = j + k;
422 int const from = position + (i * _fs->bytes_per_sample()) + (j * _fs->audio_channels) + k;
423 _deinterleave_buffer[to] = data[from];
427 switch (_fs->audio_sample_format) {
428 case AV_SAMPLE_FMT_S16:
429 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / _fs->bytes_per_sample());
432 throw EncodeError ("unknown audio sample format");
436 position += this_time;
437 remaining -= this_time * _fs->audio_channels;