a83c283d7fc08a062d1af4f07f21c9c24b97ad6f
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49 #ifdef HAVE_SWRESAMPLE    
50         , _swr_context (0)
51 #endif    
52         , _process_end (false)
53 {
54         /* Create sound output files with .tmp suffixes; we will rename
55            them if and when we complete.
56         */
57         for (int i = 0; i < _fs->audio_channels(); ++i) {
58                 SF_INFO sf_info;
59                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate());
60                 /* We write mono files */
61                 sf_info.channels = 1;
62                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
63                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
64                 if (f == 0) {
65                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
66                 }
67                 _sound_files.push_back (f);
68         }
69 }
70
71 J2KWAVEncoder::~J2KWAVEncoder ()
72 {
73         terminate_worker_threads ();
74         close_sound_files ();
75 }
76
77 void
78 J2KWAVEncoder::terminate_worker_threads ()
79 {
80         boost::mutex::scoped_lock lock (_worker_mutex);
81         _process_end = true;
82         _worker_condition.notify_all ();
83         lock.unlock ();
84
85         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
86                 (*i)->join ();
87                 delete *i;
88         }
89 }
90
91 void
92 J2KWAVEncoder::close_sound_files ()
93 {
94         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
95                 sf_close (*i);
96         }
97
98         _sound_files.clear ();
99 }       
100
101 void
102 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame, shared_ptr<Subtitle> sub)
103 {
104         boost::mutex::scoped_lock lock (_worker_mutex);
105
106         /* Wait until the queue has gone down a bit */
107         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
108                 TIMING ("decoder sleeps with queue of %1", _queue.size());
109                 _worker_condition.wait (lock);
110                 TIMING ("decoder wakes with queue of %1", _queue.size());
111         }
112
113         if (_process_end) {
114                 return;
115         }
116
117         /* Only do the processing if we don't already have a file for this frame */
118         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
119                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters());
120                 TIMING ("adding to queue of %1", _queue.size ());
121                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
122                                           new DCPVideoFrame (
123                                                   yuv, sub, _opt->out_size, _opt->padding, _fs->subtitle_offset(), _fs->subtitle_scale(),
124                                                   _fs->scaler(), frame, _fs->frames_per_second(), s.second,
125                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
126                                                   _log
127                                                   )
128                                           ));
129                 
130                 _worker_condition.notify_all ();
131         } else {
132                 frame_skipped ();
133         }
134 }
135
136 void
137 J2KWAVEncoder::encoder_thread (ServerDescription* server)
138 {
139         /* Number of seconds that we currently wait between attempts
140            to connect to the server; not relevant for localhost
141            encodings.
142         */
143         int remote_backoff = 0;
144         
145         while (1) {
146
147                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
148                 boost::mutex::scoped_lock lock (_worker_mutex);
149                 while (_queue.empty () && !_process_end) {
150                         _worker_condition.wait (lock);
151                 }
152
153                 if (_process_end) {
154                         return;
155                 }
156
157                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
158                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
159                 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()));
160                 _queue.pop_front ();
161                 
162                 lock.unlock ();
163
164                 shared_ptr<EncodedData> encoded;
165
166                 if (server) {
167                         try {
168                                 encoded = vf->encode_remotely (server);
169
170                                 if (remote_backoff > 0) {
171                                         _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
172                                 }
173                                 
174                                 /* This job succeeded, so remove any backoff */
175                                 remote_backoff = 0;
176                                 
177                         } catch (std::exception& e) {
178                                 if (remote_backoff < 60) {
179                                         /* back off more */
180                                         remote_backoff += 10;
181                                 }
182                                 _log->log (
183                                         String::compose (
184                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
185                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
186                                         );
187                         }
188                                 
189                 } else {
190                         try {
191                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
192                                 encoded = vf->encode_locally ();
193                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
194                         } catch (std::exception& e) {
195                                 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
196                         }
197                 }
198
199                 if (encoded) {
200                         encoded->write (_opt, vf->frame ());
201                         frame_done (vf->frame ());
202                 } else {
203                         lock.lock ();
204                         _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()));
205                         _queue.push_front (vf);
206                         lock.unlock ();
207                 }
208
209                 if (remote_backoff > 0) {
210                         dvdomatic_sleep (remote_backoff);
211                 }
212
213                 lock.lock ();
214                 _worker_condition.notify_all ();
215         }
216 }
217
218 void
219 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
220 {
221         if (_fs->audio_sample_rate() != _fs->target_sample_rate()) {
222 #ifdef HAVE_SWRESAMPLE
223
224                 stringstream s;
225                 s << "Will resample audio from " << _fs->audio_sample_rate() << " to " << _fs->target_sample_rate();
226                 _log->log (s.str ());
227
228                 /* We will be using planar float data when we call the resampler */
229                 _swr_context = swr_alloc_set_opts (
230                         0,
231                         audio_channel_layout,
232                         AV_SAMPLE_FMT_FLTP,
233                         _fs->target_sample_rate(),
234                         audio_channel_layout,
235                         AV_SAMPLE_FMT_FLTP,
236                         _fs->audio_sample_rate(),
237                         0, 0
238                         );
239                 
240                 swr_init (_swr_context);
241 #else
242                 throw EncodeError ("Cannot resample audio as libswresample is not present");
243 #endif
244         } else {
245 #ifdef HAVE_SWRESAMPLE
246                 _swr_context = 0;
247 #endif          
248         }
249         
250         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
251                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
252         }
253
254         vector<ServerDescription*> servers = Config::instance()->servers ();
255
256         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
257                 for (int j = 0; j < (*i)->threads (); ++j) {
258                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
259                 }
260         }
261 }
262
263 void
264 J2KWAVEncoder::process_end ()
265 {
266         boost::mutex::scoped_lock lock (_worker_mutex);
267
268         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
269
270         /* Keep waking workers until the queue is empty */
271         while (!_queue.empty ()) {
272                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
273                 _worker_condition.notify_all ();
274                 _worker_condition.wait (lock);
275         }
276
277         lock.unlock ();
278         
279         terminate_worker_threads ();
280
281         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
282
283         /* The following sequence of events can occur in the above code:
284              1. a remote worker takes the last image off the queue
285              2. the loop above terminates
286              3. the remote worker fails to encode the image and puts it back on the queue
287              4. the remote worker is then terminated by terminate_worker_threads
288
289              So just mop up anything left in the queue here.
290         */
291
292         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
293                 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
294                 try {
295                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
296                         e->write (_opt, (*i)->frame ());
297                         frame_done ((*i)->frame ());
298                 } catch (std::exception& e) {
299                         _log->log (String::compose ("Local encode failed (%1)", e.what ()));
300                 }
301         }
302
303 #if HAVE_SWRESAMPLE     
304         if (_swr_context) {
305
306                 shared_ptr<AudioBuffers> out (new AudioBuffers (_fs->audio_channels(), 256));
307                         
308                 while (1) {
309                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
310
311                         if (frames < 0) {
312                                 throw EncodeError ("could not run sample-rate converter");
313                         }
314
315                         if (frames == 0) {
316                                 break;
317                         }
318
319                         write_audio (out);
320                 }
321
322                 swr_free (&_swr_context);
323         }
324 #endif  
325         
326         close_sound_files ();
327
328         /* Rename .wav.tmp files to .wav */
329         for (int i = 0; i < _fs->audio_channels(); ++i) {
330                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
331                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
332                 }
333                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
334         }
335 }
336
337 void
338 J2KWAVEncoder::process_audio (shared_ptr<const AudioBuffers> audio)
339 {
340         shared_ptr<AudioBuffers> resampled;
341         
342 #if HAVE_SWRESAMPLE
343         /* Maybe sample-rate convert */
344         if (_swr_context) {
345
346                 /* Compute the resampled frames count and add 32 for luck */
347                 int const max_resampled_frames = ceil (audio->frames() * _fs->target_sample_rate() / _fs->audio_sample_rate()) + 32;
348
349                 resampled.reset (new AudioBuffers (_fs->audio_channels(), max_resampled_frames));
350
351                 /* Resample audio */
352                 int const resampled_frames = swr_convert (
353                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
354                         );
355                 
356                 if (resampled_frames < 0) {
357                         throw EncodeError ("could not run sample-rate converter");
358                 }
359
360                 resampled->set_frames (resampled_frames);
361                 
362                 /* And point our variables at the resampled audio */
363                 audio = resampled;
364         }
365 #endif
366
367         write_audio (audio);
368 }
369
370 void
371 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio) const
372 {
373         for (int i = 0; i < _fs->audio_channels(); ++i) {
374                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
375         }
376 }
377