2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 #ifdef HAVE_SWRESAMPLE
52 , _process_end (false)
54 /* Create sound output files with .tmp suffixes; we will rename
55 them if and when we complete.
57 for (int i = 0; i < _fs->audio_channels(); ++i) {
59 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate());
60 /* We write mono files */
62 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
63 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
65 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
67 _sound_files.push_back (f);
71 J2KWAVEncoder::~J2KWAVEncoder ()
73 terminate_worker_threads ();
78 J2KWAVEncoder::terminate_worker_threads ()
80 boost::mutex::scoped_lock lock (_worker_mutex);
82 _worker_condition.notify_all ();
85 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
92 J2KWAVEncoder::close_sound_files ()
94 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
98 _sound_files.clear ();
102 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame, shared_ptr<Subtitle> sub)
104 boost::mutex::scoped_lock lock (_worker_mutex);
106 /* Wait until the queue has gone down a bit */
107 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
108 TIMING ("decoder sleeps with queue of %1", _queue.size());
109 _worker_condition.wait (lock);
110 TIMING ("decoder wakes with queue of %1", _queue.size());
117 /* Only do the processing if we don't already have a file for this frame */
118 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
119 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters());
120 TIMING ("adding to queue of %1", _queue.size ());
121 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
123 yuv, sub, _opt->out_size, _opt->padding, _fs->subtitle_offset(), _fs->subtitle_scale(),
124 _fs->scaler(), frame, _fs->frames_per_second(), s.second,
125 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
130 _worker_condition.notify_all ();
137 J2KWAVEncoder::encoder_thread (ServerDescription* server)
139 /* Number of seconds that we currently wait between attempts
140 to connect to the server; not relevant for localhost
143 int remote_backoff = 0;
147 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
148 boost::mutex::scoped_lock lock (_worker_mutex);
149 while (_queue.empty () && !_process_end) {
150 _worker_condition.wait (lock);
157 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
158 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
159 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()));
164 shared_ptr<EncodedData> encoded;
168 encoded = vf->encode_remotely (server);
170 if (remote_backoff > 0) {
171 _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
174 /* This job succeeded, so remove any backoff */
177 } catch (std::exception& e) {
178 if (remote_backoff < 60) {
180 remote_backoff += 10;
184 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
185 vf->frame(), server->host_name(), e.what(), remote_backoff)
191 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
192 encoded = vf->encode_locally ();
193 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
194 } catch (std::exception& e) {
195 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
200 encoded->write (_opt, vf->frame ());
201 frame_done (vf->frame ());
204 _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()));
205 _queue.push_front (vf);
209 if (remote_backoff > 0) {
210 dvdomatic_sleep (remote_backoff);
214 _worker_condition.notify_all ();
219 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
221 if (_fs->audio_sample_rate() != _fs->target_sample_rate()) {
222 #ifdef HAVE_SWRESAMPLE
225 s << "Will resample audio from " << _fs->audio_sample_rate() << " to " << _fs->target_sample_rate();
226 _log->log (s.str ());
228 /* We will be using planar float data when we call the resampler */
229 _swr_context = swr_alloc_set_opts (
231 audio_channel_layout,
233 _fs->target_sample_rate(),
234 audio_channel_layout,
236 _fs->audio_sample_rate(),
240 swr_init (_swr_context);
242 throw EncodeError ("Cannot resample audio as libswresample is not present");
245 #ifdef HAVE_SWRESAMPLE
250 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
251 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
254 vector<ServerDescription*> servers = Config::instance()->servers ();
256 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
257 for (int j = 0; j < (*i)->threads (); ++j) {
258 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
264 J2KWAVEncoder::process_end ()
266 boost::mutex::scoped_lock lock (_worker_mutex);
268 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
270 /* Keep waking workers until the queue is empty */
271 while (!_queue.empty ()) {
272 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
273 _worker_condition.notify_all ();
274 _worker_condition.wait (lock);
279 terminate_worker_threads ();
281 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
283 /* The following sequence of events can occur in the above code:
284 1. a remote worker takes the last image off the queue
285 2. the loop above terminates
286 3. the remote worker fails to encode the image and puts it back on the queue
287 4. the remote worker is then terminated by terminate_worker_threads
289 So just mop up anything left in the queue here.
292 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
293 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
295 shared_ptr<EncodedData> e = (*i)->encode_locally ();
296 e->write (_opt, (*i)->frame ());
297 frame_done ((*i)->frame ());
298 } catch (std::exception& e) {
299 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
306 shared_ptr<AudioBuffers> out (new AudioBuffers (_fs->audio_channels(), 256));
309 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
312 throw EncodeError ("could not run sample-rate converter");
322 swr_free (&_swr_context);
326 close_sound_files ();
328 /* Rename .wav.tmp files to .wav */
329 for (int i = 0; i < _fs->audio_channels(); ++i) {
330 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
331 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
333 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
338 J2KWAVEncoder::process_audio (shared_ptr<const AudioBuffers> audio)
340 shared_ptr<AudioBuffers> resampled;
343 /* Maybe sample-rate convert */
346 /* Compute the resampled frames count and add 32 for luck */
347 int const max_resampled_frames = ceil (audio->frames() * _fs->target_sample_rate() / _fs->audio_sample_rate()) + 32;
349 resampled.reset (new AudioBuffers (_fs->audio_channels(), max_resampled_frames));
352 int const resampled_frames = swr_convert (
353 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
356 if (resampled_frames < 0) {
357 throw EncodeError ("could not run sample-rate converter");
360 resampled->set_frames (resampled_frames);
362 /* And point our variables at the resampled audio */
371 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio) const
373 for (int i = 0; i < _fs->audio_channels(); ++i) {
374 sf_write_float (_sound_files[i], audio->data(i), audio->frames());