Bump version
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40 #include "writer.h"
41
42 #include "i18n.h"
43
44 using std::pair;
45 using std::string;
46 using std::stringstream;
47 using std::vector;
48 using std::list;
49 using std::cout;
50 using std::min;
51 using std::make_pair;
52 using namespace boost;
53
54 int const Encoder::_history_size = 25;
55
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<Film> f)
58         : _film (f)
59         , _video_frames_in (0)
60         , _video_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE    
62         , _swr_context (0)
63 #endif
64         , _have_a_real_frame (false)
65         , _terminate (false)
66 {
67         
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73         if (_writer) {
74                 _writer->finish ();
75         }
76 }
77
78 void
79 Encoder::process_begin ()
80 {
81         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
82 #ifdef HAVE_SWRESAMPLE
83
84                 stringstream s;
85                 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_stream()->sample_rate(), _film->target_audio_sample_rate());
86                 _film->log()->log (s.str ());
87
88                 /* We will be using planar float data when we call the resampler */
89                 _swr_context = swr_alloc_set_opts (
90                         0,
91                         _film->audio_stream()->channel_layout(),
92                         AV_SAMPLE_FMT_FLTP,
93                         _film->target_audio_sample_rate(),
94                         _film->audio_stream()->channel_layout(),
95                         AV_SAMPLE_FMT_FLTP,
96                         _film->audio_stream()->sample_rate(),
97                         0, 0
98                         );
99                 
100                 swr_init (_swr_context);
101 #else
102                 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
103 #endif
104         } else {
105 #ifdef HAVE_SWRESAMPLE
106                 _swr_context = 0;
107 #endif          
108         }
109
110         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
111                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
112         }
113
114         vector<ServerDescription*> servers = Config::instance()->servers ();
115
116         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
117                 for (int j = 0; j < (*i)->threads (); ++j) {
118                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
119                 }
120         }
121
122         _writer.reset (new Writer (_film));
123 }
124
125
126 void
127 Encoder::process_end ()
128 {
129 #if HAVE_SWRESAMPLE     
130         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
131
132                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
133                         
134                 while (1) {
135                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
136
137                         if (frames < 0) {
138                                 throw EncodeError (_("could not run sample-rate converter"));
139                         }
140
141                         if (frames == 0) {
142                                 break;
143                         }
144
145                         out->set_frames (frames);
146                         write_audio (out);
147                 }
148
149                 swr_free (&_swr_context);
150         }
151 #endif
152
153         if (_film->audio_channels() == 0 && _film->minimum_audio_channels() > 0) {
154                 /* Put audio in where there is none at all */
155                 int64_t af = video_frames_to_audio_frames (_video_frames_out, 48000, _film->dcp_frame_rate ());
156                 while (af) {
157                         int64_t const this_time = min (af, static_cast<int64_t> (24000));
158                         shared_ptr<AudioBuffers> out (new AudioBuffers (_film->minimum_audio_channels(), this_time));
159                         out->make_silent ();
160                         out->set_frames (this_time);
161                         write_audio (out);
162
163                         af -= this_time;
164                 }
165         }
166
167         boost::mutex::scoped_lock lock (_mutex);
168
169         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
170
171         /* Keep waking workers until the queue is empty */
172         while (!_queue.empty ()) {
173                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
174                 _condition.notify_all ();
175                 _condition.wait (lock);
176         }
177
178         lock.unlock ();
179         
180         terminate_threads ();
181
182         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
183
184         /* The following sequence of events can occur in the above code:
185              1. a remote worker takes the last image off the queue
186              2. the loop above terminates
187              3. the remote worker fails to encode the image and puts it back on the queue
188              4. the remote worker is then terminated by terminate_threads
189
190              So just mop up anything left in the queue here.
191         */
192
193         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
194                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
195                 try {
196                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
197                         frame_done ();
198                 } catch (std::exception& e) {
199                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
200                 }
201         }
202
203         _writer->finish ();
204         _writer.reset ();
205 }       
206
207 /** @return an estimate of the current number of frames we are encoding per second,
208  *  or 0 if not known.
209  */
210 float
211 Encoder::current_frames_per_second () const
212 {
213         boost::mutex::scoped_lock lock (_history_mutex);
214         if (int (_time_history.size()) < _history_size) {
215                 return 0;
216         }
217
218         struct timeval now;
219         gettimeofday (&now, 0);
220
221         return _history_size / (seconds (now) - seconds (_time_history.back ()));
222 }
223
224 /** @return Number of video frames that have been sent out */
225 int
226 Encoder::video_frames_out () const
227 {
228         boost::mutex::scoped_lock (_history_mutex);
229         return _video_frames_out;
230 }
231
232 /** Should be called when a frame has been encoded successfully.
233  *  @param n Source frame index.
234  */
235 void
236 Encoder::frame_done ()
237 {
238         boost::mutex::scoped_lock lock (_history_mutex);
239         
240         struct timeval tv;
241         gettimeofday (&tv, 0);
242         _time_history.push_front (tv);
243         if (int (_time_history.size()) > _history_size) {
244                 _time_history.pop_back ();
245         }
246 }
247
248 void
249 Encoder::process_video (shared_ptr<const Image> image, bool same, boost::shared_ptr<Subtitle> sub)
250 {
251         FrameRateConversion frc (_film->source_frame_rate(), _film->dcp_frame_rate());
252         
253         if (frc.skip && (_video_frames_in % 2)) {
254                 ++_video_frames_in;
255                 return;
256         }
257
258         boost::mutex::scoped_lock lock (_mutex);
259
260         /* Wait until the queue has gone down a bit */
261         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
262                 TIMING ("decoder sleeps with queue of %1", _queue.size());
263                 _condition.wait (lock);
264                 TIMING ("decoder wakes with queue of %1", _queue.size());
265         }
266
267         if (_terminate) {
268                 return;
269         }
270
271         if (_writer->thrown ()) {
272                 _writer->rethrow ();
273         }
274
275         if (_writer->can_fake_write (_video_frames_out)) {
276                 _writer->fake_write (_video_frames_out);
277                 _have_a_real_frame = false;
278                 frame_done ();
279         } else if (same && _have_a_real_frame) {
280                 /* Use the last frame that we encoded. */
281                 _writer->repeat (_video_frames_out);
282                 frame_done ();
283         } else {
284                 /* Queue this new frame for encoding */
285                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
286                 TIMING ("adding to queue of %1", _queue.size ());
287                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
288                                           new DCPVideoFrame (
289                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
290                                                   _film->subtitle_offset(), _film->subtitle_scale(),
291                                                   _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
292                                                   _film->colour_lut(), _film->j2k_bandwidth(),
293                                                   _film->log()
294                                                   )
295                                           ));
296                 
297                 _condition.notify_all ();
298                 _have_a_real_frame = true;
299         }
300
301         ++_video_frames_in;
302         ++_video_frames_out;
303
304         if (frc.repeat) {
305                 _writer->repeat (_video_frames_out);
306                 ++_video_frames_out;
307                 frame_done ();
308         }
309 }
310
311 void
312 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
313 {
314         if (!data->frames ()) {
315                 return;
316         }
317         
318 #if HAVE_SWRESAMPLE
319         /* Maybe sample-rate convert */
320         if (_swr_context) {
321
322                 /* Compute the resampled frames count and add 32 for luck */
323                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
324
325                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
326
327                 /* Resample audio */
328                 int const resampled_frames = swr_convert (
329                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
330                         );
331                 
332                 if (resampled_frames < 0) {
333                         throw EncodeError (_("could not run sample-rate converter"));
334                 }
335
336                 resampled->set_frames (resampled_frames);
337                 
338                 /* And point our variables at the resampled audio */
339                 data = resampled;
340         }
341 #endif
342
343         write_audio (data);
344 }
345
346 void
347 Encoder::terminate_threads ()
348 {
349         boost::mutex::scoped_lock lock (_mutex);
350         _terminate = true;
351         _condition.notify_all ();
352         lock.unlock ();
353
354         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
355                 if ((*i)->joinable ()) {
356                         (*i)->join ();
357                 }
358                 delete *i;
359         }
360
361         _threads.clear ();
362 }
363
364 void
365 Encoder::encoder_thread (ServerDescription* server)
366 {
367         /* Number of seconds that we currently wait between attempts
368            to connect to the server; not relevant for localhost
369            encodings.
370         */
371         int remote_backoff = 0;
372         
373         while (1) {
374
375                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
376                 boost::mutex::scoped_lock lock (_mutex);
377                 while (_queue.empty () && !_terminate) {
378                         _condition.wait (lock);
379                 }
380
381                 if (_terminate) {
382                         return;
383                 }
384
385                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
386                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
387                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
388                 _queue.pop_front ();
389                 
390                 lock.unlock ();
391
392                 shared_ptr<EncodedData> encoded;
393
394                 if (server) {
395                         try {
396                                 encoded = vf->encode_remotely (server);
397
398                                 if (remote_backoff > 0) {
399                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
400                                 }
401                                 
402                                 /* This job succeeded, so remove any backoff */
403                                 remote_backoff = 0;
404                                 
405                         } catch (std::exception& e) {
406                                 if (remote_backoff < 60) {
407                                         /* back off more */
408                                         remote_backoff += 10;
409                                 }
410                                 _film->log()->log (
411                                         String::compose (
412                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
413                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
414                                         );
415                         }
416                                 
417                 } else {
418                         try {
419                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
420                                 encoded = vf->encode_locally ();
421                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
422                         } catch (std::exception& e) {
423                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
424                         }
425                 }
426
427                 if (encoded) {
428                         _writer->write (encoded, vf->frame ());
429                         frame_done ();
430                 } else {
431                         lock.lock ();
432                         _film->log()->log (
433                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
434                                 );
435                         _queue.push_front (vf);
436                         lock.unlock ();
437                 }
438
439                 if (remote_backoff > 0) {
440                         dvdomatic_sleep (remote_backoff);
441                 }
442
443                 lock.lock ();
444                 _condition.notify_all ();
445         }
446 }
447
448 void
449 Encoder::write_audio (shared_ptr<const AudioBuffers> data)
450 {
451         AudioMapping m (_film);
452         if (m.dcp_channels() != _film->audio_channels()) {
453
454                 /* Remap and pad with silence */
455
456                 shared_ptr<AudioBuffers> b (new AudioBuffers (m.dcp_channels(), data->frames ()));
457                 for (int i = 0; i < m.dcp_channels(); ++i) {
458                         optional<int> s = m.dcp_to_source (static_cast<libdcp::Channel> (i));
459                         if (!s) {
460                                 b->make_silent (i);
461                         } else {
462                                 memcpy (b->data()[i], data->data()[s.get()], data->frames() * sizeof(float));
463                         }
464                 }
465
466                 data = b;
467         }
468
469         _writer->write (data);
470 }