Add an option to dump the decryption certificate from the KDM CLI.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "j2k_encoder.h"
36 #include "log.h"
37 #include "player_video.h"
38 #include "util.h"
39 #include "writer.h"
40 #include <libcxml/cxml.h>
41 #include <iostream>
42
43 #include "i18n.h"
44
45
46 using std::cout;
47 using std::exception;
48 using std::list;
49 using std::make_shared;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54 using namespace dcpomatic;
55
56
57 /** @param film Film that we are encoding.
58  *  @param writer Writer that we are using.
59  */
60 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
61         : _film (film)
62         , _history (200)
63         , _writer (writer)
64 {
65 }
66
67
68 J2KEncoder::~J2KEncoder ()
69 {
70         _server_found_connection.disconnect();
71
72         /* One of our encoder threads may be waiting on Writer::write() to return, if that method
73          * is blocked with the writer queue full waiting for _full_condition.  In that case, the
74          * attempt to terminate the encoder threads below (in terminate_threads()) will fail because
75          * the encoder thread waiting for ::write() will have interruption disabled.
76          *
77          * To work around that, make the writer into a zombie to unblock any pending write()s and
78          * not block on any future ones.
79          */
80         _writer.zombify();
81
82         boost::mutex::scoped_lock lm (_threads_mutex);
83         terminate_threads ();
84 }
85
86
87 void
88 J2KEncoder::begin ()
89 {
90         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
91                 boost::bind(&J2KEncoder::servers_list_changed, this)
92                 );
93         servers_list_changed ();
94 }
95
96
97 void
98 J2KEncoder::end ()
99 {
100         boost::mutex::scoped_lock lock (_queue_mutex);
101
102         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
103
104         /* Keep waking workers until the queue is empty */
105         while (!_queue.empty ()) {
106                 rethrow ();
107                 _empty_condition.notify_all ();
108                 _full_condition.wait (lock);
109         }
110
111         lock.unlock ();
112
113         LOG_GENERAL_NC (N_("Terminating encoder threads"));
114
115         {
116                 boost::mutex::scoped_lock lm (_threads_mutex);
117                 terminate_threads ();
118         }
119
120         /* Something might have been thrown during terminate_threads */
121         rethrow ();
122
123         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
124
125         /* The following sequence of events can occur in the above code:
126              1. a remote worker takes the last image off the queue
127              2. the loop above terminates
128              3. the remote worker fails to encode the image and puts it back on the queue
129              4. the remote worker is then terminated by terminate_threads
130
131              So just mop up anything left in the queue here.
132         */
133
134         for (auto const& i: _queue) {
135                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
136                 try {
137                         _writer.write(
138                                 make_shared<dcp::ArrayData>(i.encode_locally()),
139                                 i.index(),
140                                 i.eyes()
141                                 );
142                         frame_done ();
143                 } catch (std::exception& e) {
144                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
145                 }
146         }
147 }
148
149
150 /** @return an estimate of the current number of frames we are encoding per second,
151  *  if known.
152  */
153 optional<float>
154 J2KEncoder::current_encoding_rate () const
155 {
156         return _history.rate ();
157 }
158
159
160 /** @return Number of video frames that have been queued for encoding */
161 int
162 J2KEncoder::video_frames_enqueued () const
163 {
164         if (!_last_player_video_time) {
165                 return 0;
166         }
167
168         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
169 }
170
171
172 /** Should be called when a frame has been encoded successfully */
173 void
174 J2KEncoder::frame_done ()
175 {
176         _history.event ();
177 }
178
179
180 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
181  *  so each time the supplied frame is the one after the previous one.
182  *  pv represents one video frame, and could be empty if there is nothing to encode
183  *  for this DCP frame.
184  *
185  *  @param pv PlayerVideo to encode.
186  *  @param time Time of \p pv within the DCP.
187  */
188 void
189 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
190 {
191         _waker.nudge ();
192
193         size_t threads = 0;
194         {
195                 boost::mutex::scoped_lock lm (_threads_mutex);
196                 threads = _threads->size();
197         }
198
199         boost::mutex::scoped_lock queue_lock (_queue_mutex);
200
201         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
202            when there are no threads.
203         */
204         while (_queue.size() >= (threads * 2) + 1) {
205                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
206                 _full_condition.wait (queue_lock);
207                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
208         }
209
210         _writer.rethrow();
211         /* Re-throw any exception raised by one of our threads.  If more
212            than one has thrown an exception, only one will be rethrown, I think;
213            but then, if that happens something has gone badly wrong.
214         */
215         rethrow ();
216
217         auto const position = time.frames_floor(_film->video_frame_rate());
218
219         if (_writer.can_fake_write(position)) {
220                 /* We can fake-write this frame */
221                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
222                 _writer.fake_write(position, pv->eyes ());
223                 frame_done ();
224         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
225                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
226                 /* This frame already has J2K data, so just write it */
227                 _writer.write(pv->j2k(), position, pv->eyes ());
228                 frame_done ();
229         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
230                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
231                 _writer.repeat(position, pv->eyes());
232         } else {
233                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
234                 /* Queue this new frame for encoding */
235                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
236                 _queue.push_back (DCPVideo(
237                                 pv,
238                                 position,
239                                 _film->video_frame_rate(),
240                                 _film->j2k_bandwidth(),
241                                 _film->resolution()
242                                 ));
243
244                 /* The queue might not be empty any more, so notify anything which is
245                    waiting on that.
246                 */
247                 _empty_condition.notify_all ();
248         }
249
250         _last_player_video[pv->eyes()] = pv;
251         _last_player_video_time = time;
252 }
253
254
255 /** Caller must hold a lock on _threads_mutex */
256 void
257 J2KEncoder::terminate_threads ()
258 {
259         boost::this_thread::disable_interruption dis;
260
261         if (!_threads) {
262                 return;
263         }
264
265         _threads->interrupt_all ();
266         try {
267                 _threads->join_all ();
268         } catch (exception& e) {
269                 LOG_ERROR ("join() threw an exception: %1", e.what());
270         } catch (...) {
271                 LOG_ERROR_NC ("join() threw an exception");
272         }
273
274         _threads.reset ();
275 }
276
277
278 void
279 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
280 try
281 {
282         start_of_thread ("J2KEncoder");
283
284         if (server) {
285                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
286         } else {
287                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
288         }
289
290         /* Number of seconds that we currently wait between attempts
291            to connect to the server; not relevant for localhost
292            encodings.
293         */
294         int remote_backoff = 0;
295
296         while (true) {
297
298                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
299                 boost::mutex::scoped_lock lock (_queue_mutex);
300                 while (_queue.empty ()) {
301                         _empty_condition.wait (lock);
302                 }
303
304                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
305                 auto vf = _queue.front ();
306
307                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
308                    so we must not be interrupted until one or other of these things have happened.  This
309                    block has thread interruption disabled.
310                 */
311                 {
312                         boost::this_thread::disable_interruption dis;
313
314                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
315                         _queue.pop_front ();
316
317                         lock.unlock ();
318
319                         shared_ptr<Data> encoded;
320
321                         /* We need to encode this input */
322                         if (server) {
323                                 try {
324                                         encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
325
326                                         if (remote_backoff > 0) {
327                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
328                                         }
329
330                                         /* This job succeeded, so remove any backoff */
331                                         remote_backoff = 0;
332
333                                 } catch (std::exception& e) {
334                                         if (remote_backoff < 60) {
335                                                 /* back off more */
336                                                 remote_backoff += 10;
337                                         }
338                                         LOG_ERROR (
339                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
340                                                 vf.index(), server->host_name(), e.what(), remote_backoff
341                                                 );
342                                 }
343
344                         } else {
345                                 try {
346                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
347                                         encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
348                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
349                                 } catch (std::exception& e) {
350                                         /* This is very bad, so don't cope with it, just pass it on */
351                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
352                                         throw;
353                                 }
354                         }
355
356                         if (encoded) {
357                                 _writer.write(encoded, vf.index(), vf.eyes());
358                                 frame_done ();
359                         } else {
360                                 lock.lock ();
361                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
362                                 _queue.push_front (vf);
363                                 lock.unlock ();
364                         }
365                 }
366
367                 if (remote_backoff > 0) {
368                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
369                 }
370
371                 /* The queue might not be full any more, so notify anything that is waiting on that */
372                 lock.lock ();
373                 _full_condition.notify_all ();
374         }
375 }
376 catch (boost::thread_interrupted& e) {
377         /* Ignore these and just stop the thread */
378         _full_condition.notify_all ();
379 }
380 catch (...)
381 {
382         store_current ();
383         /* Wake anything waiting on _full_condition so it can see the exception */
384         _full_condition.notify_all ();
385 }
386
387
388 void
389 J2KEncoder::servers_list_changed ()
390 {
391         boost::mutex::scoped_lock lm (_threads_mutex);
392
393         terminate_threads ();
394         _threads = make_shared<boost::thread_group>();
395
396         /* XXX: could re-use threads */
397
398         if (!Config::instance()->only_servers_encode ()) {
399                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
400 #ifdef DCPOMATIC_LINUX
401                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
402                         pthread_setname_np (t->native_handle(), "encode-worker");
403 #else
404                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
405 #endif
406                 }
407         }
408
409         for (auto i: EncodeServerFinder::instance()->servers()) {
410                 if (!i.current_link_version()) {
411                         continue;
412                 }
413
414                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
415                 for (int j = 0; j < i.threads(); ++j) {
416                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
417                 }
418         }
419
420         _writer.set_encoder_threads(_threads->size());
421 }