Merge branch '1.0' of /home/carl/git/dvdomatic into 1.0
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36
37 #include "i18n.h"
38
39 using std::pair;
40 using std::string;
41 using std::stringstream;
42 using std::vector;
43 using std::list;
44 using std::cout;
45 using std::min;
46 using std::make_pair;
47 using boost::shared_ptr;
48 using boost::optional;
49 using boost::scoped_array;
50
51 int const Encoder::_history_size = 25;
52
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
55         : _film (f)
56         , _job (j)
57         , _video_frames_out (0)
58         , _terminate (false)
59         , _broadcast_thread (0)
60         , _listen_thread (0)
61 {
62         _have_a_real_frame[EYES_BOTH] = false;
63         _have_a_real_frame[EYES_LEFT] = false;
64         _have_a_real_frame[EYES_RIGHT] = false;
65 }
66
67 Encoder::~Encoder ()
68 {
69         terminate_threads ();
70         if (_writer) {
71                 _writer->finish ();
72         }
73 }
74
75 /** Add a worker thread for a each thread on a remote server.  Caller must hold
76  *  a lock on _mutex, or know that one is not currently required to
77  *  safely modify _threads.
78  */
79 void
80 Encoder::add_worker_threads (ServerDescription d)
81 {
82         for (int i = 0; i < d.threads(); ++i) {
83                 _threads.push_back (
84                         make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)))
85                         );
86         }
87 }
88
89 void
90 Encoder::process_begin ()
91 {
92         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
93                 _threads.push_back (
94                         make_pair (
95                                 optional<ServerDescription> (),
96                                 new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ()))
97                                 )
98                         );
99         }
100
101         vector<ServerDescription> servers = Config::instance()->servers ();
102
103         for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
104                 add_worker_threads (*i);
105         }
106
107         _broadcast_thread = new boost::thread (boost::bind (&Encoder::broadcast_thread, this));
108         _listen_thread = new boost::thread (boost::bind (&Encoder::listen_thread, this));
109
110         _writer.reset (new Writer (_film, _job));
111 }
112
113
114 void
115 Encoder::process_end ()
116 {
117         boost::mutex::scoped_lock lock (_mutex);
118
119         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
120
121         /* Keep waking workers until the queue is empty */
122         while (!_queue.empty ()) {
123                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
124                 _condition.notify_all ();
125                 _condition.wait (lock);
126         }
127
128         lock.unlock ();
129         
130         terminate_threads ();
131
132         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
133
134         /* The following sequence of events can occur in the above code:
135              1. a remote worker takes the last image off the queue
136              2. the loop above terminates
137              3. the remote worker fails to encode the image and puts it back on the queue
138              4. the remote worker is then terminated by terminate_threads
139
140              So just mop up anything left in the queue here.
141         */
142
143         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
144                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
145                 try {
146                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
147                         frame_done ();
148                 } catch (std::exception& e) {
149                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
150                 }
151         }
152                 
153         _writer->finish ();
154         _writer.reset ();
155 }       
156
157 /** @return an estimate of the current number of frames we are encoding per second,
158  *  or 0 if not known.
159  */
160 float
161 Encoder::current_encoding_rate () const
162 {
163         boost::mutex::scoped_lock lock (_state_mutex);
164         if (int (_time_history.size()) < _history_size) {
165                 return 0;
166         }
167
168         struct timeval now;
169         gettimeofday (&now, 0);
170
171         return _history_size / (seconds (now) - seconds (_time_history.back ()));
172 }
173
174 /** @return Number of video frames that have been sent out */
175 int
176 Encoder::video_frames_out () const
177 {
178         boost::mutex::scoped_lock (_state_mutex);
179         return _video_frames_out;
180 }
181
182 /** Should be called when a frame has been encoded successfully.
183  *  @param n Source frame index.
184  */
185 void
186 Encoder::frame_done ()
187 {
188         boost::mutex::scoped_lock lock (_state_mutex);
189         
190         struct timeval tv;
191         gettimeofday (&tv, 0);
192         _time_history.push_front (tv);
193         if (int (_time_history.size()) > _history_size) {
194                 _time_history.pop_back ();
195         }
196 }
197
198 void
199 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, ColourConversion conversion, bool same)
200 {
201         boost::mutex::scoped_lock lock (_mutex);
202
203         /* XXX: discard 3D here if required */
204
205         /* Wait until the queue has gone down a bit */
206         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
207                 TIMING ("decoder sleeps with queue of %1", _queue.size());
208                 _condition.wait (lock);
209                 TIMING ("decoder wakes with queue of %1", _queue.size());
210         }
211
212         if (_terminate) {
213                 return;
214         }
215
216         if (_writer->thrown ()) {
217                 _writer->rethrow ();
218         }
219
220         if (_writer->can_fake_write (_video_frames_out)) {
221                 _writer->fake_write (_video_frames_out, eyes);
222                 _have_a_real_frame[eyes] = false;
223                 frame_done ();
224         } else if (same && _have_a_real_frame[eyes]) {
225                 /* Use the last frame that we encoded. */
226                 _writer->repeat (_video_frames_out, eyes);
227                 frame_done ();
228         } else {
229                 /* Queue this new frame for encoding */
230                 TIMING ("adding to queue of %1", _queue.size ());
231                 _queue.push_back (shared_ptr<DCPVideoFrame> (
232                                           new DCPVideoFrame (
233                                                   image, _video_frames_out, eyes, conversion, _film->video_frame_rate(),
234                                                   _film->j2k_bandwidth(), _film->log()
235                                                   )
236                                           ));
237                 
238                 _condition.notify_all ();
239                 _have_a_real_frame[eyes] = true;
240         }
241
242         if (eyes != EYES_LEFT) {
243                 ++_video_frames_out;
244         }
245 }
246
247 void
248 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
249 {
250         _writer->write (data);
251 }
252
253 void
254 Encoder::terminate_threads ()
255 {
256         {
257                 boost::mutex::scoped_lock lock (_mutex);
258                 _terminate = true;
259                 _condition.notify_all ();
260         }
261
262         for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) {
263                 if (i->second->joinable ()) {
264                         i->second->join ();
265                 }
266                 delete i->second;
267         }
268
269         _threads.clear ();
270                      
271         if (_broadcast_thread && _broadcast_thread->joinable ()) {
272                 _broadcast_thread->join ();
273         }
274         delete _broadcast_thread;
275
276         if (_listen_thread && _listen_thread->joinable ()) {
277                 _listen_thread->join ();
278         }
279         delete _listen_thread;
280 }
281
282 void
283 Encoder::encoder_thread (optional<ServerDescription> server)
284 {
285         /* Number of seconds that we currently wait between attempts
286            to connect to the server; not relevant for localhost
287            encodings.
288         */
289         int remote_backoff = 0;
290         
291         while (1) {
292
293                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
294                 boost::mutex::scoped_lock lock (_mutex);
295                 while (_queue.empty () && !_terminate) {
296                         _condition.wait (lock);
297                 }
298
299                 if (_terminate) {
300                         return;
301                 }
302
303                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
304                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
305                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 (%3) from queue"), boost::this_thread::get_id(), vf->frame(), vf->eyes ()));
306                 _queue.pop_front ();
307                 
308                 lock.unlock ();
309
310                 shared_ptr<EncodedData> encoded;
311
312                 if (server) {
313                         try {
314                                 encoded = vf->encode_remotely (server.get ());
315
316                                 if (remote_backoff > 0) {
317                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
318                                 }
319                                 
320                                 /* This job succeeded, so remove any backoff */
321                                 remote_backoff = 0;
322                                 
323                         } catch (std::exception& e) {
324                                 if (remote_backoff < 60) {
325                                         /* back off more */
326                                         remote_backoff += 10;
327                                 }
328                                 _film->log()->log (
329                                         String::compose (
330                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
331                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
332                                         );
333                         }
334                                 
335                 } else {
336                         try {
337                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
338                                 encoded = vf->encode_locally ();
339                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
340                         } catch (std::exception& e) {
341                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
342                         }
343                 }
344
345                 if (encoded) {
346                         _writer->write (encoded, vf->frame (), vf->eyes ());
347                         frame_done ();
348                 } else {
349                         lock.lock ();
350                         _film->log()->log (
351                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
352                                 );
353                         _queue.push_front (vf);
354                         lock.unlock ();
355                 }
356
357                 if (remote_backoff > 0) {
358                         dcpomatic_sleep (remote_backoff);
359                 }
360
361                 lock.lock ();
362                 _condition.notify_all ();
363         }
364 }
365
366 void
367 Encoder::broadcast_thread ()
368 {
369         boost::system::error_code error;
370         boost::asio::io_service io_service;
371         boost::asio::ip::udp::socket socket (io_service);
372         socket.open (boost::asio::ip::udp::v4(), error);
373         if (error) {
374                 throw NetworkError ("failed to set up broadcast socket");
375         }
376
377         socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
378         socket.set_option (boost::asio::socket_base::broadcast (true));
379         
380         boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);            
381
382         while (1) {
383                 boost::mutex::scoped_lock lm (_mutex);
384                 if (_terminate) {
385                         socket.close (error);
386                         return;
387                 }
388                 
389                 string data = DCPOMATIC_HELLO;
390                 socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
391
392                 lm.unlock ();
393                 dcpomatic_sleep (10);
394         }
395 }
396
397 void
398 Encoder::listen_thread ()
399 {
400         while (1) {
401                 {
402                         /* See if we need to stop */
403                         boost::mutex::scoped_lock lm (_mutex);
404                         if (_terminate) {
405                                 return;
406                         }
407                 }
408
409                 shared_ptr<Socket> sock (new Socket (10));
410
411                 try {
412                         sock->accept (Config::instance()->server_port_base() + 1);
413                 } catch (std::exception& e) {
414                         continue;
415                 }
416
417                 uint32_t length = sock->read_uint32 ();
418                 scoped_array<char> buffer (new char[length]);
419                 sock->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
420                 
421                 stringstream s (buffer.get());
422                 shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
423                 xml->read_stream (s);
424
425                 {
426                         /* See if we already know about this server */
427                         string const ip = sock->socket().remote_endpoint().address().to_string ();
428                         boost::mutex::scoped_lock lm (_mutex);
429                         ThreadList::iterator i = _threads.begin();
430                         while (i != _threads.end() && (!i->first || i->first->host_name() != ip)) {
431                                 ++i;
432                         }
433
434                         if (i == _threads.end ()) {
435                                 add_worker_threads (ServerDescription (ip, xml->number_child<int> ("Threads")));
436                         }
437                 }
438         }
439 }