Tidy up test for whether or not to resample.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49 #ifdef HAVE_SWRESAMPLE    
50         , _swr_context (0)
51 #endif    
52         , _deinterleave_buffer_size (8192)
53         , _deinterleave_buffer (0)
54         , _process_end (false)
55 {
56         /* Create sound output files with .tmp suffixes; we will rename
57            them if and when we complete.
58         */
59         for (int i = 0; i < _fs->audio_channels; ++i) {
60                 SF_INFO sf_info;
61                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
62                 /* We write mono files */
63                 sf_info.channels = 1;
64                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
65                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
66                 if (f == 0) {
67                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
68                 }
69                 _sound_files.push_back (f);
70         }
71
72         /* Create buffer for deinterleaving audio */
73         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
74 }
75
76 J2KWAVEncoder::~J2KWAVEncoder ()
77 {
78         terminate_worker_threads ();
79         delete[] _deinterleave_buffer;
80         close_sound_files ();
81 }
82
83 void
84 J2KWAVEncoder::terminate_worker_threads ()
85 {
86         boost::mutex::scoped_lock lock (_worker_mutex);
87         _process_end = true;
88         _worker_condition.notify_all ();
89         lock.unlock ();
90
91         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
92                 (*i)->join ();
93                 delete *i;
94         }
95 }
96
97 void
98 J2KWAVEncoder::close_sound_files ()
99 {
100         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101                 sf_close (*i);
102         }
103
104         _sound_files.clear ();
105 }       
106
107 void
108 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
109 {
110         boost::mutex::scoped_lock lock (_worker_mutex);
111
112         /* Wait until the queue has gone down a bit */
113         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
114                 TIMING ("decoder sleeps with queue of %1", _queue.size());
115                 _worker_condition.wait (lock);
116                 TIMING ("decoder wakes with queue of %1", _queue.size());
117         }
118
119         if (_process_end) {
120                 return;
121         }
122
123         /* Only do the processing if we don't already have a file for this frame */
124         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
125                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
126                 TIMING ("adding to queue of %1", _queue.size ());
127                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
128                                           new DCPVideoFrame (
129                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
130                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
131                                                   _log
132                                                   )
133                                           ));
134                 
135                 _worker_condition.notify_all ();
136         } else {
137                 frame_skipped ();
138         }
139 }
140
141 void
142 J2KWAVEncoder::encoder_thread (ServerDescription* server)
143 {
144         /* Number of seconds that we currently wait between attempts
145            to connect to the server; not relevant for localhost
146            encodings.
147         */
148         int remote_backoff = 0;
149         
150         while (1) {
151
152                 TIMING ("encoder thread %1 sleeps", pthread_self ());
153                 boost::mutex::scoped_lock lock (_worker_mutex);
154                 while (_queue.empty () && !_process_end) {
155                         _worker_condition.wait (lock);
156                 }
157
158                 if (_process_end) {
159                         return;
160                 }
161
162                 TIMING ("encoder thread %1 wakes with queue of %2", pthread_self(), _queue.size());
163                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
164                 _queue.pop_front ();
165                 
166                 lock.unlock ();
167
168                 shared_ptr<EncodedData> encoded;
169
170                 if (server) {
171                         try {
172                                 encoded = vf->encode_remotely (server);
173
174                                 if (remote_backoff > 0) {
175                                         _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
176                                 }
177                                 
178                                 /* This job succeeded, so remove any backoff */
179                                 remote_backoff = 0;
180                                 
181                         } catch (std::exception& e) {
182                                 if (remote_backoff < 60) {
183                                         /* back off more */
184                                         remote_backoff += 10;
185                                 }
186                                 _log->log (
187                                         String::compose (
188                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
189                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
190                                         );
191                         }
192                                 
193                 } else {
194                         try {
195                                 TIMING ("encoder thread %1 begins local encode of %2", pthread_self(), vf->frame());
196                                 encoded = vf->encode_locally ();
197                                 TIMING ("encoder thread %1 finishes local encode of %2", pthread_self(), vf->frame());
198                         } catch (std::exception& e) {
199                                 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
200                         }
201                 }
202
203                 if (encoded) {
204                         encoded->write (_opt, vf->frame ());
205                         frame_done (vf->frame ());
206                 } else {
207                         lock.lock ();
208                         _queue.push_front (vf);
209                         lock.unlock ();
210                 }
211
212                 if (remote_backoff > 0) {
213                         dvdomatic_sleep (remote_backoff);
214                 }
215
216                 lock.lock ();
217                 _worker_condition.notify_all ();
218         }
219 }
220
221 void
222 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
223 {
224         if (_fs->audio_sample_rate != _fs->target_sample_rate ()) {
225 #ifdef HAVE_SWRESAMPLE
226
227                 stringstream s;
228                 s << "Will resample audio from " << _fs->audio_sample_rate << " to " << _fs->target_sample_rate();
229                 _log->log (s.str ());
230                 
231                 _swr_context = swr_alloc_set_opts (
232                         0,
233                         audio_channel_layout,
234                         audio_sample_format,
235                         _fs->target_sample_rate(),
236                         audio_channel_layout,
237                         audio_sample_format,
238                         _fs->audio_sample_rate,
239                         0, 0
240                         );
241                 
242                 swr_init (_swr_context);
243 #else
244                 throw EncodeError ("Cannot resample audio as libswresample is not present");
245 #endif
246         } else {
247 #ifdef HAVE_SWRESAMPLE
248                 _swr_context = 0;
249 #endif          
250         }
251         
252         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
253                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
254         }
255
256         vector<ServerDescription*> servers = Config::instance()->servers ();
257
258         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
259                 for (int j = 0; j < (*i)->threads (); ++j) {
260                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
261                 }
262         }
263 }
264
265 void
266 J2KWAVEncoder::process_end ()
267 {
268         boost::mutex::scoped_lock lock (_worker_mutex);
269
270         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
271
272         /* Keep waking workers until the queue is empty */
273         while (!_queue.empty ()) {
274                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
275                 _worker_condition.notify_all ();
276                 _worker_condition.wait (lock);
277         }
278
279         lock.unlock ();
280         
281         terminate_worker_threads ();
282
283         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
284
285         /* The following sequence of events can occur in the above code:
286              1. a remote worker takes the last image off the queue
287              2. the loop above terminates
288              3. the remote worker fails to encode the image and puts it back on the queue
289              4. the remote worker is then terminated by terminate_worker_threads
290
291              So just mop up anything left in the queue here.
292         */
293
294         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
295                 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
296                 try {
297                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
298                         e->write (_opt, (*i)->frame ());
299                         frame_done ((*i)->frame ());
300                 } catch (std::exception& e) {
301                         _log->log (String::compose ("Local encode failed (%1)", e.what ()));
302                 }
303         }
304
305 #if HAVE_SWRESAMPLE     
306         if (_swr_context) {
307
308                 while (1) {
309                         uint8_t buffer[256 * _fs->bytes_per_sample() * _fs->audio_channels];
310                         uint8_t* out[2] = {
311                                 buffer,
312                                 0
313                         };
314
315                         int const frames = swr_convert (_swr_context, out, 256, 0, 0);
316
317                         if (frames < 0) {
318                                 throw EncodeError ("could not run sample-rate converter");
319                         }
320
321                         if (frames == 0) {
322                                 break;
323                         }
324
325                         write_audio (buffer, frames * _fs->bytes_per_sample() * _fs->audio_channels);
326                 }
327
328                 swr_free (&_swr_context);
329         }
330 #endif  
331         
332         close_sound_files ();
333
334         /* Rename .wav.tmp files to .wav */
335         for (int i = 0; i < _fs->audio_channels; ++i) {
336                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
337                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
338                 }
339                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
340         }
341 }
342
343 void
344 J2KWAVEncoder::process_audio (uint8_t* data, int size)
345 {
346         /* This is a buffer we might use if we are sample-rate converting;
347            it will need freeing if so.
348         */
349         uint8_t* out_buffer = 0;
350         
351         /* Maybe sample-rate convert */
352 #if HAVE_SWRESAMPLE     
353         if (_swr_context) {
354
355                 uint8_t const * in[2] = {
356                         data,
357                         0
358                 };
359
360                 /* Here's samples per channel */
361                 int const samples = size / _fs->bytes_per_sample();
362                 
363                 /* And here's frames (where 1 frame is a collection of samples, 1 for each channel,
364                    so for 5.1 a frame would be 6 samples)
365                 */
366                 int const frames = samples / _fs->audio_channels;
367
368                 /* Compute the resampled frame count and add 32 for luck */
369                 int const out_buffer_size_frames = ceil (frames * _fs->target_sample_rate() / _fs->audio_sample_rate) + 32;
370                 int const out_buffer_size_bytes = out_buffer_size_frames * _fs->audio_channels * _fs->bytes_per_sample();
371                 out_buffer = new uint8_t[out_buffer_size_bytes];
372
373                 uint8_t* out[2] = {
374                         out_buffer, 
375                         0
376                 };
377
378                 /* Resample audio */
379                 int out_frames = swr_convert (_swr_context, out, out_buffer_size_frames, in, frames);
380                 if (out_frames < 0) {
381                         throw EncodeError ("could not run sample-rate converter");
382                 }
383
384                 /* And point our variables at the resampled audio */
385                 data = out_buffer;
386                 size = out_frames * _fs->audio_channels * _fs->bytes_per_sample();
387         }
388 #endif
389
390         write_audio (data, size);
391
392         /* Delete the sample-rate conversion buffer, if it exists */
393         delete[] out_buffer;
394 }
395
396 void
397 J2KWAVEncoder::write_audio (uint8_t* data, int size)
398 {
399         /* XXX: we are assuming that the _deinterleave_buffer_size is a multiple
400            of the sample size and that size is a multiple of _fs->audio_channels * sample_size.
401         */
402
403         assert ((size % (_fs->audio_channels * _fs->bytes_per_sample())) == 0);
404         assert ((_deinterleave_buffer_size % _fs->bytes_per_sample()) == 0);
405         
406         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
407         
408         /* Number of bytes left to read this time */
409         int remaining = size;
410         /* Our position in the output buffers, in bytes */
411         int position = 0;
412         while (remaining > 0) {
413                 /* How many bytes of the deinterleaved data to do this time */
414                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
415                 for (int i = 0; i < _fs->audio_channels; ++i) {
416                         for (int j = 0; j < this_time; j += _fs->bytes_per_sample()) {
417                                 for (int k = 0; k < _fs->bytes_per_sample(); ++k) {
418                                         int const to = j + k;
419                                         int const from = position + (i * _fs->bytes_per_sample()) + (j * _fs->audio_channels) + k;
420                                         _deinterleave_buffer[to] = data[from];
421                                 }
422                         }
423                         
424                         switch (_fs->audio_sample_format) {
425                         case AV_SAMPLE_FMT_S16:
426                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / _fs->bytes_per_sample());
427                                 break;
428                         default:
429                                 throw EncodeError ("unknown audio sample format");
430                         }
431                 }
432                 
433                 position += this_time;
434                 remaining -= this_time * _fs->audio_channels;
435         }
436 }
437