Fix up include and add a log.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49 #ifdef HAVE_SWRESAMPLE    
50         , _swr_context (0)
51 #endif    
52         , _deinterleave_buffer_size (8192)
53         , _deinterleave_buffer (0)
54         , _process_end (false)
55 {
56         /* Create sound output files with .tmp suffixes; we will rename
57            them if and when we complete.
58         */
59         for (int i = 0; i < _fs->audio_channels; ++i) {
60                 SF_INFO sf_info;
61                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
62                 /* We write mono files */
63                 sf_info.channels = 1;
64                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
65                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
66                 if (f == 0) {
67                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
68                 }
69                 _sound_files.push_back (f);
70         }
71
72         /* Create buffer for deinterleaving audio */
73         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
74 }
75
76 J2KWAVEncoder::~J2KWAVEncoder ()
77 {
78         terminate_worker_threads ();
79         delete[] _deinterleave_buffer;
80         close_sound_files ();
81 }
82
83 void
84 J2KWAVEncoder::terminate_worker_threads ()
85 {
86         boost::mutex::scoped_lock lock (_worker_mutex);
87         _process_end = true;
88         _worker_condition.notify_all ();
89         lock.unlock ();
90
91         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
92                 (*i)->join ();
93                 delete *i;
94         }
95 }
96
97 void
98 J2KWAVEncoder::close_sound_files ()
99 {
100         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101                 sf_close (*i);
102         }
103
104         _sound_files.clear ();
105 }       
106
107 void
108 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
109 {
110         boost::mutex::scoped_lock lock (_worker_mutex);
111
112         /* Wait until the queue has gone down a bit */
113         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
114                 TIMING ("decoder sleeps with queue of %1", _queue.size());
115                 _worker_condition.wait (lock);
116                 TIMING ("decoder wakes with queue of %1", _queue.size());
117         }
118
119         if (_process_end) {
120                 return;
121         }
122
123         /* Only do the processing if we don't already have a file for this frame */
124         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
125                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
126                 TIMING ("adding to queue of %1", _queue.size ());
127                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
128                                           new DCPVideoFrame (
129                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
130                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
131                                                   _log
132                                                   )
133                                           ));
134                 
135                 _worker_condition.notify_all ();
136         } else {
137                 frame_skipped ();
138         }
139 }
140
141 void
142 J2KWAVEncoder::encoder_thread (ServerDescription* server)
143 {
144         /* Number of seconds that we currently wait between attempts
145            to connect to the server; not relevant for localhost
146            encodings.
147         */
148         int remote_backoff = 0;
149         
150         while (1) {
151
152                 TIMING ("encoder thread %1 sleeps", pthread_self ());
153                 boost::mutex::scoped_lock lock (_worker_mutex);
154                 while (_queue.empty () && !_process_end) {
155                         _worker_condition.wait (lock);
156                 }
157
158                 if (_process_end) {
159                         return;
160                 }
161
162                 TIMING ("encoder thread %1 wakes with queue of %2", pthread_self(), _queue.size());
163                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
164                 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", pthread_self(), vf->frame()));
165                 _queue.pop_front ();
166                 
167                 lock.unlock ();
168
169                 shared_ptr<EncodedData> encoded;
170
171                 if (server) {
172                         try {
173                                 encoded = vf->encode_remotely (server);
174
175                                 if (remote_backoff > 0) {
176                                         _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
177                                 }
178                                 
179                                 /* This job succeeded, so remove any backoff */
180                                 remote_backoff = 0;
181                                 
182                         } catch (std::exception& e) {
183                                 if (remote_backoff < 60) {
184                                         /* back off more */
185                                         remote_backoff += 10;
186                                 }
187                                 _log->log (
188                                         String::compose (
189                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
190                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
191                                         );
192                         }
193                                 
194                 } else {
195                         try {
196                                 TIMING ("encoder thread %1 begins local encode of %2", pthread_self(), vf->frame());
197                                 encoded = vf->encode_locally ();
198                                 TIMING ("encoder thread %1 finishes local encode of %2", pthread_self(), vf->frame());
199                         } catch (std::exception& e) {
200                                 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
201                         }
202                 }
203
204                 if (encoded) {
205                         encoded->write (_opt, vf->frame ());
206                         frame_done (vf->frame ());
207                 } else {
208                         lock.lock ();
209                         _queue.push_front (vf);
210                         lock.unlock ();
211                 }
212
213                 if (remote_backoff > 0) {
214                         dvdomatic_sleep (remote_backoff);
215                 }
216
217                 lock.lock ();
218                 _worker_condition.notify_all ();
219         }
220 }
221
222 void
223 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
224 {
225         if (_fs->audio_sample_rate != _fs->target_sample_rate ()) {
226 #ifdef HAVE_SWRESAMPLE
227
228                 stringstream s;
229                 s << "Will resample audio from " << _fs->audio_sample_rate << " to " << _fs->target_sample_rate();
230                 _log->log (s.str ());
231                 
232                 _swr_context = swr_alloc_set_opts (
233                         0,
234                         audio_channel_layout,
235                         audio_sample_format,
236                         _fs->target_sample_rate(),
237                         audio_channel_layout,
238                         audio_sample_format,
239                         _fs->audio_sample_rate,
240                         0, 0
241                         );
242                 
243                 swr_init (_swr_context);
244 #else
245                 throw EncodeError ("Cannot resample audio as libswresample is not present");
246 #endif
247         } else {
248 #ifdef HAVE_SWRESAMPLE
249                 _swr_context = 0;
250 #endif          
251         }
252         
253         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
254                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
255         }
256
257         vector<ServerDescription*> servers = Config::instance()->servers ();
258
259         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
260                 for (int j = 0; j < (*i)->threads (); ++j) {
261                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
262                 }
263         }
264 }
265
266 void
267 J2KWAVEncoder::process_end ()
268 {
269         boost::mutex::scoped_lock lock (_worker_mutex);
270
271         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
272
273         /* Keep waking workers until the queue is empty */
274         while (!_queue.empty ()) {
275                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
276                 _worker_condition.notify_all ();
277                 _worker_condition.wait (lock);
278         }
279
280         lock.unlock ();
281         
282         terminate_worker_threads ();
283
284         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
285
286         /* The following sequence of events can occur in the above code:
287              1. a remote worker takes the last image off the queue
288              2. the loop above terminates
289              3. the remote worker fails to encode the image and puts it back on the queue
290              4. the remote worker is then terminated by terminate_worker_threads
291
292              So just mop up anything left in the queue here.
293         */
294
295         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
296                 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
297                 try {
298                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
299                         e->write (_opt, (*i)->frame ());
300                         frame_done ((*i)->frame ());
301                 } catch (std::exception& e) {
302                         _log->log (String::compose ("Local encode failed (%1)", e.what ()));
303                 }
304         }
305
306 #if HAVE_SWRESAMPLE     
307         if (_swr_context) {
308
309                 while (1) {
310                         uint8_t buffer[256 * _fs->bytes_per_sample() * _fs->audio_channels];
311                         uint8_t* out[2] = {
312                                 buffer,
313                                 0
314                         };
315
316                         int const frames = swr_convert (_swr_context, out, 256, 0, 0);
317
318                         if (frames < 0) {
319                                 throw EncodeError ("could not run sample-rate converter");
320                         }
321
322                         if (frames == 0) {
323                                 break;
324                         }
325
326                         write_audio (buffer, frames * _fs->bytes_per_sample() * _fs->audio_channels);
327                 }
328
329                 swr_free (&_swr_context);
330         }
331 #endif  
332         
333         close_sound_files ();
334
335         /* Rename .wav.tmp files to .wav */
336         for (int i = 0; i < _fs->audio_channels; ++i) {
337                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
338                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
339                 }
340                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
341         }
342 }
343
344 void
345 J2KWAVEncoder::process_audio (uint8_t* data, int size)
346 {
347         /* This is a buffer we might use if we are sample-rate converting;
348            it will need freeing if so.
349         */
350         uint8_t* out_buffer = 0;
351         
352         /* Maybe sample-rate convert */
353 #if HAVE_SWRESAMPLE     
354         if (_swr_context) {
355
356                 uint8_t const * in[2] = {
357                         data,
358                         0
359                 };
360
361                 /* Here's samples per channel */
362                 int const samples = size / _fs->bytes_per_sample();
363                 
364                 /* And here's frames (where 1 frame is a collection of samples, 1 for each channel,
365                    so for 5.1 a frame would be 6 samples)
366                 */
367                 int const frames = samples / _fs->audio_channels;
368
369                 /* Compute the resampled frame count and add 32 for luck */
370                 int const out_buffer_size_frames = ceil (frames * _fs->target_sample_rate() / _fs->audio_sample_rate) + 32;
371                 int const out_buffer_size_bytes = out_buffer_size_frames * _fs->audio_channels * _fs->bytes_per_sample();
372                 out_buffer = new uint8_t[out_buffer_size_bytes];
373
374                 uint8_t* out[2] = {
375                         out_buffer, 
376                         0
377                 };
378
379                 /* Resample audio */
380                 int out_frames = swr_convert (_swr_context, out, out_buffer_size_frames, in, frames);
381                 if (out_frames < 0) {
382                         throw EncodeError ("could not run sample-rate converter");
383                 }
384
385                 /* And point our variables at the resampled audio */
386                 data = out_buffer;
387                 size = out_frames * _fs->audio_channels * _fs->bytes_per_sample();
388         }
389 #endif
390
391         write_audio (data, size);
392
393         /* Delete the sample-rate conversion buffer, if it exists */
394         delete[] out_buffer;
395 }
396
397 void
398 J2KWAVEncoder::write_audio (uint8_t* data, int size)
399 {
400         /* XXX: we are assuming that the _deinterleave_buffer_size is a multiple
401            of the sample size and that size is a multiple of _fs->audio_channels * sample_size.
402         */
403
404         assert ((size % (_fs->audio_channels * _fs->bytes_per_sample())) == 0);
405         assert ((_deinterleave_buffer_size % _fs->bytes_per_sample()) == 0);
406         
407         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
408         
409         /* Number of bytes left to read this time */
410         int remaining = size;
411         /* Our position in the output buffers, in bytes */
412         int position = 0;
413         while (remaining > 0) {
414                 /* How many bytes of the deinterleaved data to do this time */
415                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
416                 for (int i = 0; i < _fs->audio_channels; ++i) {
417                         for (int j = 0; j < this_time; j += _fs->bytes_per_sample()) {
418                                 for (int k = 0; k < _fs->bytes_per_sample(); ++k) {
419                                         int const to = j + k;
420                                         int const from = position + (i * _fs->bytes_per_sample()) + (j * _fs->audio_channels) + k;
421                                         _deinterleave_buffer[to] = data[from];
422                                 }
423                         }
424                         
425                         switch (_fs->audio_sample_format) {
426                         case AV_SAMPLE_FMT_S16:
427                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / _fs->bytes_per_sample());
428                                 break;
429                         default:
430                                 throw EncodeError ("unknown audio sample format");
431                         }
432                 }
433                 
434                 position += this_time;
435                 remaining -= this_time * _fs->audio_channels;
436         }
437 }
438