3e4144d0778099da8313250b74a0bb78a087a930
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23 #include "../config.h"
24 #include "../dcp_video.h"
25 #include "../film.h"
26 #include "../log.h"
27 #include "../dcpomatic_log.h"
28 #include "../writer.h"
29 #include "messenger.h"
30 #include <dcp/array_data.h>
31
32
33 static std::mutex launchMutex;
34
35 namespace grk_plugin
36 {
37
38 struct GrokLogger : public MessengerLogger {
39         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
40         {}
41         virtual ~GrokLogger() = default;
42         void info(const char* fmt, ...) override{
43                 va_list arg;
44                 va_start(arg, fmt);
45                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
46                 va_end(arg);
47         }
48         void warn(const char* fmt, ...) override{
49                 va_list arg;
50                 va_start(arg, fmt);
51                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
52                 va_end(arg);
53         }
54         void error(const char* fmt, ...) override{
55                 va_list arg;
56                 va_start(arg, fmt);
57                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
58                 va_end(arg);
59         }
60 };
61
62 struct FrameProxy {
63         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
64         {}
65         int index() const {
66                 return index_;
67         }
68         Eyes eyes(void) const {
69                 return eyes_;
70         }
71         int index_;
72         Eyes eyes_;
73         DCPVideo vf;
74 };
75
76 struct DcpomaticContext {
77         DcpomaticContext(
78                 std::shared_ptr<const Film> film,
79                 Writer& writer,
80                 EventHistory& history,
81                 boost::filesystem::path const& location
82                 )
83                 : film_(film)
84                 , writer_(writer)
85                 , history_(history)
86                 , _location(location)
87                 , width_(0)
88                 , height_(0)
89         {
90
91         }
92
93         void setDimensions(uint32_t w, uint32_t h) {
94                 width_ = w;
95                 height_ = h;
96         }
97         std::shared_ptr<const Film> film_;
98         Writer& writer_;
99         EventHistory &history_;
100         boost::filesystem::path _location;
101         uint32_t width_;
102         uint32_t height_;
103 };
104
105 class GrokContext {
106 public:
107         explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
108                                                                 dcpomaticContext_(dcpomaticContext),
109                                                                 messenger_(nullptr),
110                                                                 launched_(false)
111         {
112                 if (Config::instance()->enable_gpu ())  {
113                     boost::filesystem::path folder(dcpomaticContext_._location);
114                     boost::filesystem::path binaryPath = folder / "grk_compress";
115                     if (!boost::filesystem::exists(binaryPath)) {
116                             getMessengerLogger()->error(
117                                     "Invalid binary location %s", dcpomaticContext_._location.c_str()
118                                     );
119                             return;
120                     }
121                         auto proc = [this](const std::string& str) {
122                                 try {
123                                         Msg msg(str);
124                                         auto tag = msg.next();
125                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
126                                         {
127                                                 auto clientFrameId = msg.nextUint();
128                                                 auto compressedFrameId = msg.nextUint();
129                                                 (void)compressedFrameId;
130                                                 auto compressedFrameLength = msg.nextUint();
131                                                 auto  processor =
132                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
133                                                 {
134                                                         auto compressed_data = std::make_shared<dcp::ArrayData>(compressed, compressedFrameLength);
135                                                         dcpomaticContext_.writer_.write(compressed_data, srcFrame.index(), srcFrame.eyes());
136                                                         frame_done ();
137                                                 };
138                                                 int const minimum_size = 16384;
139                                                 bool needsRecompression = compressedFrameLength < minimum_size;
140                                                 messenger_->processCompressed(str, processor, needsRecompression);
141                                                 if (needsRecompression) {
142                                                         auto fp = messenger_->retrieve(clientFrameId);
143                                                         if (!fp) {
144                                                                 return;
145                                                         }
146
147                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
148                                                         dcpomaticContext_.writer_.write(encoded, fp->vf.index(), fp->vf.eyes());
149                                                         frame_done ();
150                                                 }
151                                         }
152                                 } catch (std::exception &ex){
153                                         getMessengerLogger()->error("%s",ex.what());
154                                 }
155                         };
156                         auto clientInit =
157                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
158                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
159                                                           std::thread::hardware_concurrency());
160                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
161                 }
162         }
163         ~GrokContext(void) {
164                 shutdown();
165         }
166         bool launch(DCPVideo dcpv, int device){
167                 if (!messenger_ )
168                         return false;
169                 if (launched_)
170                         return true;
171                 std::unique_lock<std::mutex> lk_global(launchMutex);
172                 if (!messenger_)
173                         return false;
174                 if (launched_)
175                         return true;
176                 if (MessengerInit::firstLaunch(true)) {
177                         auto s = dcpv.get_size();
178                         dcpomaticContext_.setDimensions(s.width, s.height);
179                         auto config = Config::instance();
180                         messenger_->launchGrok(
181                                 dcpomaticContext_._location,
182                                 dcpomaticContext_.width_,dcpomaticContext_.width_,
183                                 dcpomaticContext_.height_,
184                                 3, 12, device,
185                                 dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
186                                 dcpomaticContext_.film_->video_frame_rate(),
187                                 dcpomaticContext_.film_->j2k_bandwidth(),
188                                 config->gpu_license_server(),
189                                 config->gpu_license_port(),
190                                 config->gpu_license()
191                                 );
192                 }
193                 launched_ =  messenger_->waitForClientInit();
194
195                 return launched_;
196         }
197         bool scheduleCompress(DCPVideo const& vf){
198                 if (!messenger_)
199                         return false;
200
201                 auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
202                 auto cvt = [this, &fp](BufferSrc src){
203                         // xyz conversion
204                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
205                 };
206                 return messenger_->scheduleCompress(fp, cvt);
207         }
208         void shutdown(void){
209                 if (!messenger_)
210                         return;
211
212                 std::unique_lock<std::mutex> lk_global(launchMutex);
213                 if (!messenger_)
214                         return;
215                 if (launched_)
216                         messenger_->shutdown();
217                 delete messenger_;
218                 messenger_ = nullptr;
219         }
220         void frame_done () {
221                 dcpomaticContext_.history_.event ();
222         }
223 private:
224         DcpomaticContext dcpomaticContext_;
225         ScheduledMessenger<FrameProxy> *messenger_;
226         bool launched_;
227 };
228
229 }
230