2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
61 , _video_frames_enqueued (0)
67 servers_list_changed ();
78 if (!ServerFinder::instance()->disabled ()) {
79 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
86 boost::mutex::scoped_lock lock (_queue_mutex);
88 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
90 /* Keep waking workers until the queue is empty */
91 while (!_queue.empty ()) {
92 _empty_condition.notify_all ();
93 _full_condition.wait (lock);
100 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
102 /* The following sequence of events can occur in the above code:
103 1. a remote worker takes the last image off the queue
104 2. the loop above terminates
105 3. the remote worker fails to encode the image and puts it back on the queue
106 4. the remote worker is then terminated by terminate_threads
108 So just mop up anything left in the queue here.
111 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
115 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
120 } catch (std::exception& e) {
121 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
126 /** @return an estimate of the current number of frames we are encoding per second,
130 Encoder::current_encoding_rate () const
132 boost::mutex::scoped_lock lock (_state_mutex);
133 if (int (_time_history.size()) < _history_size) {
138 gettimeofday (&now, 0);
140 return _history_size / (seconds (now) - seconds (_time_history.back ()));
143 /** @return Number of video frames that have been sent out */
145 Encoder::video_frames_out () const
147 boost::mutex::scoped_lock (_state_mutex);
148 return _video_frames_enqueued;
151 /** Should be called when a frame has been encoded successfully.
152 * @param n Source frame index.
155 Encoder::frame_done ()
157 boost::mutex::scoped_lock lock (_state_mutex);
160 gettimeofday (&tv, 0);
161 _time_history.push_front (tv);
162 if (int (_time_history.size()) > _history_size) {
163 _time_history.pop_back ();
167 /** Called in order, so each time this is called the supplied frame is the one
168 * after the previous one.
171 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
177 boost::mutex::scoped_lock threads_lock (_threads_mutex);
178 threads = _threads.size ();
181 boost::mutex::scoped_lock queue_lock (_queue_mutex);
183 /* XXX: discard 3D here if required */
185 /* Wait until the queue has gone down a bit */
186 while (_queue.size() >= threads * 2 && !_terminate) {
187 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
188 _full_condition.wait (queue_lock);
189 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
197 /* Re-throw any exception raised by one of our threads. If more
198 than one has thrown an exception, only one will be rethrown, I think;
199 but then, if that happens something has gone badly wrong.
203 if (_writer->can_fake_write (_video_frames_enqueued)) {
204 /* We can fake-write this frame */
205 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
207 } else if (pv->has_j2k ()) {
208 /* This frame already has JPEG2000 data, so just write it */
209 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
210 } else if (_last_player_video && pv->same (_last_player_video)) {
211 _writer->repeat (_video_frames_enqueued, pv->eyes ());
213 /* Queue this new frame for encoding */
214 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
215 _queue.push_back (shared_ptr<DCPVideo> (
218 _video_frames_enqueued,
219 _film->video_frame_rate(),
220 _film->j2k_bandwidth(),
226 /* The queue might not be empty any more, so notify anything which is
229 _empty_condition.notify_all ();
232 switch (pv->eyes ()) {
234 ++_video_frames_enqueued;
246 if (_left_done && _right_done) {
247 ++_video_frames_enqueued;
248 _left_done = _right_done = false;
251 _last_player_video = pv;
255 Encoder::terminate_threads ()
258 boost::mutex::scoped_lock queue_lock (_queue_mutex);
260 _full_condition.notify_all ();
261 _empty_condition.notify_all ();
264 boost::mutex::scoped_lock threads_lock (_threads_mutex);
266 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
267 if ((*i)->joinable ()) {
278 Encoder::encoder_thread (optional<ServerDescription> server)
282 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
284 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
287 /* Number of seconds that we currently wait between attempts
288 to connect to the server; not relevant for localhost
291 int remote_backoff = 0;
295 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
296 boost::mutex::scoped_lock lock (_queue_mutex);
297 while (_queue.empty () && !_terminate) {
298 _empty_condition.wait (lock);
305 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
306 shared_ptr<DCPVideo> vf = _queue.front ();
307 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
312 optional<Data> encoded;
314 /* We need to encode this input */
317 encoded = vf->encode_remotely (server.get ());
319 if (remote_backoff > 0) {
320 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
323 /* This job succeeded, so remove any backoff */
326 } catch (std::exception& e) {
327 if (remote_backoff < 60) {
329 remote_backoff += 10;
332 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
333 vf->index(), server->host_name(), e.what(), remote_backoff
339 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
340 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
341 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342 } catch (std::exception& e) {
343 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
348 _writer->write (encoded.get(), vf->index (), vf->eyes ());
352 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
353 _queue.push_front (vf);
357 if (remote_backoff > 0) {
358 dcpomatic_sleep (remote_backoff);
361 /* The queue might not be full any more, so notify anything that is waiting on that */
363 _full_condition.notify_all ();
372 Encoder::servers_list_changed ()
374 terminate_threads ();
376 /* XXX: could re-use threads */
378 boost::mutex::scoped_lock lm (_threads_mutex);
380 if (!Config::instance()->only_servers_encode ()) {
381 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
382 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
386 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
387 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
388 for (int j = 0; j < i.threads(); ++j) {
389 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
393 _writer->set_encoder_threads (_threads.size ());