Move grok headers into src/lib/grok
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23 #include "../config.h"
24 #include "../dcp_video.h"
25 #include "../log.h"
26 #include "../dcpomatic_log.h"
27 #include "../writer.h"
28 #include "messenger.h"
29
30 class Film;
31 using dcp::Data;
32 using namespace dcpomatic;
33
34 static std::mutex launchMutex;
35
36 namespace grk_plugin
37 {
38
39 struct GrokLogger : public MessengerLogger {
40         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
41         {}
42         virtual ~GrokLogger() = default;
43         void info(const char* fmt, ...) override{
44                 va_list arg;
45                 va_start(arg, fmt);
46                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
47                 va_end(arg);
48         }
49         void warn(const char* fmt, ...) override{
50                 va_list arg;
51                 va_start(arg, fmt);
52                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
53                 va_end(arg);
54         }
55         void error(const char* fmt, ...) override{
56                 va_list arg;
57                 va_start(arg, fmt);
58                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
59                 va_end(arg);
60         }
61 };
62
63 struct GrokInitializer {
64         GrokInitializer(void) {
65                 setMessengerLogger(new GrokLogger("[GROK] "));
66         }
67         ~GrokInitializer()  = default;
68 };
69
70 struct FrameProxy {
71         FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
72         {}
73         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
74         {}
75         int index() const {
76                 return index_;
77         }
78         Eyes eyes(void) const {
79                 return eyes_;
80         }
81         int index_;
82         Eyes eyes_;
83         DCPVideo vf;
84 };
85
86 struct DcpomaticContext {
87         DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
88                                                 EventHistory &history, const std::string &location) :
89                                                                         film_(film), writer_(writer),
90                                                                         history_(history), location_(location),
91                                                                         width_(0), height_(0)
92         {}
93         void setDimensions(uint32_t w, uint32_t h) {
94                 width_ = w;
95                 height_ = h;
96         }
97         std::shared_ptr<const Film> film_;
98         Writer& writer_;
99         EventHistory &history_;
100         std::string location_;
101         uint32_t width_;
102         uint32_t height_;
103 };
104
105 class GrokContext {
106 public:
107         explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
108                                                                 dcpomaticContext_(dcpomaticContext),
109                                                                 messenger_(nullptr),
110                                                                 launched_(false)
111         {
112                 struct CompressedData : public dcp::Data {
113                         explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
114                         {}
115                         ~CompressedData(void){
116                                 delete[] data_;
117                         }
118                         uint8_t const * data () const override {
119                                 return data_;
120                         }
121                         uint8_t * data () override {
122                                 return data_;
123                         }
124                         int size () const override {
125                                 return dataLen_;
126                         }
127                         uint8_t *data_;
128                         int dataLen_;
129                 };
130                 if (Config::instance()->enable_gpu ())  {
131                     boost::filesystem::path folder(dcpomaticContext_.location_);
132                     boost::filesystem::path binaryPath = folder / "grk_compress";
133                     if (!boost::filesystem::exists(binaryPath)) {
134                         getMessengerLogger()->error("Invalid binary location %s",
135                                         dcpomaticContext_.location_.c_str());
136                                 return;
137                     }
138                         auto proc = [this](const std::string& str) {
139                                 try {
140                                         Msg msg(str);
141                                         auto tag = msg.next();
142                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
143                                         {
144                                                 auto clientFrameId = msg.nextUint();
145                                                 auto compressedFrameId = msg.nextUint();
146                                                 (void)compressedFrameId;
147                                                 auto compressedFrameLength = msg.nextUint();
148                                                 auto  processor =
149                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
150                                                 {
151                                                         auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
152                                                         memcpy(compressedData->data_,compressed,compressedFrameLength );
153                                                         dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
154                                                         frame_done ();
155                                                 };
156                                                 int const minimum_size = 16384;
157                                                 bool needsRecompression = compressedFrameLength < minimum_size;
158                                                 messenger_->processCompressed(str, processor, needsRecompression);
159                                                 if (needsRecompression) {
160                                                         bool success = false;
161                                                         auto fp = messenger_->retrieve(clientFrameId, success);
162                                                         if (!success)
163                                                                 return;
164
165                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
166                                                         dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
167                                                         frame_done ();
168                                                 }
169                                         }
170                                 } catch (std::exception &ex){
171                                         getMessengerLogger()->error("%s",ex.what());
172                                 }
173                         };
174                         auto clientInit =
175                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
176                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
177                                                           std::thread::hardware_concurrency());
178                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
179                 }
180         }
181         ~GrokContext(void) {
182                 shutdown();
183         }
184         bool launch(DCPVideo dcpv, int device){
185                 if (!messenger_ )
186                         return false;
187                 if (launched_)
188                         return true;
189                 std::unique_lock<std::mutex> lk_global(launchMutex);
190                 if (!messenger_)
191                         return false;
192                 if (launched_)
193                         return true;
194                 if (MessengerInit::firstLaunch(true)) {
195                         auto s = dcpv.get_size();
196                         dcpomaticContext_.setDimensions(s.width, s.height);
197                         auto config = Config::instance();
198                         messenger_->launchGrok(dcpomaticContext_.location_,
199                                         dcpomaticContext_.width_,dcpomaticContext_.width_,
200                                         dcpomaticContext_.height_,
201                                         3, 12, device,
202                                         dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
203                                         dcpomaticContext_.film_->video_frame_rate(),
204                                         dcpomaticContext_.film_->j2k_bandwidth(),
205                                         config->gpu_license_server(),
206                                         config->gpu_license_port(),
207                                         config->gpu_license());
208                 }
209                 launched_ =  messenger_->waitForClientInit();
210
211                 return launched_;
212         }
213         bool scheduleCompress(const DCPVideo &vf){
214                 if (!messenger_)
215                         return false;
216
217                 auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
218                 auto cvt = [this, &fp](BufferSrc src){
219                         // xyz conversion
220                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
221                 };
222                 return messenger_->scheduleCompress(fp, cvt);
223         }
224         void shutdown(void){
225                 if (!messenger_)
226                         return;
227
228                 std::unique_lock<std::mutex> lk_global(launchMutex);
229                 if (!messenger_)
230                         return;
231                 if (launched_)
232                         messenger_->shutdown();
233                 delete messenger_;
234                 messenger_ = nullptr;
235         }
236         void frame_done () {
237                 dcpomaticContext_.history_.event ();
238         }
239 private:
240         DcpomaticContext dcpomaticContext_;
241         ScheduledMessenger<FrameProxy> *messenger_;
242         bool launched_;
243 };
244
245 }
246