2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
21 /** @file src/encoder.h
22 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video.h"
33 #include "encode_server_finder.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
63 servers_list_changed ();
74 if (!EncodeServerFinder::instance()->disabled ()) {
75 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
82 boost::mutex::scoped_lock lock (_queue_mutex);
84 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
86 /* Keep waking workers until the queue is empty */
87 while (!_queue.empty ()) {
89 _empty_condition.notify_all ();
90 _full_condition.wait (lock);
95 LOG_GENERAL_NC (N_("Terminating encoder threads"));
99 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
101 /* The following sequence of events can occur in the above code:
102 1. a remote worker takes the last image off the queue
103 2. the loop above terminates
104 3. the remote worker fails to encode the image and puts it back on the queue
105 4. the remote worker is then terminated by terminate_threads
107 So just mop up anything left in the queue here.
110 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
111 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
114 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
119 } catch (std::exception& e) {
120 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
125 /** @return an estimate of the current number of frames we are encoding per second,
129 Encoder::current_encoding_rate () const
131 boost::mutex::scoped_lock lock (_state_mutex);
132 if (int (_time_history.size()) < _history_size) {
137 gettimeofday (&now, 0);
139 return _history_size / (seconds (now) - seconds (_time_history.back ()));
142 /** @return Number of video frames that have been queued for encoding */
144 Encoder::video_frames_enqueued () const
146 return _last_player_video->time().frames_floor (_film->video_frame_rate ());
149 /** Should be called when a frame has been encoded successfully.
150 * @param n Source frame index.
153 Encoder::frame_done ()
155 boost::mutex::scoped_lock lock (_state_mutex);
158 gettimeofday (&tv, 0);
159 _time_history.push_front (tv);
160 if (int (_time_history.size()) > _history_size) {
161 _time_history.pop_back ();
165 /** Called to start encoding of the next video frame in the DCP. This is called in order,
166 * so each time the supplied frame is the one after the previous one.
167 * pv represents one video frame, and could be empty if there is nothing to encode
168 * for this DCP frame.
171 Encoder::encode (shared_ptr<PlayerVideo> pv)
177 boost::mutex::scoped_lock threads_lock (_threads_mutex);
178 threads = _threads.size ();
181 boost::mutex::scoped_lock queue_lock (_queue_mutex);
183 /* Wait until the queue has gone down a bit */
184 while (_queue.size() >= threads * 2) {
185 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
186 _full_condition.wait (queue_lock);
187 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
191 /* Re-throw any exception raised by one of our threads. If more
192 than one has thrown an exception, only one will be rethrown, I think;
193 but then, if that happens something has gone badly wrong.
197 Frame const position = pv->time().frames_floor(_film->video_frame_rate());
199 if (_writer->can_fake_write (position)) {
200 /* We can fake-write this frame */
201 _writer->fake_write (position, pv->eyes ());
203 } else if (pv->has_j2k ()) {
204 /* This frame already has JPEG2000 data, so just write it */
205 _writer->write (pv->j2k(), position, pv->eyes ());
206 } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
207 _writer->repeat (position, pv->eyes ());
209 /* Queue this new frame for encoding */
210 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
211 _queue.push_back (shared_ptr<DCPVideo> (
215 _film->video_frame_rate(),
216 _film->j2k_bandwidth(),
222 /* The queue might not be empty any more, so notify anything which is
225 _empty_condition.notify_all ();
228 _last_player_video = pv;
232 Encoder::terminate_threads ()
234 boost::mutex::scoped_lock threads_lock (_threads_mutex);
237 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
238 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
240 DCPOMATIC_ASSERT ((*i)->joinable ());
243 LOG_GENERAL_NC ("Thread terminated");
251 Encoder::encoder_thread (optional<EncodeServerDescription> server)
255 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
257 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
260 /* Number of seconds that we currently wait between attempts
261 to connect to the server; not relevant for localhost
264 int remote_backoff = 0;
268 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
269 boost::mutex::scoped_lock lock (_queue_mutex);
270 while (_queue.empty ()) {
271 _empty_condition.wait (lock);
274 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
275 shared_ptr<DCPVideo> vf = _queue.front ();
276 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
281 optional<Data> encoded;
283 /* We need to encode this input */
286 encoded = vf->encode_remotely (server.get ());
288 if (remote_backoff > 0) {
289 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
292 /* This job succeeded, so remove any backoff */
295 } catch (std::exception& e) {
296 if (remote_backoff < 60) {
298 remote_backoff += 10;
301 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
302 vf->index(), server->host_name(), e.what(), remote_backoff
308 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
309 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
310 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
311 } catch (std::exception& e) {
312 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
318 _writer->write (encoded.get(), vf->index (), vf->eyes ());
322 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
323 _queue.push_front (vf);
327 if (remote_backoff > 0) {
328 boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
331 /* The queue might not be full any more, so notify anything that is waiting on that */
333 _full_condition.notify_all ();
339 /* Wake anything waiting on _full_condition so it can see the exception */
340 _full_condition.notify_all ();
344 Encoder::servers_list_changed ()
346 terminate_threads ();
348 /* XXX: could re-use threads */
350 boost::mutex::scoped_lock lm (_threads_mutex);
352 #ifdef BOOST_THREAD_PLATFORM_WIN32
354 info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
355 GetVersionEx (&info);
356 bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
358 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
362 if (!Config::instance()->only_servers_encode ()) {
363 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
364 boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
365 _threads.push_back (t);
366 #ifdef BOOST_THREAD_PLATFORM_WIN32
368 SetThreadAffinityMask (t->native_handle(), 1 << i);
374 BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
375 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
376 for (int j = 0; j < i.threads(); ++j) {
377 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
381 _writer->set_encoder_threads (_threads.size ());