Patch from Aaron.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "j2k_encoder.h"
36 #include "log.h"
37 #include "player_video.h"
38 #include "util.h"
39 #include "writer.h"
40 #include <libcxml/cxml.h>
41 #include <iostream>
42
43 #include "i18n.h"
44
45
46 using std::cout;
47 using std::exception;
48 using std::list;
49 using std::make_shared;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54 using namespace dcpomatic;
55
56 static grk_plugin::GrokInitializer grokInitializer;
57
58 /** @param film Film that we are encoding.
59  *  @param writer Writer that we are using.
60  */
61 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
62         : _film (film)
63         , _history (200)
64         , _writer (writer) ,
65         dcpomaticContext_(film,writer,_history, Config::instance()->gpu_binary_location ()),
66         context_(Config::instance()->enable_gpu () ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr)
67 {
68         servers_list_changed ();
69 }
70
71
72 J2KEncoder::~J2KEncoder ()
73 {
74         _server_found_connection.disconnect();
75
76         {
77         boost::mutex::scoped_lock lm (_threads_mutex);
78         terminate_threads ();
79         }
80
81         delete context_;
82 }
83
84 void
85 J2KEncoder::begin ()
86 {
87         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
88                 boost::bind(&J2KEncoder::servers_list_changed, this)
89                 );
90 }
91
92 void J2KEncoder::pause(void){
93         if (Config::instance()->enable_gpu ())
94                 end(false);
95 }
96
97 void J2KEncoder::resume(void){
98         if (Config::instance()->enable_gpu ()) {
99                 context_ = new grk_plugin::GrokContext(dcpomaticContext_);
100                 servers_list_changed ();
101         }
102 }
103
104 void
105 J2KEncoder::end (bool isFinal)
106 {
107         if (isFinal) {
108                 boost::mutex::scoped_lock lock (_queue_mutex);
109
110                 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
111
112                 /* Keep waking workers until the queue is empty */
113                         while (!_queue.empty ()) {
114                                 rethrow ();
115                                 _empty_condition.notify_all ();
116                                 _full_condition.wait (lock);
117                         }
118                 lock.unlock ();
119         }
120
121         LOG_GENERAL_NC (N_("Terminating encoder threads"));
122
123         {
124                 boost::mutex::scoped_lock lm (_threads_mutex);
125                 terminate_threads ();
126         }
127
128         /* Something might have been thrown during terminate_threads */
129         rethrow ();
130
131         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
132
133         /* The following sequence of events can occur in the above code:
134                  1. a remote worker takes the last image off the queue
135                  2. the loop above terminates
136                  3. the remote worker fails to encode the image and puts it back on the queue
137                  4. the remote worker is then terminated by terminate_threads
138
139                  So just mop up anything left in the queue here.
140         */
141         if (isFinal) {
142                 for (auto & i: _queue) {
143                         if (Config::instance()->enable_gpu ()) {
144                                 if (!context_->scheduleCompress(i)){
145                                         LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
146                                         // handle error
147                                 }
148                         }
149                         else {
150                                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
151                                 try {
152                                         _writer.write(
153                                                         make_shared<dcp::ArrayData>(i.encode_locally()),
154                                                 i.index(),
155                                                 i.eyes()
156                                                 );
157                                         frame_done ();
158                                 } catch (std::exception& e) {
159                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
160                                 }
161                         }
162                 }
163         }
164         delete context_;
165         context_ = nullptr;
166 }
167
168
169 /** @return an estimate of the current number of frames we are encoding per second,
170  *  if known.
171  */
172 optional<float>
173 J2KEncoder::current_encoding_rate () const
174 {
175         return _history.rate ();
176 }
177
178
179 /** @return Number of video frames that have been queued for encoding */
180 int
181 J2KEncoder::video_frames_enqueued () const
182 {
183         if (!_last_player_video_time) {
184                 return 0;
185         }
186
187         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
188 }
189
190
191 /** Should be called when a frame has been encoded successfully */
192 void
193 J2KEncoder::frame_done ()
194 {
195         _history.event ();
196 }
197
198
199 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
200  *  so each time the supplied frame is the one after the previous one.
201  *  pv represents one video frame, and could be empty if there is nothing to encode
202  *  for this DCP frame.
203  *
204  *  @param pv PlayerVideo to encode.
205  *  @param time Time of \p pv within the DCP.
206  */
207 void
208 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
209 {
210         _waker.nudge ();
211
212         size_t threads = 0;
213         {
214                 boost::mutex::scoped_lock lm (_threads_mutex);
215                 if (_threads)
216                         threads = _threads->size();
217                 else
218                         threads = std::thread::hardware_concurrency();
219         }
220
221         boost::mutex::scoped_lock queue_lock (_queue_mutex);
222
223         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
224            when there are no threads.
225         */
226         while (_queue.size() >= (threads * 2) + 1) {
227                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
228                 _full_condition.wait (queue_lock);
229                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
230         }
231
232         _writer.rethrow();
233         /* Re-throw any exception raised by one of our threads.  If more
234            than one has thrown an exception, only one will be rethrown, I think;
235            but then, if that happens something has gone badly wrong.
236         */
237         rethrow ();
238
239         auto const position = time.frames_floor(_film->video_frame_rate());
240
241         if (_writer.can_fake_write(position)) {
242                 /* We can fake-write this frame */
243                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
244                 _writer.fake_write(position, pv->eyes ());
245                 frame_done ();
246         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
247                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
248                 /* This frame already has J2K data, so just write it */
249                 _writer.write(pv->j2k(), position, pv->eyes ());
250                 frame_done ();
251         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
252                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
253                 _writer.repeat(position, pv->eyes());
254         } else {
255                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
256                 /* Queue this new frame for encoding */
257                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
258                 auto dcpv = DCPVideo(
259                                 pv,
260                                 position,
261                                 _film->video_frame_rate(),
262                                 _film->j2k_bandwidth(),
263                                 _film->resolution()
264                                 );
265                 _queue.push_back (dcpv);
266
267                 /* The queue might not be empty any more, so notify anything which is
268                    waiting on that.
269                 */
270                 _empty_condition.notify_all ();
271         }
272
273         _last_player_video[pv->eyes()] = pv;
274         _last_player_video_time = time;
275 }
276
277
278 /** Caller must hold a lock on _threads_mutex */
279 void
280 J2KEncoder::terminate_threads ()
281 {
282         boost::this_thread::disable_interruption dis;
283
284         if (!_threads) {
285                 return;
286         }
287
288         _threads->interrupt_all ();
289         try {
290                 _threads->join_all ();
291         } catch (exception& e) {
292                 LOG_ERROR ("join() threw an exception: %1", e.what());
293         } catch (...) {
294                 LOG_ERROR_NC ("join() threw an exception");
295         }
296
297         _threads.reset ();
298 }
299
300
301 void
302 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
303 try
304 {
305         auto config = Config::instance ();
306
307         start_of_thread ("J2KEncoder");
308
309         if (server) {
310                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
311         } else {
312                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
313         }
314
315         /* Number of seconds that we currently wait between attempts
316            to connect to the server; not relevant for localhost
317            encodings.
318         */
319         int remote_backoff = 0;
320
321         while (true) {
322
323                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
324                 boost::mutex::scoped_lock lock (_queue_mutex);
325                 while (_queue.empty ()) {
326                         _empty_condition.wait (lock);
327                 }
328
329                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
330                 auto vf = _queue.front ();
331
332                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
333                    so we must not be interrupted until one or other of these things have happened.  This
334                    block has thread interruption disabled.
335                 */
336                 {
337                         boost::this_thread::disable_interruption dis;
338
339                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
340                         _queue.pop_front ();
341
342                         lock.unlock ();
343
344                         shared_ptr<Data> encoded;
345
346                         /* We need to encode this input */
347                         if (server) {
348                                 try {
349                                         encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
350
351                                         if (remote_backoff > 0) {
352                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
353                                         }
354
355                                         /* This job succeeded, so remove any backoff */
356                                         remote_backoff = 0;
357
358                                 } catch (std::exception& e) {
359                                         if (remote_backoff < 60) {
360                                                 /* back off more */
361                                                 remote_backoff += 10;
362                                         }
363                                         LOG_ERROR (
364                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
365                                                 vf.index(), server->host_name(), e.what(), remote_backoff
366                                                 );
367                                 }
368
369                         } else {
370                                 if (context_) {
371                                         if (!context_->launch(vf, config->selected_gpu()) || !context_->scheduleCompress(vf)) {
372                                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
373                                                 _queue.push_front (vf);
374                                         }
375
376                                 } else {
377                                         try {
378                                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
379                                                 encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
380                                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
381                                         } catch (std::exception& e) {
382                                                 /* This is very bad, so don't cope with it, just pass it on */
383                                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
384                                                 throw;
385                                         }
386                                 }
387                         }
388
389                         if (encoded) {
390                                 _writer.write(encoded, vf.index(), vf.eyes());
391                                 frame_done ();
392                         } else {
393                                 if (!Config::instance()->enable_gpu ()) {
394                                         lock.lock ();
395                                         LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
396                                         _queue.push_front (vf);
397                                         lock.unlock ();
398                                 }
399                         }
400                 }
401
402                 if (remote_backoff > 0) {
403                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
404                 }
405
406                 /* The queue might not be full any more, so notify anything that is waiting on that */
407                 lock.lock ();
408                 _full_condition.notify_all ();
409         }
410 }
411 catch (boost::thread_interrupted& e) {
412         /* Ignore these and just stop the thread */
413         _full_condition.notify_all ();
414 }
415 catch (...)
416 {
417         store_current ();
418         /* Wake anything waiting on _full_condition so it can see the exception */
419         _full_condition.notify_all ();
420 }
421
422
423 void
424 J2KEncoder::servers_list_changed ()
425 {
426         boost::mutex::scoped_lock lm (_threads_mutex);
427
428         terminate_threads ();
429         _threads = make_shared<boost::thread_group>();
430
431         /* XXX: could re-use threads */
432
433         if (!Config::instance()->only_servers_encode ()) {
434                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
435 #ifdef DCPOMATIC_LINUX
436                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
437                         pthread_setname_np (t->native_handle(), "encode-worker");
438 #else
439                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
440 #endif
441                 }
442         }
443
444         for (auto i: EncodeServerFinder::instance()->servers()) {
445                 if (!i.current_link_version()) {
446                         continue;
447                 }
448
449                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
450                 for (int j = 0; j < i.threads(); ++j) {
451                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
452                 }
453         }
454
455         _writer.set_encoder_threads(_threads->size());
456 }