2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 , _deinterleave_buffer_size (8192)
50 , _deinterleave_buffer (0)
51 , _process_end (false)
53 /* Create sound output files with .tmp suffixes; we will rename
54 them if and when we complete.
56 for (int i = 0; i < _fs->audio_channels; ++i) {
58 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59 /* We write mono files */
61 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
64 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
66 _sound_files.push_back (f);
69 /* Create buffer for deinterleaving audio */
70 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
73 J2KWAVEncoder::~J2KWAVEncoder ()
75 terminate_worker_threads ();
76 delete[] _deinterleave_buffer;
81 J2KWAVEncoder::terminate_worker_threads ()
83 boost::mutex::scoped_lock lock (_worker_mutex);
85 _worker_condition.notify_all ();
88 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
95 J2KWAVEncoder::close_sound_files ()
97 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101 _sound_files.clear ();
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
107 boost::mutex::scoped_lock lock (_worker_mutex);
109 /* Wait until the queue has gone down a bit */
110 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111 TIMING ("decoder sleeps with queue of %1", _queue.size());
112 _worker_condition.wait (lock);
113 TIMING ("decoder wakes with queue of %1", _queue.size());
120 /* Only do the processing if we don't already have a file for this frame */
121 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
122 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
123 TIMING ("adding to queue of %1", _queue.size ());
124 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
126 yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
127 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
132 _worker_condition.notify_all ();
139 J2KWAVEncoder::encoder_thread (ServerDescription* server)
141 /* Number of seconds that we currently wait between attempts
142 to connect to the server; not relevant for localhost
145 int remote_backoff = 0;
149 TIMING ("encoder thread %1 sleeps", pthread_self ());
150 boost::mutex::scoped_lock lock (_worker_mutex);
151 while (_queue.empty () && !_process_end) {
152 _worker_condition.wait (lock);
159 TIMING ("encoder thread %1 wakes with queue of %2", pthread_self(), _queue.size());
160 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
165 shared_ptr<EncodedData> encoded;
169 encoded = vf->encode_remotely (server);
171 if (remote_backoff > 0) {
172 _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
175 /* This job succeeded, so remove any backoff */
178 } catch (std::exception& e) {
179 if (remote_backoff < 60) {
181 remote_backoff += 10;
185 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
186 vf->frame(), server->host_name(), e.what(), remote_backoff)
192 TIMING ("encoder thread %1 begins local encode of %2", pthread_self(), vf->frame());
193 encoded = vf->encode_locally ();
194 TIMING ("encoder thread %1 finishes local encode of %2", pthread_self(), vf->frame());
195 } catch (std::exception& e) {
196 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
201 encoded->write (_opt, vf->frame ());
202 frame_done (vf->frame ());
205 _queue.push_front (vf);
209 if (remote_backoff > 0) {
210 dvdomatic_sleep (remote_backoff);
214 _worker_condition.notify_all ();
219 J2KWAVEncoder::process_begin ()
221 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
222 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
225 vector<ServerDescription*> servers = Config::instance()->servers ();
227 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
228 for (int j = 0; j < (*i)->threads (); ++j) {
229 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
235 J2KWAVEncoder::process_end ()
237 boost::mutex::scoped_lock lock (_worker_mutex);
239 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
241 /* Keep waking workers until the queue is empty */
242 while (!_queue.empty ()) {
243 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
244 _worker_condition.notify_all ();
245 _worker_condition.wait (lock);
250 terminate_worker_threads ();
252 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
254 /* The following sequence of events can occur in the above code:
255 1. a remote worker takes the last image off the queue
256 2. the loop above terminates
257 3. the remote worker fails to encode the image and puts it back on the queue
258 4. the remote worker is then terminated by terminate_worker_threads
260 So just mop up anything left in the queue here.
263 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
264 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
266 shared_ptr<EncodedData> e = (*i)->encode_locally ();
267 e->write (_opt, (*i)->frame ());
268 frame_done ((*i)->frame ());
269 } catch (std::exception& e) {
270 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
274 close_sound_files ();
276 /* Rename .wav.tmp files to .wav */
277 for (int i = 0; i < _fs->audio_channels; ++i) {
278 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
279 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
281 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
286 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
288 /* Size of a sample in bytes */
289 int const sample_size = 2;
291 /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
292 of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
295 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
297 /* Number of bytes left to read this time */
298 int remaining = data_size;
299 /* Our position in the output buffers, in bytes */
301 while (remaining > 0) {
302 /* How many bytes of the deinterleaved data to do this time */
303 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
304 for (int i = 0; i < _fs->audio_channels; ++i) {
305 for (int j = 0; j < this_time; j += sample_size) {
306 for (int k = 0; k < sample_size; ++k) {
307 int const to = j + k;
308 int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
309 _deinterleave_buffer[to] = data[from];
313 switch (_fs->audio_sample_format) {
314 case AV_SAMPLE_FMT_S16:
315 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
318 throw DecodeError ("unknown audio sample format");
322 position += this_time;
323 remaining -= this_time * _fs->audio_channels;