Fix merge.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "options.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
38 #include "server.h"
39 #include "filter.h"
40 #include "log.h"
41 #include "cross.h"
42 #include "film.h"
43
44 using std::string;
45 using std::stringstream;
46 using std::list;
47 using std::vector;
48 using std::pair;
49 using std::cout;
50 using boost::shared_ptr;
51 using boost::thread;
52 using boost::lexical_cast;
53
54 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const Film> f, shared_ptr<const Options> o)
55         : Encoder (f, o)
56 #ifdef HAVE_SWRESAMPLE    
57         , _swr_context (0)
58 #endif    
59         , _audio_frames_written (0)
60         , _process_end (false)
61 {
62         if (_film->audio_stream()) {
63                 /* Create sound output files with .tmp suffixes; we will rename
64                    them if and when we complete.
65                 */
66                 for (int i = 0; i < _film->audio_channels(); ++i) {
67                         SF_INFO sf_info;
68                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
69                         /* We write mono files */
70                         sf_info.channels = 1;
71                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
72                         SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
73                         if (f == 0) {
74                                 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
75                         }
76                         _sound_files.push_back (f);
77                 }
78         }
79 }
80
81 J2KWAVEncoder::~J2KWAVEncoder ()
82 {
83         terminate_worker_threads ();
84         close_sound_files ();
85 }
86
87 void
88 J2KWAVEncoder::terminate_worker_threads ()
89 {
90         boost::mutex::scoped_lock lock (_worker_mutex);
91         _process_end = true;
92         _worker_condition.notify_all ();
93         lock.unlock ();
94
95         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
96                 (*i)->join ();
97                 delete *i;
98         }
99 }
100
101 void
102 J2KWAVEncoder::close_sound_files ()
103 {
104         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
105                 sf_close (*i);
106         }
107
108         _sound_files.clear ();
109 }       
110
111 void
112 J2KWAVEncoder::do_process_video (shared_ptr<Image> yuv, shared_ptr<Subtitle> sub)
113 {
114         boost::mutex::scoped_lock lock (_worker_mutex);
115
116         /* Wait until the queue has gone down a bit */
117         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
118                 TIMING ("decoder sleeps with queue of %1", _queue.size());
119                 _worker_condition.wait (lock);
120                 TIMING ("decoder wakes with queue of %1", _queue.size());
121         }
122
123         if (_process_end) {
124                 return;
125         }
126
127         /* Only do the processing if we don't already have a file for this frame */
128         if (!boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
129                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
130                 TIMING ("adding to queue of %1", _queue.size ());
131                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
132                                           new DCPVideoFrame (
133                                                   yuv, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
134                                                   _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
135                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
136                                                   _film->log()
137                                                   )
138                                           ));
139                 
140                 _worker_condition.notify_all ();
141         } else {
142                 frame_skipped ();
143         }
144 }
145
146 void
147 J2KWAVEncoder::encoder_thread (ServerDescription* server)
148 {
149         /* Number of seconds that we currently wait between attempts
150            to connect to the server; not relevant for localhost
151            encodings.
152         */
153         int remote_backoff = 0;
154         
155         while (1) {
156
157                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
158                 boost::mutex::scoped_lock lock (_worker_mutex);
159                 while (_queue.empty () && !_process_end) {
160                         _worker_condition.wait (lock);
161                 }
162
163                 if (_process_end) {
164                         return;
165                 }
166
167                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
168                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
169                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
170                 _queue.pop_front ();
171                 
172                 lock.unlock ();
173
174                 shared_ptr<EncodedData> encoded;
175
176                 if (server) {
177                         try {
178                                 encoded = vf->encode_remotely (server);
179
180                                 if (remote_backoff > 0) {
181                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
182                                 }
183                                 
184                                 /* This job succeeded, so remove any backoff */
185                                 remote_backoff = 0;
186                                 
187                         } catch (std::exception& e) {
188                                 if (remote_backoff < 60) {
189                                         /* back off more */
190                                         remote_backoff += 10;
191                                 }
192                                 _film->log()->log (
193                                         String::compose (
194                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
195                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
196                                         );
197                         }
198                                 
199                 } else {
200                         try {
201                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
202                                 encoded = vf->encode_locally ();
203                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
204                         } catch (std::exception& e) {
205                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
206                         }
207                 }
208
209                 if (encoded) {
210                         encoded->write (_opt, vf->frame ());
211                         frame_done ();
212                 } else {
213                         lock.lock ();
214                         _film->log()->log (
215                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
216                                 );
217                         _queue.push_front (vf);
218                         lock.unlock ();
219                 }
220
221                 if (remote_backoff > 0) {
222                         dvdomatic_sleep (remote_backoff);
223                 }
224
225                 lock.lock ();
226                 _worker_condition.notify_all ();
227         }
228 }
229
230 void
231 J2KWAVEncoder::process_begin ()
232 {
233         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
234 #ifdef HAVE_SWRESAMPLE
235
236                 stringstream s;
237                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
238                 _film->log()->log (s.str ());
239
240                 /* We will be using planar float data when we call the resampler */
241                 _swr_context = swr_alloc_set_opts (
242                         0,
243                         _film->audio_stream()->channel_layout(),
244                         AV_SAMPLE_FMT_FLTP,
245                         _film->target_audio_sample_rate(),
246                         _film->audio_stream()->channel_layout(),
247                         AV_SAMPLE_FMT_FLTP,
248                         _film->audio_stream()->sample_rate(),
249                         0, 0
250                         );
251                 
252                 swr_init (_swr_context);
253 #else
254                 throw EncodeError ("Cannot resample audio as libswresample is not present");
255 #endif
256         } else {
257 #ifdef HAVE_SWRESAMPLE
258                 _swr_context = 0;
259 #endif          
260         }
261         
262         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
263                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
264         }
265
266         vector<ServerDescription*> servers = Config::instance()->servers ();
267
268         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
269                 for (int j = 0; j < (*i)->threads (); ++j) {
270                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
271                 }
272         }
273 }
274
275 void
276 J2KWAVEncoder::process_end ()
277 {
278         boost::mutex::scoped_lock lock (_worker_mutex);
279
280         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
281
282         /* Keep waking workers until the queue is empty */
283         while (!_queue.empty ()) {
284                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
285                 _worker_condition.notify_all ();
286                 _worker_condition.wait (lock);
287         }
288
289         lock.unlock ();
290         
291         terminate_worker_threads ();
292
293         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
294
295         /* The following sequence of events can occur in the above code:
296              1. a remote worker takes the last image off the queue
297              2. the loop above terminates
298              3. the remote worker fails to encode the image and puts it back on the queue
299              4. the remote worker is then terminated by terminate_worker_threads
300
301              So just mop up anything left in the queue here.
302         */
303
304         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
305                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
306                 try {
307                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
308                         e->write (_opt, (*i)->frame ());
309                         frame_done ();
310                 } catch (std::exception& e) {
311                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
312                 }
313         }
314
315 #if HAVE_SWRESAMPLE     
316         if (_film->audio_stream() && _swr_context) {
317
318                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
319                         
320                 while (1) {
321                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
322
323                         if (frames < 0) {
324                                 throw EncodeError ("could not run sample-rate converter");
325                         }
326
327                         if (frames == 0) {
328                                 break;
329                         }
330
331                         out->set_frames (frames);
332                         write_audio (out);
333                 }
334
335                 swr_free (&_swr_context);
336         }
337 #endif
338
339         if (_film->audio_stream()) {
340                 close_sound_files ();
341                 
342                 /* Rename .wav.tmp files to .wav */
343                 for (int i = 0; i < _film->audio_channels(); ++i) {
344                         if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
345                                 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
346                         }
347                         boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
348                 }
349         }
350 }
351
352 void
353 J2KWAVEncoder::do_process_audio (shared_ptr<AudioBuffers> audio)
354 {
355         shared_ptr<AudioBuffers> resampled;
356         
357 #if HAVE_SWRESAMPLE
358         /* Maybe sample-rate convert */
359         if (_swr_context) {
360
361                 /* Compute the resampled frames count and add 32 for luck */
362                 int const max_resampled_frames = ceil ((int64_t) audio->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
363
364                 resampled.reset (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
365
366                 /* Resample audio */
367                 int const resampled_frames = swr_convert (
368                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
369                         );
370                 
371                 if (resampled_frames < 0) {
372                         throw EncodeError ("could not run sample-rate converter");
373                 }
374
375                 resampled->set_frames (resampled_frames);
376                 
377                 /* And point our variables at the resampled audio */
378                 audio = resampled;
379         }
380 #endif
381
382         write_audio (audio);
383 }
384
385 void
386 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
387 {
388         for (int i = 0; i < _film->audio_channels(); ++i) {
389                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
390         }
391
392         _audio_frames_written += audio->frames ();
393 }
394