Another patch from Aaron.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #ifdef DCPOMATIC_GROK
37 #include "grok/context.h"
38 #include "grok_j2k_encoder_thread.h"
39 #endif
40 #include "remote_j2k_encoder_thread.h"
41 #include "j2k_encoder.h"
42 #include "log.h"
43 #include "player_video.h"
44 #include "util.h"
45 #include "writer.h"
46 #include <libcxml/cxml.h>
47 #include <iostream>
48
49 #include "i18n.h"
50
51
52 using std::cout;
53 using std::dynamic_pointer_cast;
54 using std::exception;
55 using std::list;
56 using std::make_shared;
57 using std::shared_ptr;
58 using std::weak_ptr;
59 using boost::optional;
60 using dcp::Data;
61 using namespace dcpomatic;
62
63 #ifdef DCPOMATIC_GROK
64
65 namespace grk_plugin {
66
67 IMessengerLogger* sLogger = nullptr;
68
69 #if defined(__GNUC__) || defined(__clang__)
70 #pragma GCC diagnostic push
71 #pragma GCC diagnostic ignored "-Wunused-function"
72 #endif
73 void setMessengerLogger(grk_plugin::IMessengerLogger* logger)
74 {
75         delete sLogger;
76         sLogger = logger;
77 }
78 #if defined(__GNUC__) || defined(__clang__)
79 #pragma GCC diagnostic pop
80 #endif
81 grk_plugin::IMessengerLogger* getMessengerLogger(void)
82 {
83         return sLogger;
84 }
85
86 }
87
88 #endif
89
90
91 /** @param film Film that we are encoding.
92  *  @param writer Writer that we are using.
93  */
94 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
95         : _film (film)
96         , _history (200)
97         , _writer (writer)
98 #ifdef DCPOMATIC_GROK
99         , _dcpomatic_context(new grk_plugin::DcpomaticContext(film, writer, _history, Config::instance()->gpu_binary_location()))
100         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
101 #endif
102 {
103         servers_list_changed ();
104 }
105
106
107 J2KEncoder::~J2KEncoder ()
108 {
109         _server_found_connection.disconnect();
110
111         terminate_threads();
112
113 #ifdef DCPOMATIC_GROK
114         delete _context;
115         delete _dcpomatic_context;
116 #endif
117 }
118
119
120 void
121 J2KEncoder::servers_list_changed()
122 {
123         auto config = Config::instance();
124
125         auto const cpu = (config->enable_gpu() || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
126         auto const gpu = config->enable_gpu() ? config->master_encoding_threads() : 0;
127
128         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
129 }
130
131
132 void
133 J2KEncoder::begin ()
134 {
135         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
136                 boost::bind(&J2KEncoder::servers_list_changed, this)
137                 );
138 }
139
140
141 void
142 J2KEncoder::pause()
143 {
144         if (!Config::instance()->enable_gpu()) {
145                 return;
146         }
147
148         terminate_threads ();
149
150         /* Something might have been thrown during terminate_threads */
151         rethrow ();
152
153 #ifdef DCPOMATIC_GROK
154         delete _context;
155         _context = nullptr;
156 #endif
157 }
158
159
160 void J2KEncoder::resume()
161 {
162         if (!Config::instance()->enable_gpu()) {
163                 return;
164         }
165
166 #ifdef DCPOMATIC_GROK
167         _context = new grk_plugin::GrokContext(_dcpomatic_context);
168 #endif
169         servers_list_changed();
170 }
171
172
173 void
174 J2KEncoder::end()
175 {
176         boost::mutex::scoped_lock lock (_queue_mutex);
177
178         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
179
180         /* Keep waking workers until the queue is empty */
181         while (!_queue.empty ()) {
182                 rethrow ();
183                 _full_condition.wait (lock);
184         }
185         lock.unlock ();
186
187         LOG_GENERAL_NC (N_("Terminating encoder threads"));
188
189         terminate_threads ();
190
191         /* Something might have been thrown during terminate_threads */
192         rethrow ();
193
194         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
195
196         /* The following sequence of events can occur in the above code:
197              1. a remote worker takes the last image off the queue
198              2. the loop above terminates
199              3. the remote worker fails to encode the image and puts it back on the queue
200              4. the remote worker is then terminated by terminate_threads
201
202              So just mop up anything left in the queue here.
203         */
204         for (auto & i: _queue) {
205 #ifdef DCPOMATIC_GROK
206                 if (Config::instance()->enable_gpu ()) {
207                         if (!_context->scheduleCompress(i)){
208                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
209                                 // handle error
210                         }
211                 } else {
212 #else
213                 {
214 #endif
215                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
216                         try {
217                                 _writer.write(
218                                         make_shared<dcp::ArrayData>(i.encode_locally()),
219                                         i.index(),
220                                         i.eyes()
221                                         );
222                                 frame_done ();
223                         } catch (std::exception& e) {
224                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
225                         }
226                 }
227         }
228
229 #ifdef DCPOMATIC_GROK
230         delete _context;
231         _context = nullptr;
232 #endif
233 }
234
235
236 /** @return an estimate of the current number of frames we are encoding per second,
237  *  if known.
238  */
239 optional<float>
240 J2KEncoder::current_encoding_rate () const
241 {
242         return _history.rate ();
243 }
244
245
246 /** @return Number of video frames that have been queued for encoding */
247 int
248 J2KEncoder::video_frames_enqueued () const
249 {
250         if (!_last_player_video_time) {
251                 return 0;
252         }
253
254         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
255 }
256
257
258 /** Should be called when a frame has been encoded successfully */
259 void
260 J2KEncoder::frame_done ()
261 {
262         _history.event ();
263 }
264
265
266 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
267  *  so each time the supplied frame is the one after the previous one.
268  *  pv represents one video frame, and could be empty if there is nothing to encode
269  *  for this DCP frame.
270  *
271  *  @param pv PlayerVideo to encode.
272  *  @param time Time of \p pv within the DCP.
273  */
274 void
275 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
276 {
277         _waker.nudge ();
278
279         size_t threads = 0;
280         {
281                 boost::mutex::scoped_lock lm (_threads_mutex);
282                 threads = _threads.size();
283         }
284
285         boost::mutex::scoped_lock queue_lock (_queue_mutex);
286
287         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
288            when there are no threads.
289         */
290         while (_queue.size() >= (threads * 2) + 1) {
291                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
292                 _full_condition.wait (queue_lock);
293                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
294         }
295
296         _writer.rethrow();
297         /* Re-throw any exception raised by one of our threads.  If more
298            than one has thrown an exception, only one will be rethrown, I think;
299            but then, if that happens something has gone badly wrong.
300         */
301         rethrow ();
302
303         auto const position = time.frames_floor(_film->video_frame_rate());
304
305         if (_writer.can_fake_write(position)) {
306                 /* We can fake-write this frame */
307                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
308                 _writer.fake_write(position, pv->eyes ());
309                 frame_done ();
310         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
311                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
312                 /* This frame already has J2K data, so just write it */
313                 _writer.write(pv->j2k(), position, pv->eyes ());
314                 frame_done ();
315         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
316                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
317                 _writer.repeat(position, pv->eyes());
318         } else {
319                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
320                 /* Queue this new frame for encoding */
321                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
322                 auto dcpv = DCPVideo(
323                                 pv,
324                                 position,
325                                 _film->video_frame_rate(),
326                                 _film->j2k_bandwidth(),
327                                 _film->resolution()
328                                 );
329                 _queue.push_back (dcpv);
330
331                 /* The queue might not be empty any more, so notify anything which is
332                    waiting on that.
333                 */
334                 _empty_condition.notify_all ();
335         }
336
337         _last_player_video[pv->eyes()] = pv;
338         _last_player_video_time = time;
339 }
340
341
342 void
343 J2KEncoder::terminate_threads ()
344 {
345         boost::mutex::scoped_lock lm(_threads_mutex);
346         boost::this_thread::disable_interruption dis;
347
348         for (auto& thread: _threads) {
349                 thread->stop();
350         }
351
352         _threads.clear();
353         _ending = true;
354 }
355
356
357 void
358 #ifdef DCPOMATIC_GROK
359 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
360 #else
361 J2KEncoder::remake_threads(int cpu, int, list<EncodeServerDescription> servers)
362 #endif
363 {
364         boost::mutex::scoped_lock lm (_threads_mutex);
365         if (_ending) {
366                 return;
367         }
368
369         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
370                 for (auto i = wanted; i < current; ++i) {
371                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
372                         if (iter != _threads.end()) {
373                                 (*iter)->stop();
374                                 _threads.erase(iter);
375                         }
376                 }
377         };
378
379
380         /* CPU */
381
382         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
383                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
384         };
385
386         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
387
388         for (auto i = current_cpu_threads; i < cpu; ++i) {
389                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
390                 thread->start();
391                 _threads.push_back(thread);
392         }
393
394         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
395
396 #ifdef DCPOMATIC_GROK
397         /* GPU */
398
399         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
400                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
401         };
402
403         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
404
405         for (auto i = current_gpu_threads; i < gpu; ++i) {
406                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
407                 thread->start();
408                 _threads.push_back(thread);
409         }
410
411         remove_threads(gpu, current_gpu_threads, is_grok_thread);
412 #endif
413
414         /* Remote */
415
416         for (auto const& server: servers) {
417                 if (!server.current_link_version()) {
418                         continue;
419                 }
420
421                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
422                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
423                         return remote && remote->server().host_name() == server.host_name();
424                 };
425
426                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
427
428                 auto const wanted_threads = server.threads();
429
430                 if (wanted_threads > current_threads) {
431                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
432                 } else if (wanted_threads < current_threads) {
433                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
434                 }
435
436                 for (auto i = current_threads; i < wanted_threads; ++i) {
437                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
438                         thread->start();
439                         _threads.push_back(thread);
440                 }
441
442                 remove_threads(wanted_threads, current_threads, is_remote_thread);
443         }
444
445         _writer.set_encoder_threads(_threads.size());
446 }
447
448
449 DCPVideo
450 J2KEncoder::pop()
451 {
452         boost::mutex::scoped_lock lock(_queue_mutex);
453         while (_queue.empty()) {
454                 _empty_condition.wait (lock);
455         }
456
457         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
458
459         auto vf = _queue.front();
460         _queue.pop_front();
461
462         _full_condition.notify_all();
463         return vf;
464 }
465
466
467 void
468 J2KEncoder::retry(DCPVideo video)
469 {
470         boost::mutex::scoped_lock lock(_queue_mutex);
471         _queue.push_front(video);
472         _empty_condition.notify_all();
473 }
474
475
476 void
477 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
478 {
479         _writer.write(data, index, eyes);
480         frame_done();
481 }