Fix typo in log message.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "j2k_encoder.h"
36 #include "log.h"
37 #include "player_video.h"
38 #include "util.h"
39 #include "writer.h"
40 #include <libcxml/cxml.h>
41 #include <iostream>
42
43 #include "i18n.h"
44
45
46 using std::cout;
47 using std::exception;
48 using std::list;
49 using std::make_shared;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54 using namespace dcpomatic;
55
56
57 /** @param film Film that we are encoding.
58  *  @param writer Writer that we are using.
59  */
60 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
61         : _film (film)
62         , _history (200)
63         , _writer (writer)
64 {
65         servers_list_changed ();
66 }
67
68
69 J2KEncoder::~J2KEncoder ()
70 {
71         boost::mutex::scoped_lock lm (_threads_mutex);
72         terminate_threads ();
73 }
74
75
76 void
77 J2KEncoder::begin ()
78 {
79         weak_ptr<J2KEncoder> wp = shared_from_this ();
80         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
81                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
82                 );
83 }
84
85
86 /* We don't want the servers-list-changed callback trying to do things
87    during destruction of J2KEncoder, and I think this is the neatest way
88    to achieve that.
89 */
90 void
91 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
92 {
93         auto e = encoder.lock ();
94         if (e) {
95                 e->servers_list_changed ();
96         }
97 }
98
99
100 void
101 J2KEncoder::end ()
102 {
103         boost::mutex::scoped_lock lock (_queue_mutex);
104
105         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
106
107         /* Keep waking workers until the queue is empty */
108         while (!_queue.empty ()) {
109                 rethrow ();
110                 _empty_condition.notify_all ();
111                 _full_condition.wait (lock);
112         }
113
114         lock.unlock ();
115
116         LOG_GENERAL_NC (N_("Terminating encoder threads"));
117
118         {
119                 boost::mutex::scoped_lock lm (_threads_mutex);
120                 terminate_threads ();
121         }
122
123         /* Something might have been thrown during terminate_threads */
124         rethrow ();
125
126         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
127
128         /* The following sequence of events can occur in the above code:
129              1. a remote worker takes the last image off the queue
130              2. the loop above terminates
131              3. the remote worker fails to encode the image and puts it back on the queue
132              4. the remote worker is then terminated by terminate_threads
133
134              So just mop up anything left in the queue here.
135         */
136
137         for (auto const& i: _queue) {
138                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
139                 try {
140                         _writer->write (
141                                 make_shared<dcp::ArrayData>(i.encode_locally()),
142                                 i.index(),
143                                 i.eyes()
144                                 );
145                         frame_done ();
146                 } catch (std::exception& e) {
147                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
148                 }
149         }
150 }
151
152
153 /** @return an estimate of the current number of frames we are encoding per second,
154  *  if known.
155  */
156 optional<float>
157 J2KEncoder::current_encoding_rate () const
158 {
159         return _history.rate ();
160 }
161
162
163 /** @return Number of video frames that have been queued for encoding */
164 int
165 J2KEncoder::video_frames_enqueued () const
166 {
167         if (!_last_player_video_time) {
168                 return 0;
169         }
170
171         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
172 }
173
174
175 /** Should be called when a frame has been encoded successfully */
176 void
177 J2KEncoder::frame_done ()
178 {
179         _history.event ();
180 }
181
182
183 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
184  *  so each time the supplied frame is the one after the previous one.
185  *  pv represents one video frame, and could be empty if there is nothing to encode
186  *  for this DCP frame.
187  *
188  *  @param pv PlayerVideo to encode.
189  *  @param time Time of \p pv within the DCP.
190  */
191 void
192 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
193 {
194         _waker.nudge ();
195
196         size_t threads = 0;
197         {
198                 boost::mutex::scoped_lock lm (_threads_mutex);
199                 threads = _threads->size();
200         }
201
202         boost::mutex::scoped_lock queue_lock (_queue_mutex);
203
204         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
205            when there are no threads.
206         */
207         while (_queue.size() >= (threads * 2) + 1) {
208                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
209                 _full_condition.wait (queue_lock);
210                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
211         }
212
213         _writer->rethrow ();
214         /* Re-throw any exception raised by one of our threads.  If more
215            than one has thrown an exception, only one will be rethrown, I think;
216            but then, if that happens something has gone badly wrong.
217         */
218         rethrow ();
219
220         auto const position = time.frames_floor(_film->video_frame_rate());
221
222         if (_writer->can_fake_write (position)) {
223                 /* We can fake-write this frame */
224                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
225                 _writer->fake_write (position, pv->eyes ());
226                 frame_done ();
227         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
228                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
229                 /* This frame already has J2K data, so just write it */
230                 _writer->write (pv->j2k(), position, pv->eyes ());
231                 frame_done ();
232         } else if (_last_player_video[static_cast<int>(pv->eyes())] && _writer->can_repeat(position) && pv->same (_last_player_video[static_cast<int>(pv->eyes())])) {
233                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
234                 _writer->repeat (position, pv->eyes ());
235         } else {
236                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
237                 /* Queue this new frame for encoding */
238                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
239                 _queue.push_back (DCPVideo(
240                                 pv,
241                                 position,
242                                 _film->video_frame_rate(),
243                                 _film->j2k_bandwidth(),
244                                 _film->resolution()
245                                 ));
246
247                 /* The queue might not be empty any more, so notify anything which is
248                    waiting on that.
249                 */
250                 _empty_condition.notify_all ();
251         }
252
253         _last_player_video[static_cast<int>(pv->eyes())] = pv;
254         _last_player_video_time = time;
255 }
256
257
258 /** Caller must hold a lock on _threads_mutex */
259 void
260 J2KEncoder::terminate_threads ()
261 {
262         boost::this_thread::disable_interruption dis;
263
264         if (!_threads) {
265                 return;
266         }
267
268         _threads->interrupt_all ();
269         try {
270                 _threads->join_all ();
271         } catch (exception& e) {
272                 LOG_ERROR ("join() threw an exception: %1", e.what());
273         } catch (...) {
274                 LOG_ERROR_NC ("join() threw an exception");
275         }
276
277         _threads.reset ();
278 }
279
280
281 void
282 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
283 try
284 {
285         start_of_thread ("J2KEncoder");
286
287         if (server) {
288                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
289         } else {
290                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
291         }
292
293         /* Number of seconds that we currently wait between attempts
294            to connect to the server; not relevant for localhost
295            encodings.
296         */
297         int remote_backoff = 0;
298
299         while (true) {
300
301                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
302                 boost::mutex::scoped_lock lock (_queue_mutex);
303                 while (_queue.empty ()) {
304                         _empty_condition.wait (lock);
305                 }
306
307                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
308                 auto vf = _queue.front ();
309
310                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
311                    so we must not be interrupted until one or other of these things have happened.  This
312                    block has thread interruption disabled.
313                 */
314                 {
315                         boost::this_thread::disable_interruption dis;
316
317                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
318                         _queue.pop_front ();
319
320                         lock.unlock ();
321
322                         shared_ptr<Data> encoded;
323
324                         /* We need to encode this input */
325                         if (server) {
326                                 try {
327                                         encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
328
329                                         if (remote_backoff > 0) {
330                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
331                                         }
332
333                                         /* This job succeeded, so remove any backoff */
334                                         remote_backoff = 0;
335
336                                 } catch (std::exception& e) {
337                                         if (remote_backoff < 60) {
338                                                 /* back off more */
339                                                 remote_backoff += 10;
340                                         }
341                                         LOG_ERROR (
342                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
343                                                 vf.index(), server->host_name(), e.what(), remote_backoff
344                                                 );
345                                 }
346
347                         } else {
348                                 try {
349                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
350                                         encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
351                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
352                                 } catch (std::exception& e) {
353                                         /* This is very bad, so don't cope with it, just pass it on */
354                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
355                                         throw;
356                                 }
357                         }
358
359                         if (encoded) {
360                                 _writer->write (encoded, vf.index(), vf.eyes());
361                                 frame_done ();
362                         } else {
363                                 lock.lock ();
364                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
365                                 _queue.push_front (vf);
366                                 lock.unlock ();
367                         }
368                 }
369
370                 if (remote_backoff > 0) {
371                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
372                 }
373
374                 /* The queue might not be full any more, so notify anything that is waiting on that */
375                 lock.lock ();
376                 _full_condition.notify_all ();
377         }
378 }
379 catch (boost::thread_interrupted& e) {
380         /* Ignore these and just stop the thread */
381         _full_condition.notify_all ();
382 }
383 catch (...)
384 {
385         store_current ();
386         /* Wake anything waiting on _full_condition so it can see the exception */
387         _full_condition.notify_all ();
388 }
389
390
391 void
392 J2KEncoder::servers_list_changed ()
393 {
394         boost::mutex::scoped_lock lm (_threads_mutex);
395
396         terminate_threads ();
397         _threads = make_shared<boost::thread_group>();
398
399         /* XXX: could re-use threads */
400
401         if (!Config::instance()->only_servers_encode ()) {
402                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
403 #ifdef DCPOMATIC_LINUX
404                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
405                         pthread_setname_np (t->native_handle(), "encode-worker");
406 #else
407                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
408 #endif
409                 }
410         }
411
412         for (auto i: EncodeServerFinder::instance()->servers()) {
413                 if (!i.current_link_version()) {
414                         continue;
415                 }
416
417                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
418                 for (int j = 0; j < i.threads(); ++j) {
419                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
420                 }
421         }
422
423         _writer->set_encoder_threads (_threads->size());
424 }