cb913f58db5a2b2b0bb0143ec7b06deffe0e1089
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23 #include "../config.h"
24 #include "../dcp_video.h"
25 #include "../log.h"
26 #include "../dcpomatic_log.h"
27 #include "../writer.h"
28 #include "messenger.h"
29
30 class Film;
31 using dcp::Data;
32 using namespace dcpomatic;
33
34 static std::mutex launchMutex;
35
36 namespace grk_plugin
37 {
38
39 struct GrokLogger : public MessengerLogger {
40         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
41         {}
42         virtual ~GrokLogger() = default;
43         void info(const char* fmt, ...) override{
44                 va_list arg;
45                 va_start(arg, fmt);
46                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
47                 va_end(arg);
48         }
49         void warn(const char* fmt, ...) override{
50                 va_list arg;
51                 va_start(arg, fmt);
52                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
53                 va_end(arg);
54         }
55         void error(const char* fmt, ...) override{
56                 va_list arg;
57                 va_start(arg, fmt);
58                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
59                 va_end(arg);
60         }
61 };
62
63 struct GrokInitializer {
64         GrokInitializer(void) {
65                 setMessengerLogger(new GrokLogger("[GROK] "));
66         }
67         ~GrokInitializer()  = default;
68 };
69
70 struct FrameProxy {
71         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
72         {}
73         int index() const {
74                 return index_;
75         }
76         Eyes eyes(void) const {
77                 return eyes_;
78         }
79         int index_;
80         Eyes eyes_;
81         DCPVideo vf;
82 };
83
84 struct DcpomaticContext {
85         DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
86                                                 EventHistory &history, const std::string &location) :
87                                                                         film_(film), writer_(writer),
88                                                                         history_(history), location_(location),
89                                                                         width_(0), height_(0)
90         {}
91         void setDimensions(uint32_t w, uint32_t h) {
92                 width_ = w;
93                 height_ = h;
94         }
95         std::shared_ptr<const Film> film_;
96         Writer& writer_;
97         EventHistory &history_;
98         std::string location_;
99         uint32_t width_;
100         uint32_t height_;
101 };
102
103 class GrokContext {
104 public:
105         explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
106                                                                 dcpomaticContext_(dcpomaticContext),
107                                                                 messenger_(nullptr),
108                                                                 launched_(false)
109         {
110                 struct CompressedData : public dcp::Data {
111                         explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
112                         {}
113                         ~CompressedData(void){
114                                 delete[] data_;
115                         }
116                         uint8_t const * data () const override {
117                                 return data_;
118                         }
119                         uint8_t * data () override {
120                                 return data_;
121                         }
122                         int size () const override {
123                                 return dataLen_;
124                         }
125                         uint8_t *data_;
126                         int dataLen_;
127                 };
128                 if (Config::instance()->enable_gpu ())  {
129                     boost::filesystem::path folder(dcpomaticContext_.location_);
130                     boost::filesystem::path binaryPath = folder / "grk_compress";
131                     if (!boost::filesystem::exists(binaryPath)) {
132                         getMessengerLogger()->error("Invalid binary location %s",
133                                         dcpomaticContext_.location_.c_str());
134                                 return;
135                     }
136                         auto proc = [this](const std::string& str) {
137                                 try {
138                                         Msg msg(str);
139                                         auto tag = msg.next();
140                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
141                                         {
142                                                 auto clientFrameId = msg.nextUint();
143                                                 auto compressedFrameId = msg.nextUint();
144                                                 (void)compressedFrameId;
145                                                 auto compressedFrameLength = msg.nextUint();
146                                                 auto  processor =
147                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
148                                                 {
149                                                         auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
150                                                         memcpy(compressedData->data_,compressed,compressedFrameLength );
151                                                         dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
152                                                         frame_done ();
153                                                 };
154                                                 int const minimum_size = 16384;
155                                                 bool needsRecompression = compressedFrameLength < minimum_size;
156                                                 messenger_->processCompressed(str, processor, needsRecompression);
157                                                 if (needsRecompression) {
158                                                         auto fp = messenger_->retrieve(clientFrameId);
159                                                         if (!fp) {
160                                                                 return;
161                                                         }
162
163                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
164                                                         dcpomaticContext_.writer_.write(encoded, fp->vf.index(), fp->vf.eyes());
165                                                         frame_done ();
166                                                 }
167                                         }
168                                 } catch (std::exception &ex){
169                                         getMessengerLogger()->error("%s",ex.what());
170                                 }
171                         };
172                         auto clientInit =
173                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
174                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
175                                                           std::thread::hardware_concurrency());
176                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
177                 }
178         }
179         ~GrokContext(void) {
180                 shutdown();
181         }
182         bool launch(DCPVideo dcpv, int device){
183                 if (!messenger_ )
184                         return false;
185                 if (launched_)
186                         return true;
187                 std::unique_lock<std::mutex> lk_global(launchMutex);
188                 if (!messenger_)
189                         return false;
190                 if (launched_)
191                         return true;
192                 if (MessengerInit::firstLaunch(true)) {
193                         auto s = dcpv.get_size();
194                         dcpomaticContext_.setDimensions(s.width, s.height);
195                         auto config = Config::instance();
196                         messenger_->launchGrok(dcpomaticContext_.location_,
197                                         dcpomaticContext_.width_,dcpomaticContext_.width_,
198                                         dcpomaticContext_.height_,
199                                         3, 12, device,
200                                         dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
201                                         dcpomaticContext_.film_->video_frame_rate(),
202                                         dcpomaticContext_.film_->j2k_bandwidth(),
203                                         config->gpu_license_server(),
204                                         config->gpu_license_port(),
205                                         config->gpu_license());
206                 }
207                 launched_ =  messenger_->waitForClientInit();
208
209                 return launched_;
210         }
211         bool scheduleCompress(DCPVideo const& vf){
212                 if (!messenger_)
213                         return false;
214
215                 auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
216                 auto cvt = [this, &fp](BufferSrc src){
217                         // xyz conversion
218                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
219                 };
220                 return messenger_->scheduleCompress(fp, cvt);
221         }
222         void shutdown(void){
223                 if (!messenger_)
224                         return;
225
226                 std::unique_lock<std::mutex> lk_global(launchMutex);
227                 if (!messenger_)
228                         return;
229                 if (launched_)
230                         messenger_->shutdown();
231                 delete messenger_;
232                 messenger_ = nullptr;
233         }
234         void frame_done () {
235                 dcpomaticContext_.history_.event ();
236         }
237 private:
238         DcpomaticContext dcpomaticContext_;
239         ScheduledMessenger<FrameProxy> *messenger_;
240         bool launched_;
241 };
242
243 }
244