Merge in 0.70 branch with 0.71 fix.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include "encoder.h"
28 #include "util.h"
29 #include "options.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "cross.h"
38
39 using std::pair;
40 using std::string;
41 using std::stringstream;
42 using std::vector;
43 using std::list;
44 using std::cout;
45 using std::make_pair;
46 using namespace boost;
47
48 int const Encoder::_history_size = 25;
49
50 /** @param f Film that we are encoding.
51  *  @param o Options.
52  */
53 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
54         : _film (f)
55         , _opt (o)
56         , _just_skipped (false)
57         , _video_frame (0)
58         , _audio_frame (0)
59 #ifdef HAVE_SWRESAMPLE    
60         , _swr_context (0)
61 #endif    
62         , _audio_frames_written (0)
63         , _process_end (false)
64 {
65         if (_film->audio_stream()) {
66                 /* Create sound output files with .tmp suffixes; we will rename
67                    them if and when we complete.
68                 */
69                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
70                         SF_INFO sf_info;
71                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
72                         /* We write mono files */
73                         sf_info.channels = 1;
74                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
75                         SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
76                         if (f == 0) {
77                                 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
78                         }
79                         _sound_files.push_back (f);
80                 }
81         }
82 }
83
84 Encoder::~Encoder ()
85 {
86         close_sound_files ();
87         terminate_worker_threads ();
88 }
89
90 void
91 Encoder::process_begin ()
92 {
93         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
94 #ifdef HAVE_SWRESAMPLE
95
96                 stringstream s;
97                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
98                 _film->log()->log (s.str ());
99
100                 /* We will be using planar float data when we call the resampler */
101                 _swr_context = swr_alloc_set_opts (
102                         0,
103                         _film->audio_stream()->channel_layout(),
104                         AV_SAMPLE_FMT_FLTP,
105                         _film->target_audio_sample_rate(),
106                         _film->audio_stream()->channel_layout(),
107                         AV_SAMPLE_FMT_FLTP,
108                         _film->audio_stream()->sample_rate(),
109                         0, 0
110                         );
111                 
112                 swr_init (_swr_context);
113 #else
114                 throw EncodeError ("Cannot resample audio as libswresample is not present");
115 #endif
116         } else {
117 #ifdef HAVE_SWRESAMPLE
118                 _swr_context = 0;
119 #endif          
120         }
121
122         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
123                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
124         }
125
126         vector<ServerDescription*> servers = Config::instance()->servers ();
127
128         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
129                 for (int j = 0; j < (*i)->threads (); ++j) {
130                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
131                 }
132         }
133 }
134
135
136 void
137 Encoder::process_end ()
138 {
139 #if HAVE_SWRESAMPLE     
140         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
141
142                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
143                         
144                 while (1) {
145                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
146
147                         if (frames < 0) {
148                                 throw EncodeError ("could not run sample-rate converter");
149                         }
150
151                         if (frames == 0) {
152                                 break;
153                         }
154
155                         out->set_frames (frames);
156                         write_audio (out);
157                 }
158
159                 swr_free (&_swr_context);
160         }
161 #endif
162
163         if (_film->audio_stream()) {
164                 close_sound_files ();
165                 
166                 /* Rename .wav.tmp files to .wav */
167                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
168                         if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
169                                 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
170                         }
171                         boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
172                 }
173         }
174
175         boost::mutex::scoped_lock lock (_worker_mutex);
176
177         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
178
179         /* Keep waking workers until the queue is empty */
180         while (!_queue.empty ()) {
181                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
182                 _worker_condition.notify_all ();
183                 _worker_condition.wait (lock);
184         }
185
186         lock.unlock ();
187         
188         terminate_worker_threads ();
189
190         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
191
192         /* The following sequence of events can occur in the above code:
193              1. a remote worker takes the last image off the queue
194              2. the loop above terminates
195              3. the remote worker fails to encode the image and puts it back on the queue
196              4. the remote worker is then terminated by terminate_worker_threads
197
198              So just mop up anything left in the queue here.
199         */
200
201         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
202                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
203                 try {
204                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
205                         e->write (_opt, (*i)->frame ());
206                         frame_done ();
207                 } catch (std::exception& e) {
208                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
209                 }
210         }
211
212         /* Now do links (or copies on windows) to duplicate frames */
213         for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
214                 link (_opt->frame_out_path (i->first, false), _opt->frame_out_path (i->second, false));
215                 link (_opt->hash_out_path (i->first, false), _opt->hash_out_path (i->second, false));
216         }
217 }       
218
219 /** @return an estimate of the current number of frames we are encoding per second,
220  *  or 0 if not known.
221  */
222 float
223 Encoder::current_frames_per_second () const
224 {
225         boost::mutex::scoped_lock lock (_history_mutex);
226         if (int (_time_history.size()) < _history_size) {
227                 return 0;
228         }
229
230         struct timeval now;
231         gettimeofday (&now, 0);
232
233         return _history_size / (seconds (now) - seconds (_time_history.back ()));
234 }
235
236 /** @return true if the last frame to be processed was skipped as it already existed */
237 bool
238 Encoder::skipping () const
239 {
240         boost::mutex::scoped_lock (_history_mutex);
241         return _just_skipped;
242 }
243
244 /** @return Number of video frames that have been received */
245 SourceFrame
246 Encoder::video_frame () const
247 {
248         boost::mutex::scoped_lock (_history_mutex);
249         return _video_frame;
250 }
251
252 /** Should be called when a frame has been encoded successfully.
253  *  @param n Source frame index.
254  */
255 void
256 Encoder::frame_done ()
257 {
258         boost::mutex::scoped_lock lock (_history_mutex);
259         _just_skipped = false;
260         
261         struct timeval tv;
262         gettimeofday (&tv, 0);
263         _time_history.push_front (tv);
264         if (int (_time_history.size()) > _history_size) {
265                 _time_history.pop_back ();
266         }
267 }
268
269 /** Called by a subclass when it has just skipped the processing
270     of a frame because it has already been done.
271 */
272 void
273 Encoder::frame_skipped ()
274 {
275         boost::mutex::scoped_lock lock (_history_mutex);
276         _just_skipped = true;
277 }
278
279 void
280 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
281 {
282         if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
283                 ++_video_frame;
284                 return;
285         }
286
287         if (_opt->video_range) {
288                 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
289                 if (_video_frame < r.first || _video_frame >= r.second) {
290                         ++_video_frame;
291                         return;
292                 }
293         }
294
295         boost::mutex::scoped_lock lock (_worker_mutex);
296
297         /* Wait until the queue has gone down a bit */
298         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
299                 TIMING ("decoder sleeps with queue of %1", _queue.size());
300                 _worker_condition.wait (lock);
301                 TIMING ("decoder wakes with queue of %1", _queue.size());
302         }
303
304         if (_process_end) {
305                 return;
306         }
307
308         /* Only do the processing if we don't already have a file for this frame */
309         if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
310                 frame_skipped ();
311                 return;
312         }
313
314         if (same && _last_real_frame) {
315                 /* Use the last frame that we encoded.  We need to postpone doing the actual link,
316                    as on windows the link is really a copy and the reference frame might not have
317                    finished encoding yet.
318                 */
319                 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frame));
320         } else {
321                 /* Queue this new frame for encoding */
322                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
323                 TIMING ("adding to queue of %1", _queue.size ());
324                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
325                                           new DCPVideoFrame (
326                                                   image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
327                                                   _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
328                                                   _film->colour_lut(), _film->j2k_bandwidth(),
329                                                   _film->log()
330                                                   )
331                                           ));
332                 
333                 _worker_condition.notify_all ();
334                 _last_real_frame = _video_frame;
335         }
336
337         ++_video_frame;
338 }
339
340 void
341 Encoder::process_audio (shared_ptr<AudioBuffers> data)
342 {
343         if (_opt->audio_range) {
344                 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
345                 
346                 /* Range that we are encoding */
347                 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
348                 /* Range of this block of data */
349                 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
350
351                 if (this_range.second < required_range.first || required_range.second < this_range.first) {
352                         /* No part of this audio is within the required range */
353                         _audio_frame += data->frames();
354                         return;
355                 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
356                         /* Trim start */
357                         int64_t const shift = required_range.first - this_range.first;
358                         trimmed->move (shift, 0, trimmed->frames() - shift);
359                         trimmed->set_frames (trimmed->frames() - shift);
360                 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
361                         /* Trim end */
362                         trimmed->set_frames (required_range.second - this_range.first);
363                 }
364
365                 data = trimmed;
366         }
367
368 #if HAVE_SWRESAMPLE
369         /* Maybe sample-rate convert */
370         if (_swr_context) {
371
372                 /* Compute the resampled frames count and add 32 for luck */
373                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
374
375                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
376
377                 /* Resample audio */
378                 int const resampled_frames = swr_convert (
379                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
380                         );
381                 
382                 if (resampled_frames < 0) {
383                         throw EncodeError ("could not run sample-rate converter");
384                 }
385
386                 resampled->set_frames (resampled_frames);
387                 
388                 /* And point our variables at the resampled audio */
389                 data = resampled;
390         }
391 #endif
392
393         if (_film->audio_channels() == 1) {
394                 /* We need to switch things around so that the mono channel is on
395                    the centre channel of a 5.1 set (with other channels silent).
396                 */
397
398                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
399                 b->make_silent (libdcp::LEFT);
400                 b->make_silent (libdcp::RIGHT);
401                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
402                 b->make_silent (libdcp::LFE);
403                 b->make_silent (libdcp::LS);
404                 b->make_silent (libdcp::RS);
405
406                 data = b;
407         }
408
409         write_audio (data);
410         
411         _audio_frame += data->frames ();
412 }
413
414 void
415 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
416 {
417         for (int i = 0; i < audio->channels(); ++i) {
418                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
419         }
420
421         _audio_frames_written += audio->frames ();
422 }
423
424 void
425 Encoder::close_sound_files ()
426 {
427         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
428                 sf_close (*i);
429         }
430
431         _sound_files.clear ();
432 }       
433
434 void
435 Encoder::terminate_worker_threads ()
436 {
437         boost::mutex::scoped_lock lock (_worker_mutex);
438         _process_end = true;
439         _worker_condition.notify_all ();
440         lock.unlock ();
441
442         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
443                 (*i)->join ();
444                 delete *i;
445         }
446 }
447
448 void
449 Encoder::encoder_thread (ServerDescription* server)
450 {
451         /* Number of seconds that we currently wait between attempts
452            to connect to the server; not relevant for localhost
453            encodings.
454         */
455         int remote_backoff = 0;
456         
457         while (1) {
458
459                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
460                 boost::mutex::scoped_lock lock (_worker_mutex);
461                 while (_queue.empty () && !_process_end) {
462                         _worker_condition.wait (lock);
463                 }
464
465                 if (_process_end) {
466                         return;
467                 }
468
469                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
470                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
471                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
472                 _queue.pop_front ();
473                 
474                 lock.unlock ();
475
476                 shared_ptr<EncodedData> encoded;
477
478                 if (server) {
479                         try {
480                                 encoded = vf->encode_remotely (server);
481
482                                 if (remote_backoff > 0) {
483                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
484                                 }
485                                 
486                                 /* This job succeeded, so remove any backoff */
487                                 remote_backoff = 0;
488                                 
489                         } catch (std::exception& e) {
490                                 if (remote_backoff < 60) {
491                                         /* back off more */
492                                         remote_backoff += 10;
493                                 }
494                                 _film->log()->log (
495                                         String::compose (
496                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
497                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
498                                         );
499                         }
500                                 
501                 } else {
502                         try {
503                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
504                                 encoded = vf->encode_locally ();
505                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
506                         } catch (std::exception& e) {
507                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
508                         }
509                 }
510
511                 if (encoded) {
512                         encoded->write (_opt, vf->frame ());
513                         frame_done ();
514                 } else {
515                         lock.lock ();
516                         _film->log()->log (
517                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
518                                 );
519                         _queue.push_front (vf);
520                         lock.unlock ();
521                 }
522
523                 if (remote_backoff > 0) {
524                         dvdomatic_sleep (remote_backoff);
525                 }
526
527                 lock.lock ();
528                 _worker_condition.notify_all ();
529         }
530 }
531
532 void
533 Encoder::link (string a, string b) const
534 {
535 #ifdef DVDOMATIC_POSIX                  
536         int const r = symlink (a.c_str(), b.c_str());
537         if (r) {
538                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
539         }
540 #endif
541         
542 #ifdef DVDOMATIC_WINDOWS
543         boost::filesystem::copy_file (a, b);
544 #endif                  
545 }