45ee752e5a0d9b636f7f44427d6894898a70c442
[dcpomatic.git] / src / lib / grok / messenger.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20 #pragma once
21
22 #include <iostream>
23 #include <string>
24 #include <cstring>
25 #include <atomic>
26 #include <functional>
27 #include <sstream>
28 #include <future>
29 #include <map>
30 #include <thread>
31 #include <mutex>
32 #include <condition_variable>
33 #include <queue>
34 #include <cassert>
35 #include <cstdarg>
36
37 #ifdef _WIN32
38 #include <windows.h>
39 #include <direct.h>
40 #include <tlhelp32.h>
41 #pragma warning(disable : 4100)
42 #else
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <sys/mman.h>
46 #include <semaphore.h>
47 #include <signal.h>
48 #endif
49
50 namespace grk_plugin
51 {
52 static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
53 static std::string grokSentSynch = "Global\\grok_sent";
54 static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
55 static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
56 static std::string clientSentSynch = "Global\\client_sent";
57 static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
58 static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
59 static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
60 static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
61 static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
62 static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
63 static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
64         "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
65 static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
66 static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
67         "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
68 static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
69 static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
70 static const size_t messageBufferLen = 256;
71 struct IMessengerLogger
72 {
73         virtual ~IMessengerLogger(void) = default;
74         virtual void info(const char* fmt, ...) = 0;
75         virtual void warn(const char* fmt, ...) = 0;
76         virtual void error(const char* fmt, ...) = 0;
77
78   protected:
79         template<typename... Args>
80         std::string log_message(char const* const format, Args&... args) noexcept
81         {
82                 constexpr size_t message_size = 512;
83                 char message[message_size];
84
85                 std::snprintf(message, message_size, format, args...);
86                 return std::string(message);
87         }
88 };
89 struct MessengerLogger : public IMessengerLogger
90 {
91         explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
92         virtual ~MessengerLogger() = default;
93         virtual void info(const char* fmt, ...) override
94         {
95                 va_list args;
96                 std::string new_fmt = preamble_ + fmt + "\n";
97                 va_start(args, fmt);
98                 vfprintf(stdout, new_fmt.c_str(), args);
99                 va_end(args);
100         }
101         virtual void warn(const char* fmt, ...) override
102         {
103                 va_list args;
104                 std::string new_fmt = preamble_ + fmt + "\n";
105                 va_start(args, fmt);
106                 vfprintf(stdout, new_fmt.c_str(), args);
107                 va_end(args);
108         }
109         virtual void error(const char* fmt, ...) override
110         {
111                 va_list args;
112                 std::string new_fmt = preamble_ + fmt + "\n";
113                 va_start(args, fmt);
114                 vfprintf(stderr, new_fmt.c_str(), args);
115                 va_end(args);
116         }
117
118   protected:
119         std::string preamble_;
120 };
121
122 static IMessengerLogger* sLogger = nullptr;
123 #if defined(__GNUC__) || defined(__clang__)
124 #pragma GCC diagnostic push
125 #pragma GCC diagnostic ignored "-Wunused-function"
126 #endif
127 static void setMessengerLogger(IMessengerLogger* logger)
128 {
129         delete sLogger;
130         sLogger = logger;
131 }
132 #if defined(__GNUC__) || defined(__clang__)
133 #pragma GCC diagnostic pop
134 #endif
135 static IMessengerLogger* getMessengerLogger(void)
136 {
137         return sLogger;
138 }
139 struct MessengerInit
140 {
141         MessengerInit(const std::string &outBuf, const std::string &outSent,
142                                   const std::string &outReceiveReady, const std::string &inBuf,
143                                   const std::string &inSent,
144                                   const std::string &inReceiveReady,
145                                   std::function<void(std::string)> processor,
146                                   size_t numProcessingThreads)
147                 : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
148                   outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
149                   inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
150                   numProcessingThreads_(numProcessingThreads),
151                   uncompressedFrameSize_(0), compressedFrameSize_(0),
152                   numFrames_(0)
153         {
154                 if(firstLaunch(true))
155                         unlink();
156         }
157         void unlink(void)
158         {
159 #ifndef _WIN32
160                 shm_unlink(grokToClientMessageBuf.c_str());
161                 shm_unlink(clientToGrokMessageBuf.c_str());
162 #endif
163         }
164         static bool firstLaunch(bool isClient)
165         {
166                 bool debugGrok = false;
167                 return debugGrok != isClient;
168         }
169         std::string outboundMessageBuf;
170         std::string outboundSentSynch;
171         std::string outboundReceiveReadySynch;
172
173         std::string inboundMessageBuf;
174         std::string inboundSentSynch;
175         std::string inboundReceiveReadySynch;
176
177         std::function<void(std::string)> processor_;
178         size_t numProcessingThreads_;
179
180         size_t uncompressedFrameSize_;
181         size_t compressedFrameSize_;
182         size_t numFrames_;
183 };
184
185 /*************************** Synchronization *******************************/
186 enum SynchDirection
187 {
188         SYNCH_SENT,
189         SYNCH_RECEIVE_READY
190 };
191
192 typedef int grk_handle;
193 struct Synch
194 {
195         Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
196                 : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
197         {
198                 // unlink semaphores in case of previous crash
199                 if(MessengerInit::firstLaunch(true))
200                         unlink();
201                 open();
202         }
203         ~Synch()
204         {
205                 close();
206                 if(MessengerInit::firstLaunch(true))
207                         unlink();
208         }
209         void post(SynchDirection dir)
210         {
211                 auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
212                 int rc = sem_post(sem);
213                 if(rc)
214                         getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
215         }
216         void wait(SynchDirection dir)
217         {
218                 auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
219                 int rc = sem_wait(sem);
220                 if(rc)
221                         getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
222         }
223         void open(void)
224         {
225                 sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
226                 if(!sentSem_)
227                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
228                 receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
229                 if(!receiveReadySem_)
230                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
231         }
232         void close(void)
233         {
234                 int rc = sem_close(sentSem_);
235                 if(rc)
236                         getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
237                                                                                 strerror(errno));
238                 rc = sem_close(receiveReadySem_);
239                 if(rc)
240                         getMessengerLogger()->error("Error closing semaphore %s: %s",
241                                                                                 receiveReadySemName_.c_str(), strerror(errno));
242         }
243         void unlink(void)
244         {
245                 int rc = sem_unlink(sentSemName_.c_str());
246                 if(rc == -1 && errno != ENOENT)
247                         getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
248                                                                                 strerror(errno));
249                 rc = sem_unlink(receiveReadySemName_.c_str());
250                 if(rc == -1 && errno != ENOENT)
251                         getMessengerLogger()->error("Error unlinking semaphore %s: %s",
252                                                                                 receiveReadySemName_.c_str(), strerror(errno));
253         }
254         sem_t* sentSem_;
255         sem_t* receiveReadySem_;
256
257   private:
258         std::string sentSemName_;
259         std::string receiveReadySemName_;
260 };
261 struct SharedMemoryManager
262 {
263         static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
264         {
265                 *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
266                 if(*shm_fd < 0)
267                 {
268                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
269                         return false;
270                 }
271                 int rc = ftruncate(*shm_fd, sizeof(char) * len);
272                 if(rc)
273                 {
274                         getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
275                         rc = close(*shm_fd);
276                         if(rc)
277                                 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
278                         rc = shm_unlink(name.c_str());
279                         if(rc)
280                                 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
281                         return false;
282                 }
283                 *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
284                 if(!*buffer)
285                 {
286                         getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
287                         rc = close(*shm_fd);
288                         if(rc)
289                                 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
290                         rc = shm_unlink(name.c_str());
291                         if(rc)
292                                 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
293                 }
294
295                 return *buffer != nullptr;
296         }
297         static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
298         {
299                 if (!*buffer || !shm_fd)
300                         return true;
301
302                 int rc = munmap(*buffer, len);
303                 *buffer = nullptr;
304                 if(rc)
305                         getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
306                 rc = close(shm_fd);
307                 shm_fd = 0;
308                 if(rc)
309                         getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
310                 rc = shm_unlink(name.c_str());
311                 if(rc)
312                         fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
313
314                 return true;
315         }
316 };
317
318 template<typename Data>
319 class MessengerBlockingQueue
320 {
321   public:
322         explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
323         MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
324         size_t size() const
325         {
326                 return queue_.size();
327         }
328         // deactivate and clear queue
329         void deactivate()
330         {
331                 {
332                         std::lock_guard<std::mutex> lk(mutex_);
333                         active_ = false;
334                         while(!queue_.empty())
335                                 queue_.pop();
336                 }
337
338                 // release all waiting threads
339                 can_pop_.notify_all();
340                 can_push_.notify_all();
341         }
342         void activate()
343         {
344                 std::lock_guard<std::mutex> lk(mutex_);
345                 active_ = true;
346         }
347         bool push(Data const& value)
348         {
349                 bool rc;
350                 {
351                         std::unique_lock<std::mutex> lk(mutex_);
352                         rc = push_(value);
353                 }
354                 if(rc)
355                         can_pop_.notify_one();
356
357                 return rc;
358         }
359         bool waitAndPush(Data& value)
360         {
361                 bool rc;
362                 {
363                         std::unique_lock<std::mutex> lk(mutex_);
364                         if(!active_)
365                                 return false;
366                         // in case of spurious wakeup, loop until predicate in lambda
367                         // is satisfied.
368                         can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
369                         rc = push_(value);
370                 }
371                 if(rc)
372                         can_pop_.notify_one();
373
374                 return rc;
375         }
376         bool pop(Data& value)
377         {
378                 bool rc;
379                 {
380                         std::unique_lock<std::mutex> lk(mutex_);
381                         rc = pop_(value);
382                 }
383                 if(rc)
384                         can_push_.notify_one();
385
386                 return rc;
387         }
388         bool waitAndPop(Data& value)
389         {
390                 bool rc;
391                 {
392                         std::unique_lock<std::mutex> lk(mutex_);
393                         if(!active_)
394                                 return false;
395                         // in case of spurious wakeup, loop until predicate in lambda
396                         // is satisfied.
397                         can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
398                         rc = pop_(value);
399                 }
400                 if(rc)
401                         can_push_.notify_one();
402
403                 return rc;
404         }
405
406   private:
407         bool push_(Data const& value)
408         {
409                 if(queue_.size() == max_size_ || !active_)
410                         return false;
411                 queue_.push(value);
412
413                 return true;
414         }
415         bool pop_(Data& value)
416         {
417                 if(queue_.empty() || !active_)
418                         return false;
419                 value = queue_.front();
420                 queue_.pop();
421
422                 return true;
423         }
424         std::queue<Data> queue_;
425         mutable std::mutex mutex_;
426         std::condition_variable can_pop_;
427         std::condition_variable can_push_;
428         bool active_;
429         size_t max_size_;
430 };
431 struct BufferSrc
432 {
433         BufferSrc(void) : BufferSrc("") {}
434         explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
435         {}
436         BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
437                 : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
438         {}
439         bool fromDisk(void)
440         {
441                 return !file_.empty() && framePtr_ == nullptr;
442         }
443         size_t index() const
444         {
445                 return clientFrameId_;
446         }
447         std::string file_;
448         size_t clientFrameId_;
449         size_t frameId_;
450         uint8_t* framePtr_;
451 };
452
453 struct Messenger;
454 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
455 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
456 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
457
458 struct Messenger
459 {
460         explicit Messenger(MessengerInit init)
461                 : running(true), initialized_(false), shutdown_(false), init_(init),
462                   outboundSynch_(nullptr),
463                   inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
464                   uncompressed_fd_(0), compressed_fd_(0)
465         {}
466         virtual ~Messenger(void)
467         {
468                 running = false;
469                 sendQueue.deactivate();
470                 receiveQueue.deactivate();
471
472                 if (outboundSynch_) {
473                         outboundSynch_->post(SYNCH_RECEIVE_READY);
474                         outbound.join();
475                 }
476
477                 if (inboundSynch_) {
478                         inboundSynch_->post(SYNCH_SENT);
479                         inbound.join();
480                 }
481
482                 for(auto& p : processors_)
483                         p.join();
484
485                 delete outboundSynch_;
486                 delete inboundSynch_;
487
488                 deinitShm();
489         }
490         void startThreads(void) {
491                 outboundSynch_ =
492                         new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
493                 outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
494
495                 inboundSynch_ =
496                         new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
497                 inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
498
499                 for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
500                         processors_.push_back(std::thread(processorThread, this, init_.processor_));
501         }
502         size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
503                                          size_t compressedLength)
504         {
505                 char fname[512];
506                 if(!compressedPtr || !compressedLength)
507                         return 0;
508                 sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
509                 auto fp = fopen(fname, "wb");
510                 if(!fp)
511                         return 0;
512                 size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
513                 if(written != compressedLength)
514                 {
515                         fclose(fp);
516                         return 0;
517                 }
518                 fflush(fp);
519                 fclose(fp);
520
521                 return written;
522         }
523         bool initBuffers(void)
524         {
525                 bool rc = true;
526                 if(init_.uncompressedFrameSize_)
527                 {
528                         rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
529                                                                                                         init_.uncompressedFrameSize_ * init_.numFrames_,
530                                                                                                         &uncompressed_fd_, &uncompressed_buffer_);
531                 }
532                 if(init_.compressedFrameSize_)
533                 {
534                         rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
535                                                                                                         init_.compressedFrameSize_ * init_.numFrames_,
536                                                                                                         &compressed_fd_, &compressed_buffer_);
537                 }
538
539                 return rc;
540         }
541
542         bool deinitShm(void)
543         {
544                 bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
545                                                                                                  init_.uncompressedFrameSize_ * init_.numFrames_,
546                                                                                                  uncompressed_fd_, &uncompressed_buffer_);
547                 rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
548                                                                                                   init_.compressedFrameSize_ * init_.numFrames_,
549                                                                                                   compressed_fd_, &compressed_buffer_);
550
551                 return rc;
552         }
553         template<typename... Args>
554         void send(const std::string& str, Args... args)
555         {
556                 std::ostringstream oss;
557                 oss << str;
558                 int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
559                 static_cast<void>(dummy);
560
561                 sendQueue.push(oss.str());
562         }
563         static pid_t get_pid_by_process_name(const char* name)
564         {
565                 char command[256];
566                 snprintf(command, sizeof(command), "pgrep %s", name);
567                 auto pgrep = popen(command, "r");
568                 if(!pgrep)
569                         return -1;
570                 pid_t pid;
571                 if(fscanf(pgrep, "%d", &pid) != 1)
572                         pid = -1;
573                 pclose(pgrep);
574
575                 return pid;
576         }
577         static bool terminate_process(const char* name)
578         {
579                 auto pid = get_pid_by_process_name(name);
580
581                 return (pid != -1 && kill(pid, SIGTERM) != -1);
582         }
583         static bool kill_process(const char* name)
584         {
585                 auto pid = get_pid_by_process_name(name);
586
587                 return (pid != -1 && kill(pid, SIGKILL) != -1);
588         }
589         void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
590                                                                 uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
591                                                                 int device, bool is4K, uint32_t fps, uint32_t bandwidth,
592                                                                 const std::string server, uint32_t port,
593                                                                 const std::string license)
594         {
595
596                 std::unique_lock<std::mutex> lk(shutdownMutex_);
597                 if (async_result_.valid())
598                         return;
599                 if(MessengerInit::firstLaunch(true))
600                         init_.unlink();
601                 startThreads();
602                 char _cmd[4096];
603                 auto fullServer = server + ":" + std::to_string(port);
604                 sprintf(_cmd,
605                                 "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
606                                 "-G %d -%s %d,%d -j %s -J %s",
607                                 GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
608                                 device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
609                                 license.c_str(), fullServer.c_str());
610                 launch(_cmd, dir);
611         }
612         void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
613         {
614                 // client fills queue with pending uncompressed buffers
615                 init_.uncompressedFrameSize_ = uncompressedFrameSize;
616                 init_.compressedFrameSize_ = compressedFrameSize;
617                 init_.numFrames_ = numFrames;
618                 initBuffers();
619                 auto ptr = uncompressed_buffer_;
620                 for(size_t i = 0; i < init_.numFrames_; ++i)
621                 {
622                         availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
623                         ptr += init_.uncompressedFrameSize_;
624                 }
625
626                 std::unique_lock<std::mutex> lk(shutdownMutex_);
627                 initialized_ = true;
628                 clientInitializedCondition_.notify_all();
629         }
630         bool waitForClientInit(void)
631         {
632                 if(initialized_)
633                         return true;
634
635                 std::unique_lock<std::mutex> lk(shutdownMutex_);
636                 if(initialized_)
637                         return true;
638                 else if (shutdown_)
639                         return false;
640                 clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
641
642                 return initialized_ && !shutdown_;
643         }
644         static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
645         {
646                 return sizeof(uint16_t) * w * h * samplesPerPixel;
647         }
648         void reclaimCompressed(size_t frameId)
649         {
650                 availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
651         }
652         void reclaimUncompressed(size_t frameId)
653         {
654                 availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
655         }
656         uint8_t* getUncompressedFrame(size_t frameId)
657         {
658                 assert(frameId < init_.numFrames_);
659                 if(frameId >= init_.numFrames_)
660                         return nullptr;
661
662                 return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
663         }
664         uint8_t* getCompressedFrame(size_t frameId)
665         {
666                 assert(frameId < init_.numFrames_);
667                 if(frameId >= init_.numFrames_)
668                         return nullptr;
669
670                 return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
671         }
672         std::atomic_bool running;
673         bool initialized_;
674         bool shutdown_;
675         MessengerBlockingQueue<std::string> sendQueue;
676         MessengerBlockingQueue<std::string> receiveQueue;
677         MessengerBlockingQueue<BufferSrc> availableBuffers_;
678         MessengerInit init_;
679         std::string cmd_;
680         std::future<int> async_result_;
681         std::mutex shutdownMutex_;
682         std::condition_variable shutdownCondition_;
683
684   protected:
685         std::condition_variable clientInitializedCondition_;
686   private:
687         void launch(const std::string &cmd, const std::string &dir)
688         {
689                 // Change the working directory
690                 if(!dir.empty())
691                 {
692                         if(chdir(dir.c_str()) != 0)
693                         {
694                                 getMessengerLogger()->error("Error: failed to change the working directory");
695                                 return;
696                         }
697                 }
698                 // Execute the command using std::async and std::system
699                 cmd_ = cmd;
700                 async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
701         }
702         std::thread outbound;
703         Synch* outboundSynch_;
704
705         std::thread inbound;
706         Synch* inboundSynch_;
707
708         std::vector<std::thread> processors_;
709         char* uncompressed_buffer_;
710         char* compressed_buffer_;
711
712         grk_handle uncompressed_fd_;
713         grk_handle compressed_fd_;
714 };
715
716 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
717 {
718         grk_handle shm_fd = 0;
719         char* send_buffer = nullptr;
720
721         if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
722                 return;
723         while(messenger->running)
724         {
725                 synch->wait(SYNCH_RECEIVE_READY);
726                 if(!messenger->running)
727                         break;
728                 std::string message;
729                 if(!messenger->sendQueue.waitAndPop(message))
730                         break;
731                 if(!messenger->running)
732                         break;
733                 memcpy(send_buffer, message.c_str(), message.size() + 1);
734                 synch->post(SYNCH_SENT);
735         }
736         SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
737 }
738
739 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
740 {
741         grk_handle shm_fd = 0;
742         char* receive_buffer = nullptr;
743
744         if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
745                 return;
746         while(messenger->running)
747         {
748                 synch->wait(SYNCH_SENT);
749                 if(!messenger->running)
750                         break;
751                 auto message = std::string(receive_buffer);
752                 synch->post(SYNCH_RECEIVE_READY);
753                 messenger->receiveQueue.push(message);
754         }
755         SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
756 }
757 struct Msg
758 {
759         explicit Msg(const std::string &msg) : ct_(0)
760         {
761                 std::stringstream ss(msg);
762                 while(ss.good())
763                 {
764                         std::string substr;
765                         std::getline(ss, substr, ',');
766                         cs_.push_back(substr);
767                 }
768         }
769         std::string next()
770         {
771                 if(ct_ == cs_.size())
772                 {
773                         getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
774                         return "";
775                 }
776                 return cs_[ct_++];
777         }
778
779         uint32_t nextUint(void)
780         {
781                 return (uint32_t)std::stoi(next());
782         }
783
784         std::vector<std::string> cs_;
785         size_t ct_;
786 };
787 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
788 {
789         while(messenger->running)
790         {
791                 std::string message;
792                 if(!messenger->receiveQueue.waitAndPop(message))
793                         break;
794                 if(!messenger->running)
795                         break;
796                 Msg msg(message);
797                 auto tag = msg.next();
798                 if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
799                 {
800                         auto width = msg.nextUint();
801                         auto stride = msg.nextUint();
802                         (void)stride;
803                         auto height = msg.nextUint();
804                         auto samplesPerPixel = msg.nextUint();
805                         auto depth = msg.nextUint();
806                         (void)depth;
807                         messenger->init_.uncompressedFrameSize_ =
808                                 Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
809                         auto compressedFrameSize = msg.nextUint();
810                         auto numFrames = msg.nextUint();
811                         messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
812                 }
813                 else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
814                 {
815                         messenger->reclaimUncompressed(msg.nextUint());
816                 }
817                 else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
818                 {
819                         messenger->reclaimCompressed(msg.nextUint());
820                 }
821                 processor(message);
822         }
823 }
824
825 template<typename F>
826 struct ScheduledFrames
827 {
828         void store(F& val)
829         {
830                 std::unique_lock<std::mutex> lk(mapMutex_);
831                 auto it = map_.find(val.index());
832                 if (it == map_.end())
833                         map_[val.index()] = val;
834         }
835         F retrieve(size_t index, bool &success)
836         {
837                 std::unique_lock<std::mutex> lk(mapMutex_);
838                 success = false;
839                 auto it = map_.find(index);
840                 if(it == map_.end())
841                         return F();
842
843                 success = true;
844                 F val = it->second;
845                 map_.erase(index);
846
847                 return val;
848         }
849
850  private:
851         std::mutex mapMutex_;
852         std::map<size_t, F> map_;
853 };
854
855 template<typename F>
856 struct ScheduledMessenger : public Messenger
857 {
858         explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
859                                                                                         framesScheduled_(0),
860                                                                                         framesCompressed_(0)
861         {}
862         ~ScheduledMessenger(void) {
863                 shutdown();
864         }
865         bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
866                 size_t frameSize = init_.uncompressedFrameSize_;
867                 assert(frameSize >= init_.uncompressedFrameSize_);
868                 BufferSrc src;
869                 if(!availableBuffers_.waitAndPop(src))
870                         return false;
871                 converter(src);
872                 scheduledFrames_.store(proxy);
873                 framesScheduled_++;
874                 send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
875
876                 return true;
877         }
878         void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
879                 Msg msg(message);
880                 msg.next();
881                 auto clientFrameId = msg.nextUint();
882                 auto compressedFrameId = msg.nextUint();
883                 auto compressedFrameLength = msg.nextUint();
884                 if (!needsRecompression) {
885                         bool success = false;
886                         auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
887                         if (!success)
888                                 return;
889                         processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
890                 }
891                 ++framesCompressed_;
892                 send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
893                 if (shutdown_ && framesCompressed_ == framesScheduled_)
894                         shutdownCondition_.notify_all();
895         }
896         void shutdown(void){
897                 try {
898                         std::unique_lock<std::mutex> lk(shutdownMutex_);
899                         if (!async_result_.valid())
900                                 return;
901                         shutdown_ = true;
902                         if (framesScheduled_) {
903                                 uint32_t scheduled = framesScheduled_;
904                                 send(GRK_MSGR_BATCH_FLUSH, scheduled);
905                                 shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
906                         }
907                         availableBuffers_.deactivate();
908                         send(GRK_MSGR_BATCH_SHUTDOWN);
909                         int result = async_result_.get();
910                         if(result != 0)
911                                 getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
912                 } catch (std::exception &ex) {
913                         getMessengerLogger()->error("%s",ex.what());
914                 }
915
916         }
917         F retrieve(size_t index, bool &success) {
918                 return scheduledFrames_.retrieve(index, success);
919         }
920         void store(F& val) {
921                 scheduledFrames_.store(val);
922         }
923
924 private:
925         ScheduledFrames<F> scheduledFrames_;
926         std::atomic<uint32_t> framesScheduled_;
927         std::atomic<uint32_t> framesCompressed_;
928 };
929
930 } // namespace grk_plugin