2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video_frame.h"
39 using std::stringstream;
45 using boost::shared_ptr;
46 using boost::optional;
48 int const Encoder::_history_size = 25;
50 /** @param f Film that we are encoding */
51 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
54 , _video_frames_out (0)
55 , _have_a_real_frame (false)
70 Encoder::process_begin ()
72 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
73 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
76 vector<ServerDescription*> servers = Config::instance()->servers ();
78 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
79 for (int j = 0; j < (*i)->threads (); ++j) {
80 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
84 _writer.reset (new Writer (_film, _job));
89 Encoder::process_end ()
91 boost::mutex::scoped_lock lock (_mutex);
93 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
95 /* Keep waking workers until the queue is empty */
96 while (!_queue.empty ()) {
97 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
98 _condition.notify_all ();
99 _condition.wait (lock);
104 terminate_threads ();
106 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
108 /* The following sequence of events can occur in the above code:
109 1. a remote worker takes the last image off the queue
110 2. the loop above terminates
111 3. the remote worker fails to encode the image and puts it back on the queue
112 4. the remote worker is then terminated by terminate_threads
114 So just mop up anything left in the queue here.
117 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
118 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
120 _writer->write ((*i)->encode_locally(), (*i)->frame ());
122 } catch (std::exception& e) {
123 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
131 /** @return an estimate of the current number of frames we are encoding per second,
135 Encoder::current_encoding_rate () const
137 boost::mutex::scoped_lock lock (_history_mutex);
138 if (int (_time_history.size()) < _history_size) {
143 gettimeofday (&now, 0);
145 return _history_size / (seconds (now) - seconds (_time_history.back ()));
148 /** @return Number of video frames that have been sent out */
150 Encoder::video_frames_out () const
152 boost::mutex::scoped_lock (_history_mutex);
153 return _video_frames_out;
156 /** Should be called when a frame has been encoded successfully.
157 * @param n Source frame index.
160 Encoder::frame_done ()
162 boost::mutex::scoped_lock lock (_history_mutex);
165 gettimeofday (&tv, 0);
166 _time_history.push_front (tv);
167 if (int (_time_history.size()) > _history_size) {
168 _time_history.pop_back ();
173 Encoder::process_video (shared_ptr<const Image> image, bool same)
175 boost::mutex::scoped_lock lock (_mutex);
177 /* Wait until the queue has gone down a bit */
178 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
179 TIMING ("decoder sleeps with queue of %1", _queue.size());
180 _condition.wait (lock);
181 TIMING ("decoder wakes with queue of %1", _queue.size());
188 if (_writer->thrown ()) {
192 if (_writer->can_fake_write (_video_frames_out)) {
193 _writer->fake_write (_video_frames_out);
194 _have_a_real_frame = false;
196 } else if (same && _have_a_real_frame) {
197 /* Use the last frame that we encoded. */
198 _writer->repeat (_video_frames_out);
201 /* Queue this new frame for encoding */
202 TIMING ("adding to queue of %1", _queue.size ());
203 _queue.push_back (shared_ptr<DCPVideoFrame> (
205 image, _video_frames_out, _film->dcp_video_frame_rate(),
206 _film->j2k_bandwidth(), _film->log()
210 _condition.notify_all ();
211 _have_a_real_frame = true;
218 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
220 _writer->write (data);
224 Encoder::terminate_threads ()
226 boost::mutex::scoped_lock lock (_mutex);
228 _condition.notify_all ();
231 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
232 if ((*i)->joinable ()) {
242 Encoder::encoder_thread (ServerDescription* server)
244 /* Number of seconds that we currently wait between attempts
245 to connect to the server; not relevant for localhost
248 int remote_backoff = 0;
252 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
253 boost::mutex::scoped_lock lock (_mutex);
254 while (_queue.empty () && !_terminate) {
255 _condition.wait (lock);
262 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
263 shared_ptr<DCPVideoFrame> vf = _queue.front ();
264 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
269 shared_ptr<EncodedData> encoded;
273 encoded = vf->encode_remotely (server);
275 if (remote_backoff > 0) {
276 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
279 /* This job succeeded, so remove any backoff */
282 } catch (std::exception& e) {
283 if (remote_backoff < 60) {
285 remote_backoff += 10;
289 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
290 vf->frame(), server->host_name(), e.what(), remote_backoff)
296 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
297 encoded = vf->encode_locally ();
298 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
299 } catch (std::exception& e) {
300 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
305 _writer->write (encoded, vf->frame ());
310 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
312 _queue.push_front (vf);
316 if (remote_backoff > 0) {
317 dcpomatic_sleep (remote_backoff);
321 _condition.notify_all ();