Fix encoding, perhaps.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2020 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "j2k_encoder_backend.h"
27 #include "j2k_encoder_cpu_backend.h"
28 #include "j2k_encoder_remote_backend.h"
29 #include "j2k_encoder_fastvideo_backend.h"
30 #include "util.h"
31 #include "film.h"
32 #include "log.h"
33 #include "dcpomatic_log.h"
34 #include "config.h"
35 #include "dcp_video.h"
36 #include "cross.h"
37 #include "writer.h"
38 #include "encode_server_finder.h"
39 #include "player.h"
40 #include "player_video.h"
41 #include "encode_server_description.h"
42 #include "compose.hpp"
43 #include <libcxml/cxml.h>
44 #include <boost/foreach.hpp>
45 #include <iostream>
46
47 #include "i18n.h"
48
49 using std::list;
50 using std::cout;
51 using std::exception;
52 using std::vector;
53 using boost::shared_ptr;
54 using boost::weak_ptr;
55 using boost::optional;
56 using dcp::Data;
57 using namespace dcpomatic;
58
59 /** @param film Film that we are encoding.
60  *  @param writer Writer that we are using.
61  */
62 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
63         : _film (film)
64         , _history (200)
65         , _cpu_backend (new J2KEncoderCPUBackend())
66         , _writer (writer)
67 {
68         servers_list_changed ();
69 }
70
71 J2KEncoder::~J2KEncoder ()
72 {
73         terminate_threads ();
74 }
75
76 void
77 J2KEncoder::begin ()
78 {
79         weak_ptr<J2KEncoder> wp = shared_from_this ();
80         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
81                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
82                 );
83 }
84
85 /* We don't want the servers-list-changed callback trying to do things
86    during destruction of J2KEncoder, and I think this is the neatest way
87    to achieve that.
88 */
89 void
90 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
91 {
92         shared_ptr<J2KEncoder> e = encoder.lock ();
93         if (e) {
94                 e->servers_list_changed ();
95         }
96 }
97
98 void
99 J2KEncoder::end ()
100 {
101         boost::mutex::scoped_lock lock (_queue_mutex);
102
103         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
104
105         /* Keep waking workers until the queue is empty */
106         while (!_queue.empty ()) {
107                 rethrow ();
108                 _empty_condition.notify_all ();
109                 _full_condition.wait (lock);
110         }
111
112         lock.unlock ();
113
114         LOG_GENERAL_NC (N_("Terminating encoder threads"));
115
116         terminate_threads ();
117
118         /* Something might have been thrown during terminate_threads */
119         rethrow ();
120
121         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
122
123         /* The following sequence of events can occur in the above code:
124              1. a remote worker takes the last image off the queue
125              2. the loop above terminates
126              3. the remote worker fails to encode the image and puts it back on the queue
127              4. the remote worker is then terminated by terminate_threads
128
129              So just mop up anything left in the queue here.
130         */
131
132         BOOST_FOREACH (shared_ptr<DCPVideo> i, _queue) {
133                 LOG_GENERAL (N_("Encode left-over frame %1"), i->index());
134                 try {
135                         /* XXX: ewww */
136                         vector<shared_ptr<DCPVideo> > frames;
137                         frames.push_back (i);
138                         _writer->write (
139                                 _cpu_backend->encode(frames).front(),
140                                 i->index(),
141                                 i->frame()->eyes()
142                                 );
143                         frames_done (1);
144                 } catch (std::exception& e) {
145                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
146                 }
147         }
148 }
149
150 /** @return an estimate of the current number of frames we are encoding per second,
151  *  if known.
152  */
153 optional<float>
154 J2KEncoder::current_encoding_rate () const
155 {
156         return _history.rate ();
157 }
158
159 /** @return Number of video frames that have been queued for encoding */
160 int
161 J2KEncoder::video_frames_enqueued () const
162 {
163         if (!_last_player_video_time) {
164                 return 0;
165         }
166
167         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
168 }
169
170 /** Should be called when a frame has been encoded successfully */
171 void
172 J2KEncoder::frames_done (int number)
173 {
174         _history.event (number);
175 }
176
177 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
178  *  so each time the supplied frame is the one after the previous one.
179  *  pv represents one video frame, and could be empty if there is nothing to encode
180  *  for this DCP frame.
181  *
182  *  @param pv PlayerVideo to encode.
183  *  @param time Time of \p pv within the DCP.
184  */
185 void
186 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
187 {
188         _waker.nudge ();
189
190         size_t threads = _threads->size();
191
192         boost::mutex::scoped_lock queue_lock (_queue_mutex);
193
194         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
195            when there are no threads.
196         */
197         while (_queue.size() >= (threads * 2) + 1) {
198                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
199                 _full_condition.wait (queue_lock);
200                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
201         }
202
203         _writer->rethrow ();
204         /* Re-throw any exception raised by one of our threads.  If more
205            than one has thrown an exception, only one will be rethrown, I think;
206            but then, if that happens something has gone badly wrong.
207         */
208         rethrow ();
209
210         Frame const position = time.frames_floor(_film->video_frame_rate());
211
212         if (_writer->can_fake_write (position)) {
213                 /* We can fake-write this frame */
214                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
215                 _writer->fake_write (position, pv->eyes ());
216                 frames_done (1);
217         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
218                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
219                 /* This frame already has J2K data, so just write it */
220                 _writer->write (pv->j2k(), position, pv->eyes ());
221         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
222                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
223                 _writer->repeat (position, pv->eyes ());
224         } else {
225                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
226                 /* Queue this new frame for encoding */
227                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
228                 _queue.push_back (shared_ptr<DCPVideo> (
229                                           new DCPVideo (
230                                                   pv,
231                                                   position,
232                                                   _film->video_frame_rate(),
233                                                   _film->j2k_bandwidth(),
234                                                   _film->resolution()
235                                                   )
236                                           ));
237
238                 /* The queue might not be empty any more, so notify anything which is
239                    waiting on that.
240                 */
241                 _empty_condition.notify_all ();
242         }
243
244         _last_player_video[pv->eyes()] = pv;
245         _last_player_video_time = time;
246 }
247
248 void
249 J2KEncoder::terminate_threads ()
250 {
251         boost::this_thread::disable_interruption dis;
252
253         if (!_threads) {
254                 return;
255         }
256
257         _threads->interrupt_all ();
258         try {
259                 _threads->join_all ();
260         } catch (exception& e) {
261                 LOG_ERROR ("join() threw an exception: %1", e.what());
262         } catch (...) {
263                 LOG_ERROR_NC ("join() threw an exception");
264         }
265
266         _threads.reset ();
267 }
268
269 void
270 J2KEncoder::encoder_thread (shared_ptr<J2KEncoderBackend> backend)
271 try
272 {
273         LOG_TIMING ("start-encoder-thread thread=%1", thread_id());
274
275         while (true) {
276
277                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
278                 boost::mutex::scoped_lock lock (_queue_mutex);
279                 while (static_cast<int>(_queue.size()) < backend->quantity()) {
280                         _empty_condition.wait (lock);
281                 }
282
283                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
284                 vector<shared_ptr<DCPVideo> > video;
285                 for (int i = 0; i < backend->quantity(); ++i) {
286                         video.push_back (_queue.front());
287                         _queue.pop_front();
288                 }
289
290                 /* We're about to commit to either encoding these frame(s) or putting them back onto the queue,
291                    so we must not be interrupted until one or other of these things have happened.  This
292                    block has thread interruption disabled.
293                 */
294                 {
295                         boost::this_thread::disable_interruption dis;
296
297                         LOG_TIMING ("encoder-pop thread=%1 frames=%2 eyes=%3", thread_id(), video.size());
298
299                         lock.unlock ();
300
301                         vector<Data> encoded = backend->encode (video);
302
303                         if (!encoded.empty()) {
304                                 DCPOMATIC_ASSERT (static_cast<int>(encoded.size()) == backend->quantity());
305                                 for (int i = 0; i < backend->quantity(); ++i) {
306                                         _writer->write (encoded[i], video[i]->index(), video[i]->frame()->eyes());
307                                 }
308                                 frames_done (backend->quantity());
309                         } else {
310                                 lock.lock ();
311                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames back onto queue after failure"), thread_id(), video.size());
312                                 for (vector<shared_ptr<DCPVideo> >::const_reverse_iterator i = video.rbegin(); i != video.rend(); ++i) {
313                                         _queue.push_front (*i);
314                                 }
315                                 lock.unlock ();
316                         }
317                 }
318
319                 /* The queue might not be full any more, so notify anything that is waiting on that */
320                 lock.lock ();
321                 _full_condition.notify_all ();
322         }
323 }
324 catch (boost::thread_interrupted& e) {
325         /* Ignore these and just stop the thread */
326         _full_condition.notify_all ();
327 }
328 catch (...)
329 {
330         store_current ();
331         /* Wake anything waiting on _full_condition so it can see the exception */
332         _full_condition.notify_all ();
333 }
334
335 void
336 J2KEncoder::servers_list_changed ()
337 {
338         terminate_threads ();
339         _threads.reset (new boost::thread_group());
340
341         /* XXX: could re-use threads */
342
343         if (!Config::instance()->only_servers_encode ()) {
344                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
345 #ifdef DCPOMATIC_LINUX
346                         boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, _cpu_backend));
347                         pthread_setname_np (t->native_handle(), "encode-worker");
348 #else
349                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, _cpu_backend));
350 #endif
351                 }
352         }
353
354         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
355                 if (!i.current_link_version()) {
356                         continue;
357                 }
358
359                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
360                 shared_ptr<J2KEncoderRemoteBackend> backend (new J2KEncoderRemoteBackend(i));
361                 for (int j = 0; j < i.threads(); ++j) {
362                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
363                 }
364         }
365
366         shared_ptr<J2KEncoderFastvideoBackend> fastvideo(new J2KEncoderFastvideoBackend());
367         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, fastvideo));
368
369         _writer->set_encoder_threads (_threads->size());
370 }