Missing include.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include "encoder.h"
28 #include "util.h"
29 #include "options.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "cross.h"
38
39 using std::pair;
40 using std::string;
41 using std::stringstream;
42 using std::vector;
43 using std::list;
44 using std::cout;
45 using namespace boost;
46
47 int const Encoder::_history_size = 25;
48
49 /** @param f Film that we are encoding.
50  *  @param o Options.
51  */
52 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
53         : _film (f)
54         , _opt (o)
55         , _just_skipped (false)
56         , _video_frame (0)
57         , _audio_frame (0)
58 #ifdef HAVE_SWRESAMPLE    
59         , _swr_context (0)
60 #endif    
61         , _audio_frames_written (0)
62         , _process_end (false)
63 {
64         if (_film->audio_stream()) {
65                 /* Create sound output files with .tmp suffixes; we will rename
66                    them if and when we complete.
67                 */
68                 for (int i = 0; i < _film->audio_channels(); ++i) {
69                         SF_INFO sf_info;
70                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
71                         /* We write mono files */
72                         sf_info.channels = 1;
73                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
74                         SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
75                         if (f == 0) {
76                                 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
77                         }
78                         _sound_files.push_back (f);
79                 }
80         }
81 }
82
83 Encoder::~Encoder ()
84 {
85         close_sound_files ();
86         terminate_worker_threads ();
87 }
88
89 void
90 Encoder::process_begin ()
91 {
92         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
93 #ifdef HAVE_SWRESAMPLE
94
95                 stringstream s;
96                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
97                 _film->log()->log (s.str ());
98
99                 /* We will be using planar float data when we call the resampler */
100                 _swr_context = swr_alloc_set_opts (
101                         0,
102                         _film->audio_stream()->channel_layout(),
103                         AV_SAMPLE_FMT_FLTP,
104                         _film->target_audio_sample_rate(),
105                         _film->audio_stream()->channel_layout(),
106                         AV_SAMPLE_FMT_FLTP,
107                         _film->audio_stream()->sample_rate(),
108                         0, 0
109                         );
110                 
111                 swr_init (_swr_context);
112 #else
113                 throw EncodeError ("Cannot resample audio as libswresample is not present");
114 #endif
115         } else {
116 #ifdef HAVE_SWRESAMPLE
117                 _swr_context = 0;
118 #endif          
119         }
120
121         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
122                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
123         }
124
125         vector<ServerDescription*> servers = Config::instance()->servers ();
126
127         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
128                 for (int j = 0; j < (*i)->threads (); ++j) {
129                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
130                 }
131         }
132 }
133
134
135 void
136 Encoder::process_end ()
137 {
138 #if HAVE_SWRESAMPLE     
139         if (_film->audio_stream() && _swr_context) {
140
141                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
142                         
143                 while (1) {
144                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
145
146                         if (frames < 0) {
147                                 throw EncodeError ("could not run sample-rate converter");
148                         }
149
150                         if (frames == 0) {
151                                 break;
152                         }
153
154                         out->set_frames (frames);
155                         write_audio (out);
156                 }
157
158                 swr_free (&_swr_context);
159         }
160 #endif
161
162         if (_film->audio_stream()) {
163                 close_sound_files ();
164                 
165                 /* Rename .wav.tmp files to .wav */
166                 for (int i = 0; i < _film->audio_channels(); ++i) {
167                         if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
168                                 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
169                         }
170                         boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
171                 }
172         }
173
174         boost::mutex::scoped_lock lock (_worker_mutex);
175
176         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
177
178         /* Keep waking workers until the queue is empty */
179         while (!_queue.empty ()) {
180                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
181                 _worker_condition.notify_all ();
182                 _worker_condition.wait (lock);
183         }
184
185         lock.unlock ();
186         
187         terminate_worker_threads ();
188
189         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
190
191         /* The following sequence of events can occur in the above code:
192              1. a remote worker takes the last image off the queue
193              2. the loop above terminates
194              3. the remote worker fails to encode the image and puts it back on the queue
195              4. the remote worker is then terminated by terminate_worker_threads
196
197              So just mop up anything left in the queue here.
198         */
199
200         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
201                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
202                 try {
203                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
204                         e->write (_opt, (*i)->frame ());
205                         frame_done ();
206                 } catch (std::exception& e) {
207                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
208                 }
209         }
210 }       
211
212 /** @return an estimate of the current number of frames we are encoding per second,
213  *  or 0 if not known.
214  */
215 float
216 Encoder::current_frames_per_second () const
217 {
218         boost::mutex::scoped_lock lock (_history_mutex);
219         if (int (_time_history.size()) < _history_size) {
220                 return 0;
221         }
222
223         struct timeval now;
224         gettimeofday (&now, 0);
225
226         return _history_size / (seconds (now) - seconds (_time_history.back ()));
227 }
228
229 /** @return true if the last frame to be processed was skipped as it already existed */
230 bool
231 Encoder::skipping () const
232 {
233         boost::mutex::scoped_lock (_history_mutex);
234         return _just_skipped;
235 }
236
237 /** @return Number of video frames that have been received */
238 SourceFrame
239 Encoder::video_frame () const
240 {
241         boost::mutex::scoped_lock (_history_mutex);
242         return _video_frame;
243 }
244
245 /** Should be called when a frame has been encoded successfully.
246  *  @param n Source frame index.
247  */
248 void
249 Encoder::frame_done ()
250 {
251         boost::mutex::scoped_lock lock (_history_mutex);
252         _just_skipped = false;
253         
254         struct timeval tv;
255         gettimeofday (&tv, 0);
256         _time_history.push_front (tv);
257         if (int (_time_history.size()) > _history_size) {
258                 _time_history.pop_back ();
259         }
260 }
261
262 /** Called by a subclass when it has just skipped the processing
263     of a frame because it has already been done.
264 */
265 void
266 Encoder::frame_skipped ()
267 {
268         boost::mutex::scoped_lock lock (_history_mutex);
269         _just_skipped = true;
270 }
271
272 void
273 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
274 {
275         if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
276                 ++_video_frame;
277                 return;
278         }
279
280         if (_opt->video_range) {
281                 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
282                 if (_video_frame < r.first || _video_frame >= r.second) {
283                         ++_video_frame;
284                         return;
285                 }
286         }
287
288         boost::mutex::scoped_lock lock (_worker_mutex);
289
290         /* Wait until the queue has gone down a bit */
291         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
292                 TIMING ("decoder sleeps with queue of %1", _queue.size());
293                 _worker_condition.wait (lock);
294                 TIMING ("decoder wakes with queue of %1", _queue.size());
295         }
296
297         if (_process_end) {
298                 return;
299         }
300
301         /* Only do the processing if we don't already have a file for this frame */
302         if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
303                 frame_skipped ();
304                 return;
305         }
306
307         if (same) {
308                 /* Use the last frame that we encoded */
309                 assert (_last_real_frame);
310                 link (_opt->frame_out_path (_last_real_frame.get(), false), _opt->frame_out_path (_video_frame, false));
311                 link (_opt->hash_out_path (_last_real_frame.get(), false), _opt->hash_out_path (_video_frame, false));
312         } else {
313                 /* Queue this new frame for encoding */
314                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
315                 TIMING ("adding to queue of %1", _queue.size ());
316                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
317                                           new DCPVideoFrame (
318                                                   image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
319                                                   _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
320                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
321                                                   _film->log()
322                                                   )
323                                           ));
324                 
325                 _worker_condition.notify_all ();
326                 _last_real_frame = _video_frame;
327         }
328
329         ++_video_frame;
330 }
331
332 void
333 Encoder::process_audio (shared_ptr<AudioBuffers> data)
334 {
335         if (_opt->audio_range) {
336                 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
337                 
338                 /* Range that we are encoding */
339                 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
340                 /* Range of this block of data */
341                 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
342
343                 if (this_range.second < required_range.first || required_range.second < this_range.first) {
344                         /* No part of this audio is within the required range */
345                         return;
346                 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
347                         /* Trim start */
348                         int64_t const shift = required_range.first - this_range.first;
349                         trimmed->move (shift, 0, trimmed->frames() - shift);
350                         trimmed->set_frames (trimmed->frames() - shift);
351                 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
352                         /* Trim end */
353                         trimmed->set_frames (required_range.second - this_range.first);
354                 }
355
356                 data = trimmed;
357         }
358
359 #if HAVE_SWRESAMPLE
360         /* Maybe sample-rate convert */
361         if (_swr_context) {
362
363                 /* Compute the resampled frames count and add 32 for luck */
364                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
365
366                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
367
368                 /* Resample audio */
369                 int const resampled_frames = swr_convert (
370                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
371                         );
372                 
373                 if (resampled_frames < 0) {
374                         throw EncodeError ("could not run sample-rate converter");
375                 }
376
377                 resampled->set_frames (resampled_frames);
378                 
379                 /* And point our variables at the resampled audio */
380                 data = resampled;
381         }
382 #endif
383
384         write_audio (data);
385         
386         _audio_frame += data->frames ();
387 }
388
389 void
390 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
391 {
392         for (int i = 0; i < _film->audio_channels(); ++i) {
393                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
394         }
395
396         _audio_frames_written += audio->frames ();
397 }
398
399 void
400 Encoder::close_sound_files ()
401 {
402         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
403                 sf_close (*i);
404         }
405
406         _sound_files.clear ();
407 }       
408
409 void
410 Encoder::terminate_worker_threads ()
411 {
412         boost::mutex::scoped_lock lock (_worker_mutex);
413         _process_end = true;
414         _worker_condition.notify_all ();
415         lock.unlock ();
416
417         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
418                 (*i)->join ();
419                 delete *i;
420         }
421 }
422
423 void
424 Encoder::encoder_thread (ServerDescription* server)
425 {
426         /* Number of seconds that we currently wait between attempts
427            to connect to the server; not relevant for localhost
428            encodings.
429         */
430         int remote_backoff = 0;
431         
432         while (1) {
433
434                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
435                 boost::mutex::scoped_lock lock (_worker_mutex);
436                 while (_queue.empty () && !_process_end) {
437                         _worker_condition.wait (lock);
438                 }
439
440                 if (_process_end) {
441                         return;
442                 }
443
444                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
445                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
446                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
447                 _queue.pop_front ();
448                 
449                 lock.unlock ();
450
451                 shared_ptr<EncodedData> encoded;
452
453                 if (server) {
454                         try {
455                                 encoded = vf->encode_remotely (server);
456
457                                 if (remote_backoff > 0) {
458                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
459                                 }
460                                 
461                                 /* This job succeeded, so remove any backoff */
462                                 remote_backoff = 0;
463                                 
464                         } catch (std::exception& e) {
465                                 if (remote_backoff < 60) {
466                                         /* back off more */
467                                         remote_backoff += 10;
468                                 }
469                                 _film->log()->log (
470                                         String::compose (
471                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
472                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
473                                         );
474                         }
475                                 
476                 } else {
477                         try {
478                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
479                                 encoded = vf->encode_locally ();
480                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
481                         } catch (std::exception& e) {
482                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
483                         }
484                 }
485
486                 if (encoded) {
487                         encoded->write (_opt, vf->frame ());
488                         frame_done ();
489                 } else {
490                         lock.lock ();
491                         _film->log()->log (
492                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
493                                 );
494                         _queue.push_front (vf);
495                         lock.unlock ();
496                 }
497
498                 if (remote_backoff > 0) {
499                         dvdomatic_sleep (remote_backoff);
500                 }
501
502                 lock.lock ();
503                 _worker_condition.notify_all ();
504         }
505 }
506
507 void
508 Encoder::link (string a, string b) const
509 {
510 #ifdef DVDOMATIC_POSIX                  
511         int const r = symlink (a.c_str(), b.c_str());
512         if (r) {
513                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
514         }
515 #endif
516         
517 #ifdef DVDOMATIC_WINDOWS
518         boost::filesystem::copy_file (a, b);
519 #endif                  
520 }