Add minimal player HTTP server (#2830).
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #ifdef DCPOMATIC_GROK
37 #include "grok/context.h"
38 #include "grok_j2k_encoder_thread.h"
39 #endif
40 #include "remote_j2k_encoder_thread.h"
41 #include "j2k_encoder.h"
42 #include "log.h"
43 #include "player_video.h"
44 #include "util.h"
45 #include "writer.h"
46 #include <libcxml/cxml.h>
47 #include <iostream>
48
49 #include "i18n.h"
50
51
52 using std::cout;
53 using std::dynamic_pointer_cast;
54 using std::exception;
55 using std::list;
56 using std::make_shared;
57 using std::shared_ptr;
58 using std::weak_ptr;
59 using boost::optional;
60 using dcp::Data;
61 using namespace dcpomatic;
62
63 #ifdef DCPOMATIC_GROK
64
65 namespace grk_plugin {
66
67 IMessengerLogger* sLogger = nullptr;
68
69 #if defined(__GNUC__) || defined(__clang__)
70 #pragma GCC diagnostic push
71 #pragma GCC diagnostic ignored "-Wunused-function"
72 #endif
73 void setMessengerLogger(grk_plugin::IMessengerLogger* logger)
74 {
75         delete sLogger;
76         sLogger = logger;
77 }
78 #if defined(__GNUC__) || defined(__clang__)
79 #pragma GCC diagnostic pop
80 #endif
81 grk_plugin::IMessengerLogger* getMessengerLogger(void)
82 {
83         return sLogger;
84 }
85
86 }
87
88 #endif
89
90
91 /** @param film Film that we are encoding.
92  *  @param writer Writer that we are using.
93  */
94 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
95         : VideoEncoder(film, writer)
96 {
97 #ifdef DCPOMATIC_GROK
98         auto grok = Config::instance()->grok().get_value_or({});
99         _dcpomatic_context = new grk_plugin::DcpomaticContext(film, writer, _history, grok.binary_location);
100         if (grok.enable) {
101                 _context = new grk_plugin::GrokContext(_dcpomatic_context);
102         }
103 #endif
104 }
105
106
107 J2KEncoder::~J2KEncoder ()
108 {
109         _server_found_connection.disconnect();
110
111         /* One of our encoder threads may be waiting on Writer::write() to return, if that method
112          * is blocked with the writer queue full waiting for _full_condition.  In that case, the
113          * attempt to terminate the encoder threads below (in terminate_threads()) will fail because
114          * the encoder thread waiting for ::write() will have interruption disabled.
115          *
116          * To work around that, make the writer into a zombie to unblock any pending write()s and
117          * not block on any future ones.
118          */
119         _writer.zombify();
120
121         terminate_threads();
122
123 #ifdef DCPOMATIC_GROK
124         delete _context;
125         delete _dcpomatic_context;
126 #endif
127 }
128
129
130 void
131 J2KEncoder::servers_list_changed()
132 {
133         auto config = Config::instance();
134 #ifdef DCPOMATIC_GROK
135         auto const grok_enable = config->grok().get_value_or({}).enable;
136 #else
137         auto const grok_enable = false;
138 #endif
139
140         auto const cpu = (grok_enable || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
141         auto const gpu = grok_enable ? config->master_encoding_threads() : 0;
142
143         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
144 }
145
146
147 void
148 J2KEncoder::begin ()
149 {
150         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
151                 boost::bind(&J2KEncoder::servers_list_changed, this)
152                 );
153         servers_list_changed ();
154 }
155
156
157 void
158 J2KEncoder::pause()
159 {
160 #ifdef DCPOMATIC_GROK
161         if (!Config::instance()->grok().get_value_or({}).enable) {
162                 return;
163         }
164         return;
165
166         /* XXX; the same problem may occur here as in the destructor, perhaps? */
167
168         terminate_threads ();
169
170         /* Something might have been thrown during terminate_threads */
171         rethrow ();
172
173         delete _context;
174         _context = nullptr;
175 #endif
176 }
177
178
179 void J2KEncoder::resume()
180 {
181 #ifdef DCPOMATIC_GROK
182         if (!Config::instance()->grok().get_value_or({}).enable) {
183                 return;
184         }
185
186         _context = new grk_plugin::GrokContext(_dcpomatic_context);
187         servers_list_changed();
188 #endif
189 }
190
191
192 void
193 J2KEncoder::end()
194 {
195         boost::mutex::scoped_lock lock (_queue_mutex);
196
197         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
198
199         /* Keep waking workers until the queue is empty */
200         while (!_queue.empty ()) {
201                 rethrow ();
202                 _full_condition.wait (lock);
203         }
204         lock.unlock ();
205
206         LOG_GENERAL_NC (N_("Terminating encoder threads"));
207
208         terminate_threads ();
209
210         /* Something might have been thrown during terminate_threads */
211         rethrow ();
212
213         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
214
215         /* The following sequence of events can occur in the above code:
216              1. a remote worker takes the last image off the queue
217              2. the loop above terminates
218              3. the remote worker fails to encode the image and puts it back on the queue
219              4. the remote worker is then terminated by terminate_threads
220
221              So just mop up anything left in the queue here.
222         */
223         for (auto & i: _queue) {
224 #ifdef DCPOMATIC_GROK
225                 if (Config::instance()->grok().get_value_or({}).enable) {
226                         if (!_context->scheduleCompress(i)){
227                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
228                                 // handle error
229                         }
230                 } else {
231 #else
232                 {
233 #endif
234                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
235                         try {
236                                 _writer.write(
237                                         make_shared<dcp::ArrayData>(i.encode_locally()),
238                                         i.index(),
239                                         i.eyes()
240                                         );
241                                 frame_done ();
242                         } catch (std::exception& e) {
243                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
244                         }
245                 }
246         }
247
248 #ifdef DCPOMATIC_GROK
249         delete _context;
250         _context = nullptr;
251 #endif
252 }
253
254
255 /** Should be called when a frame has been encoded successfully */
256 void
257 J2KEncoder::frame_done ()
258 {
259         _history.event ();
260 }
261
262
263 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
264  *  so each time the supplied frame is the one after the previous one.
265  *  pv represents one video frame, and could be empty if there is nothing to encode
266  *  for this DCP frame.
267  *
268  *  @param pv PlayerVideo to encode.
269  *  @param time Time of \p pv within the DCP.
270  */
271 void
272 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
273 {
274         VideoEncoder::encode(pv, time);
275
276         _waker.nudge ();
277
278         size_t threads = 0;
279         {
280                 boost::mutex::scoped_lock lm (_threads_mutex);
281                 threads = _threads.size();
282         }
283
284         boost::mutex::scoped_lock queue_lock (_queue_mutex);
285
286         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
287            when there are no threads.
288         */
289         while (_queue.size() >= (threads * 2) + 1) {
290                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
291                 _full_condition.wait (queue_lock);
292                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
293         }
294
295         _writer.rethrow();
296         /* Re-throw any exception raised by one of our threads.  If more
297            than one has thrown an exception, only one will be rethrown, I think;
298            but then, if that happens something has gone badly wrong.
299         */
300         rethrow ();
301
302         auto const position = time.frames_floor(_film->video_frame_rate());
303
304         if (_writer.can_fake_write(position)) {
305                 /* We can fake-write this frame */
306                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
307                 _writer.fake_write(position, pv->eyes ());
308                 frame_done ();
309         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
310                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
311                 /* This frame already has J2K data, so just write it */
312                 _writer.write(pv->j2k(), position, pv->eyes ());
313                 frame_done ();
314         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
315                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
316                 _writer.repeat(position, pv->eyes());
317         } else {
318                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
319                 /* Queue this new frame for encoding */
320                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
321                 auto dcpv = DCPVideo(
322                                 pv,
323                                 position,
324                                 _film->video_frame_rate(),
325                                 _film->video_bit_rate(VideoEncoding::JPEG2000),
326                                 _film->resolution()
327                                 );
328                 _queue.push_back (dcpv);
329
330                 /* The queue might not be empty any more, so notify anything which is
331                    waiting on that.
332                 */
333                 _empty_condition.notify_all ();
334         }
335
336         _last_player_video[pv->eyes()] = pv;
337 }
338
339
340 void
341 J2KEncoder::terminate_threads ()
342 {
343         boost::mutex::scoped_lock lm(_threads_mutex);
344         boost::this_thread::disable_interruption dis;
345
346         for (auto& thread: _threads) {
347                 thread->stop();
348         }
349
350         _threads.clear();
351         _ending = true;
352 }
353
354
355 void
356 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
357 {
358         LOG_GENERAL("Making threads: CPU=%1, GPU=%2, Remote=%3", cpu, gpu, servers.size());
359
360         boost::mutex::scoped_lock lm (_threads_mutex);
361         if (_ending) {
362                 return;
363         }
364
365         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
366                 for (auto i = wanted; i < current; ++i) {
367                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
368                         if (iter != _threads.end()) {
369                                 (*iter)->stop();
370                                 _threads.erase(iter);
371                         }
372                 }
373         };
374
375
376         /* CPU */
377
378         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
379                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
380         };
381
382         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
383
384         for (auto i = current_cpu_threads; i < cpu; ++i) {
385                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
386                 thread->start();
387                 _threads.push_back(thread);
388         }
389
390         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
391
392 #ifdef DCPOMATIC_GROK
393         /* GPU */
394
395         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
396                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
397         };
398
399         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
400
401         for (auto i = current_gpu_threads; i < gpu; ++i) {
402                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
403                 thread->start();
404                 _threads.push_back(thread);
405         }
406
407         remove_threads(gpu, current_gpu_threads, is_grok_thread);
408 #endif
409
410         /* Remote */
411
412         for (auto const& server: servers) {
413                 if (!server.current_link_version()) {
414                         continue;
415                 }
416
417                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
418                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
419                         return remote && remote->server().host_name() == server.host_name();
420                 };
421
422                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
423
424                 auto const wanted_threads = server.threads();
425
426                 if (wanted_threads > current_threads) {
427                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
428                 } else if (wanted_threads < current_threads) {
429                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
430                 }
431
432                 for (auto i = current_threads; i < wanted_threads; ++i) {
433                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
434                         thread->start();
435                         _threads.push_back(thread);
436                 }
437
438                 remove_threads(wanted_threads, current_threads, is_remote_thread);
439         }
440
441         _writer.set_encoder_threads(_threads.size());
442 }
443
444
445 DCPVideo
446 J2KEncoder::pop()
447 {
448         boost::mutex::scoped_lock lock(_queue_mutex);
449         while (_queue.empty()) {
450                 _empty_condition.wait (lock);
451         }
452
453         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
454
455         auto vf = _queue.front();
456         _queue.pop_front();
457
458         _full_condition.notify_all();
459         return vf;
460 }
461
462
463 void
464 J2KEncoder::retry(DCPVideo video)
465 {
466         boost::mutex::scoped_lock lock(_queue_mutex);
467         _queue.push_front(video);
468         _empty_condition.notify_all();
469 }
470
471
472 void
473 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
474 {
475         _writer.write(data, index, eyes);
476         frame_done();
477 }