Merge writer-thread with original which was time-cleanup.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include "encoder.h"
28 #include "util.h"
29 #include "options.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "format.h"
38 #include "cross.h"
39
40 using std::pair;
41 using std::string;
42 using std::stringstream;
43 using std::vector;
44 using std::list;
45 using std::cout;
46 using std::make_pair;
47 using namespace boost;
48
49 int const Encoder::_history_size = 25;
50
51 /** @param f Film that we are encoding.
52  *  @param o Options.
53  */
54 Encoder::Encoder (shared_ptr<const Film> f)
55         : _film (f)
56         , _just_skipped (false)
57         , _video_frames_in (0)
58         , _audio_frames_in (0)
59         , _video_frames_out (0)
60         , _audio_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE    
62         , _swr_context (0)
63 #endif    
64         , _terminate_encoder (false)
65         , _writer_thread (0)
66         , _terminate_writer (false)
67 {
68         if (_film->audio_stream()) {
69                 /* Create sound output files with .tmp suffixes; we will rename
70                    them if and when we complete.
71                 */
72                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
73                         SF_INFO sf_info;
74                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
75                         /* We write mono files */
76                         sf_info.channels = 1;
77                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
78                         SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
79                         if (f == 0) {
80                                 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
81                         }
82                         _sound_files.push_back (f);
83                 }
84         }
85 }
86
87 Encoder::~Encoder ()
88 {
89         close_sound_files ();
90         terminate_worker_threads ();
91         terminate_writer_thread ();
92 }
93
94 void
95 Encoder::process_begin ()
96 {
97         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
98 #ifdef HAVE_SWRESAMPLE
99
100                 stringstream s;
101                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
102                 _film->log()->log (s.str ());
103
104                 /* We will be using planar float data when we call the resampler */
105                 _swr_context = swr_alloc_set_opts (
106                         0,
107                         _film->audio_stream()->channel_layout(),
108                         AV_SAMPLE_FMT_FLTP,
109                         _film->target_audio_sample_rate(),
110                         _film->audio_stream()->channel_layout(),
111                         AV_SAMPLE_FMT_FLTP,
112                         _film->audio_stream()->sample_rate(),
113                         0, 0
114                         );
115                 
116                 swr_init (_swr_context);
117 #else
118                 throw EncodeError ("Cannot resample audio as libswresample is not present");
119 #endif
120         } else {
121 #ifdef HAVE_SWRESAMPLE
122                 _swr_context = 0;
123 #endif          
124         }
125
126         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
127                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
128         }
129
130         vector<ServerDescription*> servers = Config::instance()->servers ();
131
132         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
133                 for (int j = 0; j < (*i)->threads (); ++j) {
134                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
135                 }
136         }
137
138         _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
139 }
140
141
142 void
143 Encoder::process_end ()
144 {
145 #if HAVE_SWRESAMPLE     
146         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
147
148                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
149                         
150                 while (1) {
151                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
152
153                         if (frames < 0) {
154                                 throw EncodeError ("could not run sample-rate converter");
155                         }
156
157                         if (frames == 0) {
158                                 break;
159                         }
160
161                         out->set_frames (frames);
162                         write_audio (out);
163                 }
164
165                 swr_free (&_swr_context);
166         }
167 #endif
168
169         if (_film->audio_stream()) {
170                 close_sound_files ();
171                 
172                 /* Rename .wav.tmp files to .wav */
173                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
174                         if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
175                                 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
176                         }
177                         boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
178                 }
179         }
180
181         boost::mutex::scoped_lock lock (_worker_mutex);
182
183         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
184
185         /* Keep waking workers until the queue is empty */
186         while (!_encode_queue.empty ()) {
187                 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
188                 _worker_condition.notify_all ();
189                 _worker_condition.wait (lock);
190         }
191
192         lock.unlock ();
193         
194         terminate_worker_threads ();
195         terminate_writer_thread ();
196
197         _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
198
199         /* The following sequence of events can occur in the above code:
200              1. a remote worker takes the last image off the queue
201              2. the loop above terminates
202              3. the remote worker fails to encode the image and puts it back on the queue
203              4. the remote worker is then terminated by terminate_worker_threads
204
205              So just mop up anything left in the queue here.
206         */
207
208         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
209                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
210                 try {
211                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
212                         e->write (_film, (*i)->frame ());
213                         frame_done ();
214                 } catch (std::exception& e) {
215                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
216                 }
217         }
218
219         /* Mop up any unwritten things in the writer's queue */
220         for (list<pair<shared_ptr<EncodedData>, int> >::iterator i = _write_queue.begin(); i != _write_queue.end(); ++i) {
221                 i->first->write (_opt, i->second);
222         }
223
224         /* Now do links (or copies on windows) to duplicate frames */
225         for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
226                 link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false));
227                 link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false));
228         }
229 }       
230
231 /** @return an estimate of the current number of frames we are encoding per second,
232  *  or 0 if not known.
233  */
234 float
235 Encoder::current_frames_per_second () const
236 {
237         boost::mutex::scoped_lock lock (_history_mutex);
238         if (int (_time_history.size()) < _history_size) {
239                 return 0;
240         }
241
242         struct timeval now;
243         gettimeofday (&now, 0);
244
245         return _history_size / (seconds (now) - seconds (_time_history.back ()));
246 }
247
248 /** @return true if the last frame to be processed was skipped as it already existed */
249 bool
250 Encoder::skipping () const
251 {
252         boost::mutex::scoped_lock (_history_mutex);
253         return _just_skipped;
254 }
255
256 /** @return Number of video frames that have been sent out */
257 int
258 Encoder::video_frames_out () const
259 {
260         boost::mutex::scoped_lock (_history_mutex);
261         return _video_frames_out;
262 }
263
264 /** Should be called when a frame has been encoded successfully.
265  *  @param n Source frame index.
266  */
267 void
268 Encoder::frame_done ()
269 {
270         boost::mutex::scoped_lock lock (_history_mutex);
271         _just_skipped = false;
272         
273         struct timeval tv;
274         gettimeofday (&tv, 0);
275         _time_history.push_front (tv);
276         if (int (_time_history.size()) > _history_size) {
277                 _time_history.pop_back ();
278         }
279 }
280
281 /** Called by a subclass when it has just skipped the processing
282     of a frame because it has already been done.
283 */
284 void
285 Encoder::frame_skipped ()
286 {
287         boost::mutex::scoped_lock lock (_history_mutex);
288         _just_skipped = true;
289 }
290
291 void
292 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
293 {
294         DCPFrameRate dfr (_film->frames_per_second ());
295         
296         if (dfr.skip && (_video_frames_in % 2)) {
297                 ++_video_frames_in;
298                 return;
299         }
300
301         boost::mutex::scoped_lock lock (_worker_mutex);
302
303         /* Wait until the queue has gone down a bit */
304         while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
305                 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
306                 _worker_condition.wait (lock);
307                 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
308         }
309
310         if (_terminate_encoder) {
311                 return;
312         }
313
314         /* Only do the processing if we don't already have a file for this frame */
315         if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
316                 frame_skipped ();
317                 return;
318         }
319
320         if (same && _last_real_frame) {
321                 /* Use the last frame that we encoded.  We need to postpone doing the actual link,
322                    as on windows the link is really a copy and the reference frame might not have
323                    finished encoding yet.
324                 */
325                 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frames_out));
326         } else {
327                 /* Queue this new frame for encoding */
328                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
329                 TIMING ("adding to queue of %1", _encode_queue.size ());
330                 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
331                                           new DCPVideoFrame (
332                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
333                                                   _film->subtitle_offset(), _film->subtitle_scale(),
334                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
335                                                   _film->colour_lut(), _film->j2k_bandwidth(),
336                                                   _film->log()
337                                                   )
338                                           ));
339                 
340                 _worker_condition.notify_all ();
341                 _last_real_frame = _video_frames_out;
342         }
343
344         ++_video_frames_in;
345         ++_video_frames_out;
346
347         if (dfr.repeat) {
348                 _links_required.push_back (make_pair (_video_frames_out, _video_frames_out - 1));
349                 ++_video_frames_out;
350         }
351 }
352
353 void
354 Encoder::process_audio (shared_ptr<AudioBuffers> data)
355 {
356 #if HAVE_SWRESAMPLE
357         /* Maybe sample-rate convert */
358         if (_swr_context) {
359
360                 /* Compute the resampled frames count and add 32 for luck */
361                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
362
363                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
364
365                 /* Resample audio */
366                 int const resampled_frames = swr_convert (
367                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
368                         );
369                 
370                 if (resampled_frames < 0) {
371                         throw EncodeError ("could not run sample-rate converter");
372                 }
373
374                 resampled->set_frames (resampled_frames);
375                 
376                 /* And point our variables at the resampled audio */
377                 data = resampled;
378         }
379 #endif
380
381         if (_film->audio_channels() == 1) {
382                 /* We need to switch things around so that the mono channel is on
383                    the centre channel of a 5.1 set (with other channels silent).
384                 */
385
386                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
387                 b->make_silent (libdcp::LEFT);
388                 b->make_silent (libdcp::RIGHT);
389                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
390                 b->make_silent (libdcp::LFE);
391                 b->make_silent (libdcp::LS);
392                 b->make_silent (libdcp::RS);
393
394                 data = b;
395         }
396
397         write_audio (data);
398         
399         _audio_frames_in += data->frames ();
400 }
401
402 void
403 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
404 {
405         for (int i = 0; i < audio->channels(); ++i) {
406                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
407         }
408
409         _audio_frames_out += audio->frames ();
410 }
411
412 void
413 Encoder::close_sound_files ()
414 {
415         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
416                 sf_close (*i);
417         }
418
419         _sound_files.clear ();
420 }       
421
422 void
423 Encoder::terminate_worker_threads ()
424 {
425         boost::mutex::scoped_lock lock (_worker_mutex);
426         _terminate_encoder = true;
427         _worker_condition.notify_all ();
428         lock.unlock ();
429
430         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
431                 (*i)->join ();
432                 delete *i;
433         }
434 }
435
436 void
437 Encoder::terminate_writer_thread ()
438 {
439         if (!_writer_thread) {
440                 return;
441         }
442         
443         boost::mutex::scoped_lock lock (_writer_mutex);
444         _terminate_writer = true;
445         _writer_condition.notify_all ();
446         lock.unlock ();
447
448         _writer_thread->join ();
449         delete _writer_thread;
450         _writer_thread = 0;
451 }
452
453 void
454 Encoder::encoder_thread (ServerDescription* server)
455 {
456         /* Number of seconds that we currently wait between attempts
457            to connect to the server; not relevant for localhost
458            encodings.
459         */
460         int remote_backoff = 0;
461         
462         while (1) {
463
464                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
465                 boost::mutex::scoped_lock lock (_worker_mutex);
466                 while (_encode_queue.empty () && !_terminate_encoder) {
467                         _worker_condition.wait (lock);
468                 }
469
470                 if (_terminate_encoder) {
471                         return;
472                 }
473
474                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
475                 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
476                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
477                 _encode_queue.pop_front ();
478                 
479                 lock.unlock ();
480
481                 shared_ptr<EncodedData> encoded;
482
483                 if (server) {
484                         try {
485                                 encoded = vf->encode_remotely (server);
486
487                                 if (remote_backoff > 0) {
488                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
489                                 }
490                                 
491                                 /* This job succeeded, so remove any backoff */
492                                 remote_backoff = 0;
493                                 
494                         } catch (std::exception& e) {
495                                 if (remote_backoff < 60) {
496                                         /* back off more */
497                                         remote_backoff += 10;
498                                 }
499                                 _film->log()->log (
500                                         String::compose (
501                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
502                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
503                                         );
504                         }
505                                 
506                 } else {
507                         try {
508                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
509                                 encoded = vf->encode_locally ();
510                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
511                         } catch (std::exception& e) {
512                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
513                         }
514                 }
515
516                 if (encoded) {
517                         boost::mutex::scoped_lock lock2 (_writer_mutex);
518                         _write_queue.push_back (make_pair (encoded, vf->frame ()));
519                         _writer_condition.notify_all ();
520                 } else {
521                         lock.lock ();
522                         _film->log()->log (
523                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
524                                 );
525                         _encode_queue.push_front (vf);
526                         lock.unlock ();
527                 }
528
529                 if (remote_backoff > 0) {
530                         dvdomatic_sleep (remote_backoff);
531                 }
532
533                 lock.lock ();
534                 _worker_condition.notify_all ();
535         }
536 }
537
538 void
539 Encoder::link (string a, string b) const
540 {
541 #ifdef DVDOMATIC_POSIX                  
542         int const r = symlink (a.c_str(), b.c_str());
543         if (r) {
544                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
545         }
546 #endif
547         
548 #ifdef DVDOMATIC_WINDOWS
549         boost::filesystem::copy_file (a, b);
550 #endif                  
551 }
552
553 void
554 Encoder::writer_thread ()
555 {
556         while (1)
557         {
558                 boost::mutex::scoped_lock lock (_writer_mutex);
559                 TIMING ("writer sleeps with a queue of %1", _write_queue.size());
560                 while (_write_queue.empty() && !_terminate_writer) {
561                         _writer_condition.wait (lock);
562                 }
563                 TIMING ("writer wakes with a queue of %1", _write_queue.size());
564
565                 if (_terminate_writer) {
566                         return;
567                 }
568
569                 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
570                 _write_queue.pop_front ();
571
572                 lock.unlock ();
573                 encoded.first->write (_opt, encoded.second);
574                 lock.lock ();
575         }
576 }