Add some missing test stuff; split server discovery off into ServerFinder.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37
38 #include "i18n.h"
39
40 using std::pair;
41 using std::string;
42 using std::stringstream;
43 using std::vector;
44 using std::list;
45 using std::cout;
46 using std::min;
47 using std::make_pair;
48 using boost::shared_ptr;
49 using boost::optional;
50 using boost::scoped_array;
51
52 int const Encoder::_history_size = 25;
53
54 /** @param f Film that we are encoding */
55 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
56         : _film (f)
57         , _job (j)
58         , _video_frames_out (0)
59         , _terminate (false)
60 {
61         _have_a_real_frame[EYES_BOTH] = false;
62         _have_a_real_frame[EYES_LEFT] = false;
63         _have_a_real_frame[EYES_RIGHT] = false;
64 }
65
66 Encoder::~Encoder ()
67 {
68         terminate_threads ();
69         if (_writer) {
70                 _writer->finish ();
71         }
72 }
73
74 /** Add a worker thread for a each thread on a remote server.  Caller must hold
75  *  a lock on _mutex, or know that one is not currently required to
76  *  safely modify _threads.
77  */
78 void
79 Encoder::add_worker_threads (ServerDescription d)
80 {
81         for (int i = 0; i < d.threads(); ++i) {
82                 _threads.push_back (
83                         make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)))
84                         );
85         }
86 }
87
88 void
89 Encoder::process_begin ()
90 {
91         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
92                 _threads.push_back (
93                         make_pair (
94                                 optional<ServerDescription> (),
95                                 new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ()))
96                                 )
97                         );
98         }
99
100         vector<ServerDescription> servers = Config::instance()->servers ();
101
102         for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
103                 add_worker_threads (*i);
104         }
105
106         _writer.reset (new Writer (_film, _job));
107         _server_finder.reset (new ServerFinder ());
108         _server_finder->ServerFound.connect (boost::bind (&Encoder::server_found, this, _1));
109 }
110
111
112 void
113 Encoder::process_end ()
114 {
115         boost::mutex::scoped_lock lock (_mutex);
116
117         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
118
119         /* Keep waking workers until the queue is empty */
120         while (!_queue.empty ()) {
121                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
122                 _condition.notify_all ();
123                 _condition.wait (lock);
124         }
125
126         lock.unlock ();
127         
128         terminate_threads ();
129
130         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
131
132         /* The following sequence of events can occur in the above code:
133              1. a remote worker takes the last image off the queue
134              2. the loop above terminates
135              3. the remote worker fails to encode the image and puts it back on the queue
136              4. the remote worker is then terminated by terminate_threads
137
138              So just mop up anything left in the queue here.
139         */
140
141         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
142                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
143                 try {
144                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
145                         frame_done ();
146                 } catch (std::exception& e) {
147                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
148                 }
149         }
150                 
151         _writer->finish ();
152         _writer.reset ();
153 }       
154
155 /** @return an estimate of the current number of frames we are encoding per second,
156  *  or 0 if not known.
157  */
158 float
159 Encoder::current_encoding_rate () const
160 {
161         boost::mutex::scoped_lock lock (_state_mutex);
162         if (int (_time_history.size()) < _history_size) {
163                 return 0;
164         }
165
166         struct timeval now;
167         gettimeofday (&now, 0);
168
169         return _history_size / (seconds (now) - seconds (_time_history.back ()));
170 }
171
172 /** @return Number of video frames that have been sent out */
173 int
174 Encoder::video_frames_out () const
175 {
176         boost::mutex::scoped_lock (_state_mutex);
177         return _video_frames_out;
178 }
179
180 /** Should be called when a frame has been encoded successfully.
181  *  @param n Source frame index.
182  */
183 void
184 Encoder::frame_done ()
185 {
186         boost::mutex::scoped_lock lock (_state_mutex);
187         
188         struct timeval tv;
189         gettimeofday (&tv, 0);
190         _time_history.push_front (tv);
191         if (int (_time_history.size()) > _history_size) {
192                 _time_history.pop_back ();
193         }
194 }
195
196 void
197 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, ColourConversion conversion, bool same)
198 {
199         boost::mutex::scoped_lock lock (_mutex);
200
201         /* XXX: discard 3D here if required */
202
203         /* Wait until the queue has gone down a bit */
204         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
205                 TIMING ("decoder sleeps with queue of %1", _queue.size());
206                 _condition.wait (lock);
207                 TIMING ("decoder wakes with queue of %1", _queue.size());
208         }
209
210         if (_terminate) {
211                 return;
212         }
213
214         if (_writer->thrown ()) {
215                 _writer->rethrow ();
216         }
217
218         if (_writer->can_fake_write (_video_frames_out)) {
219                 _writer->fake_write (_video_frames_out, eyes);
220                 _have_a_real_frame[eyes] = false;
221                 frame_done ();
222         } else if (same && _have_a_real_frame[eyes]) {
223                 /* Use the last frame that we encoded. */
224                 _writer->repeat (_video_frames_out, eyes);
225                 frame_done ();
226         } else {
227                 /* Queue this new frame for encoding */
228                 TIMING ("adding to queue of %1", _queue.size ());
229                 _queue.push_back (shared_ptr<DCPVideoFrame> (
230                                           new DCPVideoFrame (
231                                                   image, _video_frames_out, eyes, conversion, _film->video_frame_rate(),
232                                                   _film->j2k_bandwidth(), _film->log()
233                                                   )
234                                           ));
235                 
236                 _condition.notify_all ();
237                 _have_a_real_frame[eyes] = true;
238         }
239
240         if (eyes != EYES_LEFT) {
241                 ++_video_frames_out;
242         }
243 }
244
245 void
246 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
247 {
248         _writer->write (data);
249 }
250
251 void
252 Encoder::terminate_threads ()
253 {
254         {
255                 boost::mutex::scoped_lock lock (_mutex);
256                 _terminate = true;
257                 _condition.notify_all ();
258         }
259
260         for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) {
261                 if (i->second->joinable ()) {
262                         i->second->join ();
263                 }
264                 delete i->second;
265         }
266
267         _threads.clear ();
268 }
269
270 void
271 Encoder::encoder_thread (optional<ServerDescription> server)
272 {
273         /* Number of seconds that we currently wait between attempts
274            to connect to the server; not relevant for localhost
275            encodings.
276         */
277         int remote_backoff = 0;
278         
279         while (1) {
280
281                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
282                 boost::mutex::scoped_lock lock (_mutex);
283                 while (_queue.empty () && !_terminate) {
284                         _condition.wait (lock);
285                 }
286
287                 if (_terminate) {
288                         return;
289                 }
290
291                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
292                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
293                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 (%3) from queue"), boost::this_thread::get_id(), vf->frame(), vf->eyes ()));
294                 _queue.pop_front ();
295                 
296                 lock.unlock ();
297
298                 shared_ptr<EncodedData> encoded;
299
300                 if (server) {
301                         try {
302                                 encoded = vf->encode_remotely (server.get ());
303
304                                 if (remote_backoff > 0) {
305                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
306                                 }
307                                 
308                                 /* This job succeeded, so remove any backoff */
309                                 remote_backoff = 0;
310                                 
311                         } catch (std::exception& e) {
312                                 if (remote_backoff < 60) {
313                                         /* back off more */
314                                         remote_backoff += 10;
315                                 }
316                                 _film->log()->log (
317                                         String::compose (
318                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
319                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
320                                         );
321                         }
322                                 
323                 } else {
324                         try {
325                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
326                                 encoded = vf->encode_locally ();
327                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
328                         } catch (std::exception& e) {
329                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
330                         }
331                 }
332
333                 if (encoded) {
334                         _writer->write (encoded, vf->frame (), vf->eyes ());
335                         frame_done ();
336                 } else {
337                         lock.lock ();
338                         _film->log()->log (
339                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
340                                 );
341                         _queue.push_front (vf);
342                         lock.unlock ();
343                 }
344
345                 if (remote_backoff > 0) {
346                         dcpomatic_sleep (remote_backoff);
347                 }
348
349                 lock.lock ();
350                 _condition.notify_all ();
351         }
352 }
353
354 void
355 Encoder::server_found (ServerDescription s)
356 {
357         /* See if we already know about this server */
358         boost::mutex::scoped_lock lm (_mutex);
359         ThreadList::iterator i = _threads.begin();
360         while (i != _threads.end() && (!i->first || i->first.get().host_name() != s.host_name())) {
361                 ++i;
362         }
363         
364         if (i == _threads.end ()) {
365                 add_worker_threads (s);
366         }
367 }