6b14b269820589cf2288007365b0fd37befd00a5
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40 #include "writer.h"
41
42 using std::pair;
43 using std::string;
44 using std::stringstream;
45 using std::vector;
46 using std::list;
47 using std::cout;
48 using std::make_pair;
49 using namespace boost;
50
51 int const Encoder::_history_size = 25;
52
53 /** @param f Film that we are encoding.
54  *  @param o Options.
55  */
56 Encoder::Encoder (shared_ptr<Film> f)
57         : _film (f)
58         , _just_skipped (false)
59         , _video_frames_in (0)
60         , _video_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE    
62         , _swr_context (0)
63 #endif
64         , _have_a_real_frame (false)
65         , _terminate_encoder (false)
66 {
67         
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_worker_threads ();
73         if (_writer) {
74                 _writer->finish ();
75         }
76 }
77
78 void
79 Encoder::process_begin ()
80 {
81         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
82 #ifdef HAVE_SWRESAMPLE
83
84                 stringstream s;
85                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
86                 _film->log()->log (s.str ());
87
88                 /* We will be using planar float data when we call the resampler */
89                 _swr_context = swr_alloc_set_opts (
90                         0,
91                         _film->audio_stream()->channel_layout(),
92                         AV_SAMPLE_FMT_FLTP,
93                         _film->target_audio_sample_rate(),
94                         _film->audio_stream()->channel_layout(),
95                         AV_SAMPLE_FMT_FLTP,
96                         _film->audio_stream()->sample_rate(),
97                         0, 0
98                         );
99                 
100                 swr_init (_swr_context);
101 #else
102                 throw EncodeError ("Cannot resample audio as libswresample is not present");
103 #endif
104         } else {
105 #ifdef HAVE_SWRESAMPLE
106                 _swr_context = 0;
107 #endif          
108         }
109
110         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
111                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
112         }
113
114         vector<ServerDescription*> servers = Config::instance()->servers ();
115
116         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
117                 for (int j = 0; j < (*i)->threads (); ++j) {
118                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
119                 }
120         }
121
122         _writer.reset (new Writer (_film));
123 }
124
125
126 void
127 Encoder::process_end ()
128 {
129 #if HAVE_SWRESAMPLE     
130         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
131
132                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
133                         
134                 while (1) {
135                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
136
137                         if (frames < 0) {
138                                 throw EncodeError ("could not run sample-rate converter");
139                         }
140
141                         if (frames == 0) {
142                                 break;
143                         }
144
145                         out->set_frames (frames);
146                         _writer->write (out);
147                 }
148
149                 swr_free (&_swr_context);
150         }
151 #endif
152
153         boost::mutex::scoped_lock lock (_worker_mutex);
154
155         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
156
157         /* Keep waking workers until the queue is empty */
158         while (!_encode_queue.empty ()) {
159                 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
160                 _worker_condition.notify_all ();
161                 _worker_condition.wait (lock);
162         }
163
164         lock.unlock ();
165         
166         terminate_worker_threads ();
167
168         _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
169
170         /* The following sequence of events can occur in the above code:
171              1. a remote worker takes the last image off the queue
172              2. the loop above terminates
173              3. the remote worker fails to encode the image and puts it back on the queue
174              4. the remote worker is then terminated by terminate_worker_threads
175
176              So just mop up anything left in the queue here.
177         */
178
179         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
180                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
181                 try {
182                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
183                         frame_done ();
184                 } catch (std::exception& e) {
185                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
186                 }
187         }
188
189         _writer->finish ();
190         _writer.reset ();
191 }       
192
193 /** @return an estimate of the current number of frames we are encoding per second,
194  *  or 0 if not known.
195  */
196 float
197 Encoder::current_frames_per_second () const
198 {
199         boost::mutex::scoped_lock lock (_history_mutex);
200         if (int (_time_history.size()) < _history_size) {
201                 return 0;
202         }
203
204         struct timeval now;
205         gettimeofday (&now, 0);
206
207         return _history_size / (seconds (now) - seconds (_time_history.back ()));
208 }
209
210 /** @return true if the last frame to be processed was skipped as it already existed */
211 bool
212 Encoder::skipping () const
213 {
214         boost::mutex::scoped_lock (_history_mutex);
215         return _just_skipped;
216 }
217
218 /** @return Number of video frames that have been sent out */
219 int
220 Encoder::video_frames_out () const
221 {
222         boost::mutex::scoped_lock (_history_mutex);
223         return _video_frames_out;
224 }
225
226 /** Should be called when a frame has been encoded successfully.
227  *  @param n Source frame index.
228  */
229 void
230 Encoder::frame_done ()
231 {
232         boost::mutex::scoped_lock lock (_history_mutex);
233         _just_skipped = false;
234         
235         struct timeval tv;
236         gettimeofday (&tv, 0);
237         _time_history.push_front (tv);
238         if (int (_time_history.size()) > _history_size) {
239                 _time_history.pop_back ();
240         }
241 }
242
243 /** Called by a subclass when it has just skipped the processing
244     of a frame because it has already been done.
245 */
246 void
247 Encoder::frame_skipped ()
248 {
249         boost::mutex::scoped_lock lock (_history_mutex);
250         _just_skipped = true;
251 }
252
253 void
254 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
255 {
256         DCPFrameRate dfr (_film->frames_per_second ());
257         
258         if (dfr.skip && (_video_frames_in % 2)) {
259                 ++_video_frames_in;
260                 return;
261         }
262
263         boost::mutex::scoped_lock lock (_worker_mutex);
264
265         /* Wait until the queue has gone down a bit */
266         while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
267                 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
268                 _worker_condition.wait (lock);
269                 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
270         }
271
272         if (_terminate_encoder) {
273                 return;
274         }
275
276         /* Only do the processing if we don't already have a file for this frame */
277         if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
278                 frame_skipped ();
279                 return;
280         }
281
282         if (same && _have_a_real_frame) {
283                 /* Use the last frame that we encoded. */
284                 _writer->repeat (_video_frames_out);
285                 frame_done ();
286         } else {
287                 /* Queue this new frame for encoding */
288                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
289                 TIMING ("adding to queue of %1", _encode_queue.size ());
290                 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
291                                           new DCPVideoFrame (
292                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
293                                                   _film->subtitle_offset(), _film->subtitle_scale(),
294                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
295                                                   _film->colour_lut(), _film->j2k_bandwidth(),
296                                                   _film->log()
297                                                   )
298                                           ));
299                 
300                 _worker_condition.notify_all ();
301                 _have_a_real_frame = true;
302         }
303
304         ++_video_frames_in;
305         ++_video_frames_out;
306
307         if (dfr.repeat) {
308                 _writer->repeat (_video_frames_out);
309                 ++_video_frames_out;
310                 frame_done ();
311         }
312 }
313
314 void
315 Encoder::process_audio (shared_ptr<AudioBuffers> data)
316 {
317 #if HAVE_SWRESAMPLE
318         /* Maybe sample-rate convert */
319         if (_swr_context) {
320
321                 /* Compute the resampled frames count and add 32 for luck */
322                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
323
324                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
325
326                 /* Resample audio */
327                 int const resampled_frames = swr_convert (
328                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
329                         );
330                 
331                 if (resampled_frames < 0) {
332                         throw EncodeError ("could not run sample-rate converter");
333                 }
334
335                 resampled->set_frames (resampled_frames);
336                 
337                 /* And point our variables at the resampled audio */
338                 data = resampled;
339         }
340 #endif
341
342         if (_film->audio_channels() == 1) {
343                 /* We need to switch things around so that the mono channel is on
344                    the centre channel of a 5.1 set (with other channels silent).
345                 */
346
347                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
348                 b->make_silent (libdcp::LEFT);
349                 b->make_silent (libdcp::RIGHT);
350                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
351                 b->make_silent (libdcp::LFE);
352                 b->make_silent (libdcp::LS);
353                 b->make_silent (libdcp::RS);
354
355                 data = b;
356         }
357
358         _writer->write (data);
359 }
360
361 void
362 Encoder::terminate_worker_threads ()
363 {
364         boost::mutex::scoped_lock lock (_worker_mutex);
365         _terminate_encoder = true;
366         _worker_condition.notify_all ();
367         lock.unlock ();
368
369         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
370                 (*i)->join ();
371                 delete *i;
372         }
373 }
374
375 void
376 Encoder::encoder_thread (ServerDescription* server)
377 {
378         /* Number of seconds that we currently wait between attempts
379            to connect to the server; not relevant for localhost
380            encodings.
381         */
382         int remote_backoff = 0;
383         
384         while (1) {
385
386                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
387                 boost::mutex::scoped_lock lock (_worker_mutex);
388                 while (_encode_queue.empty () && !_terminate_encoder) {
389                         _worker_condition.wait (lock);
390                 }
391
392                 if (_terminate_encoder) {
393                         return;
394                 }
395
396                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
397                 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
398                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
399                 _encode_queue.pop_front ();
400                 
401                 lock.unlock ();
402
403                 shared_ptr<EncodedData> encoded;
404
405                 if (server) {
406                         try {
407                                 encoded = vf->encode_remotely (server);
408
409                                 if (remote_backoff > 0) {
410                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
411                                 }
412                                 
413                                 /* This job succeeded, so remove any backoff */
414                                 remote_backoff = 0;
415                                 
416                         } catch (std::exception& e) {
417                                 if (remote_backoff < 60) {
418                                         /* back off more */
419                                         remote_backoff += 10;
420                                 }
421                                 _film->log()->log (
422                                         String::compose (
423                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
424                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
425                                         );
426                         }
427                                 
428                 } else {
429                         try {
430                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
431                                 encoded = vf->encode_locally ();
432                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
433                         } catch (std::exception& e) {
434                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
435                         }
436                 }
437
438                 if (encoded) {
439                         _writer->write (encoded, vf->frame ());
440                         frame_done ();
441                 } else {
442                         lock.lock ();
443                         _film->log()->log (
444                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
445                                 );
446                         _encode_queue.push_front (vf);
447                         lock.unlock ();
448                 }
449
450                 if (remote_backoff > 0) {
451                         dvdomatic_sleep (remote_backoff);
452                 }
453
454                 lock.lock ();
455                 _worker_condition.notify_all ();
456         }
457 }