Run clang-format on the previous commit.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "j2k_encoder.h"
36 #include "log.h"
37 #include "player_video.h"
38 #include "util.h"
39 #include "writer.h"
40 #include <libcxml/cxml.h>
41 #include <iostream>
42
43 #include "i18n.h"
44
45
46 using std::cout;
47 using std::exception;
48 using std::list;
49 using std::make_shared;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54 using namespace dcpomatic;
55
56 static grk_plugin::GrokInitializer grokInitializer;
57
58 /** @param film Film that we are encoding.
59  *  @param writer Writer that we are using.
60  */
61 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
62         : _film(film)
63         , _history(200)
64         , _writer(writer)
65         , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location())
66         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
67 {
68         servers_list_changed ();
69 }
70
71
72 J2KEncoder::~J2KEncoder ()
73 {
74         _server_found_connection.disconnect();
75
76         {
77                 boost::mutex::scoped_lock lm(_threads_mutex);
78                 terminate_threads();
79         }
80
81         delete _context;
82 }
83
84 void
85 J2KEncoder::begin ()
86 {
87         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
88                 boost::bind(&J2KEncoder::servers_list_changed, this)
89                 );
90 }
91
92 void
93 J2KEncoder::pause()
94 {
95         if (Config::instance()->enable_gpu()) {
96                 end(false);
97         }
98 }
99
100 void
101 J2KEncoder::resume()
102 {
103         if (Config::instance()->enable_gpu()) {
104                 _context = new grk_plugin::GrokContext(_dcpomatic_context);
105                 servers_list_changed();
106         }
107 }
108
109 void
110 J2KEncoder::end(bool isFinal)
111 {
112         if (isFinal) {
113                 boost::mutex::scoped_lock lock(_queue_mutex);
114
115                 LOG_GENERAL(N_("Clearing queue of %1"), _queue.size());
116
117                 /* Keep waking workers until the queue is empty */
118                 while (!_queue.empty()) {
119                         rethrow();
120                         _empty_condition.notify_all();
121                         _full_condition.wait(lock);
122                 }
123                 lock.unlock();
124         }
125
126         LOG_GENERAL_NC (N_("Terminating encoder threads"));
127
128         {
129                 boost::mutex::scoped_lock lm (_threads_mutex);
130                 terminate_threads ();
131         }
132
133         /* Something might have been thrown during terminate_threads */
134         rethrow ();
135
136         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
137
138         /* The following sequence of events can occur in the above code:
139                  1. a remote worker takes the last image off the queue
140                  2. the loop above terminates
141                  3. the remote worker fails to encode the image and puts it back on the queue
142                  4. the remote worker is then terminated by terminate_threads
143
144                  So just mop up anything left in the queue here.
145         */
146         if (isFinal) {
147                 for (auto& i : _queue) {
148                         if (Config::instance()->enable_gpu()) {
149                                 if (!_context->schedule_compress(i)) {
150                                         LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
151                                         // handle error
152                                 }
153                         } else {
154                                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
155                                 try {
156                                         _writer.write(
157                                             make_shared<dcp::ArrayData>(i.encode_locally()),
158                                             i.index(),
159                                             i.eyes());
160                                         frame_done();
161                                 } catch (std::exception& e) {
162                                         LOG_ERROR(N_("Local encode failed (%1)"), e.what());
163                                 }
164                         }
165                 }
166         }
167         delete _context;
168         _context = nullptr;
169 }
170
171
172 /** @return an estimate of the current number of frames we are encoding per second,
173  *  if known.
174  */
175 optional<float>
176 J2KEncoder::current_encoding_rate () const
177 {
178         return _history.rate ();
179 }
180
181
182 /** @return Number of video frames that have been queued for encoding */
183 int
184 J2KEncoder::video_frames_enqueued () const
185 {
186         if (!_last_player_video_time) {
187                 return 0;
188         }
189
190         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
191 }
192
193
194 /** Should be called when a frame has been encoded successfully */
195 void
196 J2KEncoder::frame_done ()
197 {
198         _history.event ();
199 }
200
201
202 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
203  *  so each time the supplied frame is the one after the previous one.
204  *  pv represents one video frame, and could be empty if there is nothing to encode
205  *  for this DCP frame.
206  *
207  *  @param pv PlayerVideo to encode.
208  *  @param time Time of \p pv within the DCP.
209  */
210 void
211 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
212 {
213         _waker.nudge ();
214
215         size_t threads = 0;
216         {
217                 boost::mutex::scoped_lock lm (_threads_mutex);
218                 if (_threads) {
219                         threads = _threads->size();
220                 } else {
221                         threads = std::thread::hardware_concurrency();
222                 }
223         }
224
225         boost::mutex::scoped_lock queue_lock (_queue_mutex);
226
227         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
228            when there are no threads.
229         */
230         while (_queue.size() >= (threads * 2) + 1) {
231                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
232                 _full_condition.wait (queue_lock);
233                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
234         }
235
236         _writer.rethrow();
237         /* Re-throw any exception raised by one of our threads.  If more
238            than one has thrown an exception, only one will be rethrown, I think;
239            but then, if that happens something has gone badly wrong.
240         */
241         rethrow ();
242
243         auto const position = time.frames_floor(_film->video_frame_rate());
244
245         if (_writer.can_fake_write(position)) {
246                 /* We can fake-write this frame */
247                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
248                 _writer.fake_write(position, pv->eyes ());
249                 frame_done ();
250         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
251                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
252                 /* This frame already has J2K data, so just write it */
253                 _writer.write(pv->j2k(), position, pv->eyes ());
254                 frame_done ();
255         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
256                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
257                 _writer.repeat(position, pv->eyes());
258         } else {
259                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
260                 /* Queue this new frame for encoding */
261                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
262                 auto dcpv = DCPVideo(
263                     pv,
264                     position,
265                     _film->video_frame_rate(),
266                     _film->j2k_bandwidth(),
267                     _film->resolution());
268                 _queue.push_back(dcpv);
269
270                 /* The queue might not be empty any more, so notify anything which is
271                    waiting on that.
272                 */
273                 _empty_condition.notify_all ();
274         }
275
276         _last_player_video[pv->eyes()] = pv;
277         _last_player_video_time = time;
278 }
279
280
281 /** Caller must hold a lock on _threads_mutex */
282 void
283 J2KEncoder::terminate_threads ()
284 {
285         boost::this_thread::disable_interruption dis;
286
287         if (!_threads) {
288                 return;
289         }
290
291         _threads->interrupt_all ();
292         try {
293                 _threads->join_all ();
294         } catch (exception& e) {
295                 LOG_ERROR ("join() threw an exception: %1", e.what());
296         } catch (...) {
297                 LOG_ERROR_NC ("join() threw an exception");
298         }
299
300         _threads.reset ();
301 }
302
303
304 void
305 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
306 try
307 {
308         auto config = Config::instance();
309
310         start_of_thread ("J2KEncoder");
311
312         if (server) {
313                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
314         } else {
315                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
316         }
317
318         /* Number of seconds that we currently wait between attempts
319            to connect to the server; not relevant for localhost
320            encodings.
321         */
322         int remote_backoff = 0;
323
324         while (true) {
325
326                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
327                 boost::mutex::scoped_lock lock (_queue_mutex);
328                 while (_queue.empty ()) {
329                         _empty_condition.wait (lock);
330                 }
331
332                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
333                 auto vf = _queue.front ();
334
335                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
336                    so we must not be interrupted until one or other of these things have happened.  This
337                    block has thread interruption disabled.
338                 */
339                 {
340                         boost::this_thread::disable_interruption dis;
341
342                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
343                         _queue.pop_front ();
344
345                         lock.unlock ();
346
347                         shared_ptr<Data> encoded;
348
349                         /* We need to encode this input */
350                         if (server) {
351                                 try {
352                                         encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
353
354                                         if (remote_backoff > 0) {
355                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
356                                         }
357
358                                         /* This job succeeded, so remove any backoff */
359                                         remote_backoff = 0;
360
361                                 } catch (std::exception& e) {
362                                         if (remote_backoff < 60) {
363                                                 /* back off more */
364                                                 remote_backoff += 10;
365                                         }
366                                         LOG_ERROR (
367                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
368                                                 vf.index(), server->host_name(), e.what(), remote_backoff
369                                                 );
370                                 }
371
372                         } else {
373                                 if (_context) {
374                                         if (!_context->launch(vf, config->selected_gpu()) || !_context->schedule_compress(vf)) {
375                                                 LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
376                                                 _queue.push_front(vf);
377                                         }
378
379                                 } else {
380                                         try {
381                                                 LOG_TIMING("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
382                                                 encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
383                                                 LOG_TIMING("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
384                                         } catch (std::exception& e) {
385                                                 /* This is very bad, so don't cope with it, just pass it on */
386                                                 LOG_ERROR(N_("Local encode failed (%1)"), e.what());
387                                                 throw;
388                                         }
389                                 }
390                         }
391
392                         if (encoded) {
393                                 _writer.write(encoded, vf.index(), vf.eyes());
394                                 frame_done ();
395                         } else {
396                                 if (!Config::instance()->enable_gpu()) {
397                                         lock.lock();
398                                         LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
399                                         _queue.push_front(vf);
400                                         lock.unlock();
401                                 }
402                         }
403                 }
404
405                 if (remote_backoff > 0) {
406                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
407                 }
408
409                 /* The queue might not be full any more, so notify anything that is waiting on that */
410                 lock.lock ();
411                 _full_condition.notify_all ();
412         }
413 }
414 catch (boost::thread_interrupted& e) {
415         /* Ignore these and just stop the thread */
416         _full_condition.notify_all ();
417 }
418 catch (...)
419 {
420         store_current ();
421         /* Wake anything waiting on _full_condition so it can see the exception */
422         _full_condition.notify_all ();
423 }
424
425
426 void
427 J2KEncoder::servers_list_changed ()
428 {
429         boost::mutex::scoped_lock lm (_threads_mutex);
430
431         terminate_threads ();
432         _threads = make_shared<boost::thread_group>();
433
434         /* XXX: could re-use threads */
435
436         if (!Config::instance()->only_servers_encode ()) {
437                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
438 #ifdef DCPOMATIC_LINUX
439                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
440                         pthread_setname_np (t->native_handle(), "encode-worker");
441 #else
442                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
443 #endif
444                 }
445         }
446
447         for (auto i: EncodeServerFinder::instance()->servers()) {
448                 if (!i.current_link_version()) {
449                         continue;
450                 }
451
452                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
453                 for (int j = 0; j < i.threads(); ++j) {
454                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
455                 }
456         }
457
458         _writer.set_encoder_threads(_threads->size());
459 }