8e0d1cd91e5efe6b6843e64c4e2c3f6c6aac136a
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "format.h"
38 #include "cross.h"
39 #include "writer.h"
40 #include "player.h"
41 #include "audio_mapping.h"
42
43 #include "i18n.h"
44
45 using std::pair;
46 using std::string;
47 using std::stringstream;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::make_pair;
52 using boost::shared_ptr;
53 using boost::optional;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f, shared_ptr<Job> j)
59         : _film (f)
60         , _job (j)
61         , _video_frames_in (0)
62         , _video_frames_out (0)
63         , _swr_context (0)
64         , _have_a_real_frame (false)
65         , _terminate (false)
66 {
67         
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73         if (_writer) {
74                 _writer->finish ();
75         }
76 }
77
78 void
79 Encoder::process_begin ()
80 {
81         if (_film->has_audio() && _film->audio_frame_rate() != _film->target_audio_sample_rate()) {
82
83                 stringstream s;
84                 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_frame_rate(), _film->target_audio_sample_rate());
85                 _film->log()->log (s.str ());
86
87                 /* We will be using planar float data when we call the
88                    resampler.  As far as I can see, the audio channel
89                    layout is not necessary for our purposes; it seems
90                    only to be used get the number of channels and
91                    decide if rematrixing is needed.  It won't be, since
92                    input and output layouts are the same.
93                 */
94
95                 _swr_context = swr_alloc_set_opts (
96                         0,
97                         av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
98                         AV_SAMPLE_FMT_FLTP,
99                         _film->target_audio_sample_rate(),
100                         av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
101                         AV_SAMPLE_FMT_FLTP,
102                         _film->audio_frame_rate(),
103                         0, 0
104                         );
105                 
106                 swr_init (_swr_context);
107         } else {
108                 _swr_context = 0;
109         }
110
111         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
112                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
113         }
114
115         vector<ServerDescription*> servers = Config::instance()->servers ();
116
117         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
118                 for (int j = 0; j < (*i)->threads (); ++j) {
119                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
120                 }
121         }
122
123         _writer.reset (new Writer (_film, _job));
124 }
125
126
127 void
128 Encoder::process_end ()
129 {
130         if (_film->has_audio() && _swr_context) {
131
132                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_mapping().dcp_channels(), 256));
133                         
134                 while (1) {
135                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
136
137                         if (frames < 0) {
138                                 throw EncodeError (_("could not run sample-rate converter"));
139                         }
140
141                         if (frames == 0) {
142                                 break;
143                         }
144
145                         out->set_frames (frames);
146                         _writer->write (out);
147                 }
148
149                 swr_free (&_swr_context);
150         }
151
152         boost::mutex::scoped_lock lock (_mutex);
153
154         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
155
156         /* Keep waking workers until the queue is empty */
157         while (!_queue.empty ()) {
158                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
159                 _condition.notify_all ();
160                 _condition.wait (lock);
161         }
162
163         lock.unlock ();
164         
165         terminate_threads ();
166
167         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
168
169         /* The following sequence of events can occur in the above code:
170              1. a remote worker takes the last image off the queue
171              2. the loop above terminates
172              3. the remote worker fails to encode the image and puts it back on the queue
173              4. the remote worker is then terminated by terminate_threads
174
175              So just mop up anything left in the queue here.
176         */
177
178         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
179                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
180                 try {
181                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
182                         frame_done ();
183                 } catch (std::exception& e) {
184                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
185                 }
186         }
187
188         _writer->finish ();
189         _writer.reset ();
190 }       
191
192 /** @return an estimate of the current number of frames we are encoding per second,
193  *  or 0 if not known.
194  */
195 float
196 Encoder::current_encoding_rate () const
197 {
198         boost::mutex::scoped_lock lock (_history_mutex);
199         if (int (_time_history.size()) < _history_size) {
200                 return 0;
201         }
202
203         struct timeval now;
204         gettimeofday (&now, 0);
205
206         return _history_size / (seconds (now) - seconds (_time_history.back ()));
207 }
208
209 /** @return Number of video frames that have been sent out */
210 int
211 Encoder::video_frames_out () const
212 {
213         boost::mutex::scoped_lock (_history_mutex);
214         return _video_frames_out;
215 }
216
217 /** Should be called when a frame has been encoded successfully.
218  *  @param n Source frame index.
219  */
220 void
221 Encoder::frame_done ()
222 {
223         boost::mutex::scoped_lock lock (_history_mutex);
224         
225         struct timeval tv;
226         gettimeofday (&tv, 0);
227         _time_history.push_front (tv);
228         if (int (_time_history.size()) > _history_size) {
229                 _time_history.pop_back ();
230         }
231 }
232
233 void
234 Encoder::process_video (shared_ptr<const Image> image, bool same, shared_ptr<Subtitle> sub)
235 {
236         FrameRateConversion frc (_film->video_frame_rate(), _film->dcp_frame_rate());
237         
238         if (frc.skip && (_video_frames_in % 2)) {
239                 ++_video_frames_in;
240                 return;
241         }
242
243         boost::mutex::scoped_lock lock (_mutex);
244
245         /* Wait until the queue has gone down a bit */
246         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
247                 TIMING ("decoder sleeps with queue of %1", _queue.size());
248                 _condition.wait (lock);
249                 TIMING ("decoder wakes with queue of %1", _queue.size());
250         }
251
252         if (_terminate) {
253                 return;
254         }
255
256         if (_writer->thrown ()) {
257                 _writer->rethrow ();
258         }
259
260         if (_writer->can_fake_write (_video_frames_out)) {
261                 _writer->fake_write (_video_frames_out);
262                 _have_a_real_frame = false;
263                 frame_done ();
264         } else if (same && _have_a_real_frame) {
265                 /* Use the last frame that we encoded. */
266                 _writer->repeat (_video_frames_out);
267                 frame_done ();
268         } else {
269                 /* Queue this new frame for encoding */
270                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
271                 TIMING ("adding to queue of %1", _queue.size ());
272                 _queue.push_back (shared_ptr<DCPVideoFrame> (
273                                           new DCPVideoFrame (
274                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
275                                                   _film->subtitle_offset(), _film->subtitle_scale(),
276                                                   _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
277                                                   _film->colour_lut(), _film->j2k_bandwidth(),
278                                                   _film->log()
279                                                   )
280                                           ));
281                 
282                 _condition.notify_all ();
283                 _have_a_real_frame = true;
284         }
285
286         ++_video_frames_in;
287         ++_video_frames_out;
288
289         if (frc.repeat) {
290                 _writer->repeat (_video_frames_out);
291                 ++_video_frames_out;
292                 frame_done ();
293         }
294 }
295
296 void
297 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
298 {
299         /* Maybe sample-rate convert */
300         if (_swr_context) {
301
302                 /* Compute the resampled frames count and add 32 for luck */
303                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_frame_rate()) + 32;
304
305                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_mapping().dcp_channels(), max_resampled_frames));
306
307                 /* Resample audio */
308                 int const resampled_frames = swr_convert (
309                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
310                         );
311                 
312                 if (resampled_frames < 0) {
313                         throw EncodeError (_("could not run sample-rate converter"));
314                 }
315
316                 resampled->set_frames (resampled_frames);
317                 
318                 /* And point our variables at the resampled audio */
319                 data = resampled;
320         }
321
322         _writer->write (data);
323 }
324
325 void
326 Encoder::terminate_threads ()
327 {
328         boost::mutex::scoped_lock lock (_mutex);
329         _terminate = true;
330         _condition.notify_all ();
331         lock.unlock ();
332
333         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
334                 if ((*i)->joinable ()) {
335                         (*i)->join ();
336                 }
337                 delete *i;
338         }
339 }
340
341 void
342 Encoder::encoder_thread (ServerDescription* server)
343 {
344         /* Number of seconds that we currently wait between attempts
345            to connect to the server; not relevant for localhost
346            encodings.
347         */
348         int remote_backoff = 0;
349         
350         while (1) {
351
352                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
353                 boost::mutex::scoped_lock lock (_mutex);
354                 while (_queue.empty () && !_terminate) {
355                         _condition.wait (lock);
356                 }
357
358                 if (_terminate) {
359                         return;
360                 }
361
362                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
363                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
364                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
365                 _queue.pop_front ();
366                 
367                 lock.unlock ();
368
369                 shared_ptr<EncodedData> encoded;
370
371                 if (server) {
372                         try {
373                                 encoded = vf->encode_remotely (server);
374
375                                 if (remote_backoff > 0) {
376                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
377                                 }
378                                 
379                                 /* This job succeeded, so remove any backoff */
380                                 remote_backoff = 0;
381                                 
382                         } catch (std::exception& e) {
383                                 if (remote_backoff < 60) {
384                                         /* back off more */
385                                         remote_backoff += 10;
386                                 }
387                                 _film->log()->log (
388                                         String::compose (
389                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
390                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
391                                         );
392                         }
393                                 
394                 } else {
395                         try {
396                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
397                                 encoded = vf->encode_locally ();
398                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
399                         } catch (std::exception& e) {
400                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
401                         }
402                 }
403
404                 if (encoded) {
405                         _writer->write (encoded, vf->frame ());
406                         frame_done ();
407                 } else {
408                         lock.lock ();
409                         _film->log()->log (
410                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
411                                 );
412                         _queue.push_front (vf);
413                         lock.unlock ();
414                 }
415
416                 if (remote_backoff > 0) {
417                         dcpomatic_sleep (remote_backoff);
418                 }
419
420                 lock.lock ();
421                 _condition.notify_all ();
422         }
423 }