Merge tag 'v2.16.76' into v2.17.x
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #ifdef DCPOMATIC_GROK
37 #include "grok/context.h"
38 #include "grok_j2k_encoder_thread.h"
39 #endif
40 #include "remote_j2k_encoder_thread.h"
41 #include "j2k_encoder.h"
42 #include "log.h"
43 #include "player_video.h"
44 #include "util.h"
45 #include "writer.h"
46 #include <libcxml/cxml.h>
47 #include <iostream>
48
49 #include "i18n.h"
50
51
52 using std::cout;
53 using std::dynamic_pointer_cast;
54 using std::exception;
55 using std::list;
56 using std::make_shared;
57 using std::shared_ptr;
58 using std::weak_ptr;
59 using boost::optional;
60 using dcp::Data;
61 using namespace dcpomatic;
62
63 #ifdef DCPOMATIC_GROK
64
65 namespace grk_plugin {
66
67 IMessengerLogger* sLogger = nullptr;
68
69 #if defined(__GNUC__) || defined(__clang__)
70 #pragma GCC diagnostic push
71 #pragma GCC diagnostic ignored "-Wunused-function"
72 #endif
73 void setMessengerLogger(grk_plugin::IMessengerLogger* logger)
74 {
75         delete sLogger;
76         sLogger = logger;
77 }
78 #if defined(__GNUC__) || defined(__clang__)
79 #pragma GCC diagnostic pop
80 #endif
81 grk_plugin::IMessengerLogger* getMessengerLogger(void)
82 {
83         return sLogger;
84 }
85
86 }
87
88 #endif
89
90
91 /** @param film Film that we are encoding.
92  *  @param writer Writer that we are using.
93  */
94 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
95         : _film (film)
96         , _history (200)
97         , _writer (writer)
98 {
99 #ifdef DCPOMATIC_GROK
100         auto grok = Config::instance()->grok().get_value_or({});
101         _dcpomatic_context = new grk_plugin::DcpomaticContext(film, writer, _history, grok.binary_location);
102         if (grok.enable) {
103                 _context = new grk_plugin::GrokContext(_dcpomatic_context);
104         }
105 #endif
106 }
107
108
109 J2KEncoder::~J2KEncoder ()
110 {
111         _server_found_connection.disconnect();
112
113         terminate_threads();
114
115 #ifdef DCPOMATIC_GROK
116         delete _context;
117         delete _dcpomatic_context;
118 #endif
119 }
120
121
122 void
123 J2KEncoder::servers_list_changed()
124 {
125         auto config = Config::instance();
126 #ifdef DCPOMATIC_GROK
127         auto const grok_enable = config->grok().get_value_or({}).enable;
128 #else
129         auto const grok_enable = false;
130 #endif
131
132         auto const cpu = (grok_enable || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
133         auto const gpu = grok_enable ? config->master_encoding_threads() : 0;
134
135         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
136 }
137
138
139 void
140 J2KEncoder::begin ()
141 {
142         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
143                 boost::bind(&J2KEncoder::servers_list_changed, this)
144                 );
145         servers_list_changed ();
146 }
147
148
149 void
150 J2KEncoder::pause()
151 {
152 #ifdef DCPOMATIC_GROK
153         if (!Config::instance()->grok().get_value_or({}).enable) {
154                 return;
155         }
156         return;
157
158         terminate_threads ();
159
160         /* Something might have been thrown during terminate_threads */
161         rethrow ();
162
163         delete _context;
164         _context = nullptr;
165 #endif
166 }
167
168
169 void J2KEncoder::resume()
170 {
171 #ifdef DCPOMATIC_GROK
172         if (!Config::instance()->grok().get_value_or({}).enable) {
173                 return;
174         }
175
176         _context = new grk_plugin::GrokContext(_dcpomatic_context);
177         servers_list_changed();
178 #endif
179 }
180
181
182 void
183 J2KEncoder::end()
184 {
185         boost::mutex::scoped_lock lock (_queue_mutex);
186
187         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
188
189         /* Keep waking workers until the queue is empty */
190         while (!_queue.empty ()) {
191                 rethrow ();
192                 _full_condition.wait (lock);
193         }
194         lock.unlock ();
195
196         LOG_GENERAL_NC (N_("Terminating encoder threads"));
197
198         terminate_threads ();
199
200         /* Something might have been thrown during terminate_threads */
201         rethrow ();
202
203         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
204
205         /* The following sequence of events can occur in the above code:
206              1. a remote worker takes the last image off the queue
207              2. the loop above terminates
208              3. the remote worker fails to encode the image and puts it back on the queue
209              4. the remote worker is then terminated by terminate_threads
210
211              So just mop up anything left in the queue here.
212         */
213         for (auto & i: _queue) {
214 #ifdef DCPOMATIC_GROK
215                 if (Config::instance()->grok().get_value_or({}).enable) {
216                         if (!_context->scheduleCompress(i)){
217                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
218                                 // handle error
219                         }
220                 } else {
221 #else
222                 {
223 #endif
224                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
225                         try {
226                                 _writer.write(
227                                         make_shared<dcp::ArrayData>(i.encode_locally()),
228                                         i.index(),
229                                         i.eyes()
230                                         );
231                                 frame_done ();
232                         } catch (std::exception& e) {
233                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
234                         }
235                 }
236         }
237
238 #ifdef DCPOMATIC_GROK
239         delete _context;
240         _context = nullptr;
241 #endif
242 }
243
244
245 /** @return an estimate of the current number of frames we are encoding per second,
246  *  if known.
247  */
248 optional<float>
249 J2KEncoder::current_encoding_rate () const
250 {
251         return _history.rate ();
252 }
253
254
255 /** @return Number of video frames that have been queued for encoding */
256 int
257 J2KEncoder::video_frames_enqueued () const
258 {
259         if (!_last_player_video_time) {
260                 return 0;
261         }
262
263         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
264 }
265
266
267 /** Should be called when a frame has been encoded successfully */
268 void
269 J2KEncoder::frame_done ()
270 {
271         _history.event ();
272 }
273
274
275 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
276  *  so each time the supplied frame is the one after the previous one.
277  *  pv represents one video frame, and could be empty if there is nothing to encode
278  *  for this DCP frame.
279  *
280  *  @param pv PlayerVideo to encode.
281  *  @param time Time of \p pv within the DCP.
282  */
283 void
284 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
285 {
286         _waker.nudge ();
287
288         size_t threads = 0;
289         {
290                 boost::mutex::scoped_lock lm (_threads_mutex);
291                 threads = _threads.size();
292         }
293
294         boost::mutex::scoped_lock queue_lock (_queue_mutex);
295
296         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
297            when there are no threads.
298         */
299         while (_queue.size() >= (threads * 2) + 1) {
300                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
301                 _full_condition.wait (queue_lock);
302                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
303         }
304
305         _writer.rethrow();
306         /* Re-throw any exception raised by one of our threads.  If more
307            than one has thrown an exception, only one will be rethrown, I think;
308            but then, if that happens something has gone badly wrong.
309         */
310         rethrow ();
311
312         auto const position = time.frames_floor(_film->video_frame_rate());
313
314         if (_writer.can_fake_write(position)) {
315                 /* We can fake-write this frame */
316                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
317                 _writer.fake_write(position, pv->eyes ());
318                 frame_done ();
319         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
320                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
321                 /* This frame already has J2K data, so just write it */
322                 _writer.write(pv->j2k(), position, pv->eyes ());
323                 frame_done ();
324         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
325                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
326                 _writer.repeat(position, pv->eyes());
327         } else {
328                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
329                 /* Queue this new frame for encoding */
330                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
331                 auto dcpv = DCPVideo(
332                                 pv,
333                                 position,
334                                 _film->video_frame_rate(),
335                                 _film->j2k_bandwidth(),
336                                 _film->resolution()
337                                 );
338                 _queue.push_back (dcpv);
339
340                 /* The queue might not be empty any more, so notify anything which is
341                    waiting on that.
342                 */
343                 _empty_condition.notify_all ();
344         }
345
346         _last_player_video[pv->eyes()] = pv;
347         _last_player_video_time = time;
348 }
349
350
351 void
352 J2KEncoder::terminate_threads ()
353 {
354         boost::mutex::scoped_lock lm(_threads_mutex);
355         boost::this_thread::disable_interruption dis;
356
357         for (auto& thread: _threads) {
358                 thread->stop();
359         }
360
361         _threads.clear();
362         _ending = true;
363 }
364
365
366 void
367 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
368 {
369         LOG_GENERAL("Making threads: CPU=%1, GPU=%2, Remote=%3", cpu, gpu, servers.size());
370
371         boost::mutex::scoped_lock lm (_threads_mutex);
372         if (_ending) {
373                 return;
374         }
375
376         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
377                 for (auto i = wanted; i < current; ++i) {
378                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
379                         if (iter != _threads.end()) {
380                                 (*iter)->stop();
381                                 _threads.erase(iter);
382                         }
383                 }
384         };
385
386
387         /* CPU */
388
389         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
390                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
391         };
392
393         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
394
395         for (auto i = current_cpu_threads; i < cpu; ++i) {
396                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
397                 thread->start();
398                 _threads.push_back(thread);
399         }
400
401         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
402
403 #ifdef DCPOMATIC_GROK
404         /* GPU */
405
406         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
407                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
408         };
409
410         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
411
412         for (auto i = current_gpu_threads; i < gpu; ++i) {
413                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
414                 thread->start();
415                 _threads.push_back(thread);
416         }
417
418         remove_threads(gpu, current_gpu_threads, is_grok_thread);
419 #endif
420
421         /* Remote */
422
423         for (auto const& server: servers) {
424                 if (!server.current_link_version()) {
425                         continue;
426                 }
427
428                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
429                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
430                         return remote && remote->server().host_name() == server.host_name();
431                 };
432
433                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
434
435                 auto const wanted_threads = server.threads();
436
437                 if (wanted_threads > current_threads) {
438                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
439                 } else if (wanted_threads < current_threads) {
440                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
441                 }
442
443                 for (auto i = current_threads; i < wanted_threads; ++i) {
444                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
445                         thread->start();
446                         _threads.push_back(thread);
447                 }
448
449                 remove_threads(wanted_threads, current_threads, is_remote_thread);
450         }
451
452         _writer.set_encoder_threads(_threads.size());
453 }
454
455
456 DCPVideo
457 J2KEncoder::pop()
458 {
459         boost::mutex::scoped_lock lock(_queue_mutex);
460         while (_queue.empty()) {
461                 _empty_condition.wait (lock);
462         }
463
464         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
465
466         auto vf = _queue.front();
467         _queue.pop_front();
468
469         _full_condition.notify_all();
470         return vf;
471 }
472
473
474 void
475 J2KEncoder::retry(DCPVideo video)
476 {
477         boost::mutex::scoped_lock lock(_queue_mutex);
478         _queue.push_front(video);
479         _empty_condition.notify_all();
480 }
481
482
483 void
484 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
485 {
486         _writer.write(data, index, eyes);
487         frame_done();
488 }