e2a3a5ed7193dd3b591d0785999aff4c48dfacad
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49 #ifdef HAVE_SWRESAMPLE    
50         , _swr_context (0)
51 #endif    
52         , _deinterleave_buffer_size (8192)
53         , _deinterleave_buffer (0)
54         , _process_end (false)
55 {
56         /* Create sound output files with .tmp suffixes; we will rename
57            them if and when we complete.
58         */
59         for (int i = 0; i < _fs->audio_channels; ++i) {
60                 SF_INFO sf_info;
61                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
62                 /* We write mono files */
63                 sf_info.channels = 1;
64                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
65                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
66                 if (f == 0) {
67                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
68                 }
69                 _sound_files.push_back (f);
70         }
71
72         /* Create buffer for deinterleaving audio */
73         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
74 }
75
76 J2KWAVEncoder::~J2KWAVEncoder ()
77 {
78         terminate_worker_threads ();
79         delete[] _deinterleave_buffer;
80         close_sound_files ();
81 }
82
83 void
84 J2KWAVEncoder::terminate_worker_threads ()
85 {
86         boost::mutex::scoped_lock lock (_worker_mutex);
87         _process_end = true;
88         _worker_condition.notify_all ();
89         lock.unlock ();
90
91         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
92                 (*i)->join ();
93                 delete *i;
94         }
95 }
96
97 void
98 J2KWAVEncoder::close_sound_files ()
99 {
100         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101                 sf_close (*i);
102         }
103
104         _sound_files.clear ();
105 }       
106
107 void
108 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame, shared_ptr<Subtitle> sub)
109 {
110         boost::mutex::scoped_lock lock (_worker_mutex);
111
112         /* Wait until the queue has gone down a bit */
113         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
114                 TIMING ("decoder sleeps with queue of %1", _queue.size());
115                 _worker_condition.wait (lock);
116                 TIMING ("decoder wakes with queue of %1", _queue.size());
117         }
118
119         if (_process_end) {
120                 return;
121         }
122
123         /* Only do the processing if we don't already have a file for this frame */
124         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
125                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
126                 TIMING ("adding to queue of %1", _queue.size ());
127                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
128                                           new DCPVideoFrame (
129                                                   yuv, sub, _opt->out_size, _opt->padding, _fs->subtitle_offset, _fs->subtitle_scale,
130                                                   _fs->scaler, frame, _fs->frames_per_second, s.second,
131                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
132                                                   _log
133                                                   )
134                                           ));
135                 
136                 _worker_condition.notify_all ();
137         } else {
138                 frame_skipped ();
139         }
140 }
141
142 void
143 J2KWAVEncoder::encoder_thread (ServerDescription* server)
144 {
145         /* Number of seconds that we currently wait between attempts
146            to connect to the server; not relevant for localhost
147            encodings.
148         */
149         int remote_backoff = 0;
150         
151         while (1) {
152
153                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
154                 boost::mutex::scoped_lock lock (_worker_mutex);
155                 while (_queue.empty () && !_process_end) {
156                         _worker_condition.wait (lock);
157                 }
158
159                 if (_process_end) {
160                         return;
161                 }
162
163                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
164                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
165                 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()));
166                 _queue.pop_front ();
167                 
168                 lock.unlock ();
169
170                 shared_ptr<EncodedData> encoded;
171
172                 if (server) {
173                         try {
174                                 encoded = vf->encode_remotely (server);
175
176                                 if (remote_backoff > 0) {
177                                         _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
178                                 }
179                                 
180                                 /* This job succeeded, so remove any backoff */
181                                 remote_backoff = 0;
182                                 
183                         } catch (std::exception& e) {
184                                 if (remote_backoff < 60) {
185                                         /* back off more */
186                                         remote_backoff += 10;
187                                 }
188                                 _log->log (
189                                         String::compose (
190                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
191                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
192                                         );
193                         }
194                                 
195                 } else {
196                         try {
197                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
198                                 encoded = vf->encode_locally ();
199                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
200                         } catch (std::exception& e) {
201                                 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
202                         }
203                 }
204
205                 if (encoded) {
206                         encoded->write (_opt, vf->frame ());
207                         frame_done (vf->frame ());
208                 } else {
209                         lock.lock ();
210                         _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()));
211                         _queue.push_front (vf);
212                         lock.unlock ();
213                 }
214
215                 if (remote_backoff > 0) {
216                         dvdomatic_sleep (remote_backoff);
217                 }
218
219                 lock.lock ();
220                 _worker_condition.notify_all ();
221         }
222 }
223
224 void
225 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
226 {
227         if (_fs->audio_sample_rate != _fs->target_sample_rate ()) {
228 #ifdef HAVE_SWRESAMPLE
229
230                 stringstream s;
231                 s << "Will resample audio from " << _fs->audio_sample_rate << " to " << _fs->target_sample_rate();
232                 _log->log (s.str ());
233                 
234                 _swr_context = swr_alloc_set_opts (
235                         0,
236                         audio_channel_layout,
237                         audio_sample_format,
238                         _fs->target_sample_rate(),
239                         audio_channel_layout,
240                         audio_sample_format,
241                         _fs->audio_sample_rate,
242                         0, 0
243                         );
244                 
245                 swr_init (_swr_context);
246 #else
247                 throw EncodeError ("Cannot resample audio as libswresample is not present");
248 #endif
249         } else {
250 #ifdef HAVE_SWRESAMPLE
251                 _swr_context = 0;
252 #endif          
253         }
254         
255         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
256                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
257         }
258
259         vector<ServerDescription*> servers = Config::instance()->servers ();
260
261         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
262                 for (int j = 0; j < (*i)->threads (); ++j) {
263                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
264                 }
265         }
266 }
267
268 void
269 J2KWAVEncoder::process_end ()
270 {
271         boost::mutex::scoped_lock lock (_worker_mutex);
272
273         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
274
275         /* Keep waking workers until the queue is empty */
276         while (!_queue.empty ()) {
277                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
278                 _worker_condition.notify_all ();
279                 _worker_condition.wait (lock);
280         }
281
282         lock.unlock ();
283         
284         terminate_worker_threads ();
285
286         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
287
288         /* The following sequence of events can occur in the above code:
289              1. a remote worker takes the last image off the queue
290              2. the loop above terminates
291              3. the remote worker fails to encode the image and puts it back on the queue
292              4. the remote worker is then terminated by terminate_worker_threads
293
294              So just mop up anything left in the queue here.
295         */
296
297         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
298                 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
299                 try {
300                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
301                         e->write (_opt, (*i)->frame ());
302                         frame_done ((*i)->frame ());
303                 } catch (std::exception& e) {
304                         _log->log (String::compose ("Local encode failed (%1)", e.what ()));
305                 }
306         }
307
308 #if HAVE_SWRESAMPLE     
309         if (_swr_context) {
310
311                 while (1) {
312                         uint8_t buffer[256 * _fs->bytes_per_sample() * _fs->audio_channels];
313                         uint8_t* out[2] = {
314                                 buffer,
315                                 0
316                         };
317
318                         int const frames = swr_convert (_swr_context, out, 256, 0, 0);
319
320                         if (frames < 0) {
321                                 throw EncodeError ("could not run sample-rate converter");
322                         }
323
324                         if (frames == 0) {
325                                 break;
326                         }
327
328                         write_audio (buffer, frames * _fs->bytes_per_sample() * _fs->audio_channels);
329                 }
330
331                 swr_free (&_swr_context);
332         }
333 #endif  
334         
335         close_sound_files ();
336
337         /* Rename .wav.tmp files to .wav */
338         for (int i = 0; i < _fs->audio_channels; ++i) {
339                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
340                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
341                 }
342                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
343         }
344 }
345
346 void
347 J2KWAVEncoder::process_audio (uint8_t* data, int size)
348 {
349         /* This is a buffer we might use if we are sample-rate converting;
350            it will need freeing if so.
351         */
352         uint8_t* out_buffer = 0;
353         
354         /* Maybe sample-rate convert */
355 #if HAVE_SWRESAMPLE     
356         if (_swr_context) {
357
358                 uint8_t const * in[2] = {
359                         data,
360                         0
361                 };
362
363                 /* Here's samples per channel */
364                 int const samples = size / _fs->bytes_per_sample();
365                 
366                 /* And here's frames (where 1 frame is a collection of samples, 1 for each channel,
367                    so for 5.1 a frame would be 6 samples)
368                 */
369                 int const frames = samples / _fs->audio_channels;
370
371                 /* Compute the resampled frame count and add 32 for luck */
372                 int const out_buffer_size_frames = ceil (frames * _fs->target_sample_rate() / _fs->audio_sample_rate) + 32;
373                 int const out_buffer_size_bytes = out_buffer_size_frames * _fs->audio_channels * _fs->bytes_per_sample();
374                 out_buffer = new uint8_t[out_buffer_size_bytes];
375
376                 uint8_t* out[2] = {
377                         out_buffer, 
378                         0
379                 };
380
381                 /* Resample audio */
382                 int out_frames = swr_convert (_swr_context, out, out_buffer_size_frames, in, frames);
383                 if (out_frames < 0) {
384                         throw EncodeError ("could not run sample-rate converter");
385                 }
386
387                 /* And point our variables at the resampled audio */
388                 data = out_buffer;
389                 size = out_frames * _fs->audio_channels * _fs->bytes_per_sample();
390         }
391 #endif
392
393         write_audio (data, size);
394
395         /* Delete the sample-rate conversion buffer, if it exists */
396         delete[] out_buffer;
397 }
398
399 void
400 J2KWAVEncoder::write_audio (uint8_t* data, int size)
401 {
402         /* XXX: we are assuming that the _deinterleave_buffer_size is a multiple
403            of the sample size and that size is a multiple of _fs->audio_channels * sample_size.
404         */
405
406         assert ((size % (_fs->audio_channels * _fs->bytes_per_sample())) == 0);
407         assert ((_deinterleave_buffer_size % _fs->bytes_per_sample()) == 0);
408         
409         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
410         
411         /* Number of bytes left to read this time */
412         int remaining = size;
413         /* Our position in the output buffers, in bytes */
414         int position = 0;
415         while (remaining > 0) {
416                 /* How many bytes of the deinterleaved data to do this time */
417                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
418                 for (int i = 0; i < _fs->audio_channels; ++i) {
419                         for (int j = 0; j < this_time; j += _fs->bytes_per_sample()) {
420                                 for (int k = 0; k < _fs->bytes_per_sample(); ++k) {
421                                         int const to = j + k;
422                                         int const from = position + (i * _fs->bytes_per_sample()) + (j * _fs->audio_channels) + k;
423                                         _deinterleave_buffer[to] = data[from];
424                                 }
425                         }
426                         
427                         switch (_fs->audio_sample_format) {
428                         case AV_SAMPLE_FMT_S16:
429                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / _fs->bytes_per_sample());
430                                 break;
431                         default:
432                                 throw EncodeError ("unknown audio sample format");
433                         }
434                 }
435                 
436                 position += this_time;
437                 remaining -= this_time * _fs->audio_channels;
438         }
439 }
440