Remove unnecessary using statements.
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23 #include "../config.h"
24 #include "../dcp_video.h"
25 #include "../film.h"
26 #include "../log.h"
27 #include "../dcpomatic_log.h"
28 #include "../writer.h"
29 #include "messenger.h"
30
31 class Film;
32
33 static std::mutex launchMutex;
34
35 namespace grk_plugin
36 {
37
38 struct GrokLogger : public MessengerLogger {
39         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
40         {}
41         virtual ~GrokLogger() = default;
42         void info(const char* fmt, ...) override{
43                 va_list arg;
44                 va_start(arg, fmt);
45                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
46                 va_end(arg);
47         }
48         void warn(const char* fmt, ...) override{
49                 va_list arg;
50                 va_start(arg, fmt);
51                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
52                 va_end(arg);
53         }
54         void error(const char* fmt, ...) override{
55                 va_list arg;
56                 va_start(arg, fmt);
57                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
58                 va_end(arg);
59         }
60 };
61
62 struct GrokInitializer {
63         GrokInitializer(void) {
64                 setMessengerLogger(new GrokLogger("[GROK] "));
65         }
66         ~GrokInitializer()  = default;
67 };
68
69 struct FrameProxy {
70         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
71         {}
72         int index() const {
73                 return index_;
74         }
75         Eyes eyes(void) const {
76                 return eyes_;
77         }
78         int index_;
79         Eyes eyes_;
80         DCPVideo vf;
81 };
82
83 struct DcpomaticContext {
84         DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
85                                                 EventHistory &history, const std::string &location) :
86                                                                         film_(film), writer_(writer),
87                                                                         history_(history), location_(location),
88                                                                         width_(0), height_(0)
89         {}
90         void setDimensions(uint32_t w, uint32_t h) {
91                 width_ = w;
92                 height_ = h;
93         }
94         std::shared_ptr<const Film> film_;
95         Writer& writer_;
96         EventHistory &history_;
97         std::string location_;
98         uint32_t width_;
99         uint32_t height_;
100 };
101
102 class GrokContext {
103 public:
104         explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
105                                                                 dcpomaticContext_(dcpomaticContext),
106                                                                 messenger_(nullptr),
107                                                                 launched_(false)
108         {
109                 struct CompressedData : public dcp::Data {
110                         explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
111                         {}
112                         ~CompressedData(void){
113                                 delete[] data_;
114                         }
115                         uint8_t const * data () const override {
116                                 return data_;
117                         }
118                         uint8_t * data () override {
119                                 return data_;
120                         }
121                         int size () const override {
122                                 return dataLen_;
123                         }
124                         uint8_t *data_;
125                         int dataLen_;
126                 };
127                 if (Config::instance()->enable_gpu ())  {
128                     boost::filesystem::path folder(dcpomaticContext_.location_);
129                     boost::filesystem::path binaryPath = folder / "grk_compress";
130                     if (!boost::filesystem::exists(binaryPath)) {
131                         getMessengerLogger()->error("Invalid binary location %s",
132                                         dcpomaticContext_.location_.c_str());
133                                 return;
134                     }
135                         auto proc = [this](const std::string& str) {
136                                 try {
137                                         Msg msg(str);
138                                         auto tag = msg.next();
139                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
140                                         {
141                                                 auto clientFrameId = msg.nextUint();
142                                                 auto compressedFrameId = msg.nextUint();
143                                                 (void)compressedFrameId;
144                                                 auto compressedFrameLength = msg.nextUint();
145                                                 auto  processor =
146                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
147                                                 {
148                                                         auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
149                                                         memcpy(compressedData->data_,compressed,compressedFrameLength );
150                                                         dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
151                                                         frame_done ();
152                                                 };
153                                                 int const minimum_size = 16384;
154                                                 bool needsRecompression = compressedFrameLength < minimum_size;
155                                                 messenger_->processCompressed(str, processor, needsRecompression);
156                                                 if (needsRecompression) {
157                                                         auto fp = messenger_->retrieve(clientFrameId);
158                                                         if (!fp) {
159                                                                 return;
160                                                         }
161
162                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
163                                                         dcpomaticContext_.writer_.write(encoded, fp->vf.index(), fp->vf.eyes());
164                                                         frame_done ();
165                                                 }
166                                         }
167                                 } catch (std::exception &ex){
168                                         getMessengerLogger()->error("%s",ex.what());
169                                 }
170                         };
171                         auto clientInit =
172                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
173                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
174                                                           std::thread::hardware_concurrency());
175                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
176                 }
177         }
178         ~GrokContext(void) {
179                 shutdown();
180         }
181         bool launch(DCPVideo dcpv, int device){
182                 if (!messenger_ )
183                         return false;
184                 if (launched_)
185                         return true;
186                 std::unique_lock<std::mutex> lk_global(launchMutex);
187                 if (!messenger_)
188                         return false;
189                 if (launched_)
190                         return true;
191                 if (MessengerInit::firstLaunch(true)) {
192                         auto s = dcpv.get_size();
193                         dcpomaticContext_.setDimensions(s.width, s.height);
194                         auto config = Config::instance();
195                         messenger_->launchGrok(dcpomaticContext_.location_,
196                                         dcpomaticContext_.width_,dcpomaticContext_.width_,
197                                         dcpomaticContext_.height_,
198                                         3, 12, device,
199                                         dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
200                                         dcpomaticContext_.film_->video_frame_rate(),
201                                         dcpomaticContext_.film_->j2k_bandwidth(),
202                                         config->gpu_license_server(),
203                                         config->gpu_license_port(),
204                                         config->gpu_license());
205                 }
206                 launched_ =  messenger_->waitForClientInit();
207
208                 return launched_;
209         }
210         bool scheduleCompress(DCPVideo const& vf){
211                 if (!messenger_)
212                         return false;
213
214                 auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
215                 auto cvt = [this, &fp](BufferSrc src){
216                         // xyz conversion
217                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
218                 };
219                 return messenger_->scheduleCompress(fp, cvt);
220         }
221         void shutdown(void){
222                 if (!messenger_)
223                         return;
224
225                 std::unique_lock<std::mutex> lk_global(launchMutex);
226                 if (!messenger_)
227                         return;
228                 if (launched_)
229                         messenger_->shutdown();
230                 delete messenger_;
231                 messenger_ = nullptr;
232         }
233         void frame_done () {
234                 dcpomaticContext_.history_.event ();
235         }
236 private:
237         DcpomaticContext dcpomaticContext_;
238         ScheduledMessenger<FrameProxy> *messenger_;
239         bool launched_;
240 };
241
242 }
243