2 Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 /** @file src/j2k_encoder.cc
23 * @brief J2K encoder class.
27 #include "j2k_encoder_cpu_backend.h"
28 #include "j2k_encoder_remote_backend.h"
29 #include "j2k_encoder.h"
33 #include "dcpomatic_log.h"
35 #include "dcp_video.h"
38 #include "encode_server_finder.h"
40 #include "player_video.h"
41 #include "encode_server_description.h"
42 #include "compose.hpp"
43 #include <libcxml/cxml.h>
52 using std::shared_ptr;
54 using std::make_shared;
55 using boost::optional;
57 using namespace dcpomatic;
60 /** @param film Film that we are encoding.
61 * @param writer Writer that we are using.
63 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
68 servers_list_changed ();
72 J2KEncoder::~J2KEncoder ()
74 boost::mutex::scoped_lock lm (_threads_mutex);
82 auto wp = shared_from_this ();
83 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
84 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
89 /* We don't want the servers-list-changed callback trying to do things
90 during destruction of J2KEncoder, and I think this is the neatest way
94 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
96 auto e = encoder.lock ();
98 e->servers_list_changed ();
106 boost::mutex::scoped_lock lock (_queue_mutex);
108 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
110 /* Keep waking workers until the queue is empty */
111 while (!_queue.empty ()) {
113 _empty_condition.notify_all ();
114 _full_condition.wait (lock);
119 LOG_GENERAL_NC (N_("Terminating encoder threads"));
122 boost::mutex::scoped_lock lm (_threads_mutex);
123 terminate_threads ();
126 /* Something might have been thrown during terminate_threads */
129 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
131 /* The following sequence of events can occur in the above code:
132 1. a remote worker takes the last image off the queue
133 2. the loop above terminates
134 3. the remote worker fails to encode the image and puts it back on the queue
135 4. the remote worker is then terminated by terminate_threads
137 So just mop up anything left in the queue here.
140 J2KEncoderCPUBackend cpu;
141 for (auto const& i: _queue) {
142 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
144 auto enc = cpu.encode({i});
145 DCPOMATIC_ASSERT (!enc.empty());
146 _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes());
148 } catch (std::exception& e) {
149 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
155 /** @return an estimate of the current number of frames we are encoding per second,
159 J2KEncoder::current_encoding_rate () const
161 return _history.rate ();
165 /** @return Number of video frames that have been queued for encoding */
167 J2KEncoder::video_frames_enqueued () const
169 if (!_last_player_video_time) {
173 return _last_player_video_time->frames_floor (_film->video_frame_rate ());
177 /** Should be called when a frame has been encoded successfully */
179 J2KEncoder::frame_done ()
185 /** Called to request encoding of the next video frame in the DCP. This is called in order,
186 * so each time the supplied frame is the one after the previous one.
187 * pv represents one video frame, and could be empty if there is nothing to encode
188 * for this DCP frame.
190 * @param pv PlayerVideo to encode.
191 * @param time Time of \p pv within the DCP.
194 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
200 boost::mutex::scoped_lock lm (_threads_mutex);
201 threads = _threads->size();
204 boost::mutex::scoped_lock queue_lock (_queue_mutex);
206 /* Wait until the queue has gone down a bit. Allow one thing in the queue even
207 when there are no threads.
209 while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) {
210 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
211 _full_condition.wait (queue_lock);
212 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
216 /* Re-throw any exception raised by one of our threads. If more
217 than one has thrown an exception, only one will be rethrown, I think;
218 but then, if that happens something has gone badly wrong.
222 auto const position = time.frames_floor(_film->video_frame_rate());
224 if (_writer->can_fake_write (position)) {
225 /* We can fake-write this frame */
226 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
227 _writer->fake_write (position, pv->eyes ());
229 } else if (pv->has_j2k() && !_film->reencode_j2k()) {
230 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
231 /* This frame already has J2K data, so just write it */
232 _writer->write (pv->j2k(), position, pv->eyes ());
234 } else if (_last_player_video[static_cast<int>(pv->eyes())] && _writer->can_repeat(position) && pv->same (_last_player_video[static_cast<int>(pv->eyes())])) {
235 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
236 _writer->repeat (position, pv->eyes ());
238 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
239 /* Queue this new frame for encoding */
240 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
241 _queue.push_back (DCPVideo(
244 _film->video_frame_rate(),
245 _film->j2k_bandwidth(),
249 /* The queue might not be empty any more, so notify anything which is
252 _empty_condition.notify_all ();
255 _last_player_video[static_cast<int>(pv->eyes())] = pv;
256 _last_player_video_time = time;
260 /** Caller must hold a lock on _threads_mutex */
262 J2KEncoder::terminate_threads ()
264 boost::this_thread::disable_interruption dis;
270 _threads->interrupt_all ();
272 _threads->join_all ();
273 } catch (exception& e) {
274 LOG_ERROR ("join() threw an exception: %1", e.what());
276 LOG_ERROR_NC ("join() threw an exception");
284 J2KEncoder::encoder_thread (shared_ptr<J2KEncoderBackend> backend)
287 start_of_thread ("J2KEncoder");
289 LOG_TIMING ("start-encoder-thread thread=%1", thread_id());
293 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
294 boost::mutex::scoped_lock lock (_queue_mutex);
295 while (_queue.empty ()) {
296 _empty_condition.wait (lock);
299 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
300 auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity()));
301 std::vector<DCPVideo> vf (_queue.begin(), end);
303 /* We're about to commit to either encoding these frames or putting them back onto the queue,
304 so we must not be interrupted until one or other of these things have happened. This
305 block has thread interruption disabled.
308 boost::this_thread::disable_interruption dis;
310 _queue.erase(_queue.begin(), end);
314 auto encoded = backend->encode(vf);
316 if (encoded.size() == vf.size()) {
317 for (auto i = 0U; i < encoded.size(); ++i) {
318 _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes());
323 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size());
324 _queue.insert (_queue.begin(), vf.begin(), vf.end());
329 /* The queue might not be full any more, so notify anything that is waiting on that */
331 _full_condition.notify_all ();
334 catch (boost::thread_interrupted& e) {
335 /* Ignore these and just stop the thread */
336 _full_condition.notify_all ();
341 /* Wake anything waiting on _full_condition so it can see the exception */
342 _full_condition.notify_all ();
347 J2KEncoder::servers_list_changed ()
350 boost::mutex::scoped_lock lm (_threads_mutex);
352 terminate_threads ();
353 _threads = make_shared<boost::thread_group>();
355 _frames_in_parallel = 0;
357 /* XXX: could re-use threads */
359 if (!Config::instance()->only_servers_encode ()) {
360 auto backend = std::make_shared<J2KEncoderCPUBackend>();
361 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
362 #ifdef DCPOMATIC_LINUX
363 auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
364 pthread_setname_np (t->native_handle(), "encode-worker");
366 _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
368 _frames_in_parallel += backend->quantity();
372 for (auto i: EncodeServerFinder::instance()->servers()) {
373 if (!i.current_link_version()) {
377 auto backend = std::make_shared<J2KEncoderRemoteBackend>(i);
379 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
380 for (int j = 0; j < i.threads(); ++j) {
381 _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
382 _frames_in_parallel += backend->quantity();
386 _writer->set_encoder_threads (_threads->size());
389 terminate_threads ();