Only build grok for Ubuntu 22.04.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #ifdef DCPOMATIC_GROK
37 #include "grok_j2k_encoder_thread.h"
38 #endif
39 #include "remote_j2k_encoder_thread.h"
40 #include "j2k_encoder.h"
41 #include "log.h"
42 #include "player_video.h"
43 #include "util.h"
44 #include "writer.h"
45 #include <libcxml/cxml.h>
46 #include <iostream>
47
48 #include "i18n.h"
49
50
51 using std::cout;
52 using std::dynamic_pointer_cast;
53 using std::exception;
54 using std::list;
55 using std::make_shared;
56 using std::shared_ptr;
57 using std::weak_ptr;
58 using boost::optional;
59 using dcp::Data;
60 using namespace dcpomatic;
61
62
63 /** @param film Film that we are encoding.
64  *  @param writer Writer that we are using.
65  */
66 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
67         : _film (film)
68         , _history (200)
69         , _writer (writer)
70 #ifdef DCPOMATIC_GROK
71         , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location())
72         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
73 #endif
74 {
75 }
76
77
78 J2KEncoder::~J2KEncoder ()
79 {
80         _server_found_connection.disconnect();
81
82         terminate_threads();
83
84 #ifdef DCPOMATIC_GROK
85         delete _context;
86 #endif
87 }
88
89
90 void
91 J2KEncoder::servers_list_changed()
92 {
93         auto config = Config::instance();
94
95         auto const cpu = (config->enable_gpu() || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
96         auto const gpu = config->enable_gpu() ? config->master_encoding_threads() : 0;
97
98         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
99 }
100
101
102 void
103 J2KEncoder::begin ()
104 {
105         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
106                 boost::bind(&J2KEncoder::servers_list_changed, this)
107                 );
108         servers_list_changed ();
109 }
110
111
112 void
113 J2KEncoder::pause()
114 {
115         if (!Config::instance()->enable_gpu()) {
116                 return;
117         }
118
119         terminate_threads ();
120
121         /* Something might have been thrown during terminate_threads */
122         rethrow ();
123
124 #ifdef DCPOMATIC_GROK
125         delete _context;
126         _context = nullptr;
127 #endif
128 }
129
130
131 void J2KEncoder::resume()
132 {
133         if (!Config::instance()->enable_gpu()) {
134                 return;
135         }
136
137 #ifdef DCPOMATIC_GROK
138         _context = new grk_plugin::GrokContext(_dcpomatic_context);
139 #endif
140         servers_list_changed();
141 }
142
143
144 void
145 J2KEncoder::end()
146 {
147         boost::mutex::scoped_lock lock (_queue_mutex);
148
149         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
150
151         /* Keep waking workers until the queue is empty */
152         while (!_queue.empty ()) {
153                 rethrow ();
154                 _full_condition.wait (lock);
155         }
156         lock.unlock ();
157
158         LOG_GENERAL_NC (N_("Terminating encoder threads"));
159
160         terminate_threads ();
161
162         /* Something might have been thrown during terminate_threads */
163         rethrow ();
164
165         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
166
167         /* The following sequence of events can occur in the above code:
168              1. a remote worker takes the last image off the queue
169              2. the loop above terminates
170              3. the remote worker fails to encode the image and puts it back on the queue
171              4. the remote worker is then terminated by terminate_threads
172
173              So just mop up anything left in the queue here.
174         */
175         for (auto & i: _queue) {
176 #ifdef DCPOMATIC_GROK
177                 if (Config::instance()->enable_gpu ()) {
178                         if (!_context->scheduleCompress(i)){
179                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
180                                 // handle error
181                         }
182                 } else {
183 #else
184                 {
185 #endif
186                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
187                         try {
188                                 _writer.write(
189                                         make_shared<dcp::ArrayData>(i.encode_locally()),
190                                         i.index(),
191                                         i.eyes()
192                                         );
193                                 frame_done ();
194                         } catch (std::exception& e) {
195                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
196                         }
197                 }
198         }
199
200 #ifdef DCPOMATIC_GROK
201         delete _context;
202         _context = nullptr;
203 #endif
204 }
205
206
207 /** @return an estimate of the current number of frames we are encoding per second,
208  *  if known.
209  */
210 optional<float>
211 J2KEncoder::current_encoding_rate () const
212 {
213         return _history.rate ();
214 }
215
216
217 /** @return Number of video frames that have been queued for encoding */
218 int
219 J2KEncoder::video_frames_enqueued () const
220 {
221         if (!_last_player_video_time) {
222                 return 0;
223         }
224
225         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
226 }
227
228
229 /** Should be called when a frame has been encoded successfully */
230 void
231 J2KEncoder::frame_done ()
232 {
233         _history.event ();
234 }
235
236
237 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
238  *  so each time the supplied frame is the one after the previous one.
239  *  pv represents one video frame, and could be empty if there is nothing to encode
240  *  for this DCP frame.
241  *
242  *  @param pv PlayerVideo to encode.
243  *  @param time Time of \p pv within the DCP.
244  */
245 void
246 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
247 {
248         _waker.nudge ();
249
250         size_t threads = 0;
251         {
252                 boost::mutex::scoped_lock lm (_threads_mutex);
253                 threads = _threads.size();
254         }
255
256         boost::mutex::scoped_lock queue_lock (_queue_mutex);
257
258         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
259            when there are no threads.
260         */
261         while (_queue.size() >= (threads * 2) + 1) {
262                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
263                 _full_condition.wait (queue_lock);
264                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
265         }
266
267         _writer.rethrow();
268         /* Re-throw any exception raised by one of our threads.  If more
269            than one has thrown an exception, only one will be rethrown, I think;
270            but then, if that happens something has gone badly wrong.
271         */
272         rethrow ();
273
274         auto const position = time.frames_floor(_film->video_frame_rate());
275
276         if (_writer.can_fake_write(position)) {
277                 /* We can fake-write this frame */
278                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
279                 _writer.fake_write(position, pv->eyes ());
280                 frame_done ();
281         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
282                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
283                 /* This frame already has J2K data, so just write it */
284                 _writer.write(pv->j2k(), position, pv->eyes ());
285                 frame_done ();
286         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
287                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
288                 _writer.repeat(position, pv->eyes());
289         } else {
290                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
291                 /* Queue this new frame for encoding */
292                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
293                 auto dcpv = DCPVideo(
294                                 pv,
295                                 position,
296                                 _film->video_frame_rate(),
297                                 _film->j2k_bandwidth(),
298                                 _film->resolution()
299                                 );
300                 _queue.push_back (dcpv);
301
302                 /* The queue might not be empty any more, so notify anything which is
303                    waiting on that.
304                 */
305                 _empty_condition.notify_all ();
306         }
307
308         _last_player_video[pv->eyes()] = pv;
309         _last_player_video_time = time;
310 }
311
312
313 void
314 J2KEncoder::terminate_threads ()
315 {
316         boost::mutex::scoped_lock lm(_threads_mutex);
317         boost::this_thread::disable_interruption dis;
318
319         for (auto& thread: _threads) {
320                 thread->stop();
321         }
322
323         _threads.clear();
324         _ending = true;
325 }
326
327
328 void
329 #ifdef DCPOMATIC_GROK
330 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
331 #else
332 J2KEncoder::remake_threads(int cpu, int, list<EncodeServerDescription> servers)
333 #endif
334 {
335         boost::mutex::scoped_lock lm (_threads_mutex);
336         if (_ending) {
337                 return;
338         }
339
340         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
341                 for (auto i = wanted; i < current; ++i) {
342                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
343                         if (iter != _threads.end()) {
344                                 (*iter)->stop();
345                                 _threads.erase(iter);
346                         }
347                 }
348         };
349
350
351         /* CPU */
352
353         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
354                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
355         };
356
357         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
358
359         for (auto i = current_cpu_threads; i < cpu; ++i) {
360                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
361                 thread->start();
362                 _threads.push_back(thread);
363         }
364
365         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
366
367 #ifdef DCPOMATIC_GROK
368         /* GPU */
369
370         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
371                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
372         };
373
374         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
375
376         for (auto i = current_gpu_threads; i < gpu; ++i) {
377                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
378                 thread->start();
379                 _threads.push_back(thread);
380         }
381
382         remove_threads(gpu, current_gpu_threads, is_grok_thread);
383 #endif
384
385         /* Remote */
386
387         for (auto const& server: servers) {
388                 if (!server.current_link_version()) {
389                         continue;
390                 }
391
392                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
393                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
394                         return remote && remote->server().host_name() == server.host_name();
395                 };
396
397                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
398
399                 auto const wanted_threads = server.threads();
400
401                 if (wanted_threads > current_threads) {
402                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
403                 } else if (wanted_threads < current_threads) {
404                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
405                 }
406
407                 for (auto i = current_threads; i < wanted_threads; ++i) {
408                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
409                         thread->start();
410                         _threads.push_back(thread);
411                 }
412
413                 remove_threads(wanted_threads, current_threads, is_remote_thread);
414         }
415
416         _writer.set_encoder_threads(_threads.size());
417 }
418
419
420 DCPVideo
421 J2KEncoder::pop()
422 {
423         boost::mutex::scoped_lock lock(_queue_mutex);
424         while (_queue.empty()) {
425                 _empty_condition.wait (lock);
426         }
427
428         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
429
430         auto vf = _queue.front();
431         _queue.pop_front();
432
433         _full_condition.notify_all();
434         return vf;
435 }
436
437
438 void
439 J2KEncoder::retry(DCPVideo video)
440 {
441         boost::mutex::scoped_lock lock(_queue_mutex);
442         _queue.push_front(video);
443         _empty_condition.notify_all();
444 }
445
446
447 void
448 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
449 {
450         _writer.write(data, index, eyes);
451         frame_done();
452 }