Merge master.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "options.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
38 #include "server.h"
39 #include "filter.h"
40 #include "log.h"
41 #include "cross.h"
42 #include "film.h"
43
44 using std::string;
45 using std::stringstream;
46 using std::list;
47 using std::vector;
48 using std::pair;
49 using boost::shared_ptr;
50 using boost::thread;
51 using boost::lexical_cast;
52
53 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const Film> f, shared_ptr<const Options> o)
54         : Encoder (f, o)
55 #ifdef HAVE_SWRESAMPLE    
56         , _swr_context (0)
57 #endif    
58         , _audio_frames_written (0)
59         , _process_end (false)
60 {
61         if (_film->audio_stream()) {
62                 /* Create sound output files with .tmp suffixes; we will rename
63                    them if and when we complete.
64                 */
65                 for (int i = 0; i < _film->audio_channels(); ++i) {
66                         SF_INFO sf_info;
67                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream().get().sample_rate());
68                         /* We write mono files */
69                         sf_info.channels = 1;
70                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
71                         SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
72                         if (f == 0) {
73                                 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
74                         }
75                         _sound_files.push_back (f);
76                 }
77         }
78 }
79
80 J2KWAVEncoder::~J2KWAVEncoder ()
81 {
82         terminate_worker_threads ();
83         close_sound_files ();
84 }
85
86 void
87 J2KWAVEncoder::terminate_worker_threads ()
88 {
89         boost::mutex::scoped_lock lock (_worker_mutex);
90         _process_end = true;
91         _worker_condition.notify_all ();
92         lock.unlock ();
93
94         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
95                 (*i)->join ();
96                 delete *i;
97         }
98 }
99
100 void
101 J2KWAVEncoder::close_sound_files ()
102 {
103         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
104                 sf_close (*i);
105         }
106
107         _sound_files.clear ();
108 }       
109
110 void
111 J2KWAVEncoder::do_process_video (shared_ptr<const Image> yuv, SourceFrame frame, shared_ptr<Subtitle> sub)
112 {
113         boost::mutex::scoped_lock lock (_worker_mutex);
114
115         /* Wait until the queue has gone down a bit */
116         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
117                 TIMING ("decoder sleeps with queue of %1", _queue.size());
118                 _worker_condition.wait (lock);
119                 TIMING ("decoder wakes with queue of %1", _queue.size());
120         }
121
122         if (_process_end) {
123                 return;
124         }
125
126         /* Only do the processing if we don't already have a file for this frame */
127         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
128                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
129                 TIMING ("adding to queue of %1", _queue.size ());
130                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
131                                           new DCPVideoFrame (
132                                                   yuv, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
133                                                   _film->scaler(), frame, _film->frames_per_second(), s.second,
134                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
135                                                   _film->log()
136                                                   )
137                                           ));
138                 
139                 _worker_condition.notify_all ();
140         } else {
141                 frame_skipped ();
142         }
143 }
144
145 void
146 J2KWAVEncoder::encoder_thread (ServerDescription* server)
147 {
148         /* Number of seconds that we currently wait between attempts
149            to connect to the server; not relevant for localhost
150            encodings.
151         */
152         int remote_backoff = 0;
153         
154         while (1) {
155
156                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
157                 boost::mutex::scoped_lock lock (_worker_mutex);
158                 while (_queue.empty () && !_process_end) {
159                         _worker_condition.wait (lock);
160                 }
161
162                 if (_process_end) {
163                         return;
164                 }
165
166                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
167                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
168                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
169                 _queue.pop_front ();
170                 
171                 lock.unlock ();
172
173                 shared_ptr<EncodedData> encoded;
174
175                 if (server) {
176                         try {
177                                 encoded = vf->encode_remotely (server);
178
179                                 if (remote_backoff > 0) {
180                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
181                                 }
182                                 
183                                 /* This job succeeded, so remove any backoff */
184                                 remote_backoff = 0;
185                                 
186                         } catch (std::exception& e) {
187                                 if (remote_backoff < 60) {
188                                         /* back off more */
189                                         remote_backoff += 10;
190                                 }
191                                 _film->log()->log (
192                                         String::compose (
193                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
194                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
195                                         );
196                         }
197                                 
198                 } else {
199                         try {
200                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
201                                 encoded = vf->encode_locally ();
202                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
203                         } catch (std::exception& e) {
204                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
205                         }
206                 }
207
208                 if (encoded) {
209                         encoded->write (_opt, vf->frame ());
210                         frame_done (vf->frame ());
211                 } else {
212                         lock.lock ();
213                         _film->log()->log (
214                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
215                                 );
216                         _queue.push_front (vf);
217                         lock.unlock ();
218                 }
219
220                 if (remote_backoff > 0) {
221                         dvdomatic_sleep (remote_backoff);
222                 }
223
224                 lock.lock ();
225                 _worker_condition.notify_all ();
226         }
227 }
228
229 void
230 J2KWAVEncoder::process_begin ()
231 {
232         if (_film->audio_stream() && _film->audio_stream().get().sample_rate() != _film->target_audio_sample_rate()) {
233 #ifdef HAVE_SWRESAMPLE
234
235                 stringstream s;
236                 s << "Will resample audio from " << _film->audio_stream().get().sample_rate() << " to " << _film->target_audio_sample_rate();
237                 _film->log()->log (s.str ());
238
239                 /* We will be using planar float data when we call the resampler */
240                 _swr_context = swr_alloc_set_opts (
241                         0,
242                         _film->audio_stream().get().channel_layout(),
243                         AV_SAMPLE_FMT_FLTP,
244                         _film->target_audio_sample_rate(),
245                         _film->audio_stream().get().channel_layout(),
246                         AV_SAMPLE_FMT_FLTP,
247                         _film->audio_stream().get().sample_rate(),
248                         0, 0
249                         );
250                 
251                 swr_init (_swr_context);
252 #else
253                 throw EncodeError ("Cannot resample audio as libswresample is not present");
254 #endif
255         } else {
256 #ifdef HAVE_SWRESAMPLE
257                 _swr_context = 0;
258 #endif          
259         }
260         
261         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
262                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
263         }
264
265         vector<ServerDescription*> servers = Config::instance()->servers ();
266
267         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
268                 for (int j = 0; j < (*i)->threads (); ++j) {
269                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
270                 }
271         }
272 }
273
274 void
275 J2KWAVEncoder::process_end ()
276 {
277         boost::mutex::scoped_lock lock (_worker_mutex);
278
279         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
280
281         /* Keep waking workers until the queue is empty */
282         while (!_queue.empty ()) {
283                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
284                 _worker_condition.notify_all ();
285                 _worker_condition.wait (lock);
286         }
287
288         lock.unlock ();
289         
290         terminate_worker_threads ();
291
292         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
293
294         /* The following sequence of events can occur in the above code:
295              1. a remote worker takes the last image off the queue
296              2. the loop above terminates
297              3. the remote worker fails to encode the image and puts it back on the queue
298              4. the remote worker is then terminated by terminate_worker_threads
299
300              So just mop up anything left in the queue here.
301         */
302
303         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
304                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
305                 try {
306                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
307                         e->write (_opt, (*i)->frame ());
308                         frame_done ((*i)->frame ());
309                 } catch (std::exception& e) {
310                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
311                 }
312         }
313
314 #if HAVE_SWRESAMPLE     
315         if (_film->audio_stream() && _swr_context) {
316
317                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream().get().channels(), 256));
318                         
319                 while (1) {
320                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
321
322                         if (frames < 0) {
323                                 throw EncodeError ("could not run sample-rate converter");
324                         }
325
326                         if (frames == 0) {
327                                 break;
328                         }
329
330                         out->set_frames (frames);
331                         write_audio (out);
332                 }
333
334                 swr_free (&_swr_context);
335         }
336 #endif
337
338         if (_film->audio_stream()) {
339                 int const dcp_sr = dcp_audio_sample_rate (_film->audio_stream().get().sample_rate ());
340                 int64_t const extra_audio_frames = dcp_sr - (_audio_frames_written % dcp_sr);
341                 shared_ptr<AudioBuffers> silence (new AudioBuffers (_film->audio_stream().get().channels(), extra_audio_frames));
342                 silence->make_silent ();
343                 write_audio (silence);
344                 
345                 close_sound_files ();
346                 
347                 /* Rename .wav.tmp files to .wav */
348                 for (int i = 0; i < _film->audio_channels(); ++i) {
349                         if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
350                                 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
351                         }
352                         boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
353                 }
354         }
355 }
356
357 void
358 J2KWAVEncoder::do_process_audio (shared_ptr<const AudioBuffers> audio)
359 {
360         shared_ptr<AudioBuffers> resampled;
361         
362 #if HAVE_SWRESAMPLE
363         /* Maybe sample-rate convert */
364         if (_swr_context) {
365
366                 /* Compute the resampled frames count and add 32 for luck */
367                 int const max_resampled_frames = ceil (audio->frames() * _film->target_audio_sample_rate() / _film->audio_stream().get().sample_rate()) + 32;
368
369                 resampled.reset (new AudioBuffers (_film->audio_stream().get().channels(), max_resampled_frames));
370
371                 /* Resample audio */
372                 int const resampled_frames = swr_convert (
373                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
374                         );
375                 
376                 if (resampled_frames < 0) {
377                         throw EncodeError ("could not run sample-rate converter");
378                 }
379
380                 resampled->set_frames (resampled_frames);
381                 
382                 /* And point our variables at the resampled audio */
383                 audio = resampled;
384         }
385 #endif
386
387         write_audio (audio);
388 }
389
390 void
391 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
392 {
393         for (int i = 0; i < _film->audio_channels(); ++i) {
394                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
395         }
396
397         _audio_frames_written += audio->frames ();
398 }
399