Re-work again so that there's just one encoder; various tweaks to still-image-with...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <boost/filesystem.hpp>
25 #include <boost/lexical_cast.hpp>
26 #include "encoder.h"
27 #include "util.h"
28 #include "options.h"
29 #include "film.h"
30 #include "log.h"
31 #include "exceptions.h"
32 #include "filter.h"
33 #include "config.h"
34 #include "dcp_video_frame.h"
35 #include "server.h"
36 #include "cross.h"
37
38 using std::pair;
39 using std::string;
40 using std::stringstream;
41 using std::vector;
42 using std::list;
43 using std::cout;
44 using namespace boost;
45
46 int const Encoder::_history_size = 25;
47
48 /** @param f Film that we are encoding.
49  *  @param o Options.
50  */
51 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
52         : _film (f)
53         , _opt (o)
54         , _just_skipped (false)
55         , _video_frame (0)
56         , _audio_frame (0)
57 #ifdef HAVE_SWRESAMPLE    
58         , _swr_context (0)
59 #endif    
60         , _audio_frames_written (0)
61         , _process_end (false)
62 {
63         if (_film->audio_stream()) {
64                 /* Create sound output files with .tmp suffixes; we will rename
65                    them if and when we complete.
66                 */
67                 for (int i = 0; i < _film->audio_channels(); ++i) {
68                         SF_INFO sf_info;
69                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
70                         /* We write mono files */
71                         sf_info.channels = 1;
72                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
73                         SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
74                         if (f == 0) {
75                                 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
76                         }
77                         _sound_files.push_back (f);
78                 }
79         }
80 }
81
82 Encoder::~Encoder ()
83 {
84         close_sound_files ();
85         terminate_worker_threads ();
86 }
87
88 void
89 Encoder::process_begin ()
90 {
91         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
92 #ifdef HAVE_SWRESAMPLE
93
94                 stringstream s;
95                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
96                 _film->log()->log (s.str ());
97
98                 /* We will be using planar float data when we call the resampler */
99                 _swr_context = swr_alloc_set_opts (
100                         0,
101                         _film->audio_stream()->channel_layout(),
102                         AV_SAMPLE_FMT_FLTP,
103                         _film->target_audio_sample_rate(),
104                         _film->audio_stream()->channel_layout(),
105                         AV_SAMPLE_FMT_FLTP,
106                         _film->audio_stream()->sample_rate(),
107                         0, 0
108                         );
109                 
110                 swr_init (_swr_context);
111 #else
112                 throw EncodeError ("Cannot resample audio as libswresample is not present");
113 #endif
114         } else {
115 #ifdef HAVE_SWRESAMPLE
116                 _swr_context = 0;
117 #endif          
118         }
119
120         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
121                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
122         }
123
124         vector<ServerDescription*> servers = Config::instance()->servers ();
125
126         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
127                 for (int j = 0; j < (*i)->threads (); ++j) {
128                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
129                 }
130         }
131 }
132
133
134 void
135 Encoder::process_end ()
136 {
137 #if HAVE_SWRESAMPLE     
138         if (_film->audio_stream() && _swr_context) {
139
140                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
141                         
142                 while (1) {
143                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
144
145                         if (frames < 0) {
146                                 throw EncodeError ("could not run sample-rate converter");
147                         }
148
149                         if (frames == 0) {
150                                 break;
151                         }
152
153                         out->set_frames (frames);
154                         write_audio (out);
155                 }
156
157                 swr_free (&_swr_context);
158         }
159 #endif
160
161         if (_film->audio_stream()) {
162                 close_sound_files ();
163                 
164                 /* Rename .wav.tmp files to .wav */
165                 for (int i = 0; i < _film->audio_channels(); ++i) {
166                         if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
167                                 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
168                         }
169                         boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
170                 }
171         }
172
173         boost::mutex::scoped_lock lock (_worker_mutex);
174
175         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
176
177         /* Keep waking workers until the queue is empty */
178         while (!_queue.empty ()) {
179                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
180                 _worker_condition.notify_all ();
181                 _worker_condition.wait (lock);
182         }
183
184         lock.unlock ();
185         
186         terminate_worker_threads ();
187
188         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
189
190         /* The following sequence of events can occur in the above code:
191              1. a remote worker takes the last image off the queue
192              2. the loop above terminates
193              3. the remote worker fails to encode the image and puts it back on the queue
194              4. the remote worker is then terminated by terminate_worker_threads
195
196              So just mop up anything left in the queue here.
197         */
198
199         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
200                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
201                 try {
202                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
203                         e->write (_opt, (*i)->frame ());
204                         frame_done ();
205                 } catch (std::exception& e) {
206                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
207                 }
208         }
209 }       
210
211 /** @return an estimate of the current number of frames we are encoding per second,
212  *  or 0 if not known.
213  */
214 float
215 Encoder::current_frames_per_second () const
216 {
217         boost::mutex::scoped_lock lock (_history_mutex);
218         if (int (_time_history.size()) < _history_size) {
219                 return 0;
220         }
221
222         struct timeval now;
223         gettimeofday (&now, 0);
224
225         return _history_size / (seconds (now) - seconds (_time_history.back ()));
226 }
227
228 /** @return true if the last frame to be processed was skipped as it already existed */
229 bool
230 Encoder::skipping () const
231 {
232         boost::mutex::scoped_lock (_history_mutex);
233         return _just_skipped;
234 }
235
236 /** @return Number of video frames that have been received */
237 SourceFrame
238 Encoder::video_frame () const
239 {
240         boost::mutex::scoped_lock (_history_mutex);
241         return _video_frame;
242 }
243
244 /** Should be called when a frame has been encoded successfully.
245  *  @param n Source frame index.
246  */
247 void
248 Encoder::frame_done ()
249 {
250         boost::mutex::scoped_lock lock (_history_mutex);
251         _just_skipped = false;
252         
253         struct timeval tv;
254         gettimeofday (&tv, 0);
255         _time_history.push_front (tv);
256         if (int (_time_history.size()) > _history_size) {
257                 _time_history.pop_back ();
258         }
259 }
260
261 /** Called by a subclass when it has just skipped the processing
262     of a frame because it has already been done.
263 */
264 void
265 Encoder::frame_skipped ()
266 {
267         boost::mutex::scoped_lock lock (_history_mutex);
268         _just_skipped = true;
269 }
270
271 void
272 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
273 {
274         if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
275                 ++_video_frame;
276                 return;
277         }
278
279         if (_opt->video_range) {
280                 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
281                 if (_video_frame < r.first || _video_frame >= r.second) {
282                         ++_video_frame;
283                         return;
284                 }
285         }
286
287         boost::mutex::scoped_lock lock (_worker_mutex);
288
289         /* Wait until the queue has gone down a bit */
290         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
291                 TIMING ("decoder sleeps with queue of %1", _queue.size());
292                 _worker_condition.wait (lock);
293                 TIMING ("decoder wakes with queue of %1", _queue.size());
294         }
295
296         if (_process_end) {
297                 return;
298         }
299
300         /* Only do the processing if we don't already have a file for this frame */
301         if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
302                 frame_skipped ();
303                 return;
304         }
305
306         if (same) {
307                 /* Use the last frame that we encoded */
308                 assert (_last_real_frame);
309                 link (_opt->frame_out_path (_last_real_frame.get(), false), _opt->frame_out_path (_video_frame, false));
310                 link (_opt->hash_out_path (_last_real_frame.get(), false), _opt->hash_out_path (_video_frame, false));
311         } else {
312                 /* Queue this new frame for encoding */
313                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
314                 TIMING ("adding to queue of %1", _queue.size ());
315                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
316                                           new DCPVideoFrame (
317                                                   image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
318                                                   _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
319                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
320                                                   _film->log()
321                                                   )
322                                           ));
323                 
324                 _worker_condition.notify_all ();
325                 _last_real_frame = _video_frame;
326         }
327
328         ++_video_frame;
329 }
330
331 void
332 Encoder::process_audio (shared_ptr<AudioBuffers> data)
333 {
334         if (_opt->audio_range) {
335                 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
336                 
337                 /* Range that we are encoding */
338                 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
339                 /* Range of this block of data */
340                 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
341
342                 if (this_range.second < required_range.first || required_range.second < this_range.first) {
343                         /* No part of this audio is within the required range */
344                         return;
345                 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
346                         /* Trim start */
347                         int64_t const shift = required_range.first - this_range.first;
348                         trimmed->move (shift, 0, trimmed->frames() - shift);
349                         trimmed->set_frames (trimmed->frames() - shift);
350                 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
351                         /* Trim end */
352                         trimmed->set_frames (required_range.second - this_range.first);
353                 }
354
355                 data = trimmed;
356         }
357
358 #if HAVE_SWRESAMPLE
359         /* Maybe sample-rate convert */
360         if (_swr_context) {
361
362                 /* Compute the resampled frames count and add 32 for luck */
363                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
364
365                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
366
367                 /* Resample audio */
368                 int const resampled_frames = swr_convert (
369                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
370                         );
371                 
372                 if (resampled_frames < 0) {
373                         throw EncodeError ("could not run sample-rate converter");
374                 }
375
376                 resampled->set_frames (resampled_frames);
377                 
378                 /* And point our variables at the resampled audio */
379                 data = resampled;
380         }
381 #endif
382
383         write_audio (data);
384         
385         _audio_frame += data->frames ();
386 }
387
388 void
389 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
390 {
391         for (int i = 0; i < _film->audio_channels(); ++i) {
392                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
393         }
394
395         _audio_frames_written += audio->frames ();
396 }
397
398 void
399 Encoder::close_sound_files ()
400 {
401         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
402                 sf_close (*i);
403         }
404
405         _sound_files.clear ();
406 }       
407
408 void
409 Encoder::terminate_worker_threads ()
410 {
411         boost::mutex::scoped_lock lock (_worker_mutex);
412         _process_end = true;
413         _worker_condition.notify_all ();
414         lock.unlock ();
415
416         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
417                 (*i)->join ();
418                 delete *i;
419         }
420 }
421
422 void
423 Encoder::encoder_thread (ServerDescription* server)
424 {
425         /* Number of seconds that we currently wait between attempts
426            to connect to the server; not relevant for localhost
427            encodings.
428         */
429         int remote_backoff = 0;
430         
431         while (1) {
432
433                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
434                 boost::mutex::scoped_lock lock (_worker_mutex);
435                 while (_queue.empty () && !_process_end) {
436                         _worker_condition.wait (lock);
437                 }
438
439                 if (_process_end) {
440                         return;
441                 }
442
443                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
444                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
445                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
446                 _queue.pop_front ();
447                 
448                 lock.unlock ();
449
450                 shared_ptr<EncodedData> encoded;
451
452                 if (server) {
453                         try {
454                                 encoded = vf->encode_remotely (server);
455
456                                 if (remote_backoff > 0) {
457                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
458                                 }
459                                 
460                                 /* This job succeeded, so remove any backoff */
461                                 remote_backoff = 0;
462                                 
463                         } catch (std::exception& e) {
464                                 if (remote_backoff < 60) {
465                                         /* back off more */
466                                         remote_backoff += 10;
467                                 }
468                                 _film->log()->log (
469                                         String::compose (
470                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
471                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
472                                         );
473                         }
474                                 
475                 } else {
476                         try {
477                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
478                                 encoded = vf->encode_locally ();
479                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
480                         } catch (std::exception& e) {
481                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
482                         }
483                 }
484
485                 if (encoded) {
486                         encoded->write (_opt, vf->frame ());
487                         frame_done ();
488                 } else {
489                         lock.lock ();
490                         _film->log()->log (
491                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
492                                 );
493                         _queue.push_front (vf);
494                         lock.unlock ();
495                 }
496
497                 if (remote_backoff > 0) {
498                         dvdomatic_sleep (remote_backoff);
499                 }
500
501                 lock.lock ();
502                 _worker_condition.notify_all ();
503         }
504 }
505
506 void
507 Encoder::link (string a, string b) const
508 {
509 #ifdef DVDOMATIC_POSIX                  
510         int const r = symlink (a.c_str(), b.c_str());
511         if (r) {
512                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
513         }
514 #endif
515         
516 #ifdef DVDOMATIC_WINDOWS
517         boost::filesystem::copy_file (a, b);
518 #endif                  
519 }