Tidying.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "j2k_encoder_cpu_backend.h"
28 #include "j2k_encoder_remote_backend.h"
29 #include "j2k_encoder.h"
30 #include "util.h"
31 #include "film.h"
32 #include "log.h"
33 #include "dcpomatic_log.h"
34 #include "config.h"
35 #include "dcp_video.h"
36 #include "cross.h"
37 #include "writer.h"
38 #include "encode_server_finder.h"
39 #include "player.h"
40 #include "player_video.h"
41 #include "encode_server_description.h"
42 #include "compose.hpp"
43 #include <libcxml/cxml.h>
44 #include <iostream>
45
46 #include "i18n.h"
47
48
49 using std::cout;
50 using std::exception;
51 using std::shared_ptr;
52 using std::weak_ptr;
53 using std::make_shared;
54 using boost::optional;
55 using namespace dcpomatic;
56
57
58 /** @param film Film that we are encoding.
59  *  @param writer Writer that we are using.
60  */
61 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
62         : _film (film)
63         , _history (200)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69
70 J2KEncoder::~J2KEncoder ()
71 {
72         boost::mutex::scoped_lock lm (_threads_mutex);
73         terminate_threads ();
74 }
75
76
77 void
78 J2KEncoder::begin ()
79 {
80         auto wp = shared_from_this ();
81         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
82                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
83                 );
84 }
85
86
87 /* We don't want the servers-list-changed callback trying to do things
88    during destruction of J2KEncoder, and I think this is the neatest way
89    to achieve that.
90 */
91 void
92 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
93 {
94         auto e = encoder.lock ();
95         if (e) {
96                 e->servers_list_changed ();
97         }
98 }
99
100
101 void
102 J2KEncoder::end ()
103 {
104         boost::mutex::scoped_lock lock (_queue_mutex);
105
106         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
107
108         /* Keep waking workers until the queue is empty */
109         while (!_queue.empty ()) {
110                 rethrow ();
111                 _empty_condition.notify_all ();
112                 _full_condition.wait (lock);
113         }
114
115         lock.unlock ();
116
117         LOG_GENERAL_NC (N_("Terminating encoder threads"));
118
119         {
120                 boost::mutex::scoped_lock lm (_threads_mutex);
121                 terminate_threads ();
122         }
123
124         /* Something might have been thrown during terminate_threads */
125         rethrow ();
126
127         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
128
129         /* The following sequence of events can occur in the above code:
130              1. a remote worker takes the last image off the queue
131              2. the loop above terminates
132              3. the remote worker fails to encode the image and puts it back on the queue
133              4. the remote worker is then terminated by terminate_threads
134
135              So just mop up anything left in the queue here.
136         */
137
138         J2KEncoderCPUBackend cpu;
139         for (auto const& i: _queue) {
140                 LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
141                 try {
142                         auto enc = cpu.encode({i});
143                         DCPOMATIC_ASSERT (!enc.empty());
144                         _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes());
145                         frame_done ();
146                 } catch (std::exception& e) {
147                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
148                 }
149         }
150 }
151
152
153 /** @return an estimate of the current number of frames we are encoding per second,
154  *  if known.
155  */
156 optional<float>
157 J2KEncoder::current_encoding_rate () const
158 {
159         return _history.rate ();
160 }
161
162
163 /** @return Number of video frames that have been queued for encoding */
164 int
165 J2KEncoder::video_frames_enqueued () const
166 {
167         if (!_last_player_video_time) {
168                 return 0;
169         }
170
171         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
172 }
173
174
175 /** Should be called when a frame has been encoded successfully */
176 void
177 J2KEncoder::frame_done ()
178 {
179         _history.event ();
180 }
181
182
183 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
184  *  so each time the supplied frame is the one after the previous one.
185  *  pv represents one video frame, and could be empty if there is nothing to encode
186  *  for this DCP frame.
187  *
188  *  @param pv PlayerVideo to encode.
189  *  @param time Time of \p pv within the DCP.
190  */
191 void
192 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
193 {
194         _waker.nudge ();
195
196         size_t threads = 0;
197         {
198                 boost::mutex::scoped_lock lm (_threads_mutex);
199                 threads = _threads->size();
200         }
201
202         boost::mutex::scoped_lock queue_lock (_queue_mutex);
203
204         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
205            when there are no threads.
206         */
207         while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) {
208                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
209                 _full_condition.wait (queue_lock);
210                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
211         }
212
213         _writer->rethrow ();
214         /* Re-throw any exception raised by one of our threads.  If more
215            than one has thrown an exception, only one will be rethrown, I think;
216            but then, if that happens something has gone badly wrong.
217         */
218         rethrow ();
219
220         auto const position = time.frames_floor(_film->video_frame_rate());
221
222         if (_writer->can_fake_write (position)) {
223                 /* We can fake-write this frame */
224                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
225                 _writer->fake_write (position, pv->eyes ());
226                 frame_done ();
227         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
228                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
229                 /* This frame already has J2K data, so just write it */
230                 _writer->write (pv->j2k(), position, pv->eyes ());
231                 frame_done ();
232         } else if (_last_player_video[static_cast<int>(pv->eyes())] && _writer->can_repeat(position) && pv->same (_last_player_video[static_cast<int>(pv->eyes())])) {
233                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
234                 _writer->repeat (position, pv->eyes ());
235         } else {
236                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
237                 /* Queue this new frame for encoding */
238                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
239                 _queue.push_back (DCPVideo(
240                                 pv,
241                                 position,
242                                 _film->video_frame_rate(),
243                                 _film->j2k_bandwidth(),
244                                 _film->resolution()
245                                 ));
246
247                 /* The queue might not be empty any more, so notify anything which is
248                    waiting on that.
249                 */
250                 _empty_condition.notify_all ();
251         }
252
253         _last_player_video[static_cast<int>(pv->eyes())] = pv;
254         _last_player_video_time = time;
255 }
256
257
258 /** Caller must hold a lock on _threads_mutex */
259 void
260 J2KEncoder::terminate_threads ()
261 {
262         boost::this_thread::disable_interruption dis;
263
264         if (!_threads) {
265                 return;
266         }
267
268         _threads->interrupt_all ();
269         try {
270                 _threads->join_all ();
271         } catch (exception& e) {
272                 LOG_ERROR ("join() threw an exception: %1", e.what());
273         } catch (...) {
274                 LOG_ERROR_NC ("join() threw an exception");
275         }
276
277         _threads.reset ();
278 }
279
280
281 void
282 J2KEncoder::encoder_thread (shared_ptr<J2KEncoderBackend> backend)
283 try
284 {
285         start_of_thread ("J2KEncoder");
286
287         LOG_TIMING ("start-encoder-thread thread=%1", thread_id());
288
289         while (true) {
290
291                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
292                 boost::mutex::scoped_lock lock (_queue_mutex);
293                 while (_queue.empty ()) {
294                         _empty_condition.wait (lock);
295                 }
296
297                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
298                 auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity()));
299                 std::vector<DCPVideo> vf (_queue.begin(), end);
300
301                 /* We're about to commit to either encoding these frames or putting them back onto the queue,
302                    so we must not be interrupted until one or other of these things have happened.  This
303                    block has thread interruption disabled.
304                 */
305                 {
306                         boost::this_thread::disable_interruption dis;
307
308                         _queue.erase(_queue.begin(), end);
309
310                         lock.unlock ();
311
312                         auto encoded = backend->encode(vf);
313
314                         if (encoded.size() == vf.size()) {
315                                 for (auto i = 0U; i < encoded.size(); ++i) {
316                                         _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes());
317                                         frame_done ();
318                                 }
319                         } else {
320                                 lock.lock ();
321                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size());
322                                 _queue.insert (_queue.begin(), vf.begin(), vf.end());
323                                 lock.unlock ();
324                         }
325                 }
326
327                 /* The queue might not be full any more, so notify anything that is waiting on that */
328                 lock.lock ();
329                 _full_condition.notify_all ();
330         }
331 }
332 catch (boost::thread_interrupted& e) {
333         /* Ignore these and just stop the thread */
334         _full_condition.notify_all ();
335 }
336 catch (...)
337 {
338         store_current ();
339         /* Wake anything waiting on _full_condition so it can see the exception */
340         _full_condition.notify_all ();
341 }
342
343
344 void
345 J2KEncoder::servers_list_changed ()
346 try
347 {
348         boost::mutex::scoped_lock lm (_threads_mutex);
349
350         terminate_threads ();
351         _threads = make_shared<boost::thread_group>();
352
353         _frames_in_parallel = 0;
354
355         /* XXX: could re-use threads */
356
357         if (!Config::instance()->only_servers_encode ()) {
358                 auto backend = std::make_shared<J2KEncoderCPUBackend>();
359                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
360 #ifdef DCPOMATIC_LINUX
361                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
362                         pthread_setname_np (t->native_handle(), "encode-worker");
363 #else
364                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
365 #endif
366                         _frames_in_parallel += backend->quantity();
367                 }
368         }
369
370         for (auto i: EncodeServerFinder::instance()->servers()) {
371                 if (!i.current_link_version()) {
372                         continue;
373                 }
374
375                 auto backend = std::make_shared<J2KEncoderRemoteBackend>(i);
376
377                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
378                 for (int j = 0; j < i.threads(); ++j) {
379                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
380                         _frames_in_parallel += backend->quantity();
381                 }
382         }
383
384         _writer->set_encoder_threads (_threads->size());
385 }
386 catch (...) {
387         terminate_threads ();
388         throw;
389 }
390
391