Patch from Aaron.
[dcpomatic.git] / src / lib / grok_context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23 #include "config.h"
24 #include "log.h"
25 #include "dcpomatic_log.h"
26 #include "writer.h"
27 #include "grok_messenger.h"
28
29 class Film;
30 using dcp::Data;
31 using namespace dcpomatic;
32
33 static std::mutex launchMutex;
34
35 namespace grk_plugin
36 {
37
38 struct GrokLogger : public MessengerLogger {
39         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
40         {}
41         virtual ~GrokLogger() = default;
42         void info(const char* fmt, ...) override{
43                 va_list arg;
44                 va_start(arg, fmt);
45                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
46                 va_end(arg);
47         }
48         void warn(const char* fmt, ...) override{
49                 va_list arg;
50                 va_start(arg, fmt);
51                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
52                 va_end(arg);
53         }
54         void error(const char* fmt, ...) override{
55                 va_list arg;
56                 va_start(arg, fmt);
57                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
58                 va_end(arg);
59         }
60 };
61
62 struct GrokInitializer {
63         GrokInitializer(void) {
64                 setMessengerLogger(new GrokLogger("[GROK] "));
65         }
66         ~GrokInitializer()  = default;
67 };
68
69 struct FrameProxy {
70         FrameProxy(void) : FrameProxy(0,Eyes::LEFT,DCPVideo())
71         {}
72         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
73         {}
74         int index() const {
75                 return index_;
76         }
77         Eyes eyes(void) const {
78                 return eyes_;
79         }
80         int index_;
81         Eyes eyes_;
82         DCPVideo vf;
83 };
84
85 struct DcpomaticContext {
86         DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
87                                                 EventHistory &history, const std::string &location) :
88                                                                         film_(film), writer_(writer),
89                                                                         history_(history), location_(location),
90                                                                         width_(0), height_(0)
91         {}
92         void setDimensions(uint32_t w, uint32_t h) {
93                 width_ = w;
94                 height_ = h;
95         }
96         std::shared_ptr<const Film> film_;
97         Writer& writer_;
98         EventHistory &history_;
99         std::string location_;
100         uint32_t width_;
101         uint32_t height_;
102 };
103
104 class GrokContext {
105 public:
106         explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
107                                                                 dcpomaticContext_(dcpomaticContext),
108                                                                 messenger_(nullptr),
109                                                                 launched_(false)
110         {
111                 struct CompressedData : public dcp::Data {
112                         explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
113                         {}
114                         ~CompressedData(void){
115                                 delete[] data_;
116                         }
117                         uint8_t const * data () const override {
118                                 return data_;
119                         }
120                         uint8_t * data () override {
121                                 return data_;
122                         }
123                         int size () const override {
124                                 return dataLen_;
125                         }
126                         uint8_t *data_;
127                         int dataLen_;
128                 };
129                 if (Config::instance()->enable_gpu ())  {
130                     boost::filesystem::path folder(dcpomaticContext_.location_);
131                     boost::filesystem::path binaryPath = folder / "grk_compress";
132                     if (!boost::filesystem::exists(binaryPath)) {
133                         getMessengerLogger()->error("Invalid binary location %s",
134                                         dcpomaticContext_.location_.c_str());
135                                 return;
136                     }
137                         auto proc = [this](const std::string& str) {
138                                 try {
139                                         Msg msg(str);
140                                         auto tag = msg.next();
141                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
142                                         {
143                                                 auto clientFrameId = msg.nextUint();
144                                                 auto compressedFrameId = msg.nextUint();
145                                                 (void)compressedFrameId;
146                                                 auto compressedFrameLength = msg.nextUint();
147                                                 auto  processor =
148                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
149                                                 {
150                                                         auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
151                                                         memcpy(compressedData->data_,compressed,compressedFrameLength );
152                                                         dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
153                                                         frame_done ();
154                                                 };
155                                                 int const minimum_size = 16384;
156                                                 bool needsRecompression = compressedFrameLength < minimum_size;
157                                                 messenger_->processCompressed(str, processor, needsRecompression);
158                                                 if (needsRecompression) {
159                                                         bool success = false;
160                                                         auto fp = messenger_->retrieve(clientFrameId, success);
161                                                         if (!success)
162                                                                 return;
163
164                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
165                                                         dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
166                                                         frame_done ();
167                                                 }
168                                         }
169                                 } catch (std::exception &ex){
170                                         getMessengerLogger()->error("%s",ex.what());
171                                 }
172                         };
173                         auto clientInit =
174                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
175                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
176                                                           std::thread::hardware_concurrency());
177                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
178                 }
179         }
180         ~GrokContext(void) {
181                 shutdown();
182         }
183         bool launch(DCPVideo dcpv, int device){
184                 if (!messenger_ )
185                         return false;
186                 if (launched_)
187                         return true;
188                 std::unique_lock<std::mutex> lk_global(launchMutex);
189                 if (!messenger_)
190                         return false;
191                 if (launched_)
192                         return true;
193                 if (MessengerInit::firstLaunch(true)) {
194                         auto s = dcpv.get_size();
195                         dcpomaticContext_.setDimensions(s.width, s.height);
196                         auto config = Config::instance();
197                         messenger_->launchGrok(dcpomaticContext_.location_,
198                                         dcpomaticContext_.width_,dcpomaticContext_.width_,
199                                         dcpomaticContext_.height_,
200                                         3, 12, device,
201                                         dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
202                                         dcpomaticContext_.film_->video_frame_rate(),
203                                         dcpomaticContext_.film_->j2k_bandwidth(),
204                                         config->gpu_license_server(),
205                                         config->gpu_license_port(),
206                                         config->gpu_license());
207                 }
208                 launched_ =  messenger_->waitForClientInit();
209
210                 return launched_;
211         }
212         bool scheduleCompress(const DCPVideo &vf){
213                 if (!messenger_)
214                         return false;
215
216                 auto fp = FrameProxy(vf.index(),vf.eyes(),vf);
217                 auto cvt = [this, &fp](BufferSrc src){
218                         // xyz conversion
219                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
220                 };
221                 return messenger_->scheduleCompress(fp, cvt);
222         }
223         void shutdown(void){
224                 if (!messenger_)
225                         return;
226
227                 std::unique_lock<std::mutex> lk_global(launchMutex);
228                 if (!messenger_)
229                         return;
230                 if (launched_)
231                         messenger_->shutdown();
232                 delete messenger_;
233                 messenger_ = nullptr;
234         }
235         void frame_done () {
236                 dcpomaticContext_.history_.event ();
237         }
238 private:
239         DcpomaticContext dcpomaticContext_;
240         ScheduledMessenger<FrameProxy> *messenger_;
241         bool launched_;
242 };
243
244 }
245