Rename j2k_bandwidth -> video_bit_rate.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #ifdef DCPOMATIC_GROK
37 #include "grok/context.h"
38 #include "grok_j2k_encoder_thread.h"
39 #endif
40 #include "remote_j2k_encoder_thread.h"
41 #include "j2k_encoder.h"
42 #include "log.h"
43 #include "player_video.h"
44 #include "util.h"
45 #include "writer.h"
46 #include <libcxml/cxml.h>
47 #include <iostream>
48
49 #include "i18n.h"
50
51
52 using std::cout;
53 using std::dynamic_pointer_cast;
54 using std::exception;
55 using std::list;
56 using std::make_shared;
57 using std::shared_ptr;
58 using std::weak_ptr;
59 using boost::optional;
60 using dcp::Data;
61 using namespace dcpomatic;
62
63 #ifdef DCPOMATIC_GROK
64
65 namespace grk_plugin {
66
67 IMessengerLogger* sLogger = nullptr;
68
69 #if defined(__GNUC__) || defined(__clang__)
70 #pragma GCC diagnostic push
71 #pragma GCC diagnostic ignored "-Wunused-function"
72 #endif
73 void setMessengerLogger(grk_plugin::IMessengerLogger* logger)
74 {
75         delete sLogger;
76         sLogger = logger;
77 }
78 #if defined(__GNUC__) || defined(__clang__)
79 #pragma GCC diagnostic pop
80 #endif
81 grk_plugin::IMessengerLogger* getMessengerLogger(void)
82 {
83         return sLogger;
84 }
85
86 }
87
88 #endif
89
90
91 /** @param film Film that we are encoding.
92  *  @param writer Writer that we are using.
93  */
94 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
95         : VideoEncoder(film, writer)
96 {
97 #ifdef DCPOMATIC_GROK
98         auto grok = Config::instance()->grok().get_value_or({});
99         _dcpomatic_context = new grk_plugin::DcpomaticContext(film, writer, _history, grok.binary_location);
100         if (grok.enable) {
101                 _context = new grk_plugin::GrokContext(_dcpomatic_context);
102         }
103 #endif
104 }
105
106
107 J2KEncoder::~J2KEncoder ()
108 {
109         _server_found_connection.disconnect();
110
111         terminate_threads();
112
113 #ifdef DCPOMATIC_GROK
114         delete _context;
115         delete _dcpomatic_context;
116 #endif
117 }
118
119
120 void
121 J2KEncoder::servers_list_changed()
122 {
123         auto config = Config::instance();
124 #ifdef DCPOMATIC_GROK
125         auto const grok_enable = config->grok().get_value_or({}).enable;
126 #else
127         auto const grok_enable = false;
128 #endif
129
130         auto const cpu = (grok_enable || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
131         auto const gpu = grok_enable ? config->master_encoding_threads() : 0;
132
133         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
134 }
135
136
137 void
138 J2KEncoder::begin ()
139 {
140         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
141                 boost::bind(&J2KEncoder::servers_list_changed, this)
142                 );
143         servers_list_changed ();
144 }
145
146
147 void
148 J2KEncoder::pause()
149 {
150 #ifdef DCPOMATIC_GROK
151         if (!Config::instance()->grok().get_value_or({}).enable) {
152                 return;
153         }
154         return;
155
156         terminate_threads ();
157
158         /* Something might have been thrown during terminate_threads */
159         rethrow ();
160
161         delete _context;
162         _context = nullptr;
163 #endif
164 }
165
166
167 void J2KEncoder::resume()
168 {
169 #ifdef DCPOMATIC_GROK
170         if (!Config::instance()->grok().get_value_or({}).enable) {
171                 return;
172         }
173
174         _context = new grk_plugin::GrokContext(_dcpomatic_context);
175         servers_list_changed();
176 #endif
177 }
178
179
180 void
181 J2KEncoder::end()
182 {
183         boost::mutex::scoped_lock lock (_queue_mutex);
184
185         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
186
187         /* Keep waking workers until the queue is empty */
188         while (!_queue.empty ()) {
189                 rethrow ();
190                 _full_condition.wait (lock);
191         }
192         lock.unlock ();
193
194         LOG_GENERAL_NC (N_("Terminating encoder threads"));
195
196         terminate_threads ();
197
198         /* Something might have been thrown during terminate_threads */
199         rethrow ();
200
201         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
202
203         /* The following sequence of events can occur in the above code:
204              1. a remote worker takes the last image off the queue
205              2. the loop above terminates
206              3. the remote worker fails to encode the image and puts it back on the queue
207              4. the remote worker is then terminated by terminate_threads
208
209              So just mop up anything left in the queue here.
210         */
211         for (auto & i: _queue) {
212 #ifdef DCPOMATIC_GROK
213                 if (Config::instance()->grok().get_value_or({}).enable) {
214                         if (!_context->scheduleCompress(i)){
215                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
216                                 // handle error
217                         }
218                 } else {
219 #else
220                 {
221 #endif
222                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
223                         try {
224                                 _writer.write(
225                                         make_shared<dcp::ArrayData>(i.encode_locally()),
226                                         i.index(),
227                                         i.eyes()
228                                         );
229                                 frame_done ();
230                         } catch (std::exception& e) {
231                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
232                         }
233                 }
234         }
235
236 #ifdef DCPOMATIC_GROK
237         delete _context;
238         _context = nullptr;
239 #endif
240 }
241
242
243 /** Should be called when a frame has been encoded successfully */
244 void
245 J2KEncoder::frame_done ()
246 {
247         _history.event ();
248 }
249
250
251 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
252  *  so each time the supplied frame is the one after the previous one.
253  *  pv represents one video frame, and could be empty if there is nothing to encode
254  *  for this DCP frame.
255  *
256  *  @param pv PlayerVideo to encode.
257  *  @param time Time of \p pv within the DCP.
258  */
259 void
260 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
261 {
262         VideoEncoder::encode(pv, time);
263
264         _waker.nudge ();
265
266         size_t threads = 0;
267         {
268                 boost::mutex::scoped_lock lm (_threads_mutex);
269                 threads = _threads.size();
270         }
271
272         boost::mutex::scoped_lock queue_lock (_queue_mutex);
273
274         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
275            when there are no threads.
276         */
277         while (_queue.size() >= (threads * 2) + 1) {
278                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
279                 _full_condition.wait (queue_lock);
280                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
281         }
282
283         _writer.rethrow();
284         /* Re-throw any exception raised by one of our threads.  If more
285            than one has thrown an exception, only one will be rethrown, I think;
286            but then, if that happens something has gone badly wrong.
287         */
288         rethrow ();
289
290         auto const position = time.frames_floor(_film->video_frame_rate());
291
292         if (_writer.can_fake_write(position)) {
293                 /* We can fake-write this frame */
294                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
295                 _writer.fake_write(position, pv->eyes ());
296                 frame_done ();
297         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
298                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
299                 /* This frame already has J2K data, so just write it */
300                 _writer.write(pv->j2k(), position, pv->eyes ());
301                 frame_done ();
302         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
303                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
304                 _writer.repeat(position, pv->eyes());
305         } else {
306                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
307                 /* Queue this new frame for encoding */
308                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
309                 auto dcpv = DCPVideo(
310                                 pv,
311                                 position,
312                                 _film->video_frame_rate(),
313                                 _film->video_bit_rate(),
314                                 _film->resolution()
315                                 );
316                 _queue.push_back (dcpv);
317
318                 /* The queue might not be empty any more, so notify anything which is
319                    waiting on that.
320                 */
321                 _empty_condition.notify_all ();
322         }
323
324         _last_player_video[pv->eyes()] = pv;
325 }
326
327
328 void
329 J2KEncoder::terminate_threads ()
330 {
331         boost::mutex::scoped_lock lm(_threads_mutex);
332         boost::this_thread::disable_interruption dis;
333
334         for (auto& thread: _threads) {
335                 thread->stop();
336         }
337
338         _threads.clear();
339         _ending = true;
340 }
341
342
343 void
344 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
345 {
346         LOG_GENERAL("Making threads: CPU=%1, GPU=%2, Remote=%3", cpu, gpu, servers.size());
347
348         boost::mutex::scoped_lock lm (_threads_mutex);
349         if (_ending) {
350                 return;
351         }
352
353         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
354                 for (auto i = wanted; i < current; ++i) {
355                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
356                         if (iter != _threads.end()) {
357                                 (*iter)->stop();
358                                 _threads.erase(iter);
359                         }
360                 }
361         };
362
363
364         /* CPU */
365
366         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
367                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
368         };
369
370         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
371
372         for (auto i = current_cpu_threads; i < cpu; ++i) {
373                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
374                 thread->start();
375                 _threads.push_back(thread);
376         }
377
378         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
379
380 #ifdef DCPOMATIC_GROK
381         /* GPU */
382
383         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
384                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
385         };
386
387         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
388
389         for (auto i = current_gpu_threads; i < gpu; ++i) {
390                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
391                 thread->start();
392                 _threads.push_back(thread);
393         }
394
395         remove_threads(gpu, current_gpu_threads, is_grok_thread);
396 #endif
397
398         /* Remote */
399
400         for (auto const& server: servers) {
401                 if (!server.current_link_version()) {
402                         continue;
403                 }
404
405                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
406                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
407                         return remote && remote->server().host_name() == server.host_name();
408                 };
409
410                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
411
412                 auto const wanted_threads = server.threads();
413
414                 if (wanted_threads > current_threads) {
415                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
416                 } else if (wanted_threads < current_threads) {
417                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
418                 }
419
420                 for (auto i = current_threads; i < wanted_threads; ++i) {
421                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
422                         thread->start();
423                         _threads.push_back(thread);
424                 }
425
426                 remove_threads(wanted_threads, current_threads, is_remote_thread);
427         }
428
429         _writer.set_encoder_threads(_threads.size());
430 }
431
432
433 DCPVideo
434 J2KEncoder::pop()
435 {
436         boost::mutex::scoped_lock lock(_queue_mutex);
437         while (_queue.empty()) {
438                 _empty_condition.wait (lock);
439         }
440
441         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
442
443         auto vf = _queue.front();
444         _queue.pop_front();
445
446         _full_condition.notify_all();
447         return vf;
448 }
449
450
451 void
452 J2KEncoder::retry(DCPVideo video)
453 {
454         boost::mutex::scoped_lock lock(_queue_mutex);
455         _queue.push_front(video);
456         _empty_condition.notify_all();
457 }
458
459
460 void
461 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
462 {
463         _writer.write(data, index, eyes);
464         frame_done();
465 }