Explicitly set up Grok logger rather than relying on a static variable.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #include "grok_j2k_encoder_thread.h"
37 #include "remote_j2k_encoder_thread.h"
38 #include "j2k_encoder.h"
39 #include "log.h"
40 #include "player_video.h"
41 #include "util.h"
42 #include "writer.h"
43 #include <libcxml/cxml.h>
44 #include <iostream>
45
46 #include "i18n.h"
47
48
49 using std::cout;
50 using std::dynamic_pointer_cast;
51 using std::exception;
52 using std::list;
53 using std::make_shared;
54 using std::shared_ptr;
55 using std::weak_ptr;
56 using boost::optional;
57 using dcp::Data;
58 using namespace dcpomatic;
59
60
61 /** @param film Film that we are encoding.
62  *  @param writer Writer that we are using.
63  */
64 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
65         : _film (film)
66         , _history (200)
67         , _writer (writer)
68         , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location())
69         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
70 {
71 }
72
73
74 J2KEncoder::~J2KEncoder ()
75 {
76         _server_found_connection.disconnect();
77
78         terminate_threads();
79
80         delete _context;
81 }
82
83
84 void
85 J2KEncoder::servers_list_changed()
86 {
87         auto config = Config::instance();
88
89         auto const cpu = (config->enable_gpu() || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
90         auto const gpu = config->enable_gpu() ? config->master_encoding_threads() : 0;
91
92         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
93 }
94
95
96 void
97 J2KEncoder::begin ()
98 {
99         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
100                 boost::bind(&J2KEncoder::servers_list_changed, this)
101                 );
102         servers_list_changed ();
103 }
104
105
106 void
107 J2KEncoder::pause()
108 {
109         if (!Config::instance()->enable_gpu()) {
110                 return;
111         }
112
113         terminate_threads ();
114
115         /* Something might have been thrown during terminate_threads */
116         rethrow ();
117
118         delete _context;
119         _context = nullptr;
120 }
121
122
123 void J2KEncoder::resume()
124 {
125         if (!Config::instance()->enable_gpu()) {
126                 return;
127         }
128
129         _context = new grk_plugin::GrokContext(_dcpomatic_context);
130         servers_list_changed();
131 }
132
133
134 void
135 J2KEncoder::end()
136 {
137         boost::mutex::scoped_lock lock (_queue_mutex);
138
139         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
140
141         /* Keep waking workers until the queue is empty */
142         while (!_queue.empty ()) {
143                 rethrow ();
144                 _full_condition.wait (lock);
145         }
146         lock.unlock ();
147
148         LOG_GENERAL_NC (N_("Terminating encoder threads"));
149
150         terminate_threads ();
151
152         /* Something might have been thrown during terminate_threads */
153         rethrow ();
154
155         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
156
157         /* The following sequence of events can occur in the above code:
158              1. a remote worker takes the last image off the queue
159              2. the loop above terminates
160              3. the remote worker fails to encode the image and puts it back on the queue
161              4. the remote worker is then terminated by terminate_threads
162
163              So just mop up anything left in the queue here.
164         */
165         for (auto & i: _queue) {
166                 if (Config::instance()->enable_gpu ()) {
167                         if (!_context->scheduleCompress(i)){
168                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
169                                 // handle error
170                         }
171                 }
172                 else {
173                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
174                         try {
175                                 _writer.write(
176                                         make_shared<dcp::ArrayData>(i.encode_locally()),
177                                         i.index(),
178                                         i.eyes()
179                                         );
180                                 frame_done ();
181                         } catch (std::exception& e) {
182                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
183                         }
184                 }
185         }
186
187         delete _context;
188         _context = nullptr;
189 }
190
191
192 /** @return an estimate of the current number of frames we are encoding per second,
193  *  if known.
194  */
195 optional<float>
196 J2KEncoder::current_encoding_rate () const
197 {
198         return _history.rate ();
199 }
200
201
202 /** @return Number of video frames that have been queued for encoding */
203 int
204 J2KEncoder::video_frames_enqueued () const
205 {
206         if (!_last_player_video_time) {
207                 return 0;
208         }
209
210         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
211 }
212
213
214 /** Should be called when a frame has been encoded successfully */
215 void
216 J2KEncoder::frame_done ()
217 {
218         _history.event ();
219 }
220
221
222 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
223  *  so each time the supplied frame is the one after the previous one.
224  *  pv represents one video frame, and could be empty if there is nothing to encode
225  *  for this DCP frame.
226  *
227  *  @param pv PlayerVideo to encode.
228  *  @param time Time of \p pv within the DCP.
229  */
230 void
231 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
232 {
233         _waker.nudge ();
234
235         size_t threads = 0;
236         {
237                 boost::mutex::scoped_lock lm (_threads_mutex);
238                 threads = _threads.size();
239         }
240
241         boost::mutex::scoped_lock queue_lock (_queue_mutex);
242
243         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
244            when there are no threads.
245         */
246         while (_queue.size() >= (threads * 2) + 1) {
247                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
248                 _full_condition.wait (queue_lock);
249                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
250         }
251
252         _writer.rethrow();
253         /* Re-throw any exception raised by one of our threads.  If more
254            than one has thrown an exception, only one will be rethrown, I think;
255            but then, if that happens something has gone badly wrong.
256         */
257         rethrow ();
258
259         auto const position = time.frames_floor(_film->video_frame_rate());
260
261         if (_writer.can_fake_write(position)) {
262                 /* We can fake-write this frame */
263                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
264                 _writer.fake_write(position, pv->eyes ());
265                 frame_done ();
266         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
267                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
268                 /* This frame already has J2K data, so just write it */
269                 _writer.write(pv->j2k(), position, pv->eyes ());
270                 frame_done ();
271         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
272                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
273                 _writer.repeat(position, pv->eyes());
274         } else {
275                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
276                 /* Queue this new frame for encoding */
277                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
278                 auto dcpv = DCPVideo(
279                                 pv,
280                                 position,
281                                 _film->video_frame_rate(),
282                                 _film->j2k_bandwidth(),
283                                 _film->resolution()
284                                 );
285                 _queue.push_back (dcpv);
286
287                 /* The queue might not be empty any more, so notify anything which is
288                    waiting on that.
289                 */
290                 _empty_condition.notify_all ();
291         }
292
293         _last_player_video[pv->eyes()] = pv;
294         _last_player_video_time = time;
295 }
296
297
298 void
299 J2KEncoder::terminate_threads ()
300 {
301         boost::mutex::scoped_lock lm(_threads_mutex);
302         boost::this_thread::disable_interruption dis;
303
304         for (auto& thread: _threads) {
305                 thread->stop();
306         }
307
308         _threads.clear();
309         _ending = true;
310 }
311
312
313 void
314 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
315 {
316         boost::mutex::scoped_lock lm (_threads_mutex);
317         if (_ending) {
318                 return;
319         }
320
321         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
322                 for (auto i = wanted; i < current; ++i) {
323                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
324                         if (iter != _threads.end()) {
325                                 (*iter)->stop();
326                                 _threads.erase(iter);
327                         }
328                 }
329         };
330
331
332         /* CPU */
333
334         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
335                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
336         };
337
338         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
339
340         for (auto i = current_cpu_threads; i < cpu; ++i) {
341                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
342                 thread->start();
343                 _threads.push_back(thread);
344         }
345
346         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
347
348
349         /* GPU */
350
351         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
352                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
353         };
354
355         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
356
357         for (auto i = current_gpu_threads; i < gpu; ++i) {
358                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
359                 thread->start();
360                 _threads.push_back(thread);
361         }
362
363         remove_threads(gpu, current_gpu_threads, is_grok_thread);
364
365
366         /* Remote */
367
368         for (auto const& server: servers) {
369                 if (!server.current_link_version()) {
370                         continue;
371                 }
372
373                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
374                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
375                         return remote && remote->server().host_name() == server.host_name();
376                 };
377
378                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
379
380                 auto const wanted_threads = server.threads();
381
382                 if (wanted_threads > current_threads) {
383                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
384                 } else if (wanted_threads < current_threads) {
385                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
386                 }
387
388                 for (auto i = current_threads; i < wanted_threads; ++i) {
389                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
390                         thread->start();
391                         _threads.push_back(thread);
392                 }
393
394                 remove_threads(wanted_threads, current_threads, is_remote_thread);
395         }
396
397         _writer.set_encoder_threads(_threads.size());
398 }
399
400
401 DCPVideo
402 J2KEncoder::pop()
403 {
404         boost::mutex::scoped_lock lock(_queue_mutex);
405         while (_queue.empty()) {
406                 _empty_condition.wait (lock);
407         }
408
409         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
410
411         auto vf = _queue.front();
412         _queue.pop_front();
413
414         _full_condition.notify_all();
415         return vf;
416 }
417
418
419 void
420 J2KEncoder::retry(DCPVideo video)
421 {
422         boost::mutex::scoped_lock lock(_queue_mutex);
423         _queue.push_front(video);
424         _empty_condition.notify_all();
425 }
426
427
428 void
429 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
430 {
431         _writer.write(data, index, eyes);
432         frame_done();
433 }