Various fixes to resampling.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49 #ifdef HAVE_SWRESAMPLE    
50         , _swr_context (0)
51 #endif    
52         , _deinterleave_buffer_size (8192)
53         , _deinterleave_buffer (0)
54         , _process_end (false)
55 {
56         /* Create sound output files with .tmp suffixes; we will rename
57            them if and when we complete.
58         */
59         for (int i = 0; i < _fs->audio_channels; ++i) {
60                 SF_INFO sf_info;
61                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
62                 /* We write mono files */
63                 sf_info.channels = 1;
64                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
65                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
66                 if (f == 0) {
67                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
68                 }
69                 _sound_files.push_back (f);
70         }
71
72         /* Create buffer for deinterleaving audio */
73         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
74 }
75
76 J2KWAVEncoder::~J2KWAVEncoder ()
77 {
78         terminate_worker_threads ();
79         delete[] _deinterleave_buffer;
80         close_sound_files ();
81 }
82
83 void
84 J2KWAVEncoder::terminate_worker_threads ()
85 {
86         boost::mutex::scoped_lock lock (_worker_mutex);
87         _process_end = true;
88         _worker_condition.notify_all ();
89         lock.unlock ();
90
91         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
92                 (*i)->join ();
93                 delete *i;
94         }
95 }
96
97 void
98 J2KWAVEncoder::close_sound_files ()
99 {
100         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101                 sf_close (*i);
102         }
103
104         _sound_files.clear ();
105 }       
106
107 void
108 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
109 {
110         boost::mutex::scoped_lock lock (_worker_mutex);
111
112         /* Wait until the queue has gone down a bit */
113         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
114                 _worker_condition.wait (lock);
115         }
116
117         if (_process_end) {
118                 return;
119         }
120
121         /* Only do the processing if we don't already have a file for this frame */
122         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
123                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
124                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
125                                           new DCPVideoFrame (
126                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
127                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
128                                                   _log
129                                                   )
130                                           ));
131                 
132                 _worker_condition.notify_all ();
133         } else {
134                 frame_skipped ();
135         }
136 }
137
138 void
139 J2KWAVEncoder::encoder_thread (ServerDescription* server)
140 {
141         /* Number of seconds that we currently wait between attempts
142            to connect to the server; not relevant for localhost
143            encodings.
144         */
145         int remote_backoff = 0;
146         
147         while (1) {
148                 boost::mutex::scoped_lock lock (_worker_mutex);
149                 while (_queue.empty () && !_process_end) {
150                         _worker_condition.wait (lock);
151                 }
152
153                 if (_process_end) {
154                         return;
155                 }
156
157                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
158                 _queue.pop_front ();
159                 
160                 lock.unlock ();
161
162                 shared_ptr<EncodedData> encoded;
163
164                 if (server) {
165                         try {
166                                 encoded = vf->encode_remotely (server);
167
168                                 if (remote_backoff > 0) {
169                                         stringstream s;
170                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
171                                         _log->log (s.str ());
172                                 }
173                                 
174                                 /* This job succeeded, so remove any backoff */
175                                 remote_backoff = 0;
176                                 
177                         } catch (std::exception& e) {
178                                 if (remote_backoff < 60) {
179                                         /* back off more */
180                                         remote_backoff += 10;
181                                 }
182                                 stringstream s;
183                                 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
184                                 _log->log (s.str ());
185                         }
186                                 
187                 } else {
188                         try {
189                                 encoded = vf->encode_locally ();
190                         } catch (std::exception& e) {
191                                 stringstream s;
192                                 s << "Local encode failed " << e.what() << ".";
193                                 _log->log (s.str ());
194                         }
195                 }
196
197                 if (encoded) {
198                         encoded->write (_opt, vf->frame ());
199                         frame_done (vf->frame ());
200                 } else {
201                         lock.lock ();
202                         _queue.push_front (vf);
203                         lock.unlock ();
204                 }
205
206                 if (remote_backoff > 0) {
207                         dvdomatic_sleep (remote_backoff);
208                 }
209
210                 lock.lock ();
211                 _worker_condition.notify_all ();
212         }
213 }
214
215 void
216 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
217 {
218         if ((_fs->audio_sample_rate != dcp_audio_sample_rate (_fs->audio_sample_rate)) || (rint (_fs->frames_per_second) != _fs->frames_per_second)) {
219 #ifdef HAVE_SWRESAMPLE
220
221                 stringstream s;
222                 s << "Will resample audio from " << _fs->audio_sample_rate << " to " << _fs->target_sample_rate();
223                 _log->log (s.str ());
224                 
225                 _swr_context = swr_alloc_set_opts (
226                         0,
227                         audio_channel_layout,
228                         audio_sample_format,
229                         _fs->target_sample_rate(),
230                         audio_channel_layout,
231                         audio_sample_format,
232                         _fs->audio_sample_rate,
233                         0, 0
234                         );
235                 
236                 swr_init (_swr_context);
237 #else
238                 throw EncodeError ("Cannot resample audio as libswresample is not present");
239 #endif
240         } else {
241 #ifdef HAVE_SWRESAMPLE
242                 _swr_context = 0;
243 #endif          
244         }
245         
246         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
247                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
248         }
249
250         vector<ServerDescription*> servers = Config::instance()->servers ();
251
252         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
253                 for (int j = 0; j < (*i)->threads (); ++j) {
254                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
255                 }
256         }
257 }
258
259 void
260 J2KWAVEncoder::process_end ()
261 {
262         boost::mutex::scoped_lock lock (_worker_mutex);
263
264         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
265
266         /* Keep waking workers until the queue is empty */
267         while (!_queue.empty ()) {
268                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
269                 _worker_condition.notify_all ();
270                 _worker_condition.wait (lock);
271         }
272
273         lock.unlock ();
274         
275         terminate_worker_threads ();
276
277         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
278
279         /* The following sequence of events can occur in the above code:
280              1. a remote worker takes the last image off the queue
281              2. the loop above terminates
282              3. the remote worker fails to encode the image and puts it back on the queue
283              4. the remote worker is then terminated by terminate_worker_threads
284
285              So just mop up anything left in the queue here.
286         */
287
288         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
289                 stringstream s;
290                 s << "Encode left-over frame " << (*i)->frame();
291                 _log->log (s.str ());
292                 try {
293                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
294                         e->write (_opt, (*i)->frame ());
295                         frame_done ((*i)->frame ());
296                 } catch (std::exception& e) {
297                         stringstream s;
298                         s << "Local encode failed " << e.what() << ".";
299                         _log->log (s.str ());
300                 }
301         }
302
303 #if HAVE_SWRESAMPLE     
304         if (_swr_context) {
305
306                 while (1) {
307                         uint8_t buffer[256 * _fs->bytes_per_sample() * _fs->audio_channels];
308                         uint8_t* out[2] = {
309                                 buffer,
310                                 0
311                         };
312
313                         int const frames = swr_convert (_swr_context, out, 256, 0, 0);
314
315                         if (frames < 0) {
316                                 throw EncodeError ("could not run sample-rate converter");
317                         }
318
319                         if (frames == 0) {
320                                 break;
321                         }
322
323                         write_audio (buffer, frames * _fs->bytes_per_sample() * _fs->audio_channels);
324                 }
325
326                 swr_free (&_swr_context);
327         }
328 #endif  
329         
330         close_sound_files ();
331
332         /* Rename .wav.tmp files to .wav */
333         for (int i = 0; i < _fs->audio_channels; ++i) {
334                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
335                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
336                 }
337                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
338         }
339 }
340
341 void
342 J2KWAVEncoder::process_audio (uint8_t* data, int size)
343 {
344         /* This is a buffer we might use if we are sample-rate converting;
345            it will need freeing if so.
346         */
347         uint8_t* out_buffer = 0;
348         
349         /* Maybe sample-rate convert */
350 #if HAVE_SWRESAMPLE     
351         if (_swr_context) {
352
353                 uint8_t const * in[2] = {
354                         data,
355                         0
356                 };
357
358                 /* Here's samples per channel */
359                 int const samples = size / _fs->bytes_per_sample();
360                 
361                 /* And here's frames (where 1 frame is a collection of samples, 1 for each channel,
362                    so for 5.1 a frame would be 6 samples)
363                 */
364                 int const frames = samples / _fs->audio_channels;
365
366                 /* Compute the resampled frame count and add 32 for luck */
367                 int const out_buffer_size_frames = ceil (frames * _fs->target_sample_rate() / _fs->audio_sample_rate) + 32;
368                 int const out_buffer_size_bytes = out_buffer_size_frames * _fs->audio_channels * _fs->bytes_per_sample();
369                 out_buffer = new uint8_t[out_buffer_size_bytes];
370
371                 uint8_t* out[2] = {
372                         out_buffer, 
373                         0
374                 };
375
376                 /* Resample audio */
377                 int out_frames = swr_convert (_swr_context, out, out_buffer_size_frames, in, frames);
378                 if (out_frames < 0) {
379                         throw EncodeError ("could not run sample-rate converter");
380                 }
381
382                 /* And point our variables at the resampled audio */
383                 data = out_buffer;
384                 size = out_frames * _fs->audio_channels * _fs->bytes_per_sample();
385         }
386 #endif
387
388         write_audio (data, size);
389
390         /* Delete the sample-rate conversion buffer, if it exists */
391         delete[] out_buffer;
392 }
393
394 void
395 J2KWAVEncoder::write_audio (uint8_t* data, int size)
396 {
397         /* XXX: we are assuming that the _deinterleave_buffer_size is a multiple
398            of the sample size and that size is a multiple of _fs->audio_channels * sample_size.
399         */
400
401         assert ((size % (_fs->audio_channels * _fs->bytes_per_sample())) == 0);
402         assert ((_deinterleave_buffer_size % _fs->bytes_per_sample()) == 0);
403         
404         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
405         
406         /* Number of bytes left to read this time */
407         int remaining = size;
408         /* Our position in the output buffers, in bytes */
409         int position = 0;
410         while (remaining > 0) {
411                 /* How many bytes of the deinterleaved data to do this time */
412                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
413                 for (int i = 0; i < _fs->audio_channels; ++i) {
414                         for (int j = 0; j < this_time; j += _fs->bytes_per_sample()) {
415                                 for (int k = 0; k < _fs->bytes_per_sample(); ++k) {
416                                         int const to = j + k;
417                                         int const from = position + (i * _fs->bytes_per_sample()) + (j * _fs->audio_channels) + k;
418                                         _deinterleave_buffer[to] = data[from];
419                                 }
420                         }
421                         
422                         switch (_fs->audio_sample_format) {
423                         case AV_SAMPLE_FMT_S16:
424                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / _fs->bytes_per_sample());
425                                 break;
426                         default:
427                                 throw EncodeError ("unknown audio sample format");
428                         }
429                 }
430                 
431                 position += this_time;
432                 remaining -= this_time * _fs->audio_channels;
433         }
434 }
435