Try to put mono sound tracks on the centre speaker.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include "encoder.h"
28 #include "util.h"
29 #include "options.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "cross.h"
38
39 using std::pair;
40 using std::string;
41 using std::stringstream;
42 using std::vector;
43 using std::list;
44 using std::cout;
45 using namespace boost;
46
47 int const Encoder::_history_size = 25;
48
49 /** @param f Film that we are encoding.
50  *  @param o Options.
51  */
52 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
53         : _film (f)
54         , _opt (o)
55         , _just_skipped (false)
56         , _video_frame (0)
57         , _audio_frame (0)
58 #ifdef HAVE_SWRESAMPLE    
59         , _swr_context (0)
60 #endif    
61         , _audio_frames_written (0)
62         , _process_end (false)
63 {
64         if (_film->audio_stream()) {
65                 /* Create sound output files with .tmp suffixes; we will rename
66                    them if and when we complete.
67                 */
68                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
69                         SF_INFO sf_info;
70                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
71                         /* We write mono files */
72                         sf_info.channels = 1;
73                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
74                         SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
75                         if (f == 0) {
76                                 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
77                         }
78                         _sound_files.push_back (f);
79                 }
80         }
81 }
82
83 Encoder::~Encoder ()
84 {
85         close_sound_files ();
86         terminate_worker_threads ();
87 }
88
89 void
90 Encoder::process_begin ()
91 {
92         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
93 #ifdef HAVE_SWRESAMPLE
94
95                 stringstream s;
96                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
97                 _film->log()->log (s.str ());
98
99                 /* We will be using planar float data when we call the resampler */
100                 _swr_context = swr_alloc_set_opts (
101                         0,
102                         _film->audio_stream()->channel_layout(),
103                         AV_SAMPLE_FMT_FLTP,
104                         _film->target_audio_sample_rate(),
105                         _film->audio_stream()->channel_layout(),
106                         AV_SAMPLE_FMT_FLTP,
107                         _film->audio_stream()->sample_rate(),
108                         0, 0
109                         );
110                 
111                 swr_init (_swr_context);
112 #else
113                 throw EncodeError ("Cannot resample audio as libswresample is not present");
114 #endif
115         } else {
116 #ifdef HAVE_SWRESAMPLE
117                 _swr_context = 0;
118 #endif          
119         }
120
121         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
122                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
123         }
124
125         vector<ServerDescription*> servers = Config::instance()->servers ();
126
127         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
128                 for (int j = 0; j < (*i)->threads (); ++j) {
129                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
130                 }
131         }
132 }
133
134
135 void
136 Encoder::process_end ()
137 {
138 #if HAVE_SWRESAMPLE     
139         if (_film->audio_stream() && _swr_context) {
140
141                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
142                         
143                 while (1) {
144                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
145
146                         if (frames < 0) {
147                                 throw EncodeError ("could not run sample-rate converter");
148                         }
149
150                         if (frames == 0) {
151                                 break;
152                         }
153
154                         out->set_frames (frames);
155                         write_audio (out);
156                 }
157
158                 swr_free (&_swr_context);
159         }
160 #endif
161
162         if (_film->audio_stream()) {
163                 close_sound_files ();
164                 
165                 /* Rename .wav.tmp files to .wav */
166                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
167                         if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
168                                 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
169                         }
170                         boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
171                 }
172         }
173
174         boost::mutex::scoped_lock lock (_worker_mutex);
175
176         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
177
178         /* Keep waking workers until the queue is empty */
179         while (!_queue.empty ()) {
180                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
181                 _worker_condition.notify_all ();
182                 _worker_condition.wait (lock);
183         }
184
185         lock.unlock ();
186         
187         terminate_worker_threads ();
188
189         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
190
191         /* The following sequence of events can occur in the above code:
192              1. a remote worker takes the last image off the queue
193              2. the loop above terminates
194              3. the remote worker fails to encode the image and puts it back on the queue
195              4. the remote worker is then terminated by terminate_worker_threads
196
197              So just mop up anything left in the queue here.
198         */
199
200         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
201                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
202                 try {
203                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
204                         e->write (_opt, (*i)->frame ());
205                         frame_done ();
206                 } catch (std::exception& e) {
207                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
208                 }
209         }
210 }       
211
212 /** @return an estimate of the current number of frames we are encoding per second,
213  *  or 0 if not known.
214  */
215 float
216 Encoder::current_frames_per_second () const
217 {
218         boost::mutex::scoped_lock lock (_history_mutex);
219         if (int (_time_history.size()) < _history_size) {
220                 return 0;
221         }
222
223         struct timeval now;
224         gettimeofday (&now, 0);
225
226         return _history_size / (seconds (now) - seconds (_time_history.back ()));
227 }
228
229 /** @return true if the last frame to be processed was skipped as it already existed */
230 bool
231 Encoder::skipping () const
232 {
233         boost::mutex::scoped_lock (_history_mutex);
234         return _just_skipped;
235 }
236
237 /** @return Number of video frames that have been received */
238 SourceFrame
239 Encoder::video_frame () const
240 {
241         boost::mutex::scoped_lock (_history_mutex);
242         return _video_frame;
243 }
244
245 /** Should be called when a frame has been encoded successfully.
246  *  @param n Source frame index.
247  */
248 void
249 Encoder::frame_done ()
250 {
251         boost::mutex::scoped_lock lock (_history_mutex);
252         _just_skipped = false;
253         
254         struct timeval tv;
255         gettimeofday (&tv, 0);
256         _time_history.push_front (tv);
257         if (int (_time_history.size()) > _history_size) {
258                 _time_history.pop_back ();
259         }
260 }
261
262 /** Called by a subclass when it has just skipped the processing
263     of a frame because it has already been done.
264 */
265 void
266 Encoder::frame_skipped ()
267 {
268         boost::mutex::scoped_lock lock (_history_mutex);
269         _just_skipped = true;
270 }
271
272 void
273 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
274 {
275         if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
276                 ++_video_frame;
277                 return;
278         }
279
280         if (_opt->video_range) {
281                 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
282                 if (_video_frame < r.first || _video_frame >= r.second) {
283                         ++_video_frame;
284                         return;
285                 }
286         }
287
288         boost::mutex::scoped_lock lock (_worker_mutex);
289
290         /* Wait until the queue has gone down a bit */
291         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
292                 TIMING ("decoder sleeps with queue of %1", _queue.size());
293                 _worker_condition.wait (lock);
294                 TIMING ("decoder wakes with queue of %1", _queue.size());
295         }
296
297         if (_process_end) {
298                 return;
299         }
300
301         /* Only do the processing if we don't already have a file for this frame */
302         if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
303                 frame_skipped ();
304                 return;
305         }
306
307         if (same && _last_real_frame) {
308                 /* Use the last frame that we encoded */
309                 link (_opt->frame_out_path (_last_real_frame.get(), false), _opt->frame_out_path (_video_frame, false));
310                 link (_opt->hash_out_path (_last_real_frame.get(), false), _opt->hash_out_path (_video_frame, false));
311         } else {
312                 /* Queue this new frame for encoding */
313                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
314                 TIMING ("adding to queue of %1", _queue.size ());
315                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
316                                           new DCPVideoFrame (
317                                                   image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
318                                                   _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
319                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
320                                                   _film->log()
321                                                   )
322                                           ));
323                 
324                 _worker_condition.notify_all ();
325                 _last_real_frame = _video_frame;
326         }
327
328         ++_video_frame;
329 }
330
331 void
332 Encoder::process_audio (shared_ptr<AudioBuffers> data)
333 {
334         if (_opt->audio_range) {
335                 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
336                 
337                 /* Range that we are encoding */
338                 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
339                 /* Range of this block of data */
340                 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
341
342                 if (this_range.second < required_range.first || required_range.second < this_range.first) {
343                         /* No part of this audio is within the required range */
344                         return;
345                 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
346                         /* Trim start */
347                         int64_t const shift = required_range.first - this_range.first;
348                         trimmed->move (shift, 0, trimmed->frames() - shift);
349                         trimmed->set_frames (trimmed->frames() - shift);
350                 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
351                         /* Trim end */
352                         trimmed->set_frames (required_range.second - this_range.first);
353                 }
354
355                 data = trimmed;
356         }
357
358 #if HAVE_SWRESAMPLE
359         /* Maybe sample-rate convert */
360         if (_swr_context) {
361
362                 /* Compute the resampled frames count and add 32 for luck */
363                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
364
365                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
366
367                 /* Resample audio */
368                 int const resampled_frames = swr_convert (
369                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
370                         );
371                 
372                 if (resampled_frames < 0) {
373                         throw EncodeError ("could not run sample-rate converter");
374                 }
375
376                 resampled->set_frames (resampled_frames);
377                 
378                 /* And point our variables at the resampled audio */
379                 data = resampled;
380         }
381 #endif
382
383         if (_film->audio_channels() == 1) {
384                 /* We need to switch things around so that the mono channel is on
385                    the centre channel of a 5.1 set (with other channels silent).
386                 */
387
388                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
389                 b->make_silent (libdcp::LEFT);
390                 b->make_silent (libdcp::RIGHT);
391                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
392                 b->make_silent (libdcp::LFE);
393                 b->make_silent (libdcp::LS);
394                 b->make_silent (libdcp::RS);
395         }
396
397         write_audio (data);
398         
399         _audio_frame += data->frames ();
400 }
401
402 void
403 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
404 {
405         for (int i = 0; i < audio->channels(); ++i) {
406                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
407         }
408
409         _audio_frames_written += audio->frames ();
410 }
411
412 void
413 Encoder::close_sound_files ()
414 {
415         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
416                 sf_close (*i);
417         }
418
419         _sound_files.clear ();
420 }       
421
422 void
423 Encoder::terminate_worker_threads ()
424 {
425         boost::mutex::scoped_lock lock (_worker_mutex);
426         _process_end = true;
427         _worker_condition.notify_all ();
428         lock.unlock ();
429
430         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
431                 (*i)->join ();
432                 delete *i;
433         }
434 }
435
436 void
437 Encoder::encoder_thread (ServerDescription* server)
438 {
439         /* Number of seconds that we currently wait between attempts
440            to connect to the server; not relevant for localhost
441            encodings.
442         */
443         int remote_backoff = 0;
444         
445         while (1) {
446
447                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
448                 boost::mutex::scoped_lock lock (_worker_mutex);
449                 while (_queue.empty () && !_process_end) {
450                         _worker_condition.wait (lock);
451                 }
452
453                 if (_process_end) {
454                         return;
455                 }
456
457                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
458                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
459                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
460                 _queue.pop_front ();
461                 
462                 lock.unlock ();
463
464                 shared_ptr<EncodedData> encoded;
465
466                 if (server) {
467                         try {
468                                 encoded = vf->encode_remotely (server);
469
470                                 if (remote_backoff > 0) {
471                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
472                                 }
473                                 
474                                 /* This job succeeded, so remove any backoff */
475                                 remote_backoff = 0;
476                                 
477                         } catch (std::exception& e) {
478                                 if (remote_backoff < 60) {
479                                         /* back off more */
480                                         remote_backoff += 10;
481                                 }
482                                 _film->log()->log (
483                                         String::compose (
484                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
485                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
486                                         );
487                         }
488                                 
489                 } else {
490                         try {
491                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
492                                 encoded = vf->encode_locally ();
493                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
494                         } catch (std::exception& e) {
495                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
496                         }
497                 }
498
499                 if (encoded) {
500                         encoded->write (_opt, vf->frame ());
501                         frame_done ();
502                 } else {
503                         lock.lock ();
504                         _film->log()->log (
505                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
506                                 );
507                         _queue.push_front (vf);
508                         lock.unlock ();
509                 }
510
511                 if (remote_backoff > 0) {
512                         dvdomatic_sleep (remote_backoff);
513                 }
514
515                 lock.lock ();
516                 _worker_condition.notify_all ();
517         }
518 }
519
520 void
521 Encoder::link (string a, string b) const
522 {
523 #ifdef DVDOMATIC_POSIX                  
524         int const r = symlink (a.c_str(), b.c_str());
525         if (r) {
526                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
527         }
528 #endif
529         
530 #ifdef DVDOMATIC_WINDOWS
531         boost::filesystem::copy_file (a, b);
532 #endif                  
533 }