Merge remote-tracking branch 'origin/main' into v2.17.x
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23
24 #include "../config.h"
25 #include "../dcp_video.h"
26 #include "../film.h"
27 #include "../log.h"
28 #include "../dcpomatic_log.h"
29 #include "../writer.h"
30 #include "messenger.h"
31 #include <dcp/array_data.h>
32 #include <boost/filesystem.hpp>
33
34
35 static std::mutex launchMutex;
36
37 namespace grk_plugin
38 {
39
40 struct GrokLogger : public MessengerLogger {
41         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
42         {}
43         virtual ~GrokLogger() = default;
44         void info(const char* fmt, ...) override{
45                 va_list arg;
46                 va_start(arg, fmt);
47                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
48                 va_end(arg);
49         }
50         void warn(const char* fmt, ...) override{
51                 va_list arg;
52                 va_start(arg, fmt);
53                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
54                 va_end(arg);
55         }
56         void error(const char* fmt, ...) override{
57                 va_list arg;
58                 va_start(arg, fmt);
59                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
60                 va_end(arg);
61         }
62 };
63
64 struct FrameProxy {
65         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
66         {}
67         int index() const {
68                 return index_;
69         }
70         Eyes eyes(void) const {
71                 return eyes_;
72         }
73         int index_;
74         Eyes eyes_;
75         DCPVideo vf;
76 };
77
78 struct DcpomaticContext
79 {
80         DcpomaticContext(
81                 std::shared_ptr<const Film> film_,
82                 Writer& writer_,
83                 EventHistory& history_,
84                 boost::filesystem::path const& location_
85                 )
86                 : film(film_)
87                 , writer(writer_)
88                 , history(history_)
89                 , location(location_)
90         {
91
92         }
93
94         void set_dimensions(uint32_t w, uint32_t h)
95         {
96                 width = w;
97                 height = h;
98         }
99
100         std::shared_ptr<const Film> film;
101         Writer& writer;
102         EventHistory& history;
103         boost::filesystem::path location;
104         uint32_t width = 0;
105         uint32_t height = 0;
106 };
107
108
109 class GrokContext
110 {
111 public:
112         explicit GrokContext(DcpomaticContext* dcpomatic_context)
113                 : _dcpomatic_context(dcpomatic_context)
114         {
115                 auto grok = Config::instance()->grok().get_value_or({});
116                 if (!grok.enable) {
117                         return;
118                 }
119
120                 boost::filesystem::path folder(_dcpomatic_context->location);
121                 boost::filesystem::path binary_path = folder / "grk_compress";
122                 if (!boost::filesystem::exists(binary_path)) {
123                         getMessengerLogger()->error(
124                                 "Invalid binary location %s", _dcpomatic_context->location.c_str()
125                                 );
126                         return;
127                 }
128
129                 auto proc = [this](const std::string& str) {
130                         try {
131                                 Msg msg(str);
132                                 auto tag = msg.next();
133                                 if (tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED) {
134                                         auto clientFrameId = msg.nextUint();
135                                         msg.nextUint(); // compressed frame ID
136                                         auto compressedFrameLength = msg.nextUint();
137                                         auto processor = [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength) {
138                                                 auto compressed_data = std::make_shared<dcp::ArrayData>(compressed, compressedFrameLength);
139                                                 _dcpomatic_context->writer.write(compressed_data, srcFrame.index(), srcFrame.eyes());
140                                                 frame_done ();
141                                         };
142
143                                         int const minimum_size = 16384;
144
145                                         bool needsRecompression = compressedFrameLength < minimum_size;
146                                         _messenger->processCompressed(str, processor, needsRecompression);
147
148                                         if (needsRecompression) {
149                                                 auto fp = _messenger->retrieve(clientFrameId);
150                                                 if (!fp) {
151                                                         return;
152                                                 }
153
154                                                 auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
155                                                 _dcpomatic_context->writer.write(encoded, fp->vf.index(), fp->vf.eyes());
156                                                 frame_done ();
157                                         }
158                                 }
159                         } catch (std::exception& ex) {
160                                 getMessengerLogger()->error("%s",ex.what());
161                         }
162                 };
163
164                 auto clientInit = MessengerInit(
165                         clientToGrokMessageBuf,
166                         clientSentSynch,
167                         grokReceiveReadySynch,
168                         grokToClientMessageBuf,
169                         grokSentSynch,
170                         clientReceiveReadySynch,
171                         proc,
172                         std::thread::hardware_concurrency()
173                         );
174
175                 _messenger = new ScheduledMessenger<FrameProxy>(clientInit);
176         }
177
178         ~GrokContext()
179         {
180                 shutdown();
181         }
182
183         bool launch(DCPVideo dcpv, int device)
184         {
185                 namespace fs = boost::filesystem;
186
187                 if (!_messenger) {
188                         return false;
189                 }
190                 if (_launched) {
191                         return true;
192                 }
193                 if (_launch_failed) {
194                         return false;
195                 }
196
197                 std::unique_lock<std::mutex> lk_global(launchMutex);
198
199                 if (!_messenger) {
200                         return false;
201                 }
202                 if (_launched) {
203                         return true;
204                 }
205                 if (_launch_failed) {
206                         return false;
207                 }
208
209                 if (MessengerInit::firstLaunch(true)) {
210
211                         if (!fs::exists(_dcpomatic_context->location) || !fs::is_directory(_dcpomatic_context->location)) {
212                                 getMessengerLogger()->error("Invalid directory %s", _dcpomatic_context->location.c_str());
213                                 return false;
214                         }
215
216                         auto s = dcpv.get_size();
217                         _dcpomatic_context->set_dimensions(s.width, s.height);
218                         auto grok = Config::instance()->grok().get_value_or({});
219                         if (!_messenger->launchGrok(
220                                         _dcpomatic_context->location,
221                                         _dcpomatic_context->width,
222                                         _dcpomatic_context->width,
223                                         _dcpomatic_context->height,
224                                         3,
225                                         12,
226                                         device,
227                                         _dcpomatic_context->film->resolution() == Resolution::FOUR_K,
228                                         _dcpomatic_context->film->video_frame_rate(),
229                                         _dcpomatic_context->film->j2k_bandwidth(),
230                                         grok.licence_server,
231                                         grok.licence_port,
232                                         grok.licence)) {
233                                 _launch_failed = true;
234                                 return false;
235                         }
236                 }
237
238                 _launched = _messenger->waitForClientInit();
239                 _launch_failed = _launched;
240
241                 return _launched;
242         }
243
244         bool scheduleCompress(DCPVideo const& vf)
245         {
246                 if (!_messenger) {
247                         return false;
248                 }
249
250                 auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
251                 auto cvt = [this, &fp](BufferSrc src) {
252                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
253                 };
254
255                 return _messenger->scheduleCompress(fp, cvt);
256         }
257
258         void shutdown()
259         {
260                 if (!_messenger) {
261                         return;
262                 }
263
264                 std::unique_lock<std::mutex> lk_global(launchMutex);
265
266                 if (!_messenger) {
267                         return;
268                 }
269
270                 if (_launched) {
271                         _messenger->shutdown();
272                 }
273
274                 delete _messenger;
275                 _messenger = nullptr;
276         }
277
278         void frame_done()
279         {
280                 _dcpomatic_context->history.event();
281         }
282
283 private:
284         DcpomaticContext* _dcpomatic_context;
285         ScheduledMessenger<FrameProxy>* _messenger = nullptr;
286         bool _launched = false;
287         bool _launch_failed = false;
288 };
289
290 }
291