ac6dd296ceb9297737d3dea409eb7ee28563e52d
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "cpu_j2k_encode_worker.h"
27 #include "gpu_j2k_encode_worker.h"
28 #include "remote_j2k_encode_worker.h"
29 #include "util.h"
30 #include "film.h"
31 #include "log.h"
32 #include "dcpomatic_log.h"
33 #include "config.h"
34 #include "dcp_video.h"
35 #include "cross.h"
36 #include "writer.h"
37 #include "encode_server_finder.h"
38 #include "player.h"
39 #include "player_video.h"
40 #include "encode_server_description.h"
41 #include "compose.hpp"
42 #include <libcxml/cxml.h>
43 #include <boost/foreach.hpp>
44 #include <iostream>
45
46 #include "i18n.h"
47
48 using std::list;
49 using std::cout;
50 using std::exception;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55 using namespace dcpomatic;
56
57 /** @param film Film that we are encoding.
58  *  @param writer Writer that we are using.
59  */
60 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
61         : _film (film)
62         , _history (200)
63         , _writer (writer)
64 {
65         servers_list_changed ();
66 }
67
68 J2KEncoder::~J2KEncoder ()
69 {
70         terminate_threads ();
71 }
72
73 void
74 J2KEncoder::begin ()
75 {
76         weak_ptr<J2KEncoder> wp = shared_from_this ();
77         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
78                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
79                 );
80 }
81
82 /* We don't want the servers-list-changed callback trying to do things
83    during destruction of J2KEncoder, and I think this is the neatest way
84    to achieve that.
85 */
86 void
87 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
88 {
89         shared_ptr<J2KEncoder> e = encoder.lock ();
90         if (e) {
91                 e->servers_list_changed ();
92         }
93 }
94
95 void
96 J2KEncoder::end ()
97 {
98         boost::mutex::scoped_lock lock (_queue_mutex);
99
100         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
101
102         /* Keep waking workers until the queue is empty */
103         while (!_queue.empty ()) {
104                 rethrow ();
105                 _empty_condition.notify_all ();
106                 _full_condition.wait (lock);
107         }
108
109         lock.unlock ();
110
111         LOG_GENERAL_NC (N_("Terminating encoder threads"));
112
113         terminate_threads ();
114
115         /* Something might have been thrown during terminate_threads */
116         rethrow ();
117
118         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
119
120         /* The following sequence of events can occur in the above code:
121              1. a remote worker takes the last image off the queue
122              2. the loop above terminates
123              3. the remote worker fails to encode the image and puts it back on the queue
124              4. the remote worker is then terminated by terminate_threads
125
126              So just mop up anything left in the queue here.
127         */
128
129         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
130                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
131                 try {
132                         _writer->write (
133                                 (*i)->encode_locally(),
134                                 (*i)->index(),
135                                 (*i)->eyes()
136                                 );
137                         frame_done ();
138                 } catch (std::exception& e) {
139                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
140                 }
141         }
142 }
143
144 /** @return an estimate of the current number of frames we are encoding per second,
145  *  if known.
146  */
147 optional<float>
148 J2KEncoder::current_encoding_rate () const
149 {
150         return _history.rate ();
151 }
152
153 /** @return Number of video frames that have been queued for encoding */
154 int
155 J2KEncoder::video_frames_enqueued () const
156 {
157         if (!_last_player_video_time) {
158                 return 0;
159         }
160
161         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
162 }
163
164 /** Should be called when a frame has been encoded successfully */
165 void
166 J2KEncoder::frame_done ()
167 {
168         _history.event ();
169 }
170
171 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
172  *  so each time the supplied frame is the one after the previous one.
173  *  pv represents one video frame, and could be empty if there is nothing to encode
174  *  for this DCP frame.
175  *
176  *  @param pv PlayerVideo to encode.
177  *  @param time Time of \p pv within the DCP.
178  */
179 void
180 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
181 {
182         _waker.nudge ();
183
184         size_t threads = _threads->size();
185
186         boost::mutex::scoped_lock queue_lock (_queue_mutex);
187
188         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
189            when there are no threads.
190         */
191         while (_queue.size() >= (threads * 2) + 1) {
192                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
193                 _full_condition.wait (queue_lock);
194                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
195         }
196
197         _writer->rethrow ();
198         /* Re-throw any exception raised by one of our threads.  If more
199            than one has thrown an exception, only one will be rethrown, I think;
200            but then, if that happens something has gone badly wrong.
201         */
202         rethrow ();
203
204         Frame const position = time.frames_floor(_film->video_frame_rate());
205
206         if (_writer->can_fake_write (position)) {
207                 /* We can fake-write this frame */
208                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
209                 _writer->fake_write (position, pv->eyes ());
210                 frame_done ();
211         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
212                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
213                 /* This frame already has J2K data, so just write it */
214                 _writer->write (pv->j2k(), position, pv->eyes ());
215         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
216                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
217                 _writer->repeat (position, pv->eyes ());
218         } else {
219                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
220                 /* Queue this new frame for encoding */
221                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
222                 _queue.push_back (shared_ptr<DCPVideo> (
223                                           new DCPVideo (
224                                                   pv,
225                                                   position,
226                                                   _film->video_frame_rate(),
227                                                   _film->j2k_bandwidth(),
228                                                   _film->resolution()
229                                                   )
230                                           ));
231
232                 /* The queue might not be empty any more, so notify anything which is
233                    waiting on that.
234                 */
235                 _empty_condition.notify_all ();
236         }
237
238         _last_player_video[pv->eyes()] = pv;
239         _last_player_video_time = time;
240 }
241
242 void
243 J2KEncoder::terminate_threads ()
244 {
245         boost::this_thread::disable_interruption dis;
246
247         if (!_threads) {
248                 return;
249         }
250
251         _threads->interrupt_all ();
252         try {
253                 _threads->join_all ();
254         } catch (exception& e) {
255                 LOG_ERROR ("join() threw an exception: %1", e.what());
256         } catch (...) {
257                 LOG_ERROR_NC ("join() threw an exception");
258         }
259
260         _threads.reset ();
261 }
262
263 void
264 J2KEncoder::encoder_thread (weak_ptr<J2KEncodeWorker> weak_worker)
265 try
266 {
267         shared_ptr<J2KEncodeWorker> worker = weak_worker.lock ();
268         if (!worker) {
269                 return;
270         }
271
272         worker->log_thread_start ();
273
274         /* Number of seconds that we currently wait between attempts
275            to connect to the server; not relevant for localhost
276            encodings.
277         */
278         int remote_backoff = 0;
279
280         while (true) {
281
282                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
283                 boost::mutex::scoped_lock lock (_queue_mutex);
284                 while (_queue.empty ()) {
285                         _empty_condition.wait (lock);
286                 }
287
288                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
289                 shared_ptr<DCPVideo> vf = _queue.front ();
290
291                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
292                    so we must not be interrupted until one or other of these things have happened.  This
293                    block has thread interruption disabled.
294                 */
295                 {
296                         boost::this_thread::disable_interruption dis;
297
298                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
299                         _queue.pop_front ();
300
301                         lock.unlock ();
302
303                         optional<Data> encoded = worker->encode (vf);
304
305                         if (encoded) {
306                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
307                                 frame_done ();
308                         } else {
309                                 lock.lock ();
310                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
311                                 _queue.push_front (vf);
312                                 lock.unlock ();
313                         }
314                 }
315
316                 if (remote_backoff > 0) {
317                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
318                 }
319
320                 /* The queue might not be full any more, so notify anything that is waiting on that */
321                 lock.lock ();
322                 _full_condition.notify_all ();
323         }
324 }
325 catch (boost::thread_interrupted& e) {
326         /* Ignore these and just stop the thread */
327         _full_condition.notify_all ();
328 }
329 catch (...)
330 {
331         store_current ();
332         /* Wake anything waiting on _full_condition so it can see the exception */
333         _full_condition.notify_all ();
334 }
335
336 void
337 J2KEncoder::servers_list_changed ()
338 {
339         terminate_threads ();
340         _threads.reset (new boost::thread_group());
341         _workers.clear ();
342
343         /* XXX: could re-use threads */
344
345 #ifdef BOOST_THREAD_PLATFORM_WIN32
346         OSVERSIONINFO info;
347         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
348         GetVersionEx (&info);
349         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
350         if (windows_xp) {
351                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
352         }
353 #endif
354
355         if (!Config::instance()->only_servers_encode ()) {
356                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
357                         shared_ptr<CPUJ2KEncodeWorker> w (new CPUJ2KEncodeWorker());
358                         _workers.push_back (w);
359 #ifdef DCPOMATIC_LINUX
360                         boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, w));
361                         pthread_setname_np (t->native_handle(), "encode-worker");
362 #endif
363 #ifdef DCPOMATIC_OSX
364                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, w));
365 #endif
366 #ifdef DCPOMATIC_WINDOWS
367                         boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, w));
368                         if (windows_xp) {
369                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
370                         }
371 #endif
372                 }
373         }
374
375         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
376                 if (!i.current_link_version()) {
377                         continue;
378                 }
379
380                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
381                 for (int j = 0; j < i.threads(); ++j) {
382                         shared_ptr<RemoteJ2KEncodeWorker> w (new RemoteJ2KEncodeWorker(i));
383                         _workers.push_back (w);
384                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, w));
385                         _threads.push_back (new boost::thread(boost::bind(&J2KEncoder::encoder_thread, this, w)));
386                 }
387         }
388
389         shared_ptr<GPUJ2KEncodeWorker> w (new GPUJ2KEncodeWorker());
390         _workers.push_back (w);
391         _threads.push_back (new boost::thread(boost::bind(&J2KEncoder::encoder_thread, this, w)));
392
393         _writer->set_encoder_threads (_threads.size());
394 }