47d8b129559b173d7cf23acf8245d1f83221eac3
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cpu_j2k_frame_encoder.h"
30 #include "cross.h"
31 #include "dcp_video.h"
32 #include "dcpomatic_log.h"
33 #include "encode_server_description.h"
34 #include "encode_server_finder.h"
35 #include "film.h"
36 #include "j2k_encoder.h"
37 #include "log.h"
38 #include "player.h"
39 #include "player_video.h"
40 #include "remote_j2k_frame_encoder.h"
41 #include "util.h"
42 #include "writer.h"
43 #include <libcxml/cxml.h>
44 #include <iostream>
45
46 #include "i18n.h"
47
48
49 using std::cout;
50 using std::exception;
51 using std::list;
52 using std::make_shared;
53 using std::shared_ptr;
54 using std::weak_ptr;
55 using boost::optional;
56 using dcp::Data;
57 using namespace dcpomatic;
58
59
60 /** @param film Film that we are encoding.
61  *  @param writer Writer that we are using.
62  */
63 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
64         : _film (film)
65         , _history (200)
66         , _writer (writer)
67 {
68         servers_list_changed ();
69 }
70
71
72 J2KEncoder::~J2KEncoder ()
73 {
74         boost::mutex::scoped_lock lm (_threads_mutex);
75         terminate_threads ();
76 }
77
78
79 void
80 J2KEncoder::begin ()
81 {
82         auto wp = shared_from_this ();
83         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
84                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
85                 );
86 }
87
88
89 /* We don't want the servers-list-changed callback trying to do things
90    during destruction of J2KEncoder, and I think this is the neatest way
91    to achieve that.
92 */
93 void
94 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
95 {
96         auto e = encoder.lock ();
97         if (e) {
98                 e->servers_list_changed ();
99         }
100 }
101
102
103 void
104 J2KEncoder::end ()
105 {
106         boost::mutex::scoped_lock lock (_queue_mutex);
107
108         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
109
110         /* Keep waking workers until the queue is empty */
111         while (!_queue.empty ()) {
112                 rethrow ();
113                 _empty_condition.notify_all ();
114                 _full_condition.wait (lock);
115         }
116
117         lock.unlock ();
118
119         LOG_GENERAL_NC (N_("Terminating encoder threads"));
120
121         {
122                 boost::mutex::scoped_lock lm (_threads_mutex);
123                 terminate_threads ();
124         }
125
126         /* Something might have been thrown during terminate_threads */
127         rethrow ();
128
129         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
130
131         /* The following sequence of events can occur in the above code:
132              1. a remote worker takes the last image off the queue
133              2. the loop above terminates
134              3. the remote worker fails to encode the image and puts it back on the queue
135              4. the remote worker is then terminated by terminate_threads
136
137              So just mop up anything left in the queue here.
138         */
139
140         for (auto const& i: _queue) {
141                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
142                 try {
143                         _writer->write (
144                                 make_shared<dcp::ArrayData>(i.encode_locally()),
145                                 i.index(),
146                                 i.eyes()
147                                 );
148                         frame_done ();
149                 } catch (std::exception& e) {
150                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
151                 }
152         }
153 }
154
155
156 /** @return an estimate of the current number of frames we are encoding per second,
157  *  if known.
158  */
159 optional<float>
160 J2KEncoder::current_encoding_rate () const
161 {
162         return _history.rate ();
163 }
164
165
166 /** @return Number of video frames that have been queued for encoding */
167 int
168 J2KEncoder::video_frames_enqueued () const
169 {
170         if (!_last_player_video_time) {
171                 return 0;
172         }
173
174         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
175 }
176
177
178 /** Should be called when a frame has been encoded successfully */
179 void
180 J2KEncoder::frame_done ()
181 {
182         _history.event ();
183 }
184
185
186 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
187  *  so each time the supplied frame is the one after the previous one.
188  *  pv represents one video frame, and could be empty if there is nothing to encode
189  *  for this DCP frame.
190  *
191  *  @param pv PlayerVideo to encode.
192  *  @param time Time of \p pv within the DCP.
193  */
194 void
195 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
196 {
197         _waker.nudge ();
198
199         size_t threads = 0;
200         {
201                 boost::mutex::scoped_lock lm (_threads_mutex);
202                 threads = _threads->size();
203         }
204
205         boost::mutex::scoped_lock queue_lock (_queue_mutex);
206
207         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
208            when there are no threads.
209         */
210         while (_queue.size() >= (threads * 2) + 1) {
211                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
212                 _full_condition.wait (queue_lock);
213                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
214         }
215
216         _writer->rethrow ();
217         /* Re-throw any exception raised by one of our threads.  If more
218            than one has thrown an exception, only one will be rethrown, I think;
219            but then, if that happens something has gone badly wrong.
220         */
221         rethrow ();
222
223         auto const position = time.frames_floor(_film->video_frame_rate());
224
225         if (_writer->can_fake_write (position)) {
226                 /* We can fake-write this frame */
227                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
228                 _writer->fake_write (position, pv->eyes ());
229                 frame_done ();
230         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
231                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
232                 /* This frame already has J2K data, so just write it */
233                 _writer->write (pv->j2k(), position, pv->eyes ());
234                 frame_done ();
235         } else if (_last_player_video[static_cast<int>(pv->eyes())] && _writer->can_repeat(position) && pv->same (_last_player_video[static_cast<int>(pv->eyes())])) {
236                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
237                 _writer->repeat (position, pv->eyes ());
238         } else {
239                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
240                 /* Queue this new frame for encoding */
241                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
242                 _queue.push_back (DCPVideo(
243                                 pv,
244                                 position,
245                                 _film->video_frame_rate(),
246                                 _film->j2k_bandwidth(),
247                                 _film->resolution()
248                                 ));
249
250                 /* The queue might not be empty any more, so notify anything which is
251                    waiting on that.
252                 */
253                 _empty_condition.notify_all ();
254         }
255
256         _last_player_video[static_cast<int>(pv->eyes())] = pv;
257         _last_player_video_time = time;
258 }
259
260
261 /** Caller must hold a lock on _threads_mutex */
262 void
263 J2KEncoder::terminate_threads ()
264 {
265         boost::this_thread::disable_interruption dis;
266
267         if (!_threads) {
268                 return;
269         }
270
271         _threads->interrupt_all ();
272         try {
273                 _threads->join_all ();
274         } catch (exception& e) {
275                 LOG_ERROR ("join() threw an exception: %1", e.what());
276         } catch (...) {
277                 LOG_ERROR_NC ("join() threw an exception");
278         }
279
280         _threads.reset ();
281 }
282
283
284 void
285 J2KEncoder::encoder_thread (weak_ptr<J2KFrameEncoder> weak_worker)
286 try
287 {
288         auto worker = weak_worker.lock();
289         if (!worker) {
290                 return;
291         }
292
293         /* Number of seconds that we currently wait between attempts
294            to connect to the server; not relevant for localhost
295            encodings.
296         */
297         int remote_backoff = 0;
298
299         while (true) {
300
301                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
302                 boost::mutex::scoped_lock lock (_queue_mutex);
303                 while (_queue.empty ()) {
304                         _empty_condition.wait (lock);
305                 }
306
307                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
308                 auto vf = _queue.front ();
309
310                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
311                    so we must not be interrupted until one or other of these things have happened.  This
312                    block has thread interruption disabled.
313                 */
314                 {
315                         boost::this_thread::disable_interruption dis;
316
317                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
318                         _queue.pop_front ();
319
320                         lock.unlock ();
321
322                         auto encoded = worker->encode(vf);
323
324                         if (encoded) {
325                                 _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes());
326                                 frame_done ();
327                         } else {
328                                 lock.lock ();
329                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
330                                 _queue.push_front (vf);
331                                 lock.unlock ();
332                         }
333                 }
334
335                 if (remote_backoff > 0) {
336                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
337                 }
338
339                 /* The queue might not be full any more, so notify anything that is waiting on that */
340                 lock.lock ();
341                 _full_condition.notify_all ();
342         }
343 }
344 catch (boost::thread_interrupted& e) {
345         /* Ignore these and just stop the thread */
346         _full_condition.notify_all ();
347 }
348 catch (...)
349 {
350         store_current ();
351         /* Wake anything waiting on _full_condition so it can see the exception */
352         _full_condition.notify_all ();
353 }
354
355
356 void
357 J2KEncoder::servers_list_changed ()
358 {
359         boost::mutex::scoped_lock lm (_threads_mutex);
360
361         terminate_threads ();
362         _threads = make_shared<boost::thread_group>();
363
364         /* XXX: could re-use threads */
365
366         if (!Config::instance()->only_servers_encode ()) {
367                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
368                         auto worker = make_shared<CPUJ2KFrameEncoder>();
369                         _workers.push_back(worker);
370 #ifdef DCPOMATIC_LINUX
371                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
372                         pthread_setname_np (t->native_handle(), "encode-worker");
373 #else
374                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
375 #endif
376                 }
377         }
378
379         for (auto i: EncodeServerFinder::instance()->servers()) {
380                 if (!i.current_link_version()) {
381                         continue;
382                 }
383
384                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
385                 for (int j = 0; j < i.threads(); ++j) {
386                        auto worker = make_shared<RemoteJ2KFrameEncoder>(i);
387                        _workers.push_back(worker);
388                        _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
389                 }
390         }
391
392         _writer->set_encoder_threads (_threads->size());
393 }