2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
21 /** @file src/encoder.h
22 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video.h"
33 #include "encode_server_finder.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
64 servers_list_changed ();
75 if (!EncodeServerFinder::instance()->disabled ()) {
76 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
83 boost::mutex::scoped_lock lock (_queue_mutex);
85 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
87 /* Keep waking workers until the queue is empty */
88 while (!_queue.empty ()) {
90 _empty_condition.notify_all ();
91 _full_condition.wait (lock);
96 LOG_GENERAL_NC (N_("Terminating encoder threads"));
100 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
102 /* The following sequence of events can occur in the above code:
103 1. a remote worker takes the last image off the queue
104 2. the loop above terminates
105 3. the remote worker fails to encode the image and puts it back on the queue
106 4. the remote worker is then terminated by terminate_threads
108 So just mop up anything left in the queue here.
111 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
115 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
120 } catch (std::exception& e) {
121 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
126 /** @return an estimate of the current number of frames we are encoding per second,
130 Encoder::current_encoding_rate () const
132 boost::mutex::scoped_lock lock (_state_mutex);
133 if (int (_time_history.size()) < _history_size) {
138 gettimeofday (&now, 0);
140 return _history_size / (seconds (now) - seconds (_time_history.back ()));
143 /** @return Number of video frames that have been sent out */
145 Encoder::video_frames_out () const
147 boost::mutex::scoped_lock (_state_mutex);
151 /** Should be called when a frame has been encoded successfully.
152 * @param n Source frame index.
155 Encoder::frame_done ()
157 boost::mutex::scoped_lock lock (_state_mutex);
160 gettimeofday (&tv, 0);
161 _time_history.push_front (tv);
162 if (int (_time_history.size()) > _history_size) {
163 _time_history.pop_back ();
167 /** Called to start encoding of the next video frame in the DCP. This is called in order,
168 * so each time the supplied frame is the one after the previous one.
169 * pv represents one video frame, and could be empty if there is nothing to encode
170 * for this DCP frame.
173 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
175 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
176 if (!_film->three_d()) {
178 if (i->eyes() == EYES_RIGHT) {
179 /* Discard right-eye images */
181 } else if (i->eyes() == EYES_LEFT) {
182 /* Use left-eye images for both eyes */
183 i->set_eyes (EYES_BOTH);
193 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
199 boost::mutex::scoped_lock threads_lock (_threads_mutex);
200 threads = _threads.size ();
203 boost::mutex::scoped_lock queue_lock (_queue_mutex);
205 /* Wait until the queue has gone down a bit */
206 while (_queue.size() >= threads * 2) {
207 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
208 _full_condition.wait (queue_lock);
209 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
213 /* Re-throw any exception raised by one of our threads. If more
214 than one has thrown an exception, only one will be rethrown, I think;
215 but then, if that happens something has gone badly wrong.
219 if (_writer->can_fake_write (_position)) {
220 /* We can fake-write this frame */
221 _writer->fake_write (_position, pv->eyes ());
223 } else if (pv->has_j2k ()) {
224 /* This frame already has JPEG2000 data, so just write it */
225 _writer->write (pv->j2k(), _position, pv->eyes ());
226 } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
227 _writer->repeat (_position, pv->eyes ());
229 /* Queue this new frame for encoding */
230 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
231 _queue.push_back (shared_ptr<DCPVideo> (
235 _film->video_frame_rate(),
236 _film->j2k_bandwidth(),
242 /* The queue might not be empty any more, so notify anything which is
245 _empty_condition.notify_all ();
248 _last_player_video = pv;
252 Encoder::terminate_threads ()
254 boost::mutex::scoped_lock threads_lock (_threads_mutex);
257 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
258 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
260 DCPOMATIC_ASSERT ((*i)->joinable ());
263 LOG_GENERAL_NC ("Thread terminated");
271 Encoder::encoder_thread (optional<EncodeServerDescription> server)
275 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
277 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
280 /* Number of seconds that we currently wait between attempts
281 to connect to the server; not relevant for localhost
284 int remote_backoff = 0;
288 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
289 boost::mutex::scoped_lock lock (_queue_mutex);
290 while (_queue.empty ()) {
291 _empty_condition.wait (lock);
294 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
295 shared_ptr<DCPVideo> vf = _queue.front ();
296 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
301 optional<Data> encoded;
303 /* We need to encode this input */
306 encoded = vf->encode_remotely (server.get ());
308 if (remote_backoff > 0) {
309 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
312 /* This job succeeded, so remove any backoff */
315 } catch (std::exception& e) {
316 if (remote_backoff < 60) {
318 remote_backoff += 10;
321 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
322 vf->index(), server->host_name(), e.what(), remote_backoff
328 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
329 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
330 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
331 } catch (std::exception& e) {
332 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
338 _writer->write (encoded.get(), vf->index (), vf->eyes ());
342 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
343 _queue.push_front (vf);
347 if (remote_backoff > 0) {
348 boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
351 /* The queue might not be full any more, so notify anything that is waiting on that */
353 _full_condition.notify_all ();
359 /* Wake anything waiting on _full_condition so it can see the exception */
360 _full_condition.notify_all ();
364 Encoder::servers_list_changed ()
366 terminate_threads ();
368 /* XXX: could re-use threads */
370 boost::mutex::scoped_lock lm (_threads_mutex);
372 #ifdef BOOST_THREAD_PLATFORM_WIN32
374 info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
375 GetVersionEx (&info);
376 bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
378 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
382 if (!Config::instance()->only_servers_encode ()) {
383 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
384 boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
385 _threads.push_back (t);
386 #ifdef BOOST_THREAD_PLATFORM_WIN32
388 SetThreadAffinityMask (t->native_handle(), 1 << i);
394 BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
395 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
396 for (int j = 0; j < i.threads(); ++j) {
397 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
401 _writer->set_encoder_threads (_threads.size ());