b2513b72e4515172ed44414722eab9783206032c
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "j2k_encoder_cpu_backend.h"
28 #include "j2k_encoder_remote_backend.h"
29 #include "j2k_encoder.h"
30 #include "util.h"
31 #include "film.h"
32 #include "log.h"
33 #include "dcpomatic_log.h"
34 #include "config.h"
35 #include "dcp_video.h"
36 #include "cross.h"
37 #include "writer.h"
38 #include "encode_server_finder.h"
39 #include "player.h"
40 #include "player_video.h"
41 #include "encode_server_description.h"
42 #include "compose.hpp"
43 #include <libcxml/cxml.h>
44 #include <iostream>
45
46 #include "i18n.h"
47
48
49 using std::list;
50 using std::cout;
51 using std::exception;
52 using std::shared_ptr;
53 using std::weak_ptr;
54 using std::make_shared;
55 using boost::optional;
56 using dcp::Data;
57 using namespace dcpomatic;
58
59
60 /** @param film Film that we are encoding.
61  *  @param writer Writer that we are using.
62  */
63 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
64         : _film (film)
65         , _history (200)
66         , _writer (writer)
67 {
68         servers_list_changed ();
69 }
70
71
72 J2KEncoder::~J2KEncoder ()
73 {
74         boost::mutex::scoped_lock lm (_threads_mutex);
75         terminate_threads ();
76 }
77
78
79 void
80 J2KEncoder::begin ()
81 {
82         auto wp = shared_from_this ();
83         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
84                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
85                 );
86 }
87
88
89 /* We don't want the servers-list-changed callback trying to do things
90    during destruction of J2KEncoder, and I think this is the neatest way
91    to achieve that.
92 */
93 void
94 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
95 {
96         auto e = encoder.lock ();
97         if (e) {
98                 e->servers_list_changed ();
99         }
100 }
101
102
103 void
104 J2KEncoder::end ()
105 {
106         boost::mutex::scoped_lock lock (_queue_mutex);
107
108         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
109
110         /* Keep waking workers until the queue is empty */
111         while (!_queue.empty ()) {
112                 rethrow ();
113                 _empty_condition.notify_all ();
114                 _full_condition.wait (lock);
115         }
116
117         lock.unlock ();
118
119         LOG_GENERAL_NC (N_("Terminating encoder threads"));
120
121         {
122                 boost::mutex::scoped_lock lm (_threads_mutex);
123                 terminate_threads ();
124         }
125
126         /* Something might have been thrown during terminate_threads */
127         rethrow ();
128
129         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
130
131         /* The following sequence of events can occur in the above code:
132              1. a remote worker takes the last image off the queue
133              2. the loop above terminates
134              3. the remote worker fails to encode the image and puts it back on the queue
135              4. the remote worker is then terminated by terminate_threads
136
137              So just mop up anything left in the queue here.
138         */
139
140         J2KEncoderCPUBackend cpu;
141         for (auto const& i: _queue) {
142                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
143                 try {
144                         auto enc = cpu.encode({i});
145                         DCPOMATIC_ASSERT (!enc.empty());
146                         _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes());
147                         frame_done ();
148                 } catch (std::exception& e) {
149                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
150                 }
151         }
152 }
153
154
155 /** @return an estimate of the current number of frames we are encoding per second,
156  *  if known.
157  */
158 optional<float>
159 J2KEncoder::current_encoding_rate () const
160 {
161         return _history.rate ();
162 }
163
164
165 /** @return Number of video frames that have been queued for encoding */
166 int
167 J2KEncoder::video_frames_enqueued () const
168 {
169         if (!_last_player_video_time) {
170                 return 0;
171         }
172
173         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
174 }
175
176
177 /** Should be called when a frame has been encoded successfully */
178 void
179 J2KEncoder::frame_done ()
180 {
181         _history.event ();
182 }
183
184
185 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
186  *  so each time the supplied frame is the one after the previous one.
187  *  pv represents one video frame, and could be empty if there is nothing to encode
188  *  for this DCP frame.
189  *
190  *  @param pv PlayerVideo to encode.
191  *  @param time Time of \p pv within the DCP.
192  */
193 void
194 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
195 {
196         _waker.nudge ();
197
198         size_t threads = 0;
199         {
200                 boost::mutex::scoped_lock lm (_threads_mutex);
201                 threads = _threads->size();
202         }
203
204         boost::mutex::scoped_lock queue_lock (_queue_mutex);
205
206         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
207            when there are no threads.
208         */
209         while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) {
210                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
211                 _full_condition.wait (queue_lock);
212                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
213         }
214
215         _writer->rethrow ();
216         /* Re-throw any exception raised by one of our threads.  If more
217            than one has thrown an exception, only one will be rethrown, I think;
218            but then, if that happens something has gone badly wrong.
219         */
220         rethrow ();
221
222         auto const position = time.frames_floor(_film->video_frame_rate());
223
224         if (_writer->can_fake_write (position)) {
225                 /* We can fake-write this frame */
226                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
227                 _writer->fake_write (position, pv->eyes ());
228                 frame_done ();
229         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
230                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
231                 /* This frame already has J2K data, so just write it */
232                 _writer->write (pv->j2k(), position, pv->eyes ());
233                 frame_done ();
234         } else if (_last_player_video[static_cast<int>(pv->eyes())] && _writer->can_repeat(position) && pv->same (_last_player_video[static_cast<int>(pv->eyes())])) {
235                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
236                 _writer->repeat (position, pv->eyes ());
237         } else {
238                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
239                 /* Queue this new frame for encoding */
240                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
241                 _queue.push_back (DCPVideo(
242                                 pv,
243                                 position,
244                                 _film->video_frame_rate(),
245                                 _film->j2k_bandwidth(),
246                                 _film->resolution()
247                                 ));
248
249                 /* The queue might not be empty any more, so notify anything which is
250                    waiting on that.
251                 */
252                 _empty_condition.notify_all ();
253         }
254
255         _last_player_video[static_cast<int>(pv->eyes())] = pv;
256         _last_player_video_time = time;
257 }
258
259
260 /** Caller must hold a lock on _threads_mutex */
261 void
262 J2KEncoder::terminate_threads ()
263 {
264         boost::this_thread::disable_interruption dis;
265
266         if (!_threads) {
267                 return;
268         }
269
270         _threads->interrupt_all ();
271         try {
272                 _threads->join_all ();
273         } catch (exception& e) {
274                 LOG_ERROR ("join() threw an exception: %1", e.what());
275         } catch (...) {
276                 LOG_ERROR_NC ("join() threw an exception");
277         }
278
279         _threads.reset ();
280 }
281
282
283 void
284 J2KEncoder::encoder_thread (shared_ptr<J2KEncoderBackend> backend)
285 try
286 {
287         start_of_thread ("J2KEncoder");
288
289         LOG_TIMING ("start-encoder-thread thread=%1", thread_id());
290
291         while (true) {
292
293                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
294                 boost::mutex::scoped_lock lock (_queue_mutex);
295                 while (_queue.empty ()) {
296                         _empty_condition.wait (lock);
297                 }
298
299                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
300                 auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity()));
301                 std::vector<DCPVideo> vf (_queue.begin(), end);
302
303                 /* We're about to commit to either encoding these frames or putting them back onto the queue,
304                    so we must not be interrupted until one or other of these things have happened.  This
305                    block has thread interruption disabled.
306                 */
307                 {
308                         boost::this_thread::disable_interruption dis;
309
310                         _queue.erase(_queue.begin(), end);
311
312                         lock.unlock ();
313
314                         auto encoded = backend->encode(vf);
315
316                         if (encoded.size() == vf.size()) {
317                                 for (auto i = 0U; i < encoded.size(); ++i) {
318                                         _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes());
319                                         frame_done ();
320                                 }
321                         } else {
322                                 lock.lock ();
323                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size());
324                                 _queue.insert (_queue.begin(), vf.begin(), vf.end());
325                                 lock.unlock ();
326                         }
327                 }
328
329                 /* The queue might not be full any more, so notify anything that is waiting on that */
330                 lock.lock ();
331                 _full_condition.notify_all ();
332         }
333 }
334 catch (boost::thread_interrupted& e) {
335         /* Ignore these and just stop the thread */
336         _full_condition.notify_all ();
337 }
338 catch (...)
339 {
340         store_current ();
341         /* Wake anything waiting on _full_condition so it can see the exception */
342         _full_condition.notify_all ();
343 }
344
345
346 void
347 J2KEncoder::servers_list_changed ()
348 try
349 {
350         boost::mutex::scoped_lock lm (_threads_mutex);
351
352         terminate_threads ();
353         _threads = make_shared<boost::thread_group>();
354
355         _frames_in_parallel = 0;
356
357         /* XXX: could re-use threads */
358
359         if (!Config::instance()->only_servers_encode ()) {
360                 auto backend = std::make_shared<J2KEncoderCPUBackend>();
361                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
362 #ifdef DCPOMATIC_LINUX
363                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
364                         pthread_setname_np (t->native_handle(), "encode-worker");
365 #else
366                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
367 #endif
368                         _frames_in_parallel += backend->quantity();
369                 }
370         }
371
372         for (auto i: EncodeServerFinder::instance()->servers()) {
373                 if (!i.current_link_version()) {
374                         continue;
375                 }
376
377                 auto backend = std::make_shared<J2KEncoderRemoteBackend>(i);
378
379                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
380                 for (int j = 0; j < i.threads(); ++j) {
381                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
382                         _frames_in_parallel += backend->quantity();
383                 }
384         }
385
386         _writer->set_encoder_threads (_threads->size());
387 }
388 catch (...) {
389         terminate_threads ();
390         throw;
391 }
392
393