Use openssl for all MD5-ing.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49         , _deinterleave_buffer_size (8192)
50         , _deinterleave_buffer (0)
51         , _process_end (false)
52 {
53         /* Create sound output files with .tmp suffixes; we will rename
54            them if and when we complete.
55         */
56         for (int i = 0; i < _fs->audio_channels; ++i) {
57                 SF_INFO sf_info;
58                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59                 /* We write mono files */
60                 sf_info.channels = 1;
61                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
63                 if (f == 0) {
64                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
65                 }
66                 _sound_files.push_back (f);
67         }
68
69         /* Create buffer for deinterleaving audio */
70         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
71 }
72
73 J2KWAVEncoder::~J2KWAVEncoder ()
74 {
75         terminate_worker_threads ();
76         delete[] _deinterleave_buffer;
77         close_sound_files ();
78 }
79
80 void
81 J2KWAVEncoder::terminate_worker_threads ()
82 {
83         boost::mutex::scoped_lock lock (_worker_mutex);
84         _process_end = true;
85         _worker_condition.notify_all ();
86         lock.unlock ();
87
88         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
89                 (*i)->join ();
90                 delete *i;
91         }
92 }
93
94 void
95 J2KWAVEncoder::close_sound_files ()
96 {
97         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
98                 sf_close (*i);
99         }
100
101         _sound_files.clear ();
102 }       
103
104 void
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
106 {
107         boost::mutex::scoped_lock lock (_worker_mutex);
108
109         /* Wait until the queue has gone down a bit */
110         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111                 _worker_condition.wait (lock);
112         }
113
114         if (_process_end) {
115                 return;
116         }
117
118         /* Only do the processing if we don't already have a file for this frame */
119         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
120                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
121                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
122                                           new DCPVideoFrame (
123                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
124                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
125                                                   _log
126                                                   )
127                                           ));
128                 
129                 _worker_condition.notify_all ();
130         } else {
131                 frame_skipped ();
132         }
133 }
134
135 void
136 J2KWAVEncoder::encoder_thread (ServerDescription* server)
137 {
138         /* Number of seconds that we currently wait between attempts
139            to connect to the server; not relevant for localhost
140            encodings.
141         */
142         int remote_backoff = 0;
143         
144         while (1) {
145                 boost::mutex::scoped_lock lock (_worker_mutex);
146                 while (_queue.empty () && !_process_end) {
147                         _worker_condition.wait (lock);
148                 }
149
150                 if (_process_end) {
151                         return;
152                 }
153
154                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
155                 _queue.pop_front ();
156                 
157                 lock.unlock ();
158
159                 shared_ptr<EncodedData> encoded;
160
161                 if (server) {
162                         try {
163                                 encoded = vf->encode_remotely (server);
164
165                                 if (remote_backoff > 0) {
166                                         stringstream s;
167                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
168                                         _log->log (s.str ());
169                                 }
170                                 
171                                 /* This job succeeded, so remove any backoff */
172                                 remote_backoff = 0;
173                                 
174                         } catch (std::exception& e) {
175                                 if (remote_backoff < 60) {
176                                         /* back off more */
177                                         remote_backoff += 10;
178                                 }
179                                 stringstream s;
180                                 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
181                                 _log->log (s.str ());
182                         }
183                                 
184                 } else {
185                         try {
186                                 encoded = vf->encode_locally ();
187                         } catch (std::exception& e) {
188                                 stringstream s;
189                                 s << "Local encode failed " << e.what() << ".";
190                                 _log->log (s.str ());
191                         }
192                 }
193
194                 if (encoded) {
195                         encoded->write (_opt, vf->frame ());
196                         frame_done (vf->frame ());
197                 } else {
198                         lock.lock ();
199                         _queue.push_front (vf);
200                         lock.unlock ();
201                 }
202
203                 if (remote_backoff > 0) {
204                         dvdomatic_sleep (remote_backoff);
205                 }
206
207                 lock.lock ();
208                 _worker_condition.notify_all ();
209         }
210 }
211
212 void
213 J2KWAVEncoder::process_begin ()
214 {
215         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
216                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
217         }
218
219         vector<ServerDescription*> servers = Config::instance()->servers ();
220
221         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
222                 for (int j = 0; j < (*i)->threads (); ++j) {
223                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
224                 }
225         }
226 }
227
228 void
229 J2KWAVEncoder::process_end ()
230 {
231         boost::mutex::scoped_lock lock (_worker_mutex);
232
233         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
234
235         /* Keep waking workers until the queue is empty */
236         while (!_queue.empty ()) {
237                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
238                 _worker_condition.notify_all ();
239                 _worker_condition.wait (lock);
240         }
241
242         lock.unlock ();
243         
244         terminate_worker_threads ();
245
246         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
247
248         /* The following sequence of events can occur in the above code:
249              1. a remote worker takes the last image off the queue
250              2. the loop above terminates
251              3. the remote worker fails to encode the image and puts it back on the queue
252              4. the remote worker is then terminated by terminate_worker_threads
253
254              So just mop up anything left in the queue here.
255         */
256
257         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
258                 stringstream s;
259                 s << "Encode left-over frame " << (*i)->frame();
260                 _log->log (s.str ());
261                 try {
262                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
263                         e->write (_opt, (*i)->frame ());
264                         frame_done ((*i)->frame ());
265                 } catch (std::exception& e) {
266                         stringstream s;
267                         s << "Local encode failed " << e.what() << ".";
268                         _log->log (s.str ());
269                 }
270         }
271         
272         close_sound_files ();
273
274         /* Rename .wav.tmp files to .wav */
275         for (int i = 0; i < _fs->audio_channels; ++i) {
276                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
277                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
278                 }
279                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
280         }
281 }
282
283 void
284 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
285 {
286         /* Size of a sample in bytes */
287         int const sample_size = 2;
288         
289         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
290            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
291         */
292         
293         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
294         
295         /* Number of bytes left to read this time */
296         int remaining = data_size;
297         /* Our position in the output buffers, in bytes */
298         int position = 0;
299         while (remaining > 0) {
300                 /* How many bytes of the deinterleaved data to do this time */
301                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
302                 for (int i = 0; i < _fs->audio_channels; ++i) {
303                         for (int j = 0; j < this_time; j += sample_size) {
304                                 for (int k = 0; k < sample_size; ++k) {
305                                         int const to = j + k;
306                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
307                                         _deinterleave_buffer[to] = data[from];
308                                 }
309                         }
310                         
311                         switch (_fs->audio_sample_format) {
312                         case AV_SAMPLE_FMT_S16:
313                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
314                                 break;
315                         default:
316                                 throw DecodeError ("unknown audio sample format");
317                         }
318                 }
319                 
320                 position += this_time;
321                 remaining -= this_time * _fs->audio_channels;
322         }
323 }