Clean up grok's presence in the config file and make sure it's optional.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #ifdef DCPOMATIC_GROK
37 #include "grok/context.h"
38 #include "grok_j2k_encoder_thread.h"
39 #endif
40 #include "remote_j2k_encoder_thread.h"
41 #include "j2k_encoder.h"
42 #include "log.h"
43 #include "player_video.h"
44 #include "util.h"
45 #include "writer.h"
46 #include <libcxml/cxml.h>
47 #include <iostream>
48
49 #include "i18n.h"
50
51
52 using std::cout;
53 using std::dynamic_pointer_cast;
54 using std::exception;
55 using std::list;
56 using std::make_shared;
57 using std::shared_ptr;
58 using std::weak_ptr;
59 using boost::optional;
60 using dcp::Data;
61 using namespace dcpomatic;
62
63 #ifdef DCPOMATIC_GROK
64
65 namespace grk_plugin {
66
67 IMessengerLogger* sLogger = nullptr;
68
69 #if defined(__GNUC__) || defined(__clang__)
70 #pragma GCC diagnostic push
71 #pragma GCC diagnostic ignored "-Wunused-function"
72 #endif
73 void setMessengerLogger(grk_plugin::IMessengerLogger* logger)
74 {
75         delete sLogger;
76         sLogger = logger;
77 }
78 #if defined(__GNUC__) || defined(__clang__)
79 #pragma GCC diagnostic pop
80 #endif
81 grk_plugin::IMessengerLogger* getMessengerLogger(void)
82 {
83         return sLogger;
84 }
85
86 }
87
88 #endif
89
90
91 /** @param film Film that we are encoding.
92  *  @param writer Writer that we are using.
93  */
94 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
95         : _film (film)
96         , _history (200)
97         , _writer (writer)
98 {
99 #ifdef DCPOMATIC_GROK
100         auto grok = Config::instance()->grok().get_value_or({});
101         _dcpomatic_context = new grk_plugin::DcpomaticContext(film, writer, _history, grok.binary_location);
102         if (grok.enable) {
103                 _context = new grk_plugin::GrokContext(_dcpomatic_context);
104         }
105 #endif
106         servers_list_changed ();
107 }
108
109
110 J2KEncoder::~J2KEncoder ()
111 {
112         _server_found_connection.disconnect();
113
114         terminate_threads();
115
116 #ifdef DCPOMATIC_GROK
117         delete _context;
118         delete _dcpomatic_context;
119 #endif
120 }
121
122
123 void
124 J2KEncoder::servers_list_changed()
125 {
126         auto config = Config::instance();
127 #ifdef DCPOMATIC_GROK
128         auto const grok_enable = config->grok().get_value_or({}).enable;
129 #else
130         auto const grok_enable = false;
131 #endif
132
133         auto const cpu = (grok_enable || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
134         auto const gpu = grok_enable ? config->master_encoding_threads() : 0;
135
136         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
137 }
138
139
140 void
141 J2KEncoder::begin ()
142 {
143         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
144                 boost::bind(&J2KEncoder::servers_list_changed, this)
145                 );
146 }
147
148
149 void
150 J2KEncoder::pause()
151 {
152 #ifdef DCPOMATIC_GROK
153         if (!Config::instance()->grok().get_value_or({}).enable) {
154                 return;
155         }
156         return;
157
158         terminate_threads ();
159
160         /* Something might have been thrown during terminate_threads */
161         rethrow ();
162
163         delete _context;
164         _context = nullptr;
165 #endif
166 }
167
168
169 void J2KEncoder::resume()
170 {
171 #ifdef DCPOMATIC_GROK
172         if (!Config::instance()->grok().get_value_or({}).enable) {
173                 return;
174         }
175
176         _context = new grk_plugin::GrokContext(_dcpomatic_context);
177         servers_list_changed();
178 #endif
179 }
180
181
182 void
183 J2KEncoder::end()
184 {
185         boost::mutex::scoped_lock lock (_queue_mutex);
186
187         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
188
189         /* Keep waking workers until the queue is empty */
190         while (!_queue.empty ()) {
191                 rethrow ();
192                 _full_condition.wait (lock);
193         }
194         lock.unlock ();
195
196         LOG_GENERAL_NC (N_("Terminating encoder threads"));
197
198         terminate_threads ();
199
200         /* Something might have been thrown during terminate_threads */
201         rethrow ();
202
203         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
204
205         /* The following sequence of events can occur in the above code:
206              1. a remote worker takes the last image off the queue
207              2. the loop above terminates
208              3. the remote worker fails to encode the image and puts it back on the queue
209              4. the remote worker is then terminated by terminate_threads
210
211              So just mop up anything left in the queue here.
212         */
213         for (auto & i: _queue) {
214 #ifdef DCPOMATIC_GROK
215                 if (Config::instance()->grok().get_value_or({}).enable) {
216                         if (!_context->scheduleCompress(i)){
217                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
218                                 // handle error
219                         }
220                 } else {
221 #else
222                 {
223 #endif
224                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
225                         try {
226                                 _writer.write(
227                                         make_shared<dcp::ArrayData>(i.encode_locally()),
228                                         i.index(),
229                                         i.eyes()
230                                         );
231                                 frame_done ();
232                         } catch (std::exception& e) {
233                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
234                         }
235                 }
236         }
237
238 #ifdef DCPOMATIC_GROK
239         delete _context;
240         _context = nullptr;
241 #endif
242 }
243
244
245 /** @return an estimate of the current number of frames we are encoding per second,
246  *  if known.
247  */
248 optional<float>
249 J2KEncoder::current_encoding_rate () const
250 {
251         return _history.rate ();
252 }
253
254
255 /** @return Number of video frames that have been queued for encoding */
256 int
257 J2KEncoder::video_frames_enqueued () const
258 {
259         if (!_last_player_video_time) {
260                 return 0;
261         }
262
263         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
264 }
265
266
267 /** Should be called when a frame has been encoded successfully */
268 void
269 J2KEncoder::frame_done ()
270 {
271         _history.event ();
272 }
273
274
275 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
276  *  so each time the supplied frame is the one after the previous one.
277  *  pv represents one video frame, and could be empty if there is nothing to encode
278  *  for this DCP frame.
279  *
280  *  @param pv PlayerVideo to encode.
281  *  @param time Time of \p pv within the DCP.
282  */
283 void
284 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
285 {
286         _waker.nudge ();
287
288         size_t threads = 0;
289         {
290                 boost::mutex::scoped_lock lm (_threads_mutex);
291                 threads = _threads.size();
292         }
293
294         boost::mutex::scoped_lock queue_lock (_queue_mutex);
295
296         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
297            when there are no threads.
298         */
299         while (_queue.size() >= (threads * 2) + 1) {
300                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
301                 _full_condition.wait (queue_lock);
302                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
303         }
304
305         _writer.rethrow();
306         /* Re-throw any exception raised by one of our threads.  If more
307            than one has thrown an exception, only one will be rethrown, I think;
308            but then, if that happens something has gone badly wrong.
309         */
310         rethrow ();
311
312         auto const position = time.frames_floor(_film->video_frame_rate());
313
314         if (_writer.can_fake_write(position)) {
315                 /* We can fake-write this frame */
316                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
317                 _writer.fake_write(position, pv->eyes ());
318                 frame_done ();
319         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
320                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
321                 /* This frame already has J2K data, so just write it */
322                 _writer.write(pv->j2k(), position, pv->eyes ());
323                 frame_done ();
324         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
325                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
326                 _writer.repeat(position, pv->eyes());
327         } else {
328                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
329                 /* Queue this new frame for encoding */
330                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
331                 auto dcpv = DCPVideo(
332                                 pv,
333                                 position,
334                                 _film->video_frame_rate(),
335                                 _film->j2k_bandwidth(),
336                                 _film->resolution()
337                                 );
338                 _queue.push_back (dcpv);
339
340                 /* The queue might not be empty any more, so notify anything which is
341                    waiting on that.
342                 */
343                 _empty_condition.notify_all ();
344         }
345
346         _last_player_video[pv->eyes()] = pv;
347         _last_player_video_time = time;
348 }
349
350
351 void
352 J2KEncoder::terminate_threads ()
353 {
354         boost::mutex::scoped_lock lm(_threads_mutex);
355         boost::this_thread::disable_interruption dis;
356
357         for (auto& thread: _threads) {
358                 thread->stop();
359         }
360
361         _threads.clear();
362         _ending = true;
363 }
364
365
366 void
367 #ifdef DCPOMATIC_GROK
368 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
369 #else
370 J2KEncoder::remake_threads(int cpu, int, list<EncodeServerDescription> servers)
371 #endif
372 {
373         boost::mutex::scoped_lock lm (_threads_mutex);
374         if (_ending) {
375                 return;
376         }
377
378         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
379                 for (auto i = wanted; i < current; ++i) {
380                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
381                         if (iter != _threads.end()) {
382                                 (*iter)->stop();
383                                 _threads.erase(iter);
384                         }
385                 }
386         };
387
388
389         /* CPU */
390
391         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
392                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
393         };
394
395         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
396
397         for (auto i = current_cpu_threads; i < cpu; ++i) {
398                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
399                 thread->start();
400                 _threads.push_back(thread);
401         }
402
403         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
404
405 #ifdef DCPOMATIC_GROK
406         /* GPU */
407
408         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
409                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
410         };
411
412         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
413
414         for (auto i = current_gpu_threads; i < gpu; ++i) {
415                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
416                 thread->start();
417                 _threads.push_back(thread);
418         }
419
420         remove_threads(gpu, current_gpu_threads, is_grok_thread);
421 #endif
422
423         /* Remote */
424
425         for (auto const& server: servers) {
426                 if (!server.current_link_version()) {
427                         continue;
428                 }
429
430                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
431                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
432                         return remote && remote->server().host_name() == server.host_name();
433                 };
434
435                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
436
437                 auto const wanted_threads = server.threads();
438
439                 if (wanted_threads > current_threads) {
440                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
441                 } else if (wanted_threads < current_threads) {
442                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
443                 }
444
445                 for (auto i = current_threads; i < wanted_threads; ++i) {
446                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
447                         thread->start();
448                         _threads.push_back(thread);
449                 }
450
451                 remove_threads(wanted_threads, current_threads, is_remote_thread);
452         }
453
454         _writer.set_encoder_threads(_threads.size());
455 }
456
457
458 DCPVideo
459 J2KEncoder::pop()
460 {
461         boost::mutex::scoped_lock lock(_queue_mutex);
462         while (_queue.empty()) {
463                 _empty_condition.wait (lock);
464         }
465
466         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
467
468         auto vf = _queue.front();
469         _queue.pop_front();
470
471         _full_condition.notify_all();
472         return vf;
473 }
474
475
476 void
477 J2KEncoder::retry(DCPVideo video)
478 {
479         boost::mutex::scoped_lock lock(_queue_mutex);
480         _queue.push_front(video);
481         _empty_condition.notify_all();
482 }
483
484
485 void
486 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
487 {
488         _writer.write(data, index, eyes);
489         frame_done();
490 }