Various bits of server tidying up.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37
38 #include "i18n.h"
39
40 using std::pair;
41 using std::string;
42 using std::stringstream;
43 using std::vector;
44 using std::list;
45 using std::cout;
46 using std::min;
47 using std::make_pair;
48 using boost::shared_ptr;
49 using boost::optional;
50 using boost::scoped_array;
51
52 int const Encoder::_history_size = 25;
53
54 /** @param f Film that we are encoding */
55 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
56         : _film (f)
57         , _job (j)
58         , _video_frames_out (0)
59         , _terminate (false)
60 {
61         _have_a_real_frame[EYES_BOTH] = false;
62         _have_a_real_frame[EYES_LEFT] = false;
63         _have_a_real_frame[EYES_RIGHT] = false;
64 }
65
66 Encoder::~Encoder ()
67 {
68         terminate_threads ();
69         if (_writer) {
70                 _writer->finish ();
71         }
72 }
73
74 /** Add a worker thread for a each thread on a remote server.  Caller must hold
75  *  a lock on _mutex, or know that one is not currently required to
76  *  safely modify _threads.
77  */
78 void
79 Encoder::add_worker_threads (ServerDescription d)
80 {
81         for (int i = 0; i < d.threads(); ++i) {
82                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
83         }
84 }
85
86 void
87 Encoder::process_begin ()
88 {
89         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
90                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
91         }
92
93         vector<ServerDescription> servers = Config::instance()->servers ();
94
95         for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
96                 add_worker_threads (*i);
97         }
98
99         _writer.reset (new Writer (_film, _job));
100         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
101 }
102
103
104 void
105 Encoder::process_end ()
106 {
107         boost::mutex::scoped_lock lock (_mutex);
108
109         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
110
111         /* Keep waking workers until the queue is empty */
112         while (!_queue.empty ()) {
113                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
114                 _condition.notify_all ();
115                 _condition.wait (lock);
116         }
117
118         lock.unlock ();
119         
120         terminate_threads ();
121
122         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
123
124         /* The following sequence of events can occur in the above code:
125              1. a remote worker takes the last image off the queue
126              2. the loop above terminates
127              3. the remote worker fails to encode the image and puts it back on the queue
128              4. the remote worker is then terminated by terminate_threads
129
130              So just mop up anything left in the queue here.
131         */
132
133         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
134                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
135                 try {
136                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
137                         frame_done ();
138                 } catch (std::exception& e) {
139                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
140                 }
141         }
142                 
143         _writer->finish ();
144         _writer.reset ();
145 }       
146
147 /** @return an estimate of the current number of frames we are encoding per second,
148  *  or 0 if not known.
149  */
150 float
151 Encoder::current_encoding_rate () const
152 {
153         boost::mutex::scoped_lock lock (_state_mutex);
154         if (int (_time_history.size()) < _history_size) {
155                 return 0;
156         }
157
158         struct timeval now;
159         gettimeofday (&now, 0);
160
161         return _history_size / (seconds (now) - seconds (_time_history.back ()));
162 }
163
164 /** @return Number of video frames that have been sent out */
165 int
166 Encoder::video_frames_out () const
167 {
168         boost::mutex::scoped_lock (_state_mutex);
169         return _video_frames_out;
170 }
171
172 /** Should be called when a frame has been encoded successfully.
173  *  @param n Source frame index.
174  */
175 void
176 Encoder::frame_done ()
177 {
178         boost::mutex::scoped_lock lock (_state_mutex);
179         
180         struct timeval tv;
181         gettimeofday (&tv, 0);
182         _time_history.push_front (tv);
183         if (int (_time_history.size()) > _history_size) {
184                 _time_history.pop_back ();
185         }
186 }
187
188 void
189 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, ColourConversion conversion, bool same)
190 {
191         boost::mutex::scoped_lock lock (_mutex);
192
193         /* XXX: discard 3D here if required */
194
195         /* Wait until the queue has gone down a bit */
196         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
197                 TIMING ("decoder sleeps with queue of %1", _queue.size());
198                 _condition.wait (lock);
199                 TIMING ("decoder wakes with queue of %1", _queue.size());
200         }
201
202         if (_terminate) {
203                 return;
204         }
205
206         if (_writer->thrown ()) {
207                 _writer->rethrow ();
208         }
209
210         if (_writer->can_fake_write (_video_frames_out)) {
211                 _writer->fake_write (_video_frames_out, eyes);
212                 _have_a_real_frame[eyes] = false;
213                 frame_done ();
214         } else if (same && _have_a_real_frame[eyes]) {
215                 /* Use the last frame that we encoded. */
216                 _writer->repeat (_video_frames_out, eyes);
217                 frame_done ();
218         } else {
219                 /* Queue this new frame for encoding */
220                 TIMING ("adding to queue of %1", _queue.size ());
221                 _queue.push_back (shared_ptr<DCPVideoFrame> (
222                                           new DCPVideoFrame (
223                                                   image, _video_frames_out, eyes, conversion, _film->video_frame_rate(),
224                                                   _film->j2k_bandwidth(), _film->log()
225                                                   )
226                                           ));
227                 
228                 _condition.notify_all ();
229                 _have_a_real_frame[eyes] = true;
230         }
231
232         if (eyes != EYES_LEFT) {
233                 ++_video_frames_out;
234         }
235 }
236
237 void
238 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
239 {
240         _writer->write (data);
241 }
242
243 void
244 Encoder::terminate_threads ()
245 {
246         {
247                 boost::mutex::scoped_lock lock (_mutex);
248                 _terminate = true;
249                 _condition.notify_all ();
250         }
251
252         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
253                 if ((*i)->joinable ()) {
254                         (*i)->join ();
255                 }
256                 delete *i;
257         }
258
259         _threads.clear ();
260 }
261
262 void
263 Encoder::encoder_thread (optional<ServerDescription> server)
264 {
265         /* Number of seconds that we currently wait between attempts
266            to connect to the server; not relevant for localhost
267            encodings.
268         */
269         int remote_backoff = 0;
270         
271         while (1) {
272
273                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
274                 boost::mutex::scoped_lock lock (_mutex);
275                 while (_queue.empty () && !_terminate) {
276                         _condition.wait (lock);
277                 }
278
279                 if (_terminate) {
280                         return;
281                 }
282
283                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
284                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
285                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 (%3) from queue"), boost::this_thread::get_id(), vf->frame(), vf->eyes ()));
286                 _queue.pop_front ();
287                 
288                 lock.unlock ();
289
290                 shared_ptr<EncodedData> encoded;
291
292                 if (server) {
293                         try {
294                                 encoded = vf->encode_remotely (server.get ());
295
296                                 if (remote_backoff > 0) {
297                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
298                                 }
299                                 
300                                 /* This job succeeded, so remove any backoff */
301                                 remote_backoff = 0;
302                                 
303                         } catch (std::exception& e) {
304                                 if (remote_backoff < 60) {
305                                         /* back off more */
306                                         remote_backoff += 10;
307                                 }
308                                 _film->log()->log (
309                                         String::compose (
310                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
311                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
312                                         );
313                         }
314                                 
315                 } else {
316                         try {
317                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
318                                 encoded = vf->encode_locally ();
319                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
320                         } catch (std::exception& e) {
321                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
322                         }
323                 }
324
325                 if (encoded) {
326                         _writer->write (encoded, vf->frame (), vf->eyes ());
327                         frame_done ();
328                 } else {
329                         lock.lock ();
330                         _film->log()->log (
331                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
332                                 );
333                         _queue.push_front (vf);
334                         lock.unlock ();
335                 }
336
337                 if (remote_backoff > 0) {
338                         dcpomatic_sleep (remote_backoff);
339                 }
340
341                 lock.lock ();
342                 _condition.notify_all ();
343         }
344 }
345
346 void
347 Encoder::server_found (ServerDescription s)
348 {
349         add_worker_threads (s);
350 }