dbb7f4f7bf33e6db50445c690ed2e251092352e2
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "j2k_encoder.h"
36 #include "log.h"
37 #include "player_video.h"
38 #include "util.h"
39 #include "writer.h"
40 #include <libcxml/cxml.h>
41 #include <iostream>
42
43 #include "i18n.h"
44
45
46 using std::cout;
47 using std::exception;
48 using std::list;
49 using std::make_shared;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54 using namespace dcpomatic;
55
56 static grk_plugin::GrokInitializer grokInitializer;
57
58 /** @param film Film that we are encoding.
59  *  @param writer Writer that we are using.
60  */
61 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
62         : _film(film)
63         , _history(200)
64         , _writer(writer)
65         , dcpomaticContext_(film, writer, _history, Config::instance()->gpu_binary_location())
66         , context_(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr)
67 {
68         servers_list_changed ();
69 }
70
71
72 J2KEncoder::~J2KEncoder ()
73 {
74         _server_found_connection.disconnect();
75
76         {
77                 boost::mutex::scoped_lock lm(_threads_mutex);
78                 terminate_threads();
79         }
80
81         delete context_;
82 }
83
84 void
85 J2KEncoder::begin ()
86 {
87         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
88                 boost::bind(&J2KEncoder::servers_list_changed, this)
89                 );
90 }
91
92 void
93 J2KEncoder::pause(void)
94 {
95         if (Config::instance()->enable_gpu())
96                 end(false);
97 }
98
99 void
100 J2KEncoder::resume(void)
101 {
102         if (Config::instance()->enable_gpu()) {
103                 context_ = new grk_plugin::GrokContext(dcpomaticContext_);
104                 servers_list_changed();
105         }
106 }
107
108 void
109 J2KEncoder::end(bool isFinal)
110 {
111         if (isFinal) {
112                 boost::mutex::scoped_lock lock(_queue_mutex);
113
114                 LOG_GENERAL(N_("Clearing queue of %1"), _queue.size());
115
116                 /* Keep waking workers until the queue is empty */
117                 while (!_queue.empty()) {
118                         rethrow();
119                         _empty_condition.notify_all();
120                         _full_condition.wait(lock);
121                 }
122                 lock.unlock();
123         }
124
125         LOG_GENERAL_NC (N_("Terminating encoder threads"));
126
127         {
128                 boost::mutex::scoped_lock lm (_threads_mutex);
129                 terminate_threads ();
130         }
131
132         /* Something might have been thrown during terminate_threads */
133         rethrow ();
134
135         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
136
137         /* The following sequence of events can occur in the above code:
138                  1. a remote worker takes the last image off the queue
139                  2. the loop above terminates
140                  3. the remote worker fails to encode the image and puts it back on the queue
141                  4. the remote worker is then terminated by terminate_threads
142
143                  So just mop up anything left in the queue here.
144         */
145         if (isFinal) {
146                 for (auto& i : _queue) {
147                         if (Config::instance()->enable_gpu()) {
148                                 if (!context_->scheduleCompress(i)) {
149                                         LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
150                                         // handle error
151                                 }
152                         } else {
153                                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
154                                 try {
155                                         _writer.write(
156                                             make_shared<dcp::ArrayData>(i.encode_locally()),
157                                             i.index(),
158                                             i.eyes());
159                                         frame_done();
160                                 } catch (std::exception& e) {
161                                         LOG_ERROR(N_("Local encode failed (%1)"), e.what());
162                                 }
163                         }
164                 }
165         }
166         delete context_;
167         context_ = nullptr;
168 }
169
170
171 /** @return an estimate of the current number of frames we are encoding per second,
172  *  if known.
173  */
174 optional<float>
175 J2KEncoder::current_encoding_rate () const
176 {
177         return _history.rate ();
178 }
179
180
181 /** @return Number of video frames that have been queued for encoding */
182 int
183 J2KEncoder::video_frames_enqueued () const
184 {
185         if (!_last_player_video_time) {
186                 return 0;
187         }
188
189         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
190 }
191
192
193 /** Should be called when a frame has been encoded successfully */
194 void
195 J2KEncoder::frame_done ()
196 {
197         _history.event ();
198 }
199
200
201 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
202  *  so each time the supplied frame is the one after the previous one.
203  *  pv represents one video frame, and could be empty if there is nothing to encode
204  *  for this DCP frame.
205  *
206  *  @param pv PlayerVideo to encode.
207  *  @param time Time of \p pv within the DCP.
208  */
209 void
210 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
211 {
212         _waker.nudge ();
213
214         size_t threads = 0;
215         {
216                 boost::mutex::scoped_lock lm (_threads_mutex);
217                 if (_threads)
218                         threads = _threads->size();
219                 else
220                         threads = std::thread::hardware_concurrency();
221         }
222
223         boost::mutex::scoped_lock queue_lock (_queue_mutex);
224
225         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
226            when there are no threads.
227         */
228         while (_queue.size() >= (threads * 2) + 1) {
229                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
230                 _full_condition.wait (queue_lock);
231                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
232         }
233
234         _writer.rethrow();
235         /* Re-throw any exception raised by one of our threads.  If more
236            than one has thrown an exception, only one will be rethrown, I think;
237            but then, if that happens something has gone badly wrong.
238         */
239         rethrow ();
240
241         auto const position = time.frames_floor(_film->video_frame_rate());
242
243         if (_writer.can_fake_write(position)) {
244                 /* We can fake-write this frame */
245                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
246                 _writer.fake_write(position, pv->eyes ());
247                 frame_done ();
248         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
249                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
250                 /* This frame already has J2K data, so just write it */
251                 _writer.write(pv->j2k(), position, pv->eyes ());
252                 frame_done ();
253         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
254                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
255                 _writer.repeat(position, pv->eyes());
256         } else {
257                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
258                 /* Queue this new frame for encoding */
259                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
260                 auto dcpv = DCPVideo(
261                     pv,
262                     position,
263                     _film->video_frame_rate(),
264                     _film->j2k_bandwidth(),
265                     _film->resolution());
266                 _queue.push_back(dcpv);
267
268                 /* The queue might not be empty any more, so notify anything which is
269                    waiting on that.
270                 */
271                 _empty_condition.notify_all ();
272         }
273
274         _last_player_video[pv->eyes()] = pv;
275         _last_player_video_time = time;
276 }
277
278
279 /** Caller must hold a lock on _threads_mutex */
280 void
281 J2KEncoder::terminate_threads ()
282 {
283         boost::this_thread::disable_interruption dis;
284
285         if (!_threads) {
286                 return;
287         }
288
289         _threads->interrupt_all ();
290         try {
291                 _threads->join_all ();
292         } catch (exception& e) {
293                 LOG_ERROR ("join() threw an exception: %1", e.what());
294         } catch (...) {
295                 LOG_ERROR_NC ("join() threw an exception");
296         }
297
298         _threads.reset ();
299 }
300
301
302 void
303 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
304 try
305 {
306         auto config = Config::instance();
307
308         start_of_thread ("J2KEncoder");
309
310         if (server) {
311                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
312         } else {
313                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
314         }
315
316         /* Number of seconds that we currently wait between attempts
317            to connect to the server; not relevant for localhost
318            encodings.
319         */
320         int remote_backoff = 0;
321
322         while (true) {
323
324                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
325                 boost::mutex::scoped_lock lock (_queue_mutex);
326                 while (_queue.empty ()) {
327                         _empty_condition.wait (lock);
328                 }
329
330                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
331                 auto vf = _queue.front ();
332
333                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
334                    so we must not be interrupted until one or other of these things have happened.  This
335                    block has thread interruption disabled.
336                 */
337                 {
338                         boost::this_thread::disable_interruption dis;
339
340                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
341                         _queue.pop_front ();
342
343                         lock.unlock ();
344
345                         shared_ptr<Data> encoded;
346
347                         /* We need to encode this input */
348                         if (server) {
349                                 try {
350                                         encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
351
352                                         if (remote_backoff > 0) {
353                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
354                                         }
355
356                                         /* This job succeeded, so remove any backoff */
357                                         remote_backoff = 0;
358
359                                 } catch (std::exception& e) {
360                                         if (remote_backoff < 60) {
361                                                 /* back off more */
362                                                 remote_backoff += 10;
363                                         }
364                                         LOG_ERROR (
365                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
366                                                 vf.index(), server->host_name(), e.what(), remote_backoff
367                                                 );
368                                 }
369
370                         } else {
371                                 if (context_) {
372                                         if (!context_->launch(vf, config->selected_gpu()) || !context_->scheduleCompress(vf)) {
373                                                 LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
374                                                 _queue.push_front(vf);
375                                         }
376
377                                 } else {
378                                         try {
379                                                 LOG_TIMING("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
380                                                 encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
381                                                 LOG_TIMING("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
382                                         } catch (std::exception& e) {
383                                                 /* This is very bad, so don't cope with it, just pass it on */
384                                                 LOG_ERROR(N_("Local encode failed (%1)"), e.what());
385                                                 throw;
386                                         }
387                                 }
388                         }
389
390                         if (encoded) {
391                                 _writer.write(encoded, vf.index(), vf.eyes());
392                                 frame_done ();
393                         } else {
394                                 if (!Config::instance()->enable_gpu()) {
395                                         lock.lock();
396                                         LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
397                                         _queue.push_front(vf);
398                                         lock.unlock();
399                                 }
400                         }
401                 }
402
403                 if (remote_backoff > 0) {
404                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
405                 }
406
407                 /* The queue might not be full any more, so notify anything that is waiting on that */
408                 lock.lock ();
409                 _full_condition.notify_all ();
410         }
411 }
412 catch (boost::thread_interrupted& e) {
413         /* Ignore these and just stop the thread */
414         _full_condition.notify_all ();
415 }
416 catch (...)
417 {
418         store_current ();
419         /* Wake anything waiting on _full_condition so it can see the exception */
420         _full_condition.notify_all ();
421 }
422
423
424 void
425 J2KEncoder::servers_list_changed ()
426 {
427         boost::mutex::scoped_lock lm (_threads_mutex);
428
429         terminate_threads ();
430         _threads = make_shared<boost::thread_group>();
431
432         /* XXX: could re-use threads */
433
434         if (!Config::instance()->only_servers_encode ()) {
435                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
436 #ifdef DCPOMATIC_LINUX
437                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
438                         pthread_setname_np (t->native_handle(), "encode-worker");
439 #else
440                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
441 #endif
442                 }
443         }
444
445         for (auto i: EncodeServerFinder::instance()->servers()) {
446                 if (!i.current_link_version()) {
447                         continue;
448                 }
449
450                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
451                 for (int j = 0; j < i.threads(); ++j) {
452                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
453                 }
454         }
455
456         _writer.set_encoder_threads(_threads->size());
457 }