First try at doing fake writes.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40 #include "writer.h"
41
42 using std::pair;
43 using std::string;
44 using std::stringstream;
45 using std::vector;
46 using std::list;
47 using std::cout;
48 using std::make_pair;
49 using namespace boost;
50
51 int const Encoder::_history_size = 25;
52
53 /** @param f Film that we are encoding.
54  *  @param o Options.
55  */
56 Encoder::Encoder (shared_ptr<Film> f)
57         : _film (f)
58         , _video_frames_in (0)
59         , _video_frames_out (0)
60 #ifdef HAVE_SWRESAMPLE    
61         , _swr_context (0)
62 #endif
63         , _have_a_real_frame (false)
64         , _terminate_encoder (false)
65 {
66         
67 }
68
69 Encoder::~Encoder ()
70 {
71         terminate_worker_threads ();
72         if (_writer) {
73                 _writer->finish ();
74         }
75 }
76
77 void
78 Encoder::process_begin ()
79 {
80         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
81 #ifdef HAVE_SWRESAMPLE
82
83                 stringstream s;
84                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
85                 _film->log()->log (s.str ());
86
87                 /* We will be using planar float data when we call the resampler */
88                 _swr_context = swr_alloc_set_opts (
89                         0,
90                         _film->audio_stream()->channel_layout(),
91                         AV_SAMPLE_FMT_FLTP,
92                         _film->target_audio_sample_rate(),
93                         _film->audio_stream()->channel_layout(),
94                         AV_SAMPLE_FMT_FLTP,
95                         _film->audio_stream()->sample_rate(),
96                         0, 0
97                         );
98                 
99                 swr_init (_swr_context);
100 #else
101                 throw EncodeError ("Cannot resample audio as libswresample is not present");
102 #endif
103         } else {
104 #ifdef HAVE_SWRESAMPLE
105                 _swr_context = 0;
106 #endif          
107         }
108
109         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
110                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
111         }
112
113         vector<ServerDescription*> servers = Config::instance()->servers ();
114
115         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
116                 for (int j = 0; j < (*i)->threads (); ++j) {
117                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
118                 }
119         }
120
121         _writer.reset (new Writer (_film));
122 }
123
124
125 void
126 Encoder::process_end ()
127 {
128 #if HAVE_SWRESAMPLE     
129         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
130
131                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
132                         
133                 while (1) {
134                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
135
136                         if (frames < 0) {
137                                 throw EncodeError ("could not run sample-rate converter");
138                         }
139
140                         if (frames == 0) {
141                                 break;
142                         }
143
144                         out->set_frames (frames);
145                         _writer->write (out);
146                 }
147
148                 swr_free (&_swr_context);
149         }
150 #endif
151
152         boost::mutex::scoped_lock lock (_worker_mutex);
153
154         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
155
156         /* Keep waking workers until the queue is empty */
157         while (!_encode_queue.empty ()) {
158                 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
159                 _worker_condition.notify_all ();
160                 _worker_condition.wait (lock);
161         }
162
163         lock.unlock ();
164         
165         terminate_worker_threads ();
166
167         _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
168
169         /* The following sequence of events can occur in the above code:
170              1. a remote worker takes the last image off the queue
171              2. the loop above terminates
172              3. the remote worker fails to encode the image and puts it back on the queue
173              4. the remote worker is then terminated by terminate_worker_threads
174
175              So just mop up anything left in the queue here.
176         */
177
178         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
179                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
180                 try {
181                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
182                         frame_done ();
183                 } catch (std::exception& e) {
184                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
185                 }
186         }
187
188         _writer->finish ();
189         _writer.reset ();
190 }       
191
192 /** @return an estimate of the current number of frames we are encoding per second,
193  *  or 0 if not known.
194  */
195 float
196 Encoder::current_frames_per_second () const
197 {
198         boost::mutex::scoped_lock lock (_history_mutex);
199         if (int (_time_history.size()) < _history_size) {
200                 return 0;
201         }
202
203         struct timeval now;
204         gettimeofday (&now, 0);
205
206         return _history_size / (seconds (now) - seconds (_time_history.back ()));
207 }
208
209 /** @return Number of video frames that have been sent out */
210 int
211 Encoder::video_frames_out () const
212 {
213         boost::mutex::scoped_lock (_history_mutex);
214         return _video_frames_out;
215 }
216
217 /** Should be called when a frame has been encoded successfully.
218  *  @param n Source frame index.
219  */
220 void
221 Encoder::frame_done ()
222 {
223         boost::mutex::scoped_lock lock (_history_mutex);
224         
225         struct timeval tv;
226         gettimeofday (&tv, 0);
227         _time_history.push_front (tv);
228         if (int (_time_history.size()) > _history_size) {
229                 _time_history.pop_back ();
230         }
231 }
232
233 void
234 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
235 {
236         DCPFrameRate dfr (_film->frames_per_second ());
237         
238         if (dfr.skip && (_video_frames_in % 2)) {
239                 ++_video_frames_in;
240                 return;
241         }
242
243         boost::mutex::scoped_lock lock (_worker_mutex);
244
245         /* Wait until the queue has gone down a bit */
246         while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
247                 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
248                 _worker_condition.wait (lock);
249                 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
250         }
251
252         if (_terminate_encoder) {
253                 return;
254         }
255
256         if (_writer->can_fake_write (_video_frames_out)) {
257                 _writer->fake_write (_video_frames_out);
258                 _have_a_real_frame = false;
259         } else if (same && _have_a_real_frame) {
260                 /* Use the last frame that we encoded. */
261                 _writer->repeat (_video_frames_out);
262                 frame_done ();
263         } else {
264                 /* Queue this new frame for encoding */
265                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
266                 TIMING ("adding to queue of %1", _encode_queue.size ());
267                 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
268                                           new DCPVideoFrame (
269                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
270                                                   _film->subtitle_offset(), _film->subtitle_scale(),
271                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
272                                                   _film->colour_lut(), _film->j2k_bandwidth(),
273                                                   _film->log()
274                                                   )
275                                           ));
276                 
277                 _worker_condition.notify_all ();
278                 _have_a_real_frame = true;
279         }
280
281         ++_video_frames_in;
282         ++_video_frames_out;
283
284         if (dfr.repeat) {
285                 _writer->repeat (_video_frames_out);
286                 ++_video_frames_out;
287                 frame_done ();
288         }
289 }
290
291 void
292 Encoder::process_audio (shared_ptr<AudioBuffers> data)
293 {
294 #if HAVE_SWRESAMPLE
295         /* Maybe sample-rate convert */
296         if (_swr_context) {
297
298                 /* Compute the resampled frames count and add 32 for luck */
299                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
300
301                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
302
303                 /* Resample audio */
304                 int const resampled_frames = swr_convert (
305                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
306                         );
307                 
308                 if (resampled_frames < 0) {
309                         throw EncodeError ("could not run sample-rate converter");
310                 }
311
312                 resampled->set_frames (resampled_frames);
313                 
314                 /* And point our variables at the resampled audio */
315                 data = resampled;
316         }
317 #endif
318
319         if (_film->audio_channels() == 1) {
320                 /* We need to switch things around so that the mono channel is on
321                    the centre channel of a 5.1 set (with other channels silent).
322                 */
323
324                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
325                 b->make_silent (libdcp::LEFT);
326                 b->make_silent (libdcp::RIGHT);
327                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
328                 b->make_silent (libdcp::LFE);
329                 b->make_silent (libdcp::LS);
330                 b->make_silent (libdcp::RS);
331
332                 data = b;
333         }
334
335         _writer->write (data);
336 }
337
338 void
339 Encoder::terminate_worker_threads ()
340 {
341         boost::mutex::scoped_lock lock (_worker_mutex);
342         _terminate_encoder = true;
343         _worker_condition.notify_all ();
344         lock.unlock ();
345
346         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
347                 (*i)->join ();
348                 delete *i;
349         }
350 }
351
352 void
353 Encoder::encoder_thread (ServerDescription* server)
354 {
355         /* Number of seconds that we currently wait between attempts
356            to connect to the server; not relevant for localhost
357            encodings.
358         */
359         int remote_backoff = 0;
360         
361         while (1) {
362
363                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
364                 boost::mutex::scoped_lock lock (_worker_mutex);
365                 while (_encode_queue.empty () && !_terminate_encoder) {
366                         _worker_condition.wait (lock);
367                 }
368
369                 if (_terminate_encoder) {
370                         return;
371                 }
372
373                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
374                 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
375                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
376                 _encode_queue.pop_front ();
377                 
378                 lock.unlock ();
379
380                 shared_ptr<EncodedData> encoded;
381
382                 if (server) {
383                         try {
384                                 encoded = vf->encode_remotely (server);
385
386                                 if (remote_backoff > 0) {
387                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
388                                 }
389                                 
390                                 /* This job succeeded, so remove any backoff */
391                                 remote_backoff = 0;
392                                 
393                         } catch (std::exception& e) {
394                                 if (remote_backoff < 60) {
395                                         /* back off more */
396                                         remote_backoff += 10;
397                                 }
398                                 _film->log()->log (
399                                         String::compose (
400                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
401                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
402                                         );
403                         }
404                                 
405                 } else {
406                         try {
407                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
408                                 encoded = vf->encode_locally ();
409                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
410                         } catch (std::exception& e) {
411                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
412                         }
413                 }
414
415                 if (encoded) {
416                         _writer->write (encoded, vf->frame ());
417                         frame_done ();
418                 } else {
419                         lock.lock ();
420                         _film->log()->log (
421                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
422                                 );
423                         _encode_queue.push_front (vf);
424                         lock.unlock ();
425                 }
426
427                 if (remote_backoff > 0) {
428                         dvdomatic_sleep (remote_backoff);
429                 }
430
431                 lock.lock ();
432                 _worker_condition.notify_all ();
433         }
434 }