2 Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 /** @file src/j2k_encoder.cc
23 * @brief J2K encoder class.
27 #include "compose.hpp"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
35 #include "j2k_encoder.h"
37 #include "player_video.h"
40 #include <libcxml/cxml.h>
49 using std::make_shared;
50 using std::shared_ptr;
52 using boost::optional;
54 using namespace dcpomatic;
56 static grk_plugin::GrokInitializer grokInitializer;
58 /** @param film Film that we are encoding.
59 * @param writer Writer that we are using.
61 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
65 , dcpomaticContext_(film, writer, _history, Config::instance()->gpu_binary_location())
66 , context_(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr)
68 servers_list_changed ();
72 J2KEncoder::~J2KEncoder ()
74 _server_found_connection.disconnect();
77 boost::mutex::scoped_lock lm(_threads_mutex);
87 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
88 boost::bind(&J2KEncoder::servers_list_changed, this)
93 J2KEncoder::pause(void)
95 if (Config::instance()->enable_gpu())
100 J2KEncoder::resume(void)
102 if (Config::instance()->enable_gpu()) {
103 context_ = new grk_plugin::GrokContext(dcpomaticContext_);
104 servers_list_changed();
109 J2KEncoder::end(bool isFinal)
112 boost::mutex::scoped_lock lock(_queue_mutex);
114 LOG_GENERAL(N_("Clearing queue of %1"), _queue.size());
116 /* Keep waking workers until the queue is empty */
117 while (!_queue.empty()) {
119 _empty_condition.notify_all();
120 _full_condition.wait(lock);
125 LOG_GENERAL_NC (N_("Terminating encoder threads"));
128 boost::mutex::scoped_lock lm (_threads_mutex);
129 terminate_threads ();
132 /* Something might have been thrown during terminate_threads */
135 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
137 /* The following sequence of events can occur in the above code:
138 1. a remote worker takes the last image off the queue
139 2. the loop above terminates
140 3. the remote worker fails to encode the image and puts it back on the queue
141 4. the remote worker is then terminated by terminate_threads
143 So just mop up anything left in the queue here.
146 for (auto& i : _queue) {
147 if (Config::instance()->enable_gpu()) {
148 if (!context_->scheduleCompress(i)) {
149 LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
153 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
156 make_shared<dcp::ArrayData>(i.encode_locally()),
160 } catch (std::exception& e) {
161 LOG_ERROR(N_("Local encode failed (%1)"), e.what());
171 /** @return an estimate of the current number of frames we are encoding per second,
175 J2KEncoder::current_encoding_rate () const
177 return _history.rate ();
181 /** @return Number of video frames that have been queued for encoding */
183 J2KEncoder::video_frames_enqueued () const
185 if (!_last_player_video_time) {
189 return _last_player_video_time->frames_floor (_film->video_frame_rate ());
193 /** Should be called when a frame has been encoded successfully */
195 J2KEncoder::frame_done ()
201 /** Called to request encoding of the next video frame in the DCP. This is called in order,
202 * so each time the supplied frame is the one after the previous one.
203 * pv represents one video frame, and could be empty if there is nothing to encode
204 * for this DCP frame.
206 * @param pv PlayerVideo to encode.
207 * @param time Time of \p pv within the DCP.
210 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
216 boost::mutex::scoped_lock lm (_threads_mutex);
218 threads = _threads->size();
220 threads = std::thread::hardware_concurrency();
223 boost::mutex::scoped_lock queue_lock (_queue_mutex);
225 /* Wait until the queue has gone down a bit. Allow one thing in the queue even
226 when there are no threads.
228 while (_queue.size() >= (threads * 2) + 1) {
229 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
230 _full_condition.wait (queue_lock);
231 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
235 /* Re-throw any exception raised by one of our threads. If more
236 than one has thrown an exception, only one will be rethrown, I think;
237 but then, if that happens something has gone badly wrong.
241 auto const position = time.frames_floor(_film->video_frame_rate());
243 if (_writer.can_fake_write(position)) {
244 /* We can fake-write this frame */
245 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
246 _writer.fake_write(position, pv->eyes ());
248 } else if (pv->has_j2k() && !_film->reencode_j2k()) {
249 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
250 /* This frame already has J2K data, so just write it */
251 _writer.write(pv->j2k(), position, pv->eyes ());
253 } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
254 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
255 _writer.repeat(position, pv->eyes());
257 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
258 /* Queue this new frame for encoding */
259 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
260 auto dcpv = DCPVideo(
263 _film->video_frame_rate(),
264 _film->j2k_bandwidth(),
265 _film->resolution());
266 _queue.push_back(dcpv);
268 /* The queue might not be empty any more, so notify anything which is
271 _empty_condition.notify_all ();
274 _last_player_video[pv->eyes()] = pv;
275 _last_player_video_time = time;
279 /** Caller must hold a lock on _threads_mutex */
281 J2KEncoder::terminate_threads ()
283 boost::this_thread::disable_interruption dis;
289 _threads->interrupt_all ();
291 _threads->join_all ();
292 } catch (exception& e) {
293 LOG_ERROR ("join() threw an exception: %1", e.what());
295 LOG_ERROR_NC ("join() threw an exception");
303 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
306 auto config = Config::instance();
308 start_of_thread ("J2KEncoder");
311 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
313 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
316 /* Number of seconds that we currently wait between attempts
317 to connect to the server; not relevant for localhost
320 int remote_backoff = 0;
324 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
325 boost::mutex::scoped_lock lock (_queue_mutex);
326 while (_queue.empty ()) {
327 _empty_condition.wait (lock);
330 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
331 auto vf = _queue.front ();
333 /* We're about to commit to either encoding this frame or putting it back onto the queue,
334 so we must not be interrupted until one or other of these things have happened. This
335 block has thread interruption disabled.
338 boost::this_thread::disable_interruption dis;
340 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
345 shared_ptr<Data> encoded;
347 /* We need to encode this input */
350 encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
352 if (remote_backoff > 0) {
353 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
356 /* This job succeeded, so remove any backoff */
359 } catch (std::exception& e) {
360 if (remote_backoff < 60) {
362 remote_backoff += 10;
365 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
366 vf.index(), server->host_name(), e.what(), remote_backoff
372 if (!context_->launch(vf, config->selected_gpu()) || !context_->scheduleCompress(vf)) {
373 LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
374 _queue.push_front(vf);
379 LOG_TIMING("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
380 encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
381 LOG_TIMING("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
382 } catch (std::exception& e) {
383 /* This is very bad, so don't cope with it, just pass it on */
384 LOG_ERROR(N_("Local encode failed (%1)"), e.what());
391 _writer.write(encoded, vf.index(), vf.eyes());
394 if (!Config::instance()->enable_gpu()) {
396 LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
397 _queue.push_front(vf);
403 if (remote_backoff > 0) {
404 boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
407 /* The queue might not be full any more, so notify anything that is waiting on that */
409 _full_condition.notify_all ();
412 catch (boost::thread_interrupted& e) {
413 /* Ignore these and just stop the thread */
414 _full_condition.notify_all ();
419 /* Wake anything waiting on _full_condition so it can see the exception */
420 _full_condition.notify_all ();
425 J2KEncoder::servers_list_changed ()
427 boost::mutex::scoped_lock lm (_threads_mutex);
429 terminate_threads ();
430 _threads = make_shared<boost::thread_group>();
432 /* XXX: could re-use threads */
434 if (!Config::instance()->only_servers_encode ()) {
435 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
436 #ifdef DCPOMATIC_LINUX
437 auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
438 pthread_setname_np (t->native_handle(), "encode-worker");
440 _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
445 for (auto i: EncodeServerFinder::instance()->servers()) {
446 if (!i.current_link_version()) {
450 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
451 for (int j = 0; j < i.threads(); ++j) {
452 _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
456 _writer.set_encoder_threads(_threads->size());