b37a3c098d9a245381b05ac13a61ec7439509c75
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40
41 using std::pair;
42 using std::string;
43 using std::stringstream;
44 using std::vector;
45 using std::list;
46 using std::cout;
47 using std::make_pair;
48 using namespace boost;
49
50 int const Encoder::_history_size = 25;
51 unsigned int const Encoder::_maximum_frames_in_memory = 8;
52
53 /** @param f Film that we are encoding.
54  *  @param o Options.
55  */
56 Encoder::Encoder (shared_ptr<const Film> f)
57         : _film (f)
58         , _just_skipped (false)
59         , _video_frames_in (0)
60         , _audio_frames_in (0)
61         , _video_frames_out (0)
62         , _audio_frames_out (0)
63 #ifdef HAVE_SWRESAMPLE    
64         , _swr_context (0)
65 #endif
66         , _have_a_real_frame (false)
67         , _terminate_encoder (false)
68         , _writer_thread (0)
69         , _finish_writer (false)
70         , _last_written_frame (-1)
71 {
72         if (_film->audio_stream()) {
73                 /* Create sound output files with .tmp suffixes; we will rename
74                    them if and when we complete.
75                 */
76                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
77                         SF_INFO sf_info;
78                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
79                         /* We write mono files */
80                         sf_info.channels = 1;
81                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
82                         SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
83                         if (f == 0) {
84                                 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
85                         }
86                         _sound_files.push_back (f);
87                 }
88         }
89 }
90
91 Encoder::~Encoder ()
92 {
93         close_sound_files ();
94         terminate_worker_threads ();
95         finish_writer_thread ();
96 }
97
98 void
99 Encoder::process_begin ()
100 {
101         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
102 #ifdef HAVE_SWRESAMPLE
103
104                 stringstream s;
105                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
106                 _film->log()->log (s.str ());
107
108                 /* We will be using planar float data when we call the resampler */
109                 _swr_context = swr_alloc_set_opts (
110                         0,
111                         _film->audio_stream()->channel_layout(),
112                         AV_SAMPLE_FMT_FLTP,
113                         _film->target_audio_sample_rate(),
114                         _film->audio_stream()->channel_layout(),
115                         AV_SAMPLE_FMT_FLTP,
116                         _film->audio_stream()->sample_rate(),
117                         0, 0
118                         );
119                 
120                 swr_init (_swr_context);
121 #else
122                 throw EncodeError ("Cannot resample audio as libswresample is not present");
123 #endif
124         } else {
125 #ifdef HAVE_SWRESAMPLE
126                 _swr_context = 0;
127 #endif          
128         }
129
130         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
131                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
132         }
133
134         vector<ServerDescription*> servers = Config::instance()->servers ();
135
136         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
137                 for (int j = 0; j < (*i)->threads (); ++j) {
138                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
139                 }
140         }
141
142         /* XXX! */
143         _picture_asset.reset (
144                 new libdcp::MonoPictureAsset (
145                         _film->dir (_film->dcp_name()),
146                         String::compose ("video_%1.mxf", 0),
147                         DCPFrameRate (_film->frames_per_second()).frames_per_second,
148                         _film->format()->dcp_size()
149                         )
150                 );
151         
152         _picture_asset_writer = _picture_asset->start_write ();
153         _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
154 }
155
156
157 void
158 Encoder::process_end ()
159 {
160 #if HAVE_SWRESAMPLE     
161         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
162
163                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
164                         
165                 while (1) {
166                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
167
168                         if (frames < 0) {
169                                 throw EncodeError ("could not run sample-rate converter");
170                         }
171
172                         if (frames == 0) {
173                                 break;
174                         }
175
176                         out->set_frames (frames);
177                         write_audio (out);
178                 }
179
180                 swr_free (&_swr_context);
181         }
182 #endif
183
184         if (_film->audio_stream()) {
185                 close_sound_files ();
186                 
187                 /* Rename .wav.tmp files to .wav */
188                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
189                         if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
190                                 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
191                         }
192                         boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
193                 }
194         }
195
196         boost::mutex::scoped_lock lock (_worker_mutex);
197
198         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
199
200         /* Keep waking workers until the queue is empty */
201         while (!_encode_queue.empty ()) {
202                 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
203                 _worker_condition.notify_all ();
204                 _worker_condition.wait (lock);
205         }
206
207         lock.unlock ();
208         
209         terminate_worker_threads ();
210
211         _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
212
213         /* The following sequence of events can occur in the above code:
214              1. a remote worker takes the last image off the queue
215              2. the loop above terminates
216              3. the remote worker fails to encode the image and puts it back on the queue
217              4. the remote worker is then terminated by terminate_worker_threads
218
219              So just mop up anything left in the queue here.
220         */
221
222         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
223                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
224                 try {
225                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
226                         {
227                                 boost::mutex::scoped_lock lock2 (_writer_mutex);
228                                 _write_queue.push_back (make_pair (e, (*i)->frame ()));
229                                 _writer_condition.notify_all ();
230                         }
231                         frame_done ();
232                 } catch (std::exception& e) {
233                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
234                 }
235         }
236
237         finish_writer_thread ();
238         _picture_asset_writer->finalize ();
239 }       
240
241 /** @return an estimate of the current number of frames we are encoding per second,
242  *  or 0 if not known.
243  */
244 float
245 Encoder::current_frames_per_second () const
246 {
247         boost::mutex::scoped_lock lock (_history_mutex);
248         if (int (_time_history.size()) < _history_size) {
249                 return 0;
250         }
251
252         struct timeval now;
253         gettimeofday (&now, 0);
254
255         return _history_size / (seconds (now) - seconds (_time_history.back ()));
256 }
257
258 /** @return true if the last frame to be processed was skipped as it already existed */
259 bool
260 Encoder::skipping () const
261 {
262         boost::mutex::scoped_lock (_history_mutex);
263         return _just_skipped;
264 }
265
266 /** @return Number of video frames that have been sent out */
267 int
268 Encoder::video_frames_out () const
269 {
270         boost::mutex::scoped_lock (_history_mutex);
271         return _video_frames_out;
272 }
273
274 /** Should be called when a frame has been encoded successfully.
275  *  @param n Source frame index.
276  */
277 void
278 Encoder::frame_done ()
279 {
280         boost::mutex::scoped_lock lock (_history_mutex);
281         _just_skipped = false;
282         
283         struct timeval tv;
284         gettimeofday (&tv, 0);
285         _time_history.push_front (tv);
286         if (int (_time_history.size()) > _history_size) {
287                 _time_history.pop_back ();
288         }
289 }
290
291 /** Called by a subclass when it has just skipped the processing
292     of a frame because it has already been done.
293 */
294 void
295 Encoder::frame_skipped ()
296 {
297         boost::mutex::scoped_lock lock (_history_mutex);
298         _just_skipped = true;
299 }
300
301 void
302 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
303 {
304         DCPFrameRate dfr (_film->frames_per_second ());
305         
306         if (dfr.skip && (_video_frames_in % 2)) {
307                 ++_video_frames_in;
308                 return;
309         }
310
311         boost::mutex::scoped_lock lock (_worker_mutex);
312
313         /* Wait until the queue has gone down a bit */
314         while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
315                 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
316                 _worker_condition.wait (lock);
317                 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
318         }
319
320         if (_terminate_encoder) {
321                 return;
322         }
323
324         /* Only do the processing if we don't already have a file for this frame */
325         if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
326                 frame_skipped ();
327                 return;
328         }
329
330         if (same && _have_a_real_frame) {
331                 /* Use the last frame that we encoded.  We do this by putting a null encoded
332                    frame straight onto the writer's queue.  It will know to duplicate the previous frame
333                    in this case.
334                 */
335                 boost::mutex::scoped_lock lock2 (_writer_mutex);
336                 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
337                 frame_done ();
338         } else {
339                 /* Queue this new frame for encoding */
340                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
341                 TIMING ("adding to queue of %1", _encode_queue.size ());
342                 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
343                                           new DCPVideoFrame (
344                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
345                                                   _film->subtitle_offset(), _film->subtitle_scale(),
346                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
347                                                   _film->colour_lut(), _film->j2k_bandwidth(),
348                                                   _film->log()
349                                                   )
350                                           ));
351                 
352                 _worker_condition.notify_all ();
353                 _have_a_real_frame = true;
354         }
355
356         ++_video_frames_in;
357         ++_video_frames_out;
358
359         if (dfr.repeat) {
360                 boost::mutex::scoped_lock lock2 (_writer_mutex);
361                 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
362                 ++_video_frames_out;
363                 frame_done ();
364         }
365 }
366
367 void
368 Encoder::process_audio (shared_ptr<AudioBuffers> data)
369 {
370 #if HAVE_SWRESAMPLE
371         /* Maybe sample-rate convert */
372         if (_swr_context) {
373
374                 /* Compute the resampled frames count and add 32 for luck */
375                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
376
377                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
378
379                 /* Resample audio */
380                 int const resampled_frames = swr_convert (
381                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
382                         );
383                 
384                 if (resampled_frames < 0) {
385                         throw EncodeError ("could not run sample-rate converter");
386                 }
387
388                 resampled->set_frames (resampled_frames);
389                 
390                 /* And point our variables at the resampled audio */
391                 data = resampled;
392         }
393 #endif
394
395         if (_film->audio_channels() == 1) {
396                 /* We need to switch things around so that the mono channel is on
397                    the centre channel of a 5.1 set (with other channels silent).
398                 */
399
400                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
401                 b->make_silent (libdcp::LEFT);
402                 b->make_silent (libdcp::RIGHT);
403                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
404                 b->make_silent (libdcp::LFE);
405                 b->make_silent (libdcp::LS);
406                 b->make_silent (libdcp::RS);
407
408                 data = b;
409         }
410
411         write_audio (data);
412         
413         _audio_frames_in += data->frames ();
414 }
415
416 void
417 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
418 {
419         for (int i = 0; i < audio->channels(); ++i) {
420                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
421         }
422
423         _audio_frames_out += audio->frames ();
424 }
425
426 void
427 Encoder::close_sound_files ()
428 {
429         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
430                 sf_close (*i);
431         }
432
433         _sound_files.clear ();
434 }       
435
436 void
437 Encoder::terminate_worker_threads ()
438 {
439         boost::mutex::scoped_lock lock (_worker_mutex);
440         _terminate_encoder = true;
441         _worker_condition.notify_all ();
442         lock.unlock ();
443
444         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
445                 (*i)->join ();
446                 delete *i;
447         }
448 }
449
450 void
451 Encoder::finish_writer_thread ()
452 {
453         if (!_writer_thread) {
454                 return;
455         }
456         
457         boost::mutex::scoped_lock lock (_writer_mutex);
458         _finish_writer = true;
459         _writer_condition.notify_all ();
460         lock.unlock ();
461
462         _writer_thread->join ();
463         delete _writer_thread;
464         _writer_thread = 0;
465 }
466
467 void
468 Encoder::encoder_thread (ServerDescription* server)
469 {
470         /* Number of seconds that we currently wait between attempts
471            to connect to the server; not relevant for localhost
472            encodings.
473         */
474         int remote_backoff = 0;
475         
476         while (1) {
477
478                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
479                 boost::mutex::scoped_lock lock (_worker_mutex);
480                 while (_encode_queue.empty () && !_terminate_encoder) {
481                         _worker_condition.wait (lock);
482                 }
483
484                 if (_terminate_encoder) {
485                         return;
486                 }
487
488                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
489                 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
490                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
491                 _encode_queue.pop_front ();
492                 
493                 lock.unlock ();
494
495                 shared_ptr<EncodedData> encoded;
496
497                 if (server) {
498                         try {
499                                 encoded = vf->encode_remotely (server);
500
501                                 if (remote_backoff > 0) {
502                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
503                                 }
504                                 
505                                 /* This job succeeded, so remove any backoff */
506                                 remote_backoff = 0;
507                                 
508                         } catch (std::exception& e) {
509                                 if (remote_backoff < 60) {
510                                         /* back off more */
511                                         remote_backoff += 10;
512                                 }
513                                 _film->log()->log (
514                                         String::compose (
515                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
516                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
517                                         );
518                         }
519                                 
520                 } else {
521                         try {
522                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
523                                 encoded = vf->encode_locally ();
524                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
525                         } catch (std::exception& e) {
526                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
527                         }
528                 }
529
530                 if (encoded) {
531                         boost::mutex::scoped_lock lock2 (_writer_mutex);
532                         _write_queue.push_back (make_pair (encoded, vf->frame ()));
533                         _writer_condition.notify_all ();
534                         frame_done ();
535                 } else {
536                         lock.lock ();
537                         _film->log()->log (
538                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
539                                 );
540                         _encode_queue.push_front (vf);
541                         lock.unlock ();
542                 }
543
544                 if (remote_backoff > 0) {
545                         dvdomatic_sleep (remote_backoff);
546                 }
547
548                 lock.lock ();
549                 _worker_condition.notify_all ();
550         }
551 }
552
553 void
554 Encoder::link (string a, string b) const
555 {
556 #ifdef DVDOMATIC_POSIX                  
557         int const r = symlink (a.c_str(), b.c_str());
558         if (r) {
559                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
560         }
561 #endif
562         
563 #ifdef DVDOMATIC_WINDOWS
564         boost::filesystem::copy_file (a, b);
565 #endif                  
566 }
567
568 struct WriteQueueSorter
569 {
570         bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
571                 return a.second < b.second;
572         }
573 };
574
575 void
576 Encoder::writer_thread ()
577 {
578         while (1)
579         {
580                 boost::mutex::scoped_lock lock (_writer_mutex);
581
582                 while (1) {
583                         if (_finish_writer ||
584                             _write_queue.size() > _maximum_frames_in_memory ||
585                             (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) {
586                                     
587                                     break;
588                             }
589
590                             TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size());
591                             _writer_condition.wait (lock);
592                             TIMING ("writer wakes with a queue of %1", _write_queue.size());
593
594                             _write_queue.sort (WriteQueueSorter ());
595                 }
596
597                 if (_finish_writer && _write_queue.empty() && _pending.empty()) {
598                         return;
599                 }
600
601                 /* Write any frames that we can write; i.e. those that are in sequence */
602                 while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) {
603                         pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
604                         _write_queue.pop_front ();
605
606                         lock.unlock ();
607                         _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
608                         if (encoded.first) {
609                                 _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
610                                 _last_written = encoded.first;
611                         } else {
612                                 _picture_asset_writer->write (_last_written->data(), _last_written->size());
613                         }
614                         lock.lock ();
615
616                         ++_last_written_frame;
617                 }
618
619                 while (_write_queue.size() > _maximum_frames_in_memory) {
620                         /* Too many frames in memory which can't yet be written to the stream.
621                            Put some to disk.
622                         */
623
624                         pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.back ();
625                         _write_queue.pop_back ();
626                         if (!encoded.first) {
627                                 /* This is a `repeat-last' frame, so no need to write it to disk */
628                                 continue;
629                         }
630
631                         lock.unlock ();
632                         _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, encoded.second));
633                         encoded.first->write (_film, encoded.second);
634                         lock.lock ();
635
636                         _pending.push_back (encoded.second);
637                 }
638
639                 while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
640                         /* We have some space in memory.  Fetch some frames back off disk. */
641
642                         _pending.sort ();
643                         int const fetch = _pending.front ();
644
645                         lock.unlock ();
646                         _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
647                         shared_ptr<EncodedData> encoded;
648                         if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
649                                 /* It's an actual frame (not a repeat-last); load it in */
650                                 encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
651                         }
652                         lock.lock ();
653
654                         _write_queue.push_back (make_pair (encoded, fetch));
655                         _pending.remove (fetch);
656                 }
657         }
658 }