Trim logging slightly.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37
38 #include "i18n.h"
39
40 using std::pair;
41 using std::string;
42 using std::stringstream;
43 using std::vector;
44 using std::list;
45 using std::cout;
46 using std::min;
47 using std::make_pair;
48 using boost::shared_ptr;
49 using boost::weak_ptr;
50 using boost::optional;
51 using boost::scoped_array;
52
53 int const Encoder::_history_size = 25;
54
55 /** @param f Film that we are encoding */
56 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j)
57         : _film (f)
58         , _job (j)
59         , _video_frames_out (0)
60         , _terminate (false)
61 {
62         _have_a_real_frame[EYES_BOTH] = false;
63         _have_a_real_frame[EYES_LEFT] = false;
64         _have_a_real_frame[EYES_RIGHT] = false;
65 }
66
67 Encoder::~Encoder ()
68 {
69         terminate_threads ();
70         if (_writer) {
71                 _writer->finish ();
72         }
73 }
74
75 /** Add a worker thread for a each thread on a remote server.  Caller must hold
76  *  a lock on _mutex, or know that one is not currently required to
77  *  safely modify _threads.
78  */
79 void
80 Encoder::add_worker_threads (ServerDescription d)
81 {
82         for (int i = 0; i < d.threads(); ++i) {
83                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
84         }
85 }
86
87 void
88 Encoder::process_begin ()
89 {
90         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
91                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
92         }
93
94         _writer.reset (new Writer (_film, _job));
95         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
96 }
97
98
99 void
100 Encoder::process_end ()
101 {
102         boost::mutex::scoped_lock lock (_mutex);
103
104         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
105
106         /* Keep waking workers until the queue is empty */
107         while (!_queue.empty ()) {
108                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
109                 _condition.notify_all ();
110                 _condition.wait (lock);
111         }
112
113         lock.unlock ();
114         
115         terminate_threads ();
116
117         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
118
119         /* The following sequence of events can occur in the above code:
120              1. a remote worker takes the last image off the queue
121              2. the loop above terminates
122              3. the remote worker fails to encode the image and puts it back on the queue
123              4. the remote worker is then terminated by terminate_threads
124
125              So just mop up anything left in the queue here.
126         */
127
128         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
129                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
130                 try {
131                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
132                         frame_done ();
133                 } catch (std::exception& e) {
134                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
135                 }
136         }
137                 
138         _writer->finish ();
139         _writer.reset ();
140 }       
141
142 /** @return an estimate of the current number of frames we are encoding per second,
143  *  or 0 if not known.
144  */
145 float
146 Encoder::current_encoding_rate () const
147 {
148         boost::mutex::scoped_lock lock (_state_mutex);
149         if (int (_time_history.size()) < _history_size) {
150                 return 0;
151         }
152
153         struct timeval now;
154         gettimeofday (&now, 0);
155
156         return _history_size / (seconds (now) - seconds (_time_history.back ()));
157 }
158
159 /** @return Number of video frames that have been sent out */
160 int
161 Encoder::video_frames_out () const
162 {
163         boost::mutex::scoped_lock (_state_mutex);
164         return _video_frames_out;
165 }
166
167 /** Should be called when a frame has been encoded successfully.
168  *  @param n Source frame index.
169  */
170 void
171 Encoder::frame_done ()
172 {
173         boost::mutex::scoped_lock lock (_state_mutex);
174         
175         struct timeval tv;
176         gettimeofday (&tv, 0);
177         _time_history.push_front (tv);
178         if (int (_time_history.size()) > _history_size) {
179                 _time_history.pop_back ();
180         }
181 }
182
183 void
184 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, ColourConversion conversion, bool same)
185 {
186         boost::mutex::scoped_lock lock (_mutex);
187
188         /* XXX: discard 3D here if required */
189
190         /* Wait until the queue has gone down a bit */
191         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
192                 TIMING ("decoder sleeps with queue of %1", _queue.size());
193                 _condition.wait (lock);
194                 TIMING ("decoder wakes with queue of %1", _queue.size());
195         }
196
197         if (_terminate) {
198                 return;
199         }
200
201         if (_writer->thrown ()) {
202                 _writer->rethrow ();
203         }
204
205         if (_writer->can_fake_write (_video_frames_out)) {
206                 _writer->fake_write (_video_frames_out, eyes);
207                 _have_a_real_frame[eyes] = false;
208                 frame_done ();
209         } else if (same && _have_a_real_frame[eyes]) {
210                 /* Use the last frame that we encoded. */
211                 _writer->repeat (_video_frames_out, eyes);
212                 frame_done ();
213         } else {
214                 /* Queue this new frame for encoding */
215                 TIMING ("adding to queue of %1", _queue.size ());
216                 _queue.push_back (shared_ptr<DCPVideoFrame> (
217                                           new DCPVideoFrame (
218                                                   image, _video_frames_out, eyes, conversion, _film->video_frame_rate(),
219                                                   _film->j2k_bandwidth(), _film->log()
220                                                   )
221                                           ));
222                 
223                 _condition.notify_all ();
224                 _have_a_real_frame[eyes] = true;
225         }
226
227         if (eyes != EYES_LEFT) {
228                 ++_video_frames_out;
229         }
230 }
231
232 void
233 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
234 {
235         _writer->write (data);
236 }
237
238 void
239 Encoder::terminate_threads ()
240 {
241         {
242                 boost::mutex::scoped_lock lock (_mutex);
243                 _terminate = true;
244                 _condition.notify_all ();
245         }
246
247         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
248                 if ((*i)->joinable ()) {
249                         (*i)->join ();
250                 }
251                 delete *i;
252         }
253
254         _threads.clear ();
255 }
256
257 void
258 Encoder::encoder_thread (optional<ServerDescription> server)
259 {
260         /* Number of seconds that we currently wait between attempts
261            to connect to the server; not relevant for localhost
262            encodings.
263         */
264         int remote_backoff = 0;
265         
266         while (1) {
267
268                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
269                 boost::mutex::scoped_lock lock (_mutex);
270                 while (_queue.empty () && !_terminate) {
271                         _condition.wait (lock);
272                 }
273
274                 if (_terminate) {
275                         return;
276                 }
277
278                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
279                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
280                 TIMING ("encoder thread %1 pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->frame(), vf->eyes ());
281                 _queue.pop_front ();
282                 
283                 lock.unlock ();
284
285                 shared_ptr<EncodedData> encoded;
286
287                 if (server) {
288                         try {
289                                 encoded = vf->encode_remotely (server.get ());
290
291                                 if (remote_backoff > 0) {
292                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
293                                 }
294                                 
295                                 /* This job succeeded, so remove any backoff */
296                                 remote_backoff = 0;
297                                 
298                         } catch (std::exception& e) {
299                                 if (remote_backoff < 60) {
300                                         /* back off more */
301                                         remote_backoff += 10;
302                                 }
303                                 _film->log()->log (
304                                         String::compose (
305                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
306                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
307                                         );
308                         }
309                                 
310                 } else {
311                         try {
312                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
313                                 encoded = vf->encode_locally ();
314                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
315                         } catch (std::exception& e) {
316                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
317                         }
318                 }
319
320                 if (encoded) {
321                         _writer->write (encoded, vf->frame (), vf->eyes ());
322                         frame_done ();
323                 } else {
324                         lock.lock ();
325                         _film->log()->log (
326                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
327                                 );
328                         _queue.push_front (vf);
329                         lock.unlock ();
330                 }
331
332                 if (remote_backoff > 0) {
333                         dcpomatic_sleep (remote_backoff);
334                 }
335
336                 lock.lock ();
337                 _condition.notify_all ();
338         }
339 }
340
341 void
342 Encoder::server_found (ServerDescription s)
343 {
344         add_worker_threads (s);
345 }