Formatting, variable name tidying and some const correctness.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "j2k_encoder.h"
36 #include "log.h"
37 #include "player_video.h"
38 #include "util.h"
39 #include "writer.h"
40 #include <libcxml/cxml.h>
41 #include <iostream>
42
43 #include "i18n.h"
44
45
46 using std::cout;
47 using std::exception;
48 using std::list;
49 using std::make_shared;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54 using namespace dcpomatic;
55
56 static grk_plugin::GrokInitializer grokInitializer;
57
58 /** @param film Film that we are encoding.
59  *  @param writer Writer that we are using.
60  */
61 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
62         : _film (film)
63         , _history (200)
64         , _writer (writer)
65         , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location())
66         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
67 {
68 }
69
70
71 J2KEncoder::~J2KEncoder ()
72 {
73         _server_found_connection.disconnect();
74
75         {
76         boost::mutex::scoped_lock lm (_threads_mutex);
77         terminate_threads ();
78         }
79
80         delete _context;
81 }
82
83 void
84 J2KEncoder::begin ()
85 {
86         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
87                 boost::bind(&J2KEncoder::servers_list_changed, this)
88                 );
89         servers_list_changed ();
90 }
91
92
93 void
94 J2KEncoder::pause()
95 {
96         if (Config::instance()->enable_gpu()) {
97                 end(false);
98         }
99 }
100
101
102 void J2KEncoder::resume()
103 {
104         if (Config::instance()->enable_gpu()) {
105                 _context = new grk_plugin::GrokContext(_dcpomatic_context);
106                 servers_list_changed();
107         }
108 }
109
110
111 void
112 J2KEncoder::end (bool isFinal)
113 {
114         if (isFinal) {
115                 boost::mutex::scoped_lock lock (_queue_mutex);
116
117                 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
118
119                 /* Keep waking workers until the queue is empty */
120                         while (!_queue.empty ()) {
121                                 rethrow ();
122                                 _empty_condition.notify_all ();
123                                 _full_condition.wait (lock);
124                         }
125                 lock.unlock ();
126         }
127
128         LOG_GENERAL_NC (N_("Terminating encoder threads"));
129
130         {
131                 boost::mutex::scoped_lock lm (_threads_mutex);
132                 terminate_threads ();
133         }
134
135         /* Something might have been thrown during terminate_threads */
136         rethrow ();
137
138         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
139
140         /* The following sequence of events can occur in the above code:
141                  1. a remote worker takes the last image off the queue
142                  2. the loop above terminates
143                  3. the remote worker fails to encode the image and puts it back on the queue
144                  4. the remote worker is then terminated by terminate_threads
145
146                  So just mop up anything left in the queue here.
147         */
148         if (isFinal) {
149                 for (auto & i: _queue) {
150                         if (Config::instance()->enable_gpu ()) {
151                                 if (!_context->scheduleCompress(i)){
152                                         LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
153                                         // handle error
154                                 }
155                         }
156                         else {
157                                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
158                                 try {
159                                         _writer.write(
160                                                         make_shared<dcp::ArrayData>(i.encode_locally()),
161                                                 i.index(),
162                                                 i.eyes()
163                                                 );
164                                         frame_done ();
165                                 } catch (std::exception& e) {
166                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
167                                 }
168                         }
169                 }
170         }
171         delete _context;
172         _context = nullptr;
173 }
174
175
176 /** @return an estimate of the current number of frames we are encoding per second,
177  *  if known.
178  */
179 optional<float>
180 J2KEncoder::current_encoding_rate () const
181 {
182         return _history.rate ();
183 }
184
185
186 /** @return Number of video frames that have been queued for encoding */
187 int
188 J2KEncoder::video_frames_enqueued () const
189 {
190         if (!_last_player_video_time) {
191                 return 0;
192         }
193
194         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
195 }
196
197
198 /** Should be called when a frame has been encoded successfully */
199 void
200 J2KEncoder::frame_done ()
201 {
202         _history.event ();
203 }
204
205
206 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
207  *  so each time the supplied frame is the one after the previous one.
208  *  pv represents one video frame, and could be empty if there is nothing to encode
209  *  for this DCP frame.
210  *
211  *  @param pv PlayerVideo to encode.
212  *  @param time Time of \p pv within the DCP.
213  */
214 void
215 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
216 {
217         _waker.nudge ();
218
219         size_t threads = 0;
220         {
221                 boost::mutex::scoped_lock lm (_threads_mutex);
222                 if (_threads)
223                         threads = _threads->size();
224                 else
225                         threads = std::thread::hardware_concurrency();
226         }
227
228         boost::mutex::scoped_lock queue_lock (_queue_mutex);
229
230         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
231            when there are no threads.
232         */
233         while (_queue.size() >= (threads * 2) + 1) {
234                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
235                 _full_condition.wait (queue_lock);
236                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
237         }
238
239         _writer.rethrow();
240         /* Re-throw any exception raised by one of our threads.  If more
241            than one has thrown an exception, only one will be rethrown, I think;
242            but then, if that happens something has gone badly wrong.
243         */
244         rethrow ();
245
246         auto const position = time.frames_floor(_film->video_frame_rate());
247
248         if (_writer.can_fake_write(position)) {
249                 /* We can fake-write this frame */
250                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
251                 _writer.fake_write(position, pv->eyes ());
252                 frame_done ();
253         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
254                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
255                 /* This frame already has J2K data, so just write it */
256                 _writer.write(pv->j2k(), position, pv->eyes ());
257                 frame_done ();
258         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
259                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
260                 _writer.repeat(position, pv->eyes());
261         } else {
262                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
263                 /* Queue this new frame for encoding */
264                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
265                 auto dcpv = DCPVideo(
266                                 pv,
267                                 position,
268                                 _film->video_frame_rate(),
269                                 _film->j2k_bandwidth(),
270                                 _film->resolution()
271                                 );
272                 _queue.push_back (dcpv);
273
274                 /* The queue might not be empty any more, so notify anything which is
275                    waiting on that.
276                 */
277                 _empty_condition.notify_all ();
278         }
279
280         _last_player_video[pv->eyes()] = pv;
281         _last_player_video_time = time;
282 }
283
284
285 /** Caller must hold a lock on _threads_mutex */
286 void
287 J2KEncoder::terminate_threads ()
288 {
289         boost::this_thread::disable_interruption dis;
290
291         if (!_threads) {
292                 return;
293         }
294
295         _threads->interrupt_all ();
296         try {
297                 _threads->join_all ();
298         } catch (exception& e) {
299                 LOG_ERROR ("join() threw an exception: %1", e.what());
300         } catch (...) {
301                 LOG_ERROR_NC ("join() threw an exception");
302         }
303
304         _threads.reset ();
305 }
306
307
308 void
309 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
310 try
311 {
312         auto config = Config::instance ();
313
314         start_of_thread ("J2KEncoder");
315
316         if (server) {
317                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
318         } else {
319                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
320         }
321
322         /* Number of seconds that we currently wait between attempts
323            to connect to the server; not relevant for localhost
324            encodings.
325         */
326         int remote_backoff = 0;
327
328         while (true) {
329
330                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
331                 boost::mutex::scoped_lock lock (_queue_mutex);
332                 while (_queue.empty ()) {
333                         _empty_condition.wait (lock);
334                 }
335
336                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
337                 auto vf = _queue.front ();
338
339                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
340                    so we must not be interrupted until one or other of these things have happened.  This
341                    block has thread interruption disabled.
342                 */
343                 {
344                         boost::this_thread::disable_interruption dis;
345
346                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
347                         _queue.pop_front ();
348
349                         lock.unlock ();
350
351                         shared_ptr<Data> encoded;
352
353                         /* We need to encode this input */
354                         if (server) {
355                                 try {
356                                         encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
357
358                                         if (remote_backoff > 0) {
359                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
360                                         }
361
362                                         /* This job succeeded, so remove any backoff */
363                                         remote_backoff = 0;
364
365                                 } catch (std::exception& e) {
366                                         if (remote_backoff < 60) {
367                                                 /* back off more */
368                                                 remote_backoff += 10;
369                                         }
370                                         LOG_ERROR (
371                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
372                                                 vf.index(), server->host_name(), e.what(), remote_backoff
373                                                 );
374                                 }
375
376                         } else {
377                                 if (_context) {
378                                         if (!_context->launch(vf, config->selected_gpu()) || !_context->scheduleCompress(vf)) {
379                                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
380                                                 _queue.push_front (vf);
381                                         }
382
383                                 } else {
384                                         try {
385                                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
386                                                 encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
387                                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
388                                         } catch (std::exception& e) {
389                                                 /* This is very bad, so don't cope with it, just pass it on */
390                                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
391                                                 throw;
392                                         }
393                                 }
394                         }
395
396                         if (encoded) {
397                                 _writer.write(encoded, vf.index(), vf.eyes());
398                                 frame_done ();
399                         } else {
400                                 if (!Config::instance()->enable_gpu ()) {
401                                         lock.lock ();
402                                         LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
403                                         _queue.push_front (vf);
404                                         lock.unlock ();
405                                 }
406                         }
407                 }
408
409                 if (remote_backoff > 0) {
410                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
411                 }
412
413                 /* The queue might not be full any more, so notify anything that is waiting on that */
414                 lock.lock ();
415                 _full_condition.notify_all ();
416         }
417 }
418 catch (boost::thread_interrupted& e) {
419         /* Ignore these and just stop the thread */
420         _full_condition.notify_all ();
421 }
422 catch (...)
423 {
424         store_current ();
425         /* Wake anything waiting on _full_condition so it can see the exception */
426         _full_condition.notify_all ();
427 }
428
429
430 void
431 J2KEncoder::servers_list_changed ()
432 {
433         boost::mutex::scoped_lock lm (_threads_mutex);
434
435         terminate_threads ();
436         _threads = make_shared<boost::thread_group>();
437
438         /* XXX: could re-use threads */
439
440         if (!Config::instance()->only_servers_encode ()) {
441                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
442 #ifdef DCPOMATIC_LINUX
443                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
444                         pthread_setname_np (t->native_handle(), "encode-worker");
445 #else
446                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
447 #endif
448                 }
449         }
450
451         for (auto i: EncodeServerFinder::instance()->servers()) {
452                 if (!i.current_link_version()) {
453                         continue;
454                 }
455
456                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
457                 for (int j = 0; j < i.threads(); ++j) {
458                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
459                 }
460         }
461
462         _writer.set_encoder_threads(_threads->size());
463 }