08c796350c54746b586fb8b6706e98a69936b534
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49         , _deinterleave_buffer_size (8192)
50         , _deinterleave_buffer (0)
51         , _process_end (false)
52 {
53         /* Create sound output files with .tmp suffixes; we will rename
54            them if and when we complete.
55         */
56         for (int i = 0; i < _fs->audio_channels; ++i) {
57                 SF_INFO sf_info;
58                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59                 /* We write mono files */
60                 sf_info.channels = 1;
61                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
63                 if (f == 0) {
64                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
65                 }
66                 _sound_files.push_back (f);
67         }
68
69         /* Create buffer for deinterleaving audio */
70         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
71 }
72
73 J2KWAVEncoder::~J2KWAVEncoder ()
74 {
75         terminate_worker_threads ();
76         delete[] _deinterleave_buffer;
77         close_sound_files ();
78 }
79
80 void
81 J2KWAVEncoder::terminate_worker_threads ()
82 {
83         boost::mutex::scoped_lock lock (_worker_mutex);
84         _process_end = true;
85         _worker_condition.notify_all ();
86         lock.unlock ();
87
88         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
89                 (*i)->join ();
90                 delete *i;
91         }
92 }
93
94 void
95 J2KWAVEncoder::close_sound_files ()
96 {
97         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
98                 sf_close (*i);
99         }
100
101         _sound_files.clear ();
102 }       
103
104 void
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
106 {
107         boost::mutex::scoped_lock lock (_worker_mutex);
108
109         /* Wait until the queue has gone down a bit */
110         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111                 TIMING ("decoder sleeps with queue of %1", _queue.size());
112                 _worker_condition.wait (lock);
113                 TIMING ("decoder wakes with queue of %1", _queue.size());
114         }
115
116         if (_process_end) {
117                 return;
118         }
119
120         /* Only do the processing if we don't already have a file for this frame */
121         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
122                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
123                 TIMING ("adding to queue of %1", _queue.size ());
124                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
125                                           new DCPVideoFrame (
126                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
127                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
128                                                   _log
129                                                   )
130                                           ));
131                 
132                 _worker_condition.notify_all ();
133         } else {
134                 frame_skipped ();
135         }
136 }
137
138 void
139 J2KWAVEncoder::encoder_thread (ServerDescription* server)
140 {
141         /* Number of seconds that we currently wait between attempts
142            to connect to the server; not relevant for localhost
143            encodings.
144         */
145         int remote_backoff = 0;
146         
147         while (1) {
148
149                 TIMING ("encoder thread %1 sleeps", pthread_self ());
150                 boost::mutex::scoped_lock lock (_worker_mutex);
151                 while (_queue.empty () && !_process_end) {
152                         _worker_condition.wait (lock);
153                 }
154
155                 if (_process_end) {
156                         return;
157                 }
158
159                 TIMING ("encoder thread %1 wakes with queue of %2", pthread_self(), _queue.size());
160                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
161                 _queue.pop_front ();
162                 
163                 lock.unlock ();
164
165                 shared_ptr<EncodedData> encoded;
166
167                 if (server) {
168                         try {
169                                 encoded = vf->encode_remotely (server);
170
171                                 if (remote_backoff > 0) {
172                                         _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
173                                 }
174                                 
175                                 /* This job succeeded, so remove any backoff */
176                                 remote_backoff = 0;
177                                 
178                         } catch (std::exception& e) {
179                                 if (remote_backoff < 60) {
180                                         /* back off more */
181                                         remote_backoff += 10;
182                                 }
183                                 _log->log (
184                                         String::compose (
185                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
186                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
187                                         );
188                         }
189                                 
190                 } else {
191                         try {
192                                 TIMING ("encoder thread %1 begins local encode of %2", pthread_self(), vf->frame());
193                                 encoded = vf->encode_locally ();
194                                 TIMING ("encoder thread %1 finishes local encode of %2", pthread_self(), vf->frame());
195                         } catch (std::exception& e) {
196                                 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
197                         }
198                 }
199
200                 if (encoded) {
201                         encoded->write (_opt, vf->frame ());
202                         frame_done (vf->frame ());
203                 } else {
204                         lock.lock ();
205                         _queue.push_front (vf);
206                         lock.unlock ();
207                 }
208
209                 if (remote_backoff > 0) {
210                         dvdomatic_sleep (remote_backoff);
211                 }
212
213                 lock.lock ();
214                 _worker_condition.notify_all ();
215         }
216 }
217
218 void
219 J2KWAVEncoder::process_begin ()
220 {
221         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
222                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
223         }
224
225         vector<ServerDescription*> servers = Config::instance()->servers ();
226
227         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
228                 for (int j = 0; j < (*i)->threads (); ++j) {
229                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
230                 }
231         }
232 }
233
234 void
235 J2KWAVEncoder::process_end ()
236 {
237         boost::mutex::scoped_lock lock (_worker_mutex);
238
239         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
240
241         /* Keep waking workers until the queue is empty */
242         while (!_queue.empty ()) {
243                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
244                 _worker_condition.notify_all ();
245                 _worker_condition.wait (lock);
246         }
247
248         lock.unlock ();
249         
250         terminate_worker_threads ();
251
252         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
253
254         /* The following sequence of events can occur in the above code:
255              1. a remote worker takes the last image off the queue
256              2. the loop above terminates
257              3. the remote worker fails to encode the image and puts it back on the queue
258              4. the remote worker is then terminated by terminate_worker_threads
259
260              So just mop up anything left in the queue here.
261         */
262
263         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
264                 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
265                 try {
266                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
267                         e->write (_opt, (*i)->frame ());
268                         frame_done ((*i)->frame ());
269                 } catch (std::exception& e) {
270                         _log->log (String::compose ("Local encode failed (%1)", e.what ()));
271                 }
272         }
273         
274         close_sound_files ();
275
276         /* Rename .wav.tmp files to .wav */
277         for (int i = 0; i < _fs->audio_channels; ++i) {
278                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
279                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
280                 }
281                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
282         }
283 }
284
285 void
286 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
287 {
288         /* Size of a sample in bytes */
289         int const sample_size = 2;
290         
291         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
292            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
293         */
294         
295         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
296         
297         /* Number of bytes left to read this time */
298         int remaining = data_size;
299         /* Our position in the output buffers, in bytes */
300         int position = 0;
301         while (remaining > 0) {
302                 /* How many bytes of the deinterleaved data to do this time */
303                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
304                 for (int i = 0; i < _fs->audio_channels; ++i) {
305                         for (int j = 0; j < this_time; j += sample_size) {
306                                 for (int k = 0; k < sample_size; ++k) {
307                                         int const to = j + k;
308                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
309                                         _deinterleave_buffer[to] = data[from];
310                                 }
311                         }
312                         
313                         switch (_fs->audio_sample_format) {
314                         case AV_SAMPLE_FMT_S16:
315                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
316                                 break;
317                         default:
318                                 throw DecodeError ("unknown audio sample format");
319                         }
320                 }
321                 
322                 position += this_time;
323                 remaining -= this_time * _fs->audio_channels;
324         }
325 }