8c7a1ef1be60c8c69babcc68d9bb03a450eb315c
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #include "grok_j2k_encoder_thread.h"
37 #include "remote_j2k_encoder_thread.h"
38 #include "j2k_encoder.h"
39 #include "log.h"
40 #include "player_video.h"
41 #include "util.h"
42 #include "writer.h"
43 #include <libcxml/cxml.h>
44 #include <iostream>
45
46 #include "i18n.h"
47
48
49 using std::cout;
50 using std::dynamic_pointer_cast;
51 using std::exception;
52 using std::list;
53 using std::make_shared;
54 using std::shared_ptr;
55 using std::weak_ptr;
56 using boost::optional;
57 using dcp::Data;
58 using namespace dcpomatic;
59
60
61 static grk_plugin::GrokInitializer grokInitializer;
62
63 /** @param film Film that we are encoding.
64  *  @param writer Writer that we are using.
65  */
66 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
67         : _film (film)
68         , _history (200)
69         , _writer (writer)
70         , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location())
71         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
72 {
73 }
74
75
76 J2KEncoder::~J2KEncoder ()
77 {
78         _server_found_connection.disconnect();
79
80         terminate_threads();
81
82         delete _context;
83 }
84
85
86 void
87 J2KEncoder::servers_list_changed()
88 {
89         auto config = Config::instance();
90
91         auto const cpu = (config->enable_gpu() || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
92         auto const gpu = config->enable_gpu() ? config->master_encoding_threads() : 0;
93
94         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
95 }
96
97
98 void
99 J2KEncoder::begin ()
100 {
101         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
102                 boost::bind(&J2KEncoder::servers_list_changed, this)
103                 );
104         servers_list_changed ();
105 }
106
107
108 void
109 J2KEncoder::pause()
110 {
111         if (!Config::instance()->enable_gpu()) {
112                 return;
113         }
114
115         terminate_threads ();
116
117         /* Something might have been thrown during terminate_threads */
118         rethrow ();
119
120         delete _context;
121         _context = nullptr;
122 }
123
124
125 void J2KEncoder::resume()
126 {
127         if (!Config::instance()->enable_gpu()) {
128                 return;
129         }
130
131         _context = new grk_plugin::GrokContext(_dcpomatic_context);
132         servers_list_changed();
133 }
134
135
136 void
137 J2KEncoder::end()
138 {
139         boost::mutex::scoped_lock lock (_queue_mutex);
140
141         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
142
143         /* Keep waking workers until the queue is empty */
144         while (!_queue.empty ()) {
145                 rethrow ();
146                 _full_condition.wait (lock);
147         }
148         lock.unlock ();
149
150         LOG_GENERAL_NC (N_("Terminating encoder threads"));
151
152         terminate_threads ();
153
154         /* Something might have been thrown during terminate_threads */
155         rethrow ();
156
157         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
158
159         /* The following sequence of events can occur in the above code:
160              1. a remote worker takes the last image off the queue
161              2. the loop above terminates
162              3. the remote worker fails to encode the image and puts it back on the queue
163              4. the remote worker is then terminated by terminate_threads
164
165              So just mop up anything left in the queue here.
166         */
167         for (auto & i: _queue) {
168                 if (Config::instance()->enable_gpu ()) {
169                         if (!_context->scheduleCompress(i)){
170                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
171                                 // handle error
172                         }
173                 }
174                 else {
175                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
176                         try {
177                                 _writer.write(
178                                         make_shared<dcp::ArrayData>(i.encode_locally()),
179                                         i.index(),
180                                         i.eyes()
181                                         );
182                                 frame_done ();
183                         } catch (std::exception& e) {
184                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
185                         }
186                 }
187         }
188
189         delete _context;
190         _context = nullptr;
191 }
192
193
194 /** @return an estimate of the current number of frames we are encoding per second,
195  *  if known.
196  */
197 optional<float>
198 J2KEncoder::current_encoding_rate () const
199 {
200         return _history.rate ();
201 }
202
203
204 /** @return Number of video frames that have been queued for encoding */
205 int
206 J2KEncoder::video_frames_enqueued () const
207 {
208         if (!_last_player_video_time) {
209                 return 0;
210         }
211
212         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
213 }
214
215
216 /** Should be called when a frame has been encoded successfully */
217 void
218 J2KEncoder::frame_done ()
219 {
220         _history.event ();
221 }
222
223
224 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
225  *  so each time the supplied frame is the one after the previous one.
226  *  pv represents one video frame, and could be empty if there is nothing to encode
227  *  for this DCP frame.
228  *
229  *  @param pv PlayerVideo to encode.
230  *  @param time Time of \p pv within the DCP.
231  */
232 void
233 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
234 {
235         _waker.nudge ();
236
237         size_t threads = 0;
238         {
239                 boost::mutex::scoped_lock lm (_threads_mutex);
240                 threads = _threads.size();
241         }
242
243         boost::mutex::scoped_lock queue_lock (_queue_mutex);
244
245         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
246            when there are no threads.
247         */
248         while (_queue.size() >= (threads * 2) + 1) {
249                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
250                 _full_condition.wait (queue_lock);
251                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
252         }
253
254         _writer.rethrow();
255         /* Re-throw any exception raised by one of our threads.  If more
256            than one has thrown an exception, only one will be rethrown, I think;
257            but then, if that happens something has gone badly wrong.
258         */
259         rethrow ();
260
261         auto const position = time.frames_floor(_film->video_frame_rate());
262
263         if (_writer.can_fake_write(position)) {
264                 /* We can fake-write this frame */
265                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
266                 _writer.fake_write(position, pv->eyes ());
267                 frame_done ();
268         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
269                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
270                 /* This frame already has J2K data, so just write it */
271                 _writer.write(pv->j2k(), position, pv->eyes ());
272                 frame_done ();
273         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
274                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
275                 _writer.repeat(position, pv->eyes());
276         } else {
277                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
278                 /* Queue this new frame for encoding */
279                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
280                 auto dcpv = DCPVideo(
281                                 pv,
282                                 position,
283                                 _film->video_frame_rate(),
284                                 _film->j2k_bandwidth(),
285                                 _film->resolution()
286                                 );
287                 _queue.push_back (dcpv);
288
289                 /* The queue might not be empty any more, so notify anything which is
290                    waiting on that.
291                 */
292                 _empty_condition.notify_all ();
293         }
294
295         _last_player_video[pv->eyes()] = pv;
296         _last_player_video_time = time;
297 }
298
299
300 void
301 J2KEncoder::terminate_threads ()
302 {
303         boost::mutex::scoped_lock lm(_threads_mutex);
304         boost::this_thread::disable_interruption dis;
305
306         for (auto& thread: _threads) {
307                 thread->stop();
308         }
309
310         _threads.clear();
311         _ending = true;
312 }
313
314
315 void
316 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
317 {
318         boost::mutex::scoped_lock lm (_threads_mutex);
319         if (_ending) {
320                 return;
321         }
322
323         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
324                 for (auto i = wanted; i < current; ++i) {
325                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
326                         if (iter != _threads.end()) {
327                                 (*iter)->stop();
328                                 _threads.erase(iter);
329                         }
330                 }
331         };
332
333
334         /* CPU */
335
336         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
337                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
338         };
339
340         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
341
342         for (auto i = current_cpu_threads; i < cpu; ++i) {
343                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
344                 thread->start();
345                 _threads.push_back(thread);
346         }
347
348         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
349
350
351         /* GPU */
352
353         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
354                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
355         };
356
357         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
358
359         for (auto i = current_gpu_threads; i < gpu; ++i) {
360                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
361                 thread->start();
362                 _threads.push_back(thread);
363         }
364
365         remove_threads(gpu, current_gpu_threads, is_grok_thread);
366
367
368         /* Remote */
369
370         for (auto const& server: servers) {
371                 if (!server.current_link_version()) {
372                         continue;
373                 }
374
375                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
376                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
377                         return remote && remote->server().host_name() == server.host_name();
378                 };
379
380                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
381
382                 auto const wanted_threads = server.threads();
383
384                 if (wanted_threads > current_threads) {
385                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
386                 } else if (wanted_threads < current_threads) {
387                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
388                 }
389
390                 for (auto i = current_threads; i < wanted_threads; ++i) {
391                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
392                         thread->start();
393                         _threads.push_back(thread);
394                 }
395
396                 remove_threads(wanted_threads, current_threads, is_remote_thread);
397         }
398
399         _writer.set_encoder_threads(_threads.size());
400 }
401
402
403 DCPVideo
404 J2KEncoder::pop()
405 {
406         boost::mutex::scoped_lock lock(_queue_mutex);
407         while (_queue.empty()) {
408                 _empty_condition.wait (lock);
409         }
410
411         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
412
413         auto vf = _queue.front();
414         _queue.pop_front();
415
416         _full_condition.notify_all();
417         return vf;
418 }
419
420
421 void
422 J2KEncoder::retry(DCPVideo video)
423 {
424         boost::mutex::scoped_lock lock(_queue_mutex);
425         _queue.push_front(video);
426         _empty_condition.notify_all();
427 }
428
429
430 void
431 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
432 {
433         _writer.write(data, index, eyes);
434         frame_done();
435 }