2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 #include "audio_mapping.h"
47 using std::stringstream;
52 using boost::shared_ptr;
53 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f, shared_ptr<Job> j)
61 , _video_frames_out (0)
62 , _have_a_real_frame (false)
77 Encoder::process_begin ()
79 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
80 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
83 vector<ServerDescription*> servers = Config::instance()->servers ();
85 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
86 for (int j = 0; j < (*i)->threads (); ++j) {
87 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
91 _writer.reset (new Writer (_film, _job));
96 Encoder::process_end ()
98 boost::mutex::scoped_lock lock (_mutex);
100 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
102 /* Keep waking workers until the queue is empty */
103 while (!_queue.empty ()) {
104 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
105 _condition.notify_all ();
106 _condition.wait (lock);
111 terminate_threads ();
113 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
115 /* The following sequence of events can occur in the above code:
116 1. a remote worker takes the last image off the queue
117 2. the loop above terminates
118 3. the remote worker fails to encode the image and puts it back on the queue
119 4. the remote worker is then terminated by terminate_threads
121 So just mop up anything left in the queue here.
124 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
125 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
127 _writer->write ((*i)->encode_locally(), (*i)->frame ());
129 } catch (std::exception& e) {
130 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
138 /** @return an estimate of the current number of frames we are encoding per second,
142 Encoder::current_encoding_rate () const
144 boost::mutex::scoped_lock lock (_history_mutex);
145 if (int (_time_history.size()) < _history_size) {
150 gettimeofday (&now, 0);
152 return _history_size / (seconds (now) - seconds (_time_history.back ()));
155 /** @return Number of video frames that have been sent out */
157 Encoder::video_frames_out () const
159 boost::mutex::scoped_lock (_history_mutex);
160 return _video_frames_out;
163 /** Should be called when a frame has been encoded successfully.
164 * @param n Source frame index.
167 Encoder::frame_done ()
169 boost::mutex::scoped_lock lock (_history_mutex);
172 gettimeofday (&tv, 0);
173 _time_history.push_front (tv);
174 if (int (_time_history.size()) > _history_size) {
175 _time_history.pop_back ();
180 Encoder::process_video (shared_ptr<const Image> image, bool same, shared_ptr<Subtitle> sub, Time)
182 boost::mutex::scoped_lock lock (_mutex);
184 /* Wait until the queue has gone down a bit */
185 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
186 TIMING ("decoder sleeps with queue of %1", _queue.size());
187 _condition.wait (lock);
188 TIMING ("decoder wakes with queue of %1", _queue.size());
195 if (_writer->thrown ()) {
199 if (_writer->can_fake_write (_video_frames_out)) {
200 _writer->fake_write (_video_frames_out);
201 _have_a_real_frame = false;
203 } else if (same && _have_a_real_frame) {
204 /* Use the last frame that we encoded. */
205 _writer->repeat (_video_frames_out);
208 /* Queue this new frame for encoding */
209 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
210 TIMING ("adding to queue of %1", _queue.size ());
211 _queue.push_back (shared_ptr<DCPVideoFrame> (
213 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
214 _film->subtitle_offset(), _film->subtitle_scale(),
215 _film->scaler(), _video_frames_out, _film->dcp_video_frame_rate(), s.second,
216 _film->colour_lut(), _film->j2k_bandwidth(),
221 _condition.notify_all ();
222 _have_a_real_frame = true;
229 Encoder::process_audio (shared_ptr<const AudioBuffers> data, Time)
231 _writer->write (data);
235 Encoder::terminate_threads ()
237 boost::mutex::scoped_lock lock (_mutex);
239 _condition.notify_all ();
242 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
243 if ((*i)->joinable ()) {
251 Encoder::encoder_thread (ServerDescription* server)
253 /* Number of seconds that we currently wait between attempts
254 to connect to the server; not relevant for localhost
257 int remote_backoff = 0;
261 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
262 boost::mutex::scoped_lock lock (_mutex);
263 while (_queue.empty () && !_terminate) {
264 _condition.wait (lock);
271 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
272 shared_ptr<DCPVideoFrame> vf = _queue.front ();
273 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
278 shared_ptr<EncodedData> encoded;
282 encoded = vf->encode_remotely (server);
284 if (remote_backoff > 0) {
285 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
288 /* This job succeeded, so remove any backoff */
291 } catch (std::exception& e) {
292 if (remote_backoff < 60) {
294 remote_backoff += 10;
298 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
299 vf->frame(), server->host_name(), e.what(), remote_backoff)
305 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
306 encoded = vf->encode_locally ();
307 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
308 } catch (std::exception& e) {
309 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
314 _writer->write (encoded, vf->frame ());
319 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
321 _queue.push_front (vf);
325 if (remote_backoff > 0) {
326 dcpomatic_sleep (remote_backoff);
330 _condition.notify_all ();