2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 , _deinterleave_buffer_size (8192)
50 , _deinterleave_buffer (0)
51 , _process_end (false)
53 /* Create sound output files with .tmp suffixes; we will rename
54 them if and when we complete.
56 for (int i = 0; i < _fs->audio_channels; ++i) {
58 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59 /* We write mono files */
61 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
64 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
66 _sound_files.push_back (f);
69 /* Create buffer for deinterleaving audio */
70 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
73 J2KWAVEncoder::~J2KWAVEncoder ()
75 terminate_worker_threads ();
76 delete[] _deinterleave_buffer;
81 J2KWAVEncoder::terminate_worker_threads ()
83 boost::mutex::scoped_lock lock (_worker_mutex);
85 _worker_condition.notify_all ();
88 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
95 J2KWAVEncoder::close_sound_files ()
97 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101 _sound_files.clear ();
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
107 boost::mutex::scoped_lock lock (_worker_mutex);
109 /* Wait until the queue has gone down a bit */
110 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111 _worker_condition.wait (lock);
118 /* Only do the processing if we don't already have a file for this frame */
119 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
120 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
121 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
123 yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
124 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
129 _worker_condition.notify_all ();
136 J2KWAVEncoder::encoder_thread (ServerDescription* server)
138 /* Number of seconds that we currently wait between attempts
139 to connect to the server; not relevant for localhost
142 int remote_backoff = 0;
145 boost::mutex::scoped_lock lock (_worker_mutex);
146 while (_queue.empty () && !_process_end) {
147 _worker_condition.wait (lock);
154 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
159 shared_ptr<EncodedData> encoded;
163 encoded = vf->encode_remotely (server);
165 if (remote_backoff > 0) {
167 s << server->host_name() << " was lost, but now she is found; removing backoff";
168 _log->log (s.str ());
171 /* This job succeeded, so remove any backoff */
174 } catch (std::exception& e) {
175 if (remote_backoff < 60) {
177 remote_backoff += 10;
180 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
181 _log->log (s.str ());
186 encoded = vf->encode_locally ();
187 } catch (std::exception& e) {
189 s << "Local encode failed " << e.what() << ".";
190 _log->log (s.str ());
195 encoded->write (_opt, vf->frame ());
196 frame_done (vf->frame ());
199 _queue.push_front (vf);
203 if (remote_backoff > 0) {
204 dvdomatic_sleep (remote_backoff);
208 _worker_condition.notify_all ();
213 J2KWAVEncoder::process_begin ()
215 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
216 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
219 vector<ServerDescription*> servers = Config::instance()->servers ();
221 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
222 for (int j = 0; j < (*i)->threads (); ++j) {
223 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
229 J2KWAVEncoder::process_end ()
231 boost::mutex::scoped_lock lock (_worker_mutex);
233 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
235 /* Keep waking workers until the queue is empty */
236 while (!_queue.empty ()) {
237 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
238 _worker_condition.notify_all ();
239 _worker_condition.wait (lock);
244 terminate_worker_threads ();
246 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
248 /* The following sequence of events can occur in the above code:
249 1. a remote worker takes the last image off the queue
250 2. the loop above terminates
251 3. the remote worker fails to encode the image and puts it back on the queue
252 4. the remote worker is then terminated by terminate_worker_threads
254 So just mop up anything left in the queue here.
257 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
259 s << "Encode left-over frame " << (*i)->frame();
260 _log->log (s.str ());
262 shared_ptr<EncodedData> e = (*i)->encode_locally ();
263 e->write (_opt, (*i)->frame ());
264 frame_done ((*i)->frame ());
265 } catch (std::exception& e) {
267 s << "Local encode failed " << e.what() << ".";
268 _log->log (s.str ());
272 close_sound_files ();
274 /* Rename .wav.tmp files to .wav */
275 for (int i = 0; i < _fs->audio_channels; ++i) {
276 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
277 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
279 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
284 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
286 /* Size of a sample in bytes */
287 int const sample_size = 2;
289 /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
290 of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
293 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
295 /* Number of bytes left to read this time */
296 int remaining = data_size;
297 /* Our position in the output buffers, in bytes */
299 while (remaining > 0) {
300 /* How many bytes of the deinterleaved data to do this time */
301 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
302 for (int i = 0; i < _fs->audio_channels; ++i) {
303 for (int j = 0; j < this_time; j += sample_size) {
304 for (int k = 0; k < sample_size; ++k) {
305 int const to = j + k;
306 int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
307 _deinterleave_buffer[to] = data[from];
311 switch (_fs->audio_sample_format) {
312 case AV_SAMPLE_FMT_S16:
313 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
316 throw DecodeError ("unknown audio sample format");
320 position += this_time;
321 remaining -= this_time * _fs->audio_channels;