00814dcb0fb07bdc2fdaf0e0be5d34684f2d1cd3
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cpu_j2k_frame_encoder.h"
30 #include "cuda_j2k_frame_encoder.h"
31 #include "cross.h"
32 #include "dcp_video.h"
33 #include "dcpomatic_log.h"
34 #include "encode_server_description.h"
35 #include "encode_server_finder.h"
36 #include "film.h"
37 #include "j2k_encoder.h"
38 #include "log.h"
39 #include "player.h"
40 #include "player_video.h"
41 #include "remote_j2k_frame_encoder.h"
42 #include "util.h"
43 #include "writer.h"
44 #include <libcxml/cxml.h>
45 #include <iostream>
46
47 #include "i18n.h"
48
49
50 using std::cout;
51 using std::exception;
52 using std::list;
53 using std::make_shared;
54 using std::shared_ptr;
55 using std::weak_ptr;
56 using boost::optional;
57 using dcp::Data;
58 using namespace dcpomatic;
59
60
61 /** @param film Film that we are encoding.
62  *  @param writer Writer that we are using.
63  */
64 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
65         : _film (film)
66         , _history (200)
67         , _writer (writer)
68 {
69         servers_list_changed ();
70 }
71
72
73 J2KEncoder::~J2KEncoder ()
74 {
75         boost::mutex::scoped_lock lm (_threads_mutex);
76         terminate_threads ();
77 }
78
79
80 void
81 J2KEncoder::begin ()
82 {
83         auto wp = shared_from_this ();
84         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
85                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
86                 );
87 }
88
89
90 /* We don't want the servers-list-changed callback trying to do things
91    during destruction of J2KEncoder, and I think this is the neatest way
92    to achieve that.
93 */
94 void
95 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
96 {
97         auto e = encoder.lock ();
98         if (e) {
99                 e->servers_list_changed ();
100         }
101 }
102
103
104 void
105 J2KEncoder::end ()
106 {
107         boost::mutex::scoped_lock lock (_queue_mutex);
108
109         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
110
111         /* Keep waking workers until the queue is empty */
112         while (!_queue.empty ()) {
113                 rethrow ();
114                 _empty_condition.notify_all ();
115                 _full_condition.wait (lock);
116         }
117
118         for (auto& worker: _workers) {
119                 worker->flush();
120         }
121
122         lock.unlock ();
123
124         LOG_GENERAL_NC (N_("Terminating encoder threads"));
125
126         {
127                 boost::mutex::scoped_lock lm (_threads_mutex);
128                 terminate_threads ();
129         }
130
131         /* Something might have been thrown during terminate_threads */
132         rethrow ();
133
134         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
135
136         /* The following sequence of events can occur in the above code:
137              1. a remote worker takes the last image off the queue
138              2. the loop above terminates
139              3. the remote worker fails to encode the image and puts it back on the queue
140              4. the remote worker is then terminated by terminate_threads
141
142              So just mop up anything left in the queue here.
143         */
144
145         CPUJ2KFrameEncoder cpu;
146         for (auto const& i: _queue) {
147                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
148                 try {
149                         _writer->write(make_shared<dcp::ArrayData>(*cpu.encode(i)), i.index(), i.eyes());
150                         frame_done ();
151                 } catch (std::exception& e) {
152                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
153                 }
154         }
155 }
156
157
158 /** @return an estimate of the current number of frames we are encoding per second,
159  *  if known.
160  */
161 optional<float>
162 J2KEncoder::current_encoding_rate () const
163 {
164         return _history.rate ();
165 }
166
167
168 /** @return Number of video frames that have been queued for encoding */
169 int
170 J2KEncoder::video_frames_enqueued () const
171 {
172         if (!_last_player_video_time) {
173                 return 0;
174         }
175
176         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
177 }
178
179
180 /** Should be called when a frame has been encoded successfully */
181 void
182 J2KEncoder::frame_done ()
183 {
184         _history.event ();
185 }
186
187
188 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
189  *  so each time the supplied frame is the one after the previous one.
190  *  pv represents one video frame, and could be empty if there is nothing to encode
191  *  for this DCP frame.
192  *
193  *  @param pv PlayerVideo to encode.
194  *  @param time Time of \p pv within the DCP.
195  */
196 void
197 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
198 {
199         _waker.nudge ();
200
201         size_t threads = 0;
202         {
203                 boost::mutex::scoped_lock lm (_threads_mutex);
204                 threads = _threads->size();
205         }
206
207         boost::mutex::scoped_lock queue_lock (_queue_mutex);
208
209         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
210            when there are no threads.
211         */
212         while (_queue.size() >= (threads * 2) + 1) {
213                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
214                 _full_condition.wait (queue_lock);
215                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
216         }
217
218         _writer->rethrow ();
219         /* Re-throw any exception raised by one of our threads.  If more
220            than one has thrown an exception, only one will be rethrown, I think;
221            but then, if that happens something has gone badly wrong.
222         */
223         rethrow ();
224
225         auto const position = time.frames_floor(_film->video_frame_rate());
226
227         if (_writer->can_fake_write (position)) {
228                 /* We can fake-write this frame */
229                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
230                 _writer->fake_write (position, pv->eyes ());
231                 frame_done ();
232         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
233                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
234                 /* This frame already has J2K data, so just write it */
235                 _writer->write (pv->j2k(), position, pv->eyes ());
236                 frame_done ();
237         } else if (_last_player_video[static_cast<int>(pv->eyes())] && _writer->can_repeat(position) && pv->same (_last_player_video[static_cast<int>(pv->eyes())])) {
238                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
239                 _writer->repeat (position, pv->eyes ());
240         } else {
241                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
242                 /* Queue this new frame for encoding */
243                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
244                 _queue.push_back (DCPVideo(
245                                 pv,
246                                 position,
247                                 _film->video_frame_rate(),
248                                 _film->j2k_bandwidth(),
249                                 _film->resolution()
250                                 ));
251
252                 /* The queue might not be empty any more, so notify anything which is
253                    waiting on that.
254                 */
255                 _empty_condition.notify_all ();
256         }
257
258         _last_player_video[static_cast<int>(pv->eyes())] = pv;
259         _last_player_video_time = time;
260 }
261
262
263 /** Caller must hold a lock on _threads_mutex */
264 void
265 J2KEncoder::terminate_threads ()
266 {
267         boost::this_thread::disable_interruption dis;
268
269         if (!_threads) {
270                 return;
271         }
272
273         _threads->interrupt_all ();
274         try {
275                 _threads->join_all ();
276         } catch (exception& e) {
277                 LOG_ERROR ("join() threw an exception: %1", e.what());
278         } catch (...) {
279                 LOG_ERROR_NC ("join() threw an exception");
280         }
281
282         _threads.reset ();
283 }
284
285
286 void
287 J2KEncoder::encoder_thread (weak_ptr<J2KFrameEncoder> weak_worker)
288 try
289 {
290         auto worker = weak_worker.lock();
291         if (!worker) {
292                 return;
293         }
294
295         while (true) {
296
297                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
298                 boost::mutex::scoped_lock lock (_queue_mutex);
299                 while (_queue.empty ()) {
300                         _empty_condition.wait (lock);
301                 }
302
303                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
304                 auto vf = _queue.front ();
305
306                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
307                    so we must not be interrupted until one or other of these things have happened.  This
308                    block has thread interruption disabled.
309                 */
310                 {
311                         boost::this_thread::disable_interruption dis;
312
313                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
314                         _queue.pop_front ();
315
316                         lock.unlock ();
317
318                         auto encoded = worker->encode(vf);
319
320                         if (encoded) {
321                                 _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes());
322                                 frame_done ();
323                         } else {
324                                 lock.lock ();
325                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
326                                 _queue.push_front (vf);
327                                 lock.unlock ();
328                         }
329                 }
330
331                 /* The queue might not be full any more, so notify anything that is waiting on that */
332                 lock.lock ();
333                 _full_condition.notify_all ();
334         }
335 }
336 catch (boost::thread_interrupted& e) {
337         /* Ignore these and just stop the thread */
338         _full_condition.notify_all ();
339 }
340 catch (...)
341 {
342         store_current ();
343         /* Wake anything waiting on _full_condition so it can see the exception */
344         _full_condition.notify_all ();
345 }
346
347
348 void
349 J2KEncoder::servers_list_changed ()
350 {
351         boost::mutex::scoped_lock lm (_threads_mutex);
352
353         terminate_threads ();
354         _threads = make_shared<boost::thread_group>();
355
356         /* XXX: could re-use threads */
357
358 #if 0
359         if (!Config::instance()->only_servers_encode ()) {
360                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
361                         auto worker = make_shared<CPUJ2KFrameEncoder>();
362                         _workers.push_back(worker);
363 #ifdef DCPOMATIC_LINUX
364                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
365                         pthread_setname_np (t->native_handle(), "encode-worker");
366 #else
367                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
368 #endif
369                 }
370         }
371
372         for (auto i: EncodeServerFinder::instance()->servers()) {
373                 if (!i.current_link_version()) {
374                         continue;
375                 }
376
377                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
378                 for (int j = 0; j < i.threads(); ++j) {
379                        auto worker = make_shared<RemoteJ2KFrameEncoder>(i);
380                        _workers.push_back(worker);
381                        _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
382                 }
383         }
384 #endif
385         for (int i = 0; i < 16; ++i) {
386                 auto worker = make_shared<CUDAJ2KFrameEncoder>();
387                 _workers.push_back(worker);
388                 _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
389         }
390
391         _writer->set_encoder_threads (_threads->size());
392 }