Basic attempt to catch exceptions in the writer thread and pass them safely back...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40 #include "writer.h"
41
42 using std::pair;
43 using std::string;
44 using std::stringstream;
45 using std::vector;
46 using std::list;
47 using std::cout;
48 using std::make_pair;
49 using namespace boost;
50
51 int const Encoder::_history_size = 25;
52
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<Film> f)
55         : _film (f)
56         , _video_frames_in (0)
57         , _video_frames_out (0)
58 #ifdef HAVE_SWRESAMPLE    
59         , _swr_context (0)
60 #endif
61         , _have_a_real_frame (false)
62         , _terminate (false)
63 {
64         
65 }
66
67 Encoder::~Encoder ()
68 {
69         terminate_threads ();
70         if (_writer) {
71                 _writer->finish ();
72         }
73 }
74
75 void
76 Encoder::process_begin ()
77 {
78         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
79 #ifdef HAVE_SWRESAMPLE
80
81                 stringstream s;
82                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
83                 _film->log()->log (s.str ());
84
85                 /* We will be using planar float data when we call the resampler */
86                 _swr_context = swr_alloc_set_opts (
87                         0,
88                         _film->audio_stream()->channel_layout(),
89                         AV_SAMPLE_FMT_FLTP,
90                         _film->target_audio_sample_rate(),
91                         _film->audio_stream()->channel_layout(),
92                         AV_SAMPLE_FMT_FLTP,
93                         _film->audio_stream()->sample_rate(),
94                         0, 0
95                         );
96                 
97                 swr_init (_swr_context);
98 #else
99                 throw EncodeError ("Cannot resample audio as libswresample is not present");
100 #endif
101         } else {
102 #ifdef HAVE_SWRESAMPLE
103                 _swr_context = 0;
104 #endif          
105         }
106
107         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
108                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
109         }
110
111         vector<ServerDescription*> servers = Config::instance()->servers ();
112
113         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
114                 for (int j = 0; j < (*i)->threads (); ++j) {
115                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
116                 }
117         }
118
119         _writer.reset (new Writer (_film));
120 }
121
122
123 void
124 Encoder::process_end ()
125 {
126 #if HAVE_SWRESAMPLE     
127         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
128
129                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
130                         
131                 while (1) {
132                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
133
134                         if (frames < 0) {
135                                 throw EncodeError ("could not run sample-rate converter");
136                         }
137
138                         if (frames == 0) {
139                                 break;
140                         }
141
142                         out->set_frames (frames);
143                         _writer->write (out);
144                 }
145
146                 swr_free (&_swr_context);
147         }
148 #endif
149
150         boost::mutex::scoped_lock lock (_mutex);
151
152         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
153
154         /* Keep waking workers until the queue is empty */
155         while (!_queue.empty ()) {
156                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
157                 _condition.notify_all ();
158                 _condition.wait (lock);
159         }
160
161         lock.unlock ();
162         
163         terminate_threads ();
164
165         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
166
167         /* The following sequence of events can occur in the above code:
168              1. a remote worker takes the last image off the queue
169              2. the loop above terminates
170              3. the remote worker fails to encode the image and puts it back on the queue
171              4. the remote worker is then terminated by terminate_threads
172
173              So just mop up anything left in the queue here.
174         */
175
176         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
177                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
178                 try {
179                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
180                         frame_done ();
181                 } catch (std::exception& e) {
182                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
183                 }
184         }
185
186         _writer->finish ();
187         _writer.reset ();
188 }       
189
190 /** @return an estimate of the current number of frames we are encoding per second,
191  *  or 0 if not known.
192  */
193 float
194 Encoder::current_frames_per_second () const
195 {
196         boost::mutex::scoped_lock lock (_history_mutex);
197         if (int (_time_history.size()) < _history_size) {
198                 return 0;
199         }
200
201         struct timeval now;
202         gettimeofday (&now, 0);
203
204         return _history_size / (seconds (now) - seconds (_time_history.back ()));
205 }
206
207 /** @return Number of video frames that have been sent out */
208 int
209 Encoder::video_frames_out () const
210 {
211         boost::mutex::scoped_lock (_history_mutex);
212         return _video_frames_out;
213 }
214
215 /** Should be called when a frame has been encoded successfully.
216  *  @param n Source frame index.
217  */
218 void
219 Encoder::frame_done ()
220 {
221         boost::mutex::scoped_lock lock (_history_mutex);
222         
223         struct timeval tv;
224         gettimeofday (&tv, 0);
225         _time_history.push_front (tv);
226         if (int (_time_history.size()) > _history_size) {
227                 _time_history.pop_back ();
228         }
229 }
230
231 void
232 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
233 {
234         DCPFrameRate dfr (_film->frames_per_second ());
235         
236         if (dfr.skip && (_video_frames_in % 2)) {
237                 ++_video_frames_in;
238                 return;
239         }
240
241         boost::mutex::scoped_lock lock (_mutex);
242
243         /* Wait until the queue has gone down a bit */
244         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
245                 TIMING ("decoder sleeps with queue of %1", _queue.size());
246                 _condition.wait (lock);
247                 TIMING ("decoder wakes with queue of %1", _queue.size());
248         }
249
250         if (_terminate) {
251                 return;
252         }
253
254         if (_writer->thrown ()) {
255                 _writer->rethrow ();
256         }
257
258         if (_writer->can_fake_write (_video_frames_out)) {
259                 _writer->fake_write (_video_frames_out);
260                 _have_a_real_frame = false;
261                 frame_done ();
262         } else if (same && _have_a_real_frame) {
263                 /* Use the last frame that we encoded. */
264                 _writer->repeat (_video_frames_out);
265                 frame_done ();
266         } else {
267                 /* Queue this new frame for encoding */
268                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
269                 TIMING ("adding to queue of %1", _queue.size ());
270                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
271                                           new DCPVideoFrame (
272                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
273                                                   _film->subtitle_offset(), _film->subtitle_scale(),
274                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
275                                                   _film->colour_lut(), _film->j2k_bandwidth(),
276                                                   _film->log()
277                                                   )
278                                           ));
279                 
280                 _condition.notify_all ();
281                 _have_a_real_frame = true;
282         }
283
284         ++_video_frames_in;
285         ++_video_frames_out;
286
287         if (dfr.repeat) {
288                 _writer->repeat (_video_frames_out);
289                 ++_video_frames_out;
290                 frame_done ();
291         }
292 }
293
294 void
295 Encoder::process_audio (shared_ptr<AudioBuffers> data)
296 {
297 #if HAVE_SWRESAMPLE
298         /* Maybe sample-rate convert */
299         if (_swr_context) {
300
301                 /* Compute the resampled frames count and add 32 for luck */
302                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
303
304                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
305
306                 /* Resample audio */
307                 int const resampled_frames = swr_convert (
308                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
309                         );
310                 
311                 if (resampled_frames < 0) {
312                         throw EncodeError ("could not run sample-rate converter");
313                 }
314
315                 resampled->set_frames (resampled_frames);
316                 
317                 /* And point our variables at the resampled audio */
318                 data = resampled;
319         }
320 #endif
321
322         if (_film->audio_channels() == 1) {
323                 /* We need to switch things around so that the mono channel is on
324                    the centre channel of a 5.1 set (with other channels silent).
325                 */
326
327                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
328                 b->make_silent (libdcp::LEFT);
329                 b->make_silent (libdcp::RIGHT);
330                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
331                 b->make_silent (libdcp::LFE);
332                 b->make_silent (libdcp::LS);
333                 b->make_silent (libdcp::RS);
334
335                 data = b;
336         }
337
338         _writer->write (data);
339 }
340
341 void
342 Encoder::terminate_threads ()
343 {
344         boost::mutex::scoped_lock lock (_mutex);
345         _terminate = true;
346         _condition.notify_all ();
347         lock.unlock ();
348
349         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
350                 (*i)->join ();
351                 delete *i;
352         }
353 }
354
355 void
356 Encoder::encoder_thread (ServerDescription* server)
357 {
358         /* Number of seconds that we currently wait between attempts
359            to connect to the server; not relevant for localhost
360            encodings.
361         */
362         int remote_backoff = 0;
363         
364         while (1) {
365
366                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
367                 boost::mutex::scoped_lock lock (_mutex);
368                 while (_queue.empty () && !_terminate) {
369                         _condition.wait (lock);
370                 }
371
372                 if (_terminate) {
373                         return;
374                 }
375
376                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
377                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
378                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
379                 _queue.pop_front ();
380                 
381                 lock.unlock ();
382
383                 shared_ptr<EncodedData> encoded;
384
385                 if (server) {
386                         try {
387                                 encoded = vf->encode_remotely (server);
388
389                                 if (remote_backoff > 0) {
390                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
391                                 }
392                                 
393                                 /* This job succeeded, so remove any backoff */
394                                 remote_backoff = 0;
395                                 
396                         } catch (std::exception& e) {
397                                 if (remote_backoff < 60) {
398                                         /* back off more */
399                                         remote_backoff += 10;
400                                 }
401                                 _film->log()->log (
402                                         String::compose (
403                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
404                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
405                                         );
406                         }
407                                 
408                 } else {
409                         try {
410                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
411                                 encoded = vf->encode_locally ();
412                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
413                         } catch (std::exception& e) {
414                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
415                         }
416                 }
417
418                 if (encoded) {
419                         _writer->write (encoded, vf->frame ());
420                         frame_done ();
421                 } else {
422                         lock.lock ();
423                         _film->log()->log (
424                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
425                                 );
426                         _queue.push_front (vf);
427                         lock.unlock ();
428                 }
429
430                 if (remote_backoff > 0) {
431                         dvdomatic_sleep (remote_backoff);
432                 }
433
434                 lock.lock ();
435                 _condition.notify_all ();
436         }
437 }