Missed offset for multi-reel.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include "encoder.h"
28 #include "util.h"
29 #include "options.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "format.h"
38 #include "cross.h"
39
40 using std::pair;
41 using std::string;
42 using std::stringstream;
43 using std::vector;
44 using std::list;
45 using std::cout;
46 using std::make_pair;
47 using namespace boost;
48
49 int const Encoder::_history_size = 25;
50
51 /** @param f Film that we are encoding.
52  *  @param o Options.
53  */
54 Encoder::Encoder (shared_ptr<const Film> f)
55         : _film (f)
56         , _just_skipped (false)
57         , _video_frames_in (0)
58         , _audio_frames_in (0)
59         , _video_frames_out (0)
60         , _audio_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE    
62         , _swr_context (0)
63 #endif    
64         , _process_end (false)
65 {
66         if (_film->audio_stream()) {
67                 /* Create sound output files with .tmp suffixes; we will rename
68                    them if and when we complete.
69                 */
70                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
71                         SF_INFO sf_info;
72                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
73                         /* We write mono files */
74                         sf_info.channels = 1;
75                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
76                         SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
77                         if (f == 0) {
78                                 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
79                         }
80                         _sound_files.push_back (f);
81                 }
82         }
83 }
84
85 Encoder::~Encoder ()
86 {
87         close_sound_files ();
88         terminate_worker_threads ();
89 }
90
91 void
92 Encoder::process_begin ()
93 {
94         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
95 #ifdef HAVE_SWRESAMPLE
96
97                 stringstream s;
98                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
99                 _film->log()->log (s.str ());
100
101                 /* We will be using planar float data when we call the resampler */
102                 _swr_context = swr_alloc_set_opts (
103                         0,
104                         _film->audio_stream()->channel_layout(),
105                         AV_SAMPLE_FMT_FLTP,
106                         _film->target_audio_sample_rate(),
107                         _film->audio_stream()->channel_layout(),
108                         AV_SAMPLE_FMT_FLTP,
109                         _film->audio_stream()->sample_rate(),
110                         0, 0
111                         );
112                 
113                 swr_init (_swr_context);
114 #else
115                 throw EncodeError ("Cannot resample audio as libswresample is not present");
116 #endif
117         } else {
118 #ifdef HAVE_SWRESAMPLE
119                 _swr_context = 0;
120 #endif          
121         }
122
123         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
124                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
125         }
126
127         vector<ServerDescription*> servers = Config::instance()->servers ();
128
129         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
130                 for (int j = 0; j < (*i)->threads (); ++j) {
131                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
132                 }
133         }
134 }
135
136
137 void
138 Encoder::process_end ()
139 {
140 #if HAVE_SWRESAMPLE     
141         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
142
143                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
144                         
145                 while (1) {
146                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
147
148                         if (frames < 0) {
149                                 throw EncodeError ("could not run sample-rate converter");
150                         }
151
152                         if (frames == 0) {
153                                 break;
154                         }
155
156                         out->set_frames (frames);
157                         write_audio (out);
158                 }
159
160                 swr_free (&_swr_context);
161         }
162 #endif
163
164         if (_film->audio_stream()) {
165                 close_sound_files ();
166                 
167                 /* Rename .wav.tmp files to .wav */
168                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
169                         if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
170                                 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
171                         }
172                         boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
173                 }
174         }
175
176         boost::mutex::scoped_lock lock (_worker_mutex);
177
178         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
179
180         /* Keep waking workers until the queue is empty */
181         while (!_queue.empty ()) {
182                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
183                 _worker_condition.notify_all ();
184                 _worker_condition.wait (lock);
185         }
186
187         lock.unlock ();
188         
189         terminate_worker_threads ();
190
191         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
192
193         /* The following sequence of events can occur in the above code:
194              1. a remote worker takes the last image off the queue
195              2. the loop above terminates
196              3. the remote worker fails to encode the image and puts it back on the queue
197              4. the remote worker is then terminated by terminate_worker_threads
198
199              So just mop up anything left in the queue here.
200         */
201
202         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
203                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
204                 try {
205                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
206                         e->write (_film, (*i)->frame ());
207                         frame_done ();
208                 } catch (std::exception& e) {
209                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
210                 }
211         }
212
213         /* Now do links (or copies on windows) to duplicate frames */
214         for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
215                 link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false));
216                 link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false));
217         }
218 }       
219
220 /** @return an estimate of the current number of frames we are encoding per second,
221  *  or 0 if not known.
222  */
223 float
224 Encoder::current_frames_per_second () const
225 {
226         boost::mutex::scoped_lock lock (_history_mutex);
227         if (int (_time_history.size()) < _history_size) {
228                 return 0;
229         }
230
231         struct timeval now;
232         gettimeofday (&now, 0);
233
234         return _history_size / (seconds (now) - seconds (_time_history.back ()));
235 }
236
237 /** @return true if the last frame to be processed was skipped as it already existed */
238 bool
239 Encoder::skipping () const
240 {
241         boost::mutex::scoped_lock (_history_mutex);
242         return _just_skipped;
243 }
244
245 /** @return Number of video frames that have been sent out */
246 int
247 Encoder::video_frames_out () const
248 {
249         boost::mutex::scoped_lock (_history_mutex);
250         return _video_frames_out;
251 }
252
253 /** Should be called when a frame has been encoded successfully.
254  *  @param n Source frame index.
255  */
256 void
257 Encoder::frame_done ()
258 {
259         boost::mutex::scoped_lock lock (_history_mutex);
260         _just_skipped = false;
261         
262         struct timeval tv;
263         gettimeofday (&tv, 0);
264         _time_history.push_front (tv);
265         if (int (_time_history.size()) > _history_size) {
266                 _time_history.pop_back ();
267         }
268 }
269
270 /** Called by a subclass when it has just skipped the processing
271     of a frame because it has already been done.
272 */
273 void
274 Encoder::frame_skipped ()
275 {
276         boost::mutex::scoped_lock lock (_history_mutex);
277         _just_skipped = true;
278 }
279
280 void
281 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
282 {
283         DCPFrameRate dfr (_film->frames_per_second ());
284         
285         if (dfr.skip && (_video_frames_in % 2)) {
286                 ++_video_frames_in;
287                 return;
288         }
289
290         boost::mutex::scoped_lock lock (_worker_mutex);
291
292         /* Wait until the queue has gone down a bit */
293         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
294                 TIMING ("decoder sleeps with queue of %1", _queue.size());
295                 _worker_condition.wait (lock);
296                 TIMING ("decoder wakes with queue of %1", _queue.size());
297         }
298
299         if (_process_end) {
300                 return;
301         }
302
303         /* Only do the processing if we don't already have a file for this frame */
304         if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
305                 frame_skipped ();
306                 return;
307         }
308
309         if (same && _last_real_frame) {
310                 /* Use the last frame that we encoded.  We need to postpone doing the actual link,
311                    as on windows the link is really a copy and the reference frame might not have
312                    finished encoding yet.
313                 */
314                 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frames_out));
315         } else {
316                 /* Queue this new frame for encoding */
317                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
318                 TIMING ("adding to queue of %1", _queue.size ());
319                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
320                                           new DCPVideoFrame (
321                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
322                                                   _film->subtitle_offset(), _film->subtitle_scale(),
323                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
324                                                   _film->colour_lut(), _film->j2k_bandwidth(),
325                                                   _film->log()
326                                                   )
327                                           ));
328                 
329                 _worker_condition.notify_all ();
330                 _last_real_frame = _video_frames_out;
331         }
332
333         ++_video_frames_in;
334         ++_video_frames_out;
335
336         if (dfr.repeat) {
337                 _links_required.push_back (make_pair (_video_frames_out, _video_frames_out - 1));
338                 ++_video_frames_out;
339         }
340 }
341
342 void
343 Encoder::process_audio (shared_ptr<AudioBuffers> data)
344 {
345 #if HAVE_SWRESAMPLE
346         /* Maybe sample-rate convert */
347         if (_swr_context) {
348
349                 /* Compute the resampled frames count and add 32 for luck */
350                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
351
352                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
353
354                 /* Resample audio */
355                 int const resampled_frames = swr_convert (
356                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
357                         );
358                 
359                 if (resampled_frames < 0) {
360                         throw EncodeError ("could not run sample-rate converter");
361                 }
362
363                 resampled->set_frames (resampled_frames);
364                 
365                 /* And point our variables at the resampled audio */
366                 data = resampled;
367         }
368 #endif
369
370         if (_film->audio_channels() == 1) {
371                 /* We need to switch things around so that the mono channel is on
372                    the centre channel of a 5.1 set (with other channels silent).
373                 */
374
375                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
376                 b->make_silent (libdcp::LEFT);
377                 b->make_silent (libdcp::RIGHT);
378                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
379                 b->make_silent (libdcp::LFE);
380                 b->make_silent (libdcp::LS);
381                 b->make_silent (libdcp::RS);
382
383                 data = b;
384         }
385
386         write_audio (data);
387         
388         _audio_frames_in += data->frames ();
389 }
390
391 void
392 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
393 {
394         for (int i = 0; i < audio->channels(); ++i) {
395                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
396         }
397
398         _audio_frames_out += audio->frames ();
399 }
400
401 void
402 Encoder::close_sound_files ()
403 {
404         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
405                 sf_close (*i);
406         }
407
408         _sound_files.clear ();
409 }       
410
411 void
412 Encoder::terminate_worker_threads ()
413 {
414         boost::mutex::scoped_lock lock (_worker_mutex);
415         _process_end = true;
416         _worker_condition.notify_all ();
417         lock.unlock ();
418
419         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
420                 (*i)->join ();
421                 delete *i;
422         }
423 }
424
425 void
426 Encoder::encoder_thread (ServerDescription* server)
427 {
428         /* Number of seconds that we currently wait between attempts
429            to connect to the server; not relevant for localhost
430            encodings.
431         */
432         int remote_backoff = 0;
433         
434         while (1) {
435
436                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
437                 boost::mutex::scoped_lock lock (_worker_mutex);
438                 while (_queue.empty () && !_process_end) {
439                         _worker_condition.wait (lock);
440                 }
441
442                 if (_process_end) {
443                         return;
444                 }
445
446                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
447                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
448                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
449                 _queue.pop_front ();
450                 
451                 lock.unlock ();
452
453                 shared_ptr<EncodedData> encoded;
454
455                 if (server) {
456                         try {
457                                 encoded = vf->encode_remotely (server);
458
459                                 if (remote_backoff > 0) {
460                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
461                                 }
462                                 
463                                 /* This job succeeded, so remove any backoff */
464                                 remote_backoff = 0;
465                                 
466                         } catch (std::exception& e) {
467                                 if (remote_backoff < 60) {
468                                         /* back off more */
469                                         remote_backoff += 10;
470                                 }
471                                 _film->log()->log (
472                                         String::compose (
473                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
474                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
475                                         );
476                         }
477                                 
478                 } else {
479                         try {
480                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
481                                 encoded = vf->encode_locally ();
482                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
483                         } catch (std::exception& e) {
484                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
485                         }
486                 }
487
488                 if (encoded) {
489                         encoded->write (_film, vf->frame ());
490                         frame_done ();
491                 } else {
492                         lock.lock ();
493                         _film->log()->log (
494                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
495                                 );
496                         _queue.push_front (vf);
497                         lock.unlock ();
498                 }
499
500                 if (remote_backoff > 0) {
501                         dvdomatic_sleep (remote_backoff);
502                 }
503
504                 lock.lock ();
505                 _worker_condition.notify_all ();
506         }
507 }
508
509 void
510 Encoder::link (string a, string b) const
511 {
512 #ifdef DVDOMATIC_POSIX                  
513         int const r = symlink (a.c_str(), b.c_str());
514         if (r) {
515                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
516         }
517 #endif
518         
519 #ifdef DVDOMATIC_WINDOWS
520         boost::filesystem::copy_file (a, b);
521 #endif                  
522 }