2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
32 #include "dcp_video_frame.h"
41 using std::stringstream;
47 using boost::shared_ptr;
48 using boost::optional;
49 using boost::scoped_array;
51 int const Encoder::_history_size = 25;
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
57 , _video_frames_out (0)
59 , _broadcast_thread (0)
62 _have_a_real_frame[EYES_BOTH] = false;
63 _have_a_real_frame[EYES_LEFT] = false;
64 _have_a_real_frame[EYES_RIGHT] = false;
75 /** Add a worker thread for a each thread on a remote server. Caller must hold
76 * a lock on _mutex, or know that one is not currently required to
77 * safely modify _threads.
80 Encoder::add_worker_threads (ServerDescription d)
82 for (int i = 0; i < d.threads(); ++i) {
84 make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)))
90 Encoder::process_begin ()
92 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
95 optional<ServerDescription> (),
96 new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ()))
101 vector<ServerDescription> servers = Config::instance()->servers ();
103 for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
104 add_worker_threads (*i);
107 _broadcast_thread = new boost::thread (boost::bind (&Encoder::broadcast_thread, this));
108 _listen_thread = new boost::thread (boost::bind (&Encoder::listen_thread, this));
110 _writer.reset (new Writer (_film, _job));
115 Encoder::process_end ()
117 boost::mutex::scoped_lock lock (_mutex);
119 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
121 /* Keep waking workers until the queue is empty */
122 while (!_queue.empty ()) {
123 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
124 _condition.notify_all ();
125 _condition.wait (lock);
130 terminate_threads ();
132 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
134 /* The following sequence of events can occur in the above code:
135 1. a remote worker takes the last image off the queue
136 2. the loop above terminates
137 3. the remote worker fails to encode the image and puts it back on the queue
138 4. the remote worker is then terminated by terminate_threads
140 So just mop up anything left in the queue here.
143 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
144 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
146 _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
148 } catch (std::exception& e) {
149 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
157 /** @return an estimate of the current number of frames we are encoding per second,
161 Encoder::current_encoding_rate () const
163 boost::mutex::scoped_lock lock (_state_mutex);
164 if (int (_time_history.size()) < _history_size) {
169 gettimeofday (&now, 0);
171 return _history_size / (seconds (now) - seconds (_time_history.back ()));
174 /** @return Number of video frames that have been sent out */
176 Encoder::video_frames_out () const
178 boost::mutex::scoped_lock (_state_mutex);
179 return _video_frames_out;
182 /** Should be called when a frame has been encoded successfully.
183 * @param n Source frame index.
186 Encoder::frame_done ()
188 boost::mutex::scoped_lock lock (_state_mutex);
191 gettimeofday (&tv, 0);
192 _time_history.push_front (tv);
193 if (int (_time_history.size()) > _history_size) {
194 _time_history.pop_back ();
199 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, ColourConversion conversion, bool same)
201 boost::mutex::scoped_lock lock (_mutex);
203 /* XXX: discard 3D here if required */
205 /* Wait until the queue has gone down a bit */
206 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
207 TIMING ("decoder sleeps with queue of %1", _queue.size());
208 _condition.wait (lock);
209 TIMING ("decoder wakes with queue of %1", _queue.size());
216 if (_writer->thrown ()) {
220 if (_writer->can_fake_write (_video_frames_out)) {
221 _writer->fake_write (_video_frames_out, eyes);
222 _have_a_real_frame[eyes] = false;
224 } else if (same && _have_a_real_frame[eyes]) {
225 /* Use the last frame that we encoded. */
226 _writer->repeat (_video_frames_out, eyes);
229 /* Queue this new frame for encoding */
230 TIMING ("adding to queue of %1", _queue.size ());
231 _queue.push_back (shared_ptr<DCPVideoFrame> (
233 image, _video_frames_out, eyes, conversion, _film->video_frame_rate(),
234 _film->j2k_bandwidth(), _film->log()
238 _condition.notify_all ();
239 _have_a_real_frame[eyes] = true;
242 if (eyes != EYES_LEFT) {
248 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
250 _writer->write (data);
254 Encoder::terminate_threads ()
257 boost::mutex::scoped_lock lock (_mutex);
259 _condition.notify_all ();
262 for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) {
263 if (i->second->joinable ()) {
271 if (_broadcast_thread && _broadcast_thread->joinable ()) {
272 _broadcast_thread->join ();
274 delete _broadcast_thread;
276 if (_listen_thread && _listen_thread->joinable ()) {
277 _listen_thread->join ();
279 delete _listen_thread;
283 Encoder::encoder_thread (optional<ServerDescription> server)
285 /* Number of seconds that we currently wait between attempts
286 to connect to the server; not relevant for localhost
289 int remote_backoff = 0;
293 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
294 boost::mutex::scoped_lock lock (_mutex);
295 while (_queue.empty () && !_terminate) {
296 _condition.wait (lock);
303 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
304 shared_ptr<DCPVideoFrame> vf = _queue.front ();
305 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 (%3) from queue"), boost::this_thread::get_id(), vf->frame(), vf->eyes ()));
310 shared_ptr<EncodedData> encoded;
314 encoded = vf->encode_remotely (server.get ());
316 if (remote_backoff > 0) {
317 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
320 /* This job succeeded, so remove any backoff */
323 } catch (std::exception& e) {
324 if (remote_backoff < 60) {
326 remote_backoff += 10;
330 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
331 vf->frame(), server->host_name(), e.what(), remote_backoff)
337 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
338 encoded = vf->encode_locally ();
339 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
340 } catch (std::exception& e) {
341 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
346 _writer->write (encoded, vf->frame (), vf->eyes ());
351 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
353 _queue.push_front (vf);
357 if (remote_backoff > 0) {
358 dcpomatic_sleep (remote_backoff);
362 _condition.notify_all ();
367 Encoder::broadcast_thread ()
369 boost::system::error_code error;
370 boost::asio::io_service io_service;
371 boost::asio::ip::udp::socket socket (io_service);
372 socket.open (boost::asio::ip::udp::v4(), error);
374 throw NetworkError ("failed to set up broadcast socket");
377 socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
378 socket.set_option (boost::asio::socket_base::broadcast (true));
380 boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);
383 boost::mutex::scoped_lock lm (_mutex);
385 socket.close (error);
389 string data = DCPOMATIC_HELLO;
390 socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
393 dcpomatic_sleep (10);
398 Encoder::listen_thread ()
402 /* See if we need to stop */
403 boost::mutex::scoped_lock lm (_mutex);
409 shared_ptr<Socket> sock (new Socket (10));
412 sock->accept (Config::instance()->server_port_base() + 1);
413 } catch (std::exception& e) {
417 uint32_t length = sock->read_uint32 ();
418 scoped_array<char> buffer (new char[length]);
419 sock->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
421 stringstream s (buffer.get());
422 shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
423 xml->read_stream (s);
426 /* See if we already know about this server */
427 string const ip = sock->socket().remote_endpoint().address().to_string ();
428 boost::mutex::scoped_lock lm (_mutex);
429 ThreadList::iterator i = _threads.begin();
430 while (i != _threads.end() && (!i->first || i->first->host_name() != ip)) {
434 if (i == _threads.end ()) {
435 add_worker_threads (ServerDescription (ip, xml->number_child<int> ("Threads")));