2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video_frame.h"
39 using std::stringstream;
45 using boost::shared_ptr;
46 using boost::optional;
48 int const Encoder::_history_size = 25;
50 /** @param f Film that we are encoding */
51 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
54 , _video_frames_out (0)
55 , _have_a_real_frame (false)
70 Encoder::process_begin ()
72 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
73 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
76 vector<ServerDescription*> servers = Config::instance()->servers ();
78 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
79 for (int j = 0; j < (*i)->threads (); ++j) {
80 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
84 _writer.reset (new Writer (_film, _job));
89 Encoder::process_end ()
91 boost::mutex::scoped_lock lock (_mutex);
93 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
95 /* Keep waking workers until the queue is empty */
96 while (!_queue.empty ()) {
97 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
98 _condition.notify_all ();
99 _condition.wait (lock);
104 terminate_threads ();
106 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
108 /* The following sequence of events can occur in the above code:
109 1. a remote worker takes the last image off the queue
110 2. the loop above terminates
111 3. the remote worker fails to encode the image and puts it back on the queue
112 4. the remote worker is then terminated by terminate_threads
114 So just mop up anything left in the queue here.
117 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
118 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
120 _writer->write ((*i)->encode_locally(), (*i)->frame ());
122 } catch (std::exception& e) {
123 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
131 /** @return an estimate of the current number of frames we are encoding per second,
135 Encoder::current_encoding_rate () const
137 boost::mutex::scoped_lock lock (_history_mutex);
138 if (int (_time_history.size()) < _history_size) {
143 gettimeofday (&now, 0);
145 return _history_size / (seconds (now) - seconds (_time_history.back ()));
148 /** @return Number of video frames that have been sent out */
150 Encoder::video_frames_out () const
152 boost::mutex::scoped_lock (_history_mutex);
153 return _video_frames_out;
156 /** Should be called when a frame has been encoded successfully.
157 * @param n Source frame index.
160 Encoder::frame_done ()
162 boost::mutex::scoped_lock lock (_history_mutex);
165 gettimeofday (&tv, 0);
166 _time_history.push_front (tv);
167 if (int (_time_history.size()) > _history_size) {
168 _time_history.pop_back ();
173 Encoder::process_video (shared_ptr<const Image> image, bool same)
175 boost::mutex::scoped_lock lock (_mutex);
177 /* Wait until the queue has gone down a bit */
178 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
179 TIMING ("decoder sleeps with queue of %1", _queue.size());
180 _condition.wait (lock);
181 TIMING ("decoder wakes with queue of %1", _queue.size());
188 if (_writer->thrown ()) {
192 if (_writer->can_fake_write (_video_frames_out)) {
193 _writer->fake_write (_video_frames_out);
194 _have_a_real_frame = false;
196 } else if (same && _have_a_real_frame) {
197 /* Use the last frame that we encoded. */
198 _writer->repeat (_video_frames_out);
201 /* Queue this new frame for encoding */
202 TIMING ("adding to queue of %1", _queue.size ());
204 _queue.push_back (shared_ptr<DCPVideoFrame> (
206 image, _video_frames_out, _film->dcp_video_frame_rate(),
207 _film->colour_lut(), _film->j2k_bandwidth(), _film->log()
211 _condition.notify_all ();
212 _have_a_real_frame = true;
219 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
221 _writer->write (data);
225 Encoder::terminate_threads ()
227 boost::mutex::scoped_lock lock (_mutex);
229 _condition.notify_all ();
232 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
233 if ((*i)->joinable ()) {
243 Encoder::encoder_thread (ServerDescription* server)
245 /* Number of seconds that we currently wait between attempts
246 to connect to the server; not relevant for localhost
249 int remote_backoff = 0;
253 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
254 boost::mutex::scoped_lock lock (_mutex);
255 while (_queue.empty () && !_terminate) {
256 _condition.wait (lock);
263 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
264 shared_ptr<DCPVideoFrame> vf = _queue.front ();
265 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
270 shared_ptr<EncodedData> encoded;
274 encoded = vf->encode_remotely (server);
276 if (remote_backoff > 0) {
277 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
280 /* This job succeeded, so remove any backoff */
283 } catch (std::exception& e) {
284 if (remote_backoff < 60) {
286 remote_backoff += 10;
290 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
291 vf->frame(), server->host_name(), e.what(), remote_backoff)
297 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
298 encoded = vf->encode_locally ();
299 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
300 } catch (std::exception& e) {
301 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
306 _writer->write (encoded, vf->frame ());
311 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
313 _queue.push_front (vf);
317 if (remote_backoff > 0) {
318 dcpomatic_sleep (remote_backoff);
322 _condition.notify_all ();