ecb9d1f98ac3b36e498a6ba46c03321a7183a824
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23
24 #include "../config.h"
25 #include "../dcp_video.h"
26 #include "../film.h"
27 #include "../log.h"
28 #include "../dcpomatic_log.h"
29 #include "../writer.h"
30 #include "messenger.h"
31 #include <dcp/array_data.h>
32 #include <boost/filesystem.hpp>
33
34
35 static std::mutex launchMutex;
36
37 namespace grk_plugin
38 {
39
40 struct GrokLogger : public MessengerLogger {
41         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
42         {}
43         virtual ~GrokLogger() = default;
44         void info(const char* fmt, ...) override{
45                 va_list arg;
46                 va_start(arg, fmt);
47                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
48                 va_end(arg);
49         }
50         void warn(const char* fmt, ...) override{
51                 va_list arg;
52                 va_start(arg, fmt);
53                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
54                 va_end(arg);
55         }
56         void error(const char* fmt, ...) override{
57                 va_list arg;
58                 va_start(arg, fmt);
59                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
60                 va_end(arg);
61         }
62 };
63
64 struct FrameProxy {
65         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
66         {}
67         int index() const {
68                 return index_;
69         }
70         Eyes eyes(void) const {
71                 return eyes_;
72         }
73         int index_;
74         Eyes eyes_;
75         DCPVideo vf;
76 };
77
78 struct DcpomaticContext {
79         DcpomaticContext(
80                 std::shared_ptr<const Film> film,
81                 Writer& writer,
82                 EventHistory& history,
83                 boost::filesystem::path const& location
84                 )
85                 : film_(film)
86                 , writer_(writer)
87                 , history_(history)
88                 , _location(location)
89                 , width_(0)
90                 , height_(0)
91         {
92
93         }
94
95         void setDimensions(uint32_t w, uint32_t h) {
96                 width_ = w;
97                 height_ = h;
98         }
99         std::shared_ptr<const Film> film_;
100         Writer& writer_;
101         EventHistory &history_;
102         boost::filesystem::path _location;
103         uint32_t width_;
104         uint32_t height_;
105 };
106
107
108 class GrokContext
109 {
110 public:
111         explicit GrokContext(DcpomaticContext* dcpomatic_context)
112                 : _dcpomatic_context(dcpomatic_context)
113                 , messenger_(nullptr)
114                 , launched_(false)
115                 , launchFailed_(false)
116         {
117                 if (Config::instance()->enable_gpu ())  {
118                     boost::filesystem::path folder(_dcpomatic_context->_location);
119                     boost::filesystem::path binaryPath = folder / "grk_compress";
120                     if (!boost::filesystem::exists(binaryPath)) {
121                             getMessengerLogger()->error(
122                                     "Invalid binary location %s", _dcpomatic_context->_location.c_str()
123                                     );
124                             return;
125                     }
126                         auto proc = [this](const std::string& str) {
127                                 try {
128                                         Msg msg(str);
129                                         auto tag = msg.next();
130                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
131                                         {
132                                                 auto clientFrameId = msg.nextUint();
133                                                 auto compressedFrameId = msg.nextUint();
134                                                 (void)compressedFrameId;
135                                                 auto compressedFrameLength = msg.nextUint();
136                                                 auto  processor =
137                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
138                                                 {
139                                                         auto compressed_data = std::make_shared<dcp::ArrayData>(compressed, compressedFrameLength);
140                                                         _dcpomatic_context->writer_.write(compressed_data, srcFrame.index(), srcFrame.eyes());
141                                                         frame_done ();
142                                                 };
143                                                 int const minimum_size = 16384;
144                                                 bool needsRecompression = compressedFrameLength < minimum_size;
145                                                 messenger_->processCompressed(str, processor, needsRecompression);
146                                                 if (needsRecompression) {
147                                                         auto fp = messenger_->retrieve(clientFrameId);
148                                                         if (!fp) {
149                                                                 return;
150                                                         }
151
152                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
153                                                         _dcpomatic_context->writer_.write(encoded, fp->vf.index(), fp->vf.eyes());
154                                                         frame_done ();
155                                                 }
156                                         }
157                                 } catch (std::exception &ex){
158                                         getMessengerLogger()->error("%s",ex.what());
159                                 }
160                         };
161                         auto clientInit =
162                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
163                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
164                                                           std::thread::hardware_concurrency());
165                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
166                 }
167         }
168         ~GrokContext(void) {
169                 shutdown();
170         }
171         bool launch(DCPVideo dcpv, int device){
172                 namespace fs = boost::filesystem;
173
174                 if (!messenger_ )
175                         return false;
176                 if (launched_)
177                         return true;
178                 if (launchFailed_)
179                         return false;
180                 std::unique_lock<std::mutex> lk_global(launchMutex);
181                 if (!messenger_)
182                         return false;
183                 if (launched_)
184                         return true;
185                 if (launchFailed_)
186                         return false;
187                 if (MessengerInit::firstLaunch(true)) {
188
189                     if (!fs::exists(_dcpomatic_context->_location) || !fs::is_directory(_dcpomatic_context->_location)) {
190                         getMessengerLogger()->error("Invalid directory %s", _dcpomatic_context->_location.c_str());
191                                 return false;
192                     }
193                         auto s = dcpv.get_size();
194                         _dcpomatic_context->setDimensions(s.width, s.height);
195                         auto config = Config::instance();
196                         if (!messenger_->launchGrok(
197                                         _dcpomatic_context->_location,
198                                         _dcpomatic_context->width_,_dcpomatic_context->width_,
199                                         _dcpomatic_context->height_,
200                                         3, 12, device,
201                                         _dcpomatic_context->film_->resolution() == Resolution::FOUR_K,
202                                         _dcpomatic_context->film_->video_frame_rate(),
203                                         _dcpomatic_context->film_->j2k_bandwidth(),
204                                         config->gpu_license_server(),
205                                         config->gpu_license_port(),
206                                         config->gpu_license())) {
207                                 launchFailed_ = true;
208                                 return false;
209                         }
210                 }
211                 launched_ =  messenger_->waitForClientInit();
212                 launchFailed_ = launched_;
213
214                 return launched_;
215         }
216         bool scheduleCompress(DCPVideo const& vf){
217                 if (!messenger_)
218                         return false;
219
220                 auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
221                 auto cvt = [this, &fp](BufferSrc src){
222                         // xyz conversion
223                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
224                 };
225                 return messenger_->scheduleCompress(fp, cvt);
226         }
227         void shutdown(void){
228                 if (!messenger_)
229                         return;
230
231                 std::unique_lock<std::mutex> lk_global(launchMutex);
232                 if (!messenger_)
233                         return;
234                 if (launched_)
235                         messenger_->shutdown();
236                 delete messenger_;
237                 messenger_ = nullptr;
238         }
239         void frame_done () {
240                 _dcpomatic_context->history_.event();
241         }
242 private:
243         DcpomaticContext* _dcpomatic_context;
244         ScheduledMessenger<FrameProxy> *messenger_;
245         bool launched_;
246         bool launchFailed_;
247 };
248
249 }
250