2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video_frame.h"
39 using std::stringstream;
45 using boost::shared_ptr;
46 using boost::optional;
48 int const Encoder::_history_size = 25;
50 /** @param f Film that we are encoding */
51 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
54 , _video_frames_out (0)
57 _have_a_real_frame[EYES_BOTH] = false;
58 _have_a_real_frame[EYES_LEFT] = false;
59 _have_a_real_frame[EYES_RIGHT] = false;
71 Encoder::process_begin ()
73 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
74 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
77 vector<ServerDescription> servers = Config::instance()->servers ();
79 for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
80 for (int j = 0; j < i->threads (); ++j) {
81 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
85 _writer.reset (new Writer (_film, _job));
90 Encoder::process_end ()
92 boost::mutex::scoped_lock lock (_mutex);
94 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
96 /* Keep waking workers until the queue is empty */
97 while (!_queue.empty ()) {
98 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
99 _condition.notify_all ();
100 _condition.wait (lock);
105 terminate_threads ();
107 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
109 /* The following sequence of events can occur in the above code:
110 1. a remote worker takes the last image off the queue
111 2. the loop above terminates
112 3. the remote worker fails to encode the image and puts it back on the queue
113 4. the remote worker is then terminated by terminate_threads
115 So just mop up anything left in the queue here.
118 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
119 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
121 _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
123 } catch (std::exception& e) {
124 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
132 /** @return an estimate of the current number of frames we are encoding per second,
136 Encoder::current_encoding_rate () const
138 boost::mutex::scoped_lock lock (_history_mutex);
139 if (int (_time_history.size()) < _history_size) {
144 gettimeofday (&now, 0);
146 return _history_size / (seconds (now) - seconds (_time_history.back ()));
149 /** @return Number of video frames that have been sent out */
151 Encoder::video_frames_out () const
153 boost::mutex::scoped_lock (_history_mutex);
154 return _video_frames_out;
157 /** Should be called when a frame has been encoded successfully.
158 * @param n Source frame index.
161 Encoder::frame_done ()
163 boost::mutex::scoped_lock lock (_history_mutex);
166 gettimeofday (&tv, 0);
167 _time_history.push_front (tv);
168 if (int (_time_history.size()) > _history_size) {
169 _time_history.pop_back ();
174 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, ColourConversion conversion, bool same)
176 boost::mutex::scoped_lock lock (_mutex);
178 /* XXX: discard 3D here if required */
180 /* Wait until the queue has gone down a bit */
181 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
182 TIMING ("decoder sleeps with queue of %1", _queue.size());
183 _condition.wait (lock);
184 TIMING ("decoder wakes with queue of %1", _queue.size());
191 if (_writer->thrown ()) {
195 if (_writer->can_fake_write (_video_frames_out)) {
196 _writer->fake_write (_video_frames_out, eyes);
197 _have_a_real_frame[eyes] = false;
199 } else if (same && _have_a_real_frame[eyes]) {
200 /* Use the last frame that we encoded. */
201 _writer->repeat (_video_frames_out, eyes);
204 /* Queue this new frame for encoding */
205 TIMING ("adding to queue of %1", _queue.size ());
206 _queue.push_back (shared_ptr<DCPVideoFrame> (
208 image, _video_frames_out, eyes, conversion, _film->video_frame_rate(),
209 _film->j2k_bandwidth(), _film->log()
213 _condition.notify_all ();
214 _have_a_real_frame[eyes] = true;
217 if (eyes != EYES_LEFT) {
223 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
225 _writer->write (data);
229 Encoder::terminate_threads ()
231 boost::mutex::scoped_lock lock (_mutex);
233 _condition.notify_all ();
236 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
237 if ((*i)->joinable ()) {
247 Encoder::encoder_thread (optional<ServerDescription> server)
249 /* Number of seconds that we currently wait between attempts
250 to connect to the server; not relevant for localhost
253 int remote_backoff = 0;
257 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
258 boost::mutex::scoped_lock lock (_mutex);
259 while (_queue.empty () && !_terminate) {
260 _condition.wait (lock);
267 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
268 shared_ptr<DCPVideoFrame> vf = _queue.front ();
269 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 (%3) from queue"), boost::this_thread::get_id(), vf->frame(), vf->eyes ()));
274 shared_ptr<EncodedData> encoded;
278 encoded = vf->encode_remotely (server.get ());
280 if (remote_backoff > 0) {
281 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
284 /* This job succeeded, so remove any backoff */
287 } catch (std::exception& e) {
288 if (remote_backoff < 60) {
290 remote_backoff += 10;
294 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
295 vf->frame(), server->host_name(), e.what(), remote_backoff)
301 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
302 encoded = vf->encode_locally ();
303 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
304 } catch (std::exception& e) {
305 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
310 _writer->write (encoded, vf->frame (), vf->eyes ());
315 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
317 _queue.push_front (vf);
321 if (remote_backoff > 0) {
322 dcpomatic_sleep (remote_backoff);
326 _condition.notify_all ();