Try to fix crash on finishing off DCPs with mono -> 5.1 conversions.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40 #include "writer.h"
41
42 using std::pair;
43 using std::string;
44 using std::stringstream;
45 using std::vector;
46 using std::list;
47 using std::cout;
48 using std::make_pair;
49 using namespace boost;
50
51 int const Encoder::_history_size = 25;
52
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<Film> f)
55         : _film (f)
56         , _video_frames_in (0)
57         , _video_frames_out (0)
58 #ifdef HAVE_SWRESAMPLE    
59         , _swr_context (0)
60 #endif
61         , _have_a_real_frame (false)
62         , _terminate (false)
63 {
64         
65 }
66
67 Encoder::~Encoder ()
68 {
69         terminate_threads ();
70         if (_writer) {
71                 _writer->finish ();
72         }
73 }
74
75 void
76 Encoder::process_begin ()
77 {
78         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
79 #ifdef HAVE_SWRESAMPLE
80
81                 stringstream s;
82                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
83                 _film->log()->log (s.str ());
84
85                 /* We will be using planar float data when we call the resampler */
86                 _swr_context = swr_alloc_set_opts (
87                         0,
88                         _film->audio_stream()->channel_layout(),
89                         AV_SAMPLE_FMT_FLTP,
90                         _film->target_audio_sample_rate(),
91                         _film->audio_stream()->channel_layout(),
92                         AV_SAMPLE_FMT_FLTP,
93                         _film->audio_stream()->sample_rate(),
94                         0, 0
95                         );
96                 
97                 swr_init (_swr_context);
98 #else
99                 throw EncodeError ("Cannot resample audio as libswresample is not present");
100 #endif
101         } else {
102 #ifdef HAVE_SWRESAMPLE
103                 _swr_context = 0;
104 #endif          
105         }
106
107         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
108                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
109         }
110
111         vector<ServerDescription*> servers = Config::instance()->servers ();
112
113         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
114                 for (int j = 0; j < (*i)->threads (); ++j) {
115                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
116                 }
117         }
118
119         _writer.reset (new Writer (_film));
120 }
121
122
123 void
124 Encoder::process_end ()
125 {
126 #if HAVE_SWRESAMPLE     
127         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
128
129                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
130                         
131                 while (1) {
132                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
133
134                         if (frames < 0) {
135                                 throw EncodeError ("could not run sample-rate converter");
136                         }
137
138                         if (frames == 0) {
139                                 break;
140                         }
141
142                         out->set_frames (frames);
143                         write_audio (out);
144                 }
145
146                 swr_free (&_swr_context);
147         }
148 #endif
149
150         boost::mutex::scoped_lock lock (_mutex);
151
152         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
153
154         /* Keep waking workers until the queue is empty */
155         while (!_queue.empty ()) {
156                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
157                 _condition.notify_all ();
158                 _condition.wait (lock);
159         }
160
161         lock.unlock ();
162         
163         terminate_threads ();
164
165         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
166
167         /* The following sequence of events can occur in the above code:
168              1. a remote worker takes the last image off the queue
169              2. the loop above terminates
170              3. the remote worker fails to encode the image and puts it back on the queue
171              4. the remote worker is then terminated by terminate_threads
172
173              So just mop up anything left in the queue here.
174         */
175
176         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
177                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
178                 try {
179                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
180                         frame_done ();
181                 } catch (std::exception& e) {
182                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
183                 }
184         }
185
186         _writer->finish ();
187         _writer.reset ();
188 }       
189
190 /** @return an estimate of the current number of frames we are encoding per second,
191  *  or 0 if not known.
192  */
193 float
194 Encoder::current_frames_per_second () const
195 {
196         boost::mutex::scoped_lock lock (_history_mutex);
197         if (int (_time_history.size()) < _history_size) {
198                 return 0;
199         }
200
201         struct timeval now;
202         gettimeofday (&now, 0);
203
204         return _history_size / (seconds (now) - seconds (_time_history.back ()));
205 }
206
207 /** @return Number of video frames that have been sent out */
208 int
209 Encoder::video_frames_out () const
210 {
211         boost::mutex::scoped_lock (_history_mutex);
212         return _video_frames_out;
213 }
214
215 /** Should be called when a frame has been encoded successfully.
216  *  @param n Source frame index.
217  */
218 void
219 Encoder::frame_done ()
220 {
221         boost::mutex::scoped_lock lock (_history_mutex);
222         
223         struct timeval tv;
224         gettimeofday (&tv, 0);
225         _time_history.push_front (tv);
226         if (int (_time_history.size()) > _history_size) {
227                 _time_history.pop_back ();
228         }
229 }
230
231 void
232 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
233 {
234         DCPFrameRate dfr (_film->frames_per_second ());
235         
236         if (dfr.skip && (_video_frames_in % 2)) {
237                 ++_video_frames_in;
238                 return;
239         }
240
241         boost::mutex::scoped_lock lock (_mutex);
242
243         /* Wait until the queue has gone down a bit */
244         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
245                 TIMING ("decoder sleeps with queue of %1", _queue.size());
246                 _condition.wait (lock);
247                 TIMING ("decoder wakes with queue of %1", _queue.size());
248         }
249
250         if (_terminate) {
251                 return;
252         }
253
254         if (_writer->thrown ()) {
255                 _writer->rethrow ();
256         }
257
258         if (_writer->can_fake_write (_video_frames_out)) {
259                 _writer->fake_write (_video_frames_out);
260                 _have_a_real_frame = false;
261                 frame_done ();
262         } else if (same && _have_a_real_frame) {
263                 /* Use the last frame that we encoded. */
264                 _writer->repeat (_video_frames_out);
265                 frame_done ();
266         } else {
267                 /* Queue this new frame for encoding */
268                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
269                 TIMING ("adding to queue of %1", _queue.size ());
270                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
271                                           new DCPVideoFrame (
272                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
273                                                   _film->subtitle_offset(), _film->subtitle_scale(),
274                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
275                                                   _film->colour_lut(), _film->j2k_bandwidth(),
276                                                   _film->log()
277                                                   )
278                                           ));
279                 
280                 _condition.notify_all ();
281                 _have_a_real_frame = true;
282         }
283
284         ++_video_frames_in;
285         ++_video_frames_out;
286
287         if (dfr.repeat) {
288                 _writer->repeat (_video_frames_out);
289                 ++_video_frames_out;
290                 frame_done ();
291         }
292 }
293
294 void
295 Encoder::process_audio (shared_ptr<AudioBuffers> data)
296 {
297 #if HAVE_SWRESAMPLE
298         /* Maybe sample-rate convert */
299         if (_swr_context) {
300
301                 /* Compute the resampled frames count and add 32 for luck */
302                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
303
304                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
305
306                 /* Resample audio */
307                 int const resampled_frames = swr_convert (
308                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
309                         );
310                 
311                 if (resampled_frames < 0) {
312                         throw EncodeError ("could not run sample-rate converter");
313                 }
314
315                 resampled->set_frames (resampled_frames);
316                 
317                 /* And point our variables at the resampled audio */
318                 data = resampled;
319         }
320 #endif
321
322         write_audio (data);
323 }
324
325 void
326 Encoder::terminate_threads ()
327 {
328         boost::mutex::scoped_lock lock (_mutex);
329         _terminate = true;
330         _condition.notify_all ();
331         lock.unlock ();
332
333         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
334                 (*i)->join ();
335                 delete *i;
336         }
337 }
338
339 void
340 Encoder::encoder_thread (ServerDescription* server)
341 {
342         /* Number of seconds that we currently wait between attempts
343            to connect to the server; not relevant for localhost
344            encodings.
345         */
346         int remote_backoff = 0;
347         
348         while (1) {
349
350                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
351                 boost::mutex::scoped_lock lock (_mutex);
352                 while (_queue.empty () && !_terminate) {
353                         _condition.wait (lock);
354                 }
355
356                 if (_terminate) {
357                         return;
358                 }
359
360                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
361                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
362                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
363                 _queue.pop_front ();
364                 
365                 lock.unlock ();
366
367                 shared_ptr<EncodedData> encoded;
368
369                 if (server) {
370                         try {
371                                 encoded = vf->encode_remotely (server);
372
373                                 if (remote_backoff > 0) {
374                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
375                                 }
376                                 
377                                 /* This job succeeded, so remove any backoff */
378                                 remote_backoff = 0;
379                                 
380                         } catch (std::exception& e) {
381                                 if (remote_backoff < 60) {
382                                         /* back off more */
383                                         remote_backoff += 10;
384                                 }
385                                 _film->log()->log (
386                                         String::compose (
387                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
388                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
389                                         );
390                         }
391                                 
392                 } else {
393                         try {
394                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
395                                 encoded = vf->encode_locally ();
396                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
397                         } catch (std::exception& e) {
398                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
399                         }
400                 }
401
402                 if (encoded) {
403                         _writer->write (encoded, vf->frame ());
404                         frame_done ();
405                 } else {
406                         lock.lock ();
407                         _film->log()->log (
408                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
409                                 );
410                         _queue.push_front (vf);
411                         lock.unlock ();
412                 }
413
414                 if (remote_backoff > 0) {
415                         dvdomatic_sleep (remote_backoff);
416                 }
417
418                 lock.lock ();
419                 _condition.notify_all ();
420         }
421 }
422
423 void
424 Encoder::write_audio (shared_ptr<const AudioBuffers> data)
425 {
426         if (_film->audio_channels() == 1) {
427                 /* We need to switch things around so that the mono channel is on
428                    the centre channel of a 5.1 set (with other channels silent).
429                 */
430
431                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
432                 b->make_silent (libdcp::LEFT);
433                 b->make_silent (libdcp::RIGHT);
434                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
435                 b->make_silent (libdcp::LFE);
436                 b->make_silent (libdcp::LS);
437                 b->make_silent (libdcp::RS);
438
439                 data = b;
440         }
441
442         _writer->write (data);
443 }