4e4759bc669d46bde4a022c9002a9bcf471ce389
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "j2k_encoder.h"
36 #include "log.h"
37 #include "player_video.h"
38 #include "util.h"
39 #include "writer.h"
40 #include <libcxml/cxml.h>
41 #include <iostream>
42
43 #include "i18n.h"
44
45
46 using std::cout;
47 using std::exception;
48 using std::list;
49 using std::make_shared;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54 using namespace dcpomatic;
55
56 static grk_plugin::GrokInitializer grokInitializer;
57
58 /** @param film Film that we are encoding.
59  *  @param writer Writer that we are using.
60  */
61 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
62         : _film (film)
63         , _history (200)
64         , _writer (writer)
65         , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location())
66         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
67 {
68 }
69
70
71 J2KEncoder::~J2KEncoder ()
72 {
73         _server_found_connection.disconnect();
74
75         {
76         boost::mutex::scoped_lock lm (_threads_mutex);
77         terminate_threads ();
78         }
79
80         delete _context;
81 }
82
83 void
84 J2KEncoder::begin ()
85 {
86         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
87                 boost::bind(&J2KEncoder::servers_list_changed, this)
88                 );
89         servers_list_changed ();
90 }
91
92
93 void
94 J2KEncoder::pause()
95 {
96         if (!Config::instance()->enable_gpu()) {
97                 return;
98         }
99
100         {
101                 boost::mutex::scoped_lock lm (_threads_mutex);
102                 terminate_threads ();
103         }
104
105         /* Something might have been thrown during terminate_threads */
106         rethrow ();
107
108         delete _context;
109         _context = nullptr;
110 }
111
112
113 void J2KEncoder::resume()
114 {
115         if (!Config::instance()->enable_gpu()) {
116                 return;
117         }
118
119         _context = new grk_plugin::GrokContext(_dcpomatic_context);
120         servers_list_changed();
121 }
122
123
124 void
125 J2KEncoder::end()
126 {
127         boost::mutex::scoped_lock lock (_queue_mutex);
128
129         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
130
131         /* Keep waking workers until the queue is empty */
132         while (!_queue.empty ()) {
133                 rethrow ();
134                 _empty_condition.notify_all ();
135                 _full_condition.wait (lock);
136         }
137         lock.unlock ();
138
139         LOG_GENERAL_NC (N_("Terminating encoder threads"));
140
141         {
142                 boost::mutex::scoped_lock lm (_threads_mutex);
143                 terminate_threads ();
144         }
145
146         /* Something might have been thrown during terminate_threads */
147         rethrow ();
148
149         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
150
151         /* The following sequence of events can occur in the above code:
152              1. a remote worker takes the last image off the queue
153              2. the loop above terminates
154              3. the remote worker fails to encode the image and puts it back on the queue
155              4. the remote worker is then terminated by terminate_threads
156
157              So just mop up anything left in the queue here.
158         */
159         for (auto & i: _queue) {
160                 if (Config::instance()->enable_gpu ()) {
161                         if (!_context->scheduleCompress(i)){
162                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
163                                 // handle error
164                         }
165                 }
166                 else {
167                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
168                         try {
169                                 _writer.write(
170                                                 make_shared<dcp::ArrayData>(i.encode_locally()),
171                                         i.index(),
172                                         i.eyes()
173                                         );
174                                 frame_done ();
175                         } catch (std::exception& e) {
176                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
177                         }
178                 }
179         }
180
181         delete _context;
182         _context = nullptr;
183 }
184
185
186 /** @return an estimate of the current number of frames we are encoding per second,
187  *  if known.
188  */
189 optional<float>
190 J2KEncoder::current_encoding_rate () const
191 {
192         return _history.rate ();
193 }
194
195
196 /** @return Number of video frames that have been queued for encoding */
197 int
198 J2KEncoder::video_frames_enqueued () const
199 {
200         if (!_last_player_video_time) {
201                 return 0;
202         }
203
204         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
205 }
206
207
208 /** Should be called when a frame has been encoded successfully */
209 void
210 J2KEncoder::frame_done ()
211 {
212         _history.event ();
213 }
214
215
216 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
217  *  so each time the supplied frame is the one after the previous one.
218  *  pv represents one video frame, and could be empty if there is nothing to encode
219  *  for this DCP frame.
220  *
221  *  @param pv PlayerVideo to encode.
222  *  @param time Time of \p pv within the DCP.
223  */
224 void
225 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
226 {
227         _waker.nudge ();
228
229         size_t threads = 0;
230         {
231                 boost::mutex::scoped_lock lm (_threads_mutex);
232                 if (_threads)
233                         threads = _threads->size();
234                 else
235                         threads = std::thread::hardware_concurrency();
236         }
237
238         boost::mutex::scoped_lock queue_lock (_queue_mutex);
239
240         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
241            when there are no threads.
242         */
243         while (_queue.size() >= (threads * 2) + 1) {
244                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
245                 _full_condition.wait (queue_lock);
246                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
247         }
248
249         _writer.rethrow();
250         /* Re-throw any exception raised by one of our threads.  If more
251            than one has thrown an exception, only one will be rethrown, I think;
252            but then, if that happens something has gone badly wrong.
253         */
254         rethrow ();
255
256         auto const position = time.frames_floor(_film->video_frame_rate());
257
258         if (_writer.can_fake_write(position)) {
259                 /* We can fake-write this frame */
260                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
261                 _writer.fake_write(position, pv->eyes ());
262                 frame_done ();
263         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
264                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
265                 /* This frame already has J2K data, so just write it */
266                 _writer.write(pv->j2k(), position, pv->eyes ());
267                 frame_done ();
268         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
269                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
270                 _writer.repeat(position, pv->eyes());
271         } else {
272                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
273                 /* Queue this new frame for encoding */
274                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
275                 auto dcpv = DCPVideo(
276                                 pv,
277                                 position,
278                                 _film->video_frame_rate(),
279                                 _film->j2k_bandwidth(),
280                                 _film->resolution()
281                                 );
282                 _queue.push_back (dcpv);
283
284                 /* The queue might not be empty any more, so notify anything which is
285                    waiting on that.
286                 */
287                 _empty_condition.notify_all ();
288         }
289
290         _last_player_video[pv->eyes()] = pv;
291         _last_player_video_time = time;
292 }
293
294
295 /** Caller must hold a lock on _threads_mutex */
296 void
297 J2KEncoder::terminate_threads ()
298 {
299         boost::this_thread::disable_interruption dis;
300
301         if (!_threads) {
302                 return;
303         }
304
305         _threads->interrupt_all ();
306         try {
307                 _threads->join_all ();
308         } catch (exception& e) {
309                 LOG_ERROR ("join() threw an exception: %1", e.what());
310         } catch (...) {
311                 LOG_ERROR_NC ("join() threw an exception");
312         }
313
314         _threads.reset ();
315 }
316
317
318 void
319 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
320 try
321 {
322         auto config = Config::instance ();
323
324         start_of_thread ("J2KEncoder");
325
326         if (server) {
327                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
328         } else {
329                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
330         }
331
332         /* Number of seconds that we currently wait between attempts
333            to connect to the server; not relevant for localhost
334            encodings.
335         */
336         int remote_backoff = 0;
337
338         while (true) {
339
340                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
341                 boost::mutex::scoped_lock lock (_queue_mutex);
342                 while (_queue.empty ()) {
343                         _empty_condition.wait (lock);
344                 }
345
346                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
347                 auto vf = _queue.front ();
348
349                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
350                    so we must not be interrupted until one or other of these things have happened.  This
351                    block has thread interruption disabled.
352                 */
353                 {
354                         boost::this_thread::disable_interruption dis;
355
356                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
357                         _queue.pop_front ();
358
359                         lock.unlock ();
360
361                         shared_ptr<Data> encoded;
362
363                         /* We need to encode this input */
364                         if (server) {
365                                 try {
366                                         encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
367
368                                         if (remote_backoff > 0) {
369                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
370                                         }
371
372                                         /* This job succeeded, so remove any backoff */
373                                         remote_backoff = 0;
374
375                                 } catch (std::exception& e) {
376                                         if (remote_backoff < 60) {
377                                                 /* back off more */
378                                                 remote_backoff += 10;
379                                         }
380                                         LOG_ERROR (
381                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
382                                                 vf.index(), server->host_name(), e.what(), remote_backoff
383                                                 );
384                                 }
385
386                         } else {
387                                 if (_context) {
388                                         if (!_context->launch(vf, config->selected_gpu()) || !_context->scheduleCompress(vf)) {
389                                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
390                                                 _queue.push_front (vf);
391                                         }
392                                 } else {
393                                         try {
394                                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
395                                                 encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
396                                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
397                                         } catch (std::exception& e) {
398                                                 /* This is very bad, so don't cope with it, just pass it on */
399                                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
400                                                 throw;
401                                         }
402                                 }
403                         }
404
405                         if (encoded) {
406                                 _writer.write(encoded, vf.index(), vf.eyes());
407                                 frame_done ();
408                         } else {
409                                 if (!Config::instance()->enable_gpu ()) {
410                                         lock.lock ();
411                                         LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
412                                         _queue.push_front (vf);
413                                         lock.unlock ();
414                                 }
415                         }
416                 }
417
418                 if (remote_backoff > 0) {
419                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
420                 }
421
422                 /* The queue might not be full any more, so notify anything that is waiting on that */
423                 lock.lock ();
424                 _full_condition.notify_all ();
425         }
426 }
427 catch (boost::thread_interrupted& e) {
428         /* Ignore these and just stop the thread */
429         _full_condition.notify_all ();
430 }
431 catch (...)
432 {
433         store_current ();
434         /* Wake anything waiting on _full_condition so it can see the exception */
435         _full_condition.notify_all ();
436 }
437
438
439 void
440 J2KEncoder::servers_list_changed ()
441 {
442         boost::mutex::scoped_lock lm (_threads_mutex);
443
444         terminate_threads ();
445         _threads = make_shared<boost::thread_group>();
446
447         /* XXX: could re-use threads */
448
449         if (!Config::instance()->only_servers_encode ()) {
450                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
451 #ifdef DCPOMATIC_LINUX
452                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
453                         pthread_setname_np (t->native_handle(), "encode-worker");
454 #else
455                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
456 #endif
457                 }
458         }
459
460         for (auto i: EncodeServerFinder::instance()->servers()) {
461                 if (!i.current_link_version()) {
462                         continue;
463                 }
464
465                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
466                 for (int j = 0; j < i.threads(); ++j) {
467                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
468                 }
469         }
470
471         _writer.set_encoder_threads(_threads->size());
472 }