2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
21 /** @file src/encoder.h
22 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video.h"
33 #include "encode_server_finder.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
62 , _terminate_enqueue (false)
63 , _terminate_encoding (false)
66 servers_list_changed ();
73 boost::mutex::scoped_lock lm (_queue_mutex);
74 _terminate_enqueue = true;
75 _full_condition.notify_all ();
76 _empty_condition.notify_all ();
82 if (!EncodeServerFinder::instance()->disabled ()) {
83 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
90 boost::mutex::scoped_lock lock (_queue_mutex);
92 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
94 /* Keep waking workers until the queue is empty */
95 while (!_queue.empty ()) {
97 _empty_condition.notify_all ();
98 _full_condition.wait (lock);
103 LOG_GENERAL_NC (N_("Terminating encoder threads"));
105 terminate_threads ();
107 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
109 /* The following sequence of events can occur in the above code:
110 1. a remote worker takes the last image off the queue
111 2. the loop above terminates
112 3. the remote worker fails to encode the image and puts it back on the queue
113 4. the remote worker is then terminated by terminate_threads
115 So just mop up anything left in the queue here.
118 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
119 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
122 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
127 } catch (std::exception& e) {
128 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
133 /** @return an estimate of the current number of frames we are encoding per second,
137 Encoder::current_encoding_rate () const
139 boost::mutex::scoped_lock lock (_state_mutex);
140 if (int (_time_history.size()) < _history_size) {
145 gettimeofday (&now, 0);
147 return _history_size / (seconds (now) - seconds (_time_history.back ()));
150 /** @return Number of video frames that have been sent out */
152 Encoder::video_frames_out () const
154 boost::mutex::scoped_lock (_state_mutex);
158 /** Should be called when a frame has been encoded successfully.
159 * @param n Source frame index.
162 Encoder::frame_done ()
164 boost::mutex::scoped_lock lock (_state_mutex);
167 gettimeofday (&tv, 0);
168 _time_history.push_front (tv);
169 if (int (_time_history.size()) > _history_size) {
170 _time_history.pop_back ();
174 /** Called to start encoding of the next video frame in the DCP. This is called in order,
175 * so each time the supplied frame is the one after the previous one.
176 * pv represents one video frame, and could be empty if there is nothing to encode
177 * for this DCP frame.
180 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
182 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
189 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
195 boost::mutex::scoped_lock threads_lock (_threads_mutex);
196 threads = _threads.size ();
199 boost::mutex::scoped_lock queue_lock (_queue_mutex);
201 /* XXX: discard 3D here if required */
203 /* Wait until the queue has gone down a bit */
204 while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
205 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
206 _full_condition.wait (queue_lock);
207 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
210 if (_terminate_enqueue) {
215 /* Re-throw any exception raised by one of our threads. If more
216 than one has thrown an exception, only one will be rethrown, I think;
217 but then, if that happens something has gone badly wrong.
221 if (_writer->can_fake_write (_position)) {
222 /* We can fake-write this frame */
223 _writer->fake_write (_position, pv->eyes ());
225 } else if (pv->has_j2k ()) {
226 /* This frame already has JPEG2000 data, so just write it */
227 _writer->write (pv->j2k(), _position, pv->eyes ());
228 } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
229 _writer->repeat (_position, pv->eyes ());
231 /* Queue this new frame for encoding */
232 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
233 _queue.push_back (shared_ptr<DCPVideo> (
237 _film->video_frame_rate(),
238 _film->j2k_bandwidth(),
244 /* The queue might not be empty any more, so notify anything which is
247 _empty_condition.notify_all ();
250 _last_player_video = pv;
254 Encoder::terminate_threads ()
257 boost::mutex::scoped_lock queue_lock (_queue_mutex);
258 _terminate_encoding = true;
261 boost::mutex::scoped_lock threads_lock (_threads_mutex);
264 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
265 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
267 if ((*i)->joinable ()) {
271 LOG_GENERAL_NC ("Thread terminated");
276 _terminate_encoding = false;
280 Encoder::encoder_thread (optional<EncodeServerDescription> server)
284 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
286 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
289 /* Number of seconds that we currently wait between attempts
290 to connect to the server; not relevant for localhost
293 int remote_backoff = 0;
297 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
298 boost::mutex::scoped_lock lock (_queue_mutex);
299 while (_queue.empty () && !_terminate_encoding) {
300 _empty_condition.wait (lock);
303 if (_terminate_encoding) {
307 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
308 shared_ptr<DCPVideo> vf = _queue.front ();
309 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
314 optional<Data> encoded;
316 /* We need to encode this input */
319 encoded = vf->encode_remotely (server.get ());
321 if (remote_backoff > 0) {
322 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
325 /* This job succeeded, so remove any backoff */
328 } catch (std::exception& e) {
329 if (remote_backoff < 60) {
331 remote_backoff += 10;
334 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
335 vf->index(), server->host_name(), e.what(), remote_backoff
341 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
343 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
344 } catch (std::exception& e) {
345 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
351 _writer->write (encoded.get(), vf->index (), vf->eyes ());
355 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
356 _queue.push_front (vf);
360 if (remote_backoff > 0) {
361 boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
364 /* The queue might not be full any more, so notify anything that is waiting on that */
366 _full_condition.notify_all ();
372 /* Wake anything waiting on _full_condition so it can see the exception */
373 _full_condition.notify_all ();
377 Encoder::servers_list_changed ()
379 terminate_threads ();
381 /* XXX: could re-use threads */
383 boost::mutex::scoped_lock lm (_threads_mutex);
385 #ifdef BOOST_THREAD_PLATFORM_WIN32
387 info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
388 GetVersionEx (&info);
389 bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
391 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
395 if (!Config::instance()->only_servers_encode ()) {
396 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
397 boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
398 _threads.push_back (t);
399 #ifdef BOOST_THREAD_PLATFORM_WIN32
401 SetThreadAffinityMask (t->native_handle(), 1 << i);
407 BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
408 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
409 for (int j = 0; j < i.threads(); ++j) {
410 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
414 _writer->set_encoder_threads (_threads.size ());