Bump version
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2014 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37 #include "player.h"
38 #include "player_video_frame.h"
39
40 #include "i18n.h"
41
42 using std::pair;
43 using std::string;
44 using std::stringstream;
45 using std::vector;
46 using std::list;
47 using std::cout;
48 using std::min;
49 using std::make_pair;
50 using boost::shared_ptr;
51 using boost::weak_ptr;
52 using boost::optional;
53 using boost::scoped_array;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j)
59         : _film (f)
60         , _job (j)
61         , _video_frames_out (0)
62         , _terminate (false)
63 {
64         _have_a_real_frame[EYES_BOTH] = false;
65         _have_a_real_frame[EYES_LEFT] = false;
66         _have_a_real_frame[EYES_RIGHT] = false;
67 }
68
69 Encoder::~Encoder ()
70 {
71         terminate_threads ();
72 }
73
74 /** Add a worker thread for a each thread on a remote server.  Caller must hold
75  *  a lock on _mutex, or know that one is not currently required to
76  *  safely modify _threads.
77  */
78 void
79 Encoder::add_worker_threads (ServerDescription d)
80 {
81         _film->log()->log (String::compose (N_("Adding %1 worker threads for remote %2"), d.host_name ()));
82         for (int i = 0; i < d.threads(); ++i) {
83                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
84         }
85 }
86
87 void
88 Encoder::process_begin ()
89 {
90         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
91                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
92         }
93
94         _writer.reset (new Writer (_film, _job));
95         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
96 }
97
98 void
99 Encoder::process_end ()
100 {
101         boost::mutex::scoped_lock lock (_mutex);
102
103         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
104
105         /* Keep waking workers until the queue is empty */
106         while (!_queue.empty ()) {
107                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
108                 _condition.notify_all ();
109                 _condition.wait (lock);
110         }
111
112         lock.unlock ();
113         
114         terminate_threads ();
115
116         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
117
118         /* The following sequence of events can occur in the above code:
119              1. a remote worker takes the last image off the queue
120              2. the loop above terminates
121              3. the remote worker fails to encode the image and puts it back on the queue
122              4. the remote worker is then terminated by terminate_threads
123
124              So just mop up anything left in the queue here.
125         */
126
127         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
128                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->index ()));
129                 try {
130                         _writer->write ((*i)->encode_locally(), (*i)->index (), (*i)->eyes ());
131                         frame_done ();
132                 } catch (std::exception& e) {
133                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
134                 }
135         }
136                 
137         _writer->finish ();
138         _writer.reset ();
139 }       
140
141 /** @return an estimate of the current number of frames we are encoding per second,
142  *  or 0 if not known.
143  */
144 float
145 Encoder::current_encoding_rate () const
146 {
147         boost::mutex::scoped_lock lock (_state_mutex);
148         if (int (_time_history.size()) < _history_size) {
149                 return 0;
150         }
151
152         struct timeval now;
153         gettimeofday (&now, 0);
154
155         return _history_size / (seconds (now) - seconds (_time_history.back ()));
156 }
157
158 /** @return Number of video frames that have been sent out */
159 int
160 Encoder::video_frames_out () const
161 {
162         boost::mutex::scoped_lock (_state_mutex);
163         return _video_frames_out;
164 }
165
166 /** Should be called when a frame has been encoded successfully.
167  *  @param n Source frame index.
168  */
169 void
170 Encoder::frame_done ()
171 {
172         boost::mutex::scoped_lock lock (_state_mutex);
173         
174         struct timeval tv;
175         gettimeofday (&tv, 0);
176         _time_history.push_front (tv);
177         if (int (_time_history.size()) > _history_size) {
178                 _time_history.pop_back ();
179         }
180 }
181
182 void
183 Encoder::process_video (shared_ptr<PlayerVideoFrame> pvf, bool same)
184 {
185         _waker.nudge ();
186         
187         boost::mutex::scoped_lock lock (_mutex);
188
189         /* XXX: discard 3D here if required */
190
191         /* Wait until the queue has gone down a bit */
192         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
193                 TIMING ("decoder sleeps with queue of %1", _queue.size());
194                 _condition.wait (lock);
195                 TIMING ("decoder wakes with queue of %1", _queue.size());
196         }
197
198         if (_terminate) {
199                 return;
200         }
201
202         _writer->rethrow ();
203         /* Re-throw any exception raised by one of our threads.  If more
204            than one has thrown an exception, only one will be rethrown, I think;
205            but then, if that happens something has gone badly wrong.
206         */
207         rethrow ();
208
209         if (_writer->can_fake_write (_video_frames_out)) {
210                 _writer->fake_write (_video_frames_out, pvf->eyes ());
211                 _have_a_real_frame[pvf->eyes()] = false;
212                 frame_done ();
213         } else if (same && _have_a_real_frame[pvf->eyes()]) {
214                 /* Use the last frame that we encoded. */
215                 _writer->repeat (_video_frames_out, pvf->eyes());
216                 frame_done ();
217         } else {
218                 /* Queue this new frame for encoding */
219                 TIMING ("adding to queue of %1", _queue.size ());
220                 _queue.push_back (shared_ptr<DCPVideoFrame> (
221                                           new DCPVideoFrame (
222                                                   pvf, _video_frames_out, _film->video_frame_rate(),
223                                                   _film->j2k_bandwidth(), _film->resolution(), _film->log()
224                                                   )
225                                           ));
226                 
227                 _condition.notify_all ();
228                 _have_a_real_frame[pvf->eyes()] = true;
229         }
230
231         if (pvf->eyes() != EYES_LEFT) {
232                 ++_video_frames_out;
233         }
234 }
235
236 void
237 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
238 {
239         _writer->write (data);
240 }
241
242 void
243 Encoder::terminate_threads ()
244 {
245         {
246                 boost::mutex::scoped_lock lock (_mutex);
247                 _terminate = true;
248                 _condition.notify_all ();
249         }
250
251         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
252                 if ((*i)->joinable ()) {
253                         (*i)->join ();
254                 }
255                 delete *i;
256         }
257
258         _threads.clear ();
259 }
260
261 void
262 Encoder::encoder_thread (optional<ServerDescription> server)
263 try
264 {
265         /* Number of seconds that we currently wait between attempts
266            to connect to the server; not relevant for localhost
267            encodings.
268         */
269         int remote_backoff = 0;
270         
271         while (1) {
272
273                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
274                 boost::mutex::scoped_lock lock (_mutex);
275                 while (_queue.empty () && !_terminate) {
276                         _condition.wait (lock);
277                 }
278
279                 if (_terminate) {
280                         return;
281                 }
282
283                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
284                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
285                 TIMING ("encoder thread %1 pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
286                 _queue.pop_front ();
287                 
288                 lock.unlock ();
289
290                 shared_ptr<EncodedData> encoded;
291
292                 if (server) {
293                         try {
294                                 encoded = vf->encode_remotely (server.get ());
295
296                                 if (remote_backoff > 0) {
297                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
298                                 }
299                                 
300                                 /* This job succeeded, so remove any backoff */
301                                 remote_backoff = 0;
302                                 
303                         } catch (std::exception& e) {
304                                 if (remote_backoff < 60) {
305                                         /* back off more */
306                                         remote_backoff += 10;
307                                 }
308                                 _film->log()->log (
309                                         String::compose (
310                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
311                                                 vf->index(), server->host_name(), e.what(), remote_backoff)
312                                         );
313                         }
314                                 
315                 } else {
316                         try {
317                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->index());
318                                 encoded = vf->encode_locally ();
319                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->index());
320                         } catch (std::exception& e) {
321                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
322                         }
323                 }
324
325                 if (encoded) {
326                         _writer->write (encoded, vf->index (), vf->eyes ());
327                         frame_done ();
328                 } else {
329                         lock.lock ();
330                         _film->log()->log (
331                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index())
332                                 );
333                         _queue.push_front (vf);
334                         lock.unlock ();
335                 }
336
337                 if (remote_backoff > 0) {
338                         dcpomatic_sleep (remote_backoff);
339                 }
340
341                 lock.lock ();
342                 _condition.notify_all ();
343         }
344 }
345 catch (...)
346 {
347         store_current ();
348 }
349
350 void
351 Encoder::server_found (ServerDescription s)
352 {
353         add_worker_threads (s);
354 }