Run clang-format on the previous commit.
[dcpomatic.git] / src / lib / grok_messenger.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20 #pragma once
21
22 #include <atomic>
23 #include <cassert>
24 #include <condition_variable>
25 #include <cstdarg>
26 #include <cstring>
27 #include <functional>
28 #include <future>
29 #include <iostream>
30 #include <map>
31 #include <mutex>
32 #include <queue>
33 #include <sstream>
34 #include <string>
35 #include <thread>
36
37 #ifdef _WIN32
38 #include <direct.h>
39 #include <tlhelp32.h>
40 #include <windows.h>
41 #pragma warning(disable : 4100)
42 #else
43 #include <fcntl.h>
44 #include <semaphore.h>
45 #include <signal.h>
46 #include <sys/mman.h>
47 #include <unistd.h>
48 #endif
49
50 namespace grk_plugin {
51 static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
52 static std::string grokSentSynch = "Global\\grok_sent";
53 static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
54 static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
55 static std::string clientSentSynch = "Global\\client_sent";
56 static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
57 static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
58 static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
59 static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
60 static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
61 static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
62 static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
63     "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
64 static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
65 static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
66     "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
67 static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
68 static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
69 static const size_t messageBufferLen = 256;
70
71 struct IMessengerLogger
72 {
73         virtual ~IMessengerLogger() = default;
74         virtual void info(const char* fmt, ...) = 0;
75         virtual void warn(const char* fmt, ...) = 0;
76         virtual void error(const char* fmt, ...) = 0;
77
78 protected:
79         template <typename... Args>
80         std::string log_message(char const* const format, Args&... args) noexcept
81         {
82                 constexpr size_t message_size = 512;
83                 char message[message_size];
84
85                 std::snprintf(message, message_size, format, args...);
86                 return std::string(message);
87         }
88 };
89
90 struct MessengerLogger : public IMessengerLogger
91 {
92         explicit MessengerLogger(const std::string& preamble)
93                 : preamble_(preamble)
94         {
95         }
96
97         virtual ~MessengerLogger() = default;
98
99         virtual void info(const char* fmt, ...) override
100         {
101                 va_list args;
102                 std::string new_fmt = preamble_ + fmt + "\n";
103                 va_start(args, fmt);
104                 vfprintf(stdout, new_fmt.c_str(), args);
105                 va_end(args);
106         }
107
108         virtual void warn(const char* fmt, ...) override
109         {
110                 va_list args;
111                 std::string new_fmt = preamble_ + fmt + "\n";
112                 va_start(args, fmt);
113                 vfprintf(stdout, new_fmt.c_str(), args);
114                 va_end(args);
115         }
116
117         virtual void error(const char* fmt, ...) override
118         {
119                 va_list args;
120                 std::string new_fmt = preamble_ + fmt + "\n";
121                 va_start(args, fmt);
122                 vfprintf(stderr, new_fmt.c_str(), args);
123                 va_end(args);
124         }
125
126 protected:
127         std::string preamble_;
128 };
129
130 static IMessengerLogger* sLogger = nullptr;
131 #if defined(__GNUC__) || defined(__clang__)
132 #pragma GCC diagnostic push
133 #pragma GCC diagnostic ignored "-Wunused-function"
134 #endif
135 static void
136 setMessengerLogger(IMessengerLogger* logger)
137 {
138         delete sLogger;
139         sLogger = logger;
140 }
141 #if defined(__GNUC__) || defined(__clang__)
142 #pragma GCC diagnostic pop
143 #endif
144 static IMessengerLogger*
145 getMessengerLogger()
146 {
147         return sLogger;
148 }
149
150 struct MessengerInit
151 {
152         MessengerInit(const std::string& outBuf, const std::string& outSent,
153                       const std::string& outReceiveReady, const std::string& inBuf,
154                       const std::string& inSent,
155                       const std::string& inReceiveReady,
156                       std::function<void(std::string)> processor,
157                       size_t numProcessingThreads)
158                 : outboundMessageBuf(outBuf)
159                 , outboundSentSynch(outSent)
160                 , outboundReceiveReadySynch(outReceiveReady)
161                 , inboundMessageBuf(inBuf)
162                 , inboundSentSynch(inSent)
163                 , inboundReceiveReadySynch(inReceiveReady)
164                 , processor_(processor)
165                 , numProcessingThreads_(numProcessingThreads)
166                 , uncompressedFrameSize_(0)
167                 , compressedFrameSize_(0)
168                 , numFrames_(0)
169         {
170                 if (first_launch(true)) {
171                         unlink();
172                 }
173         }
174
175         void unlink()
176         {
177 #ifndef _WIN32
178                 shm_unlink(grokToClientMessageBuf.c_str());
179                 shm_unlink(clientToGrokMessageBuf.c_str());
180 #endif
181         }
182
183         static bool first_launch(bool isClient)
184         {
185                 bool debugGrok = false;
186                 return debugGrok != isClient;
187         }
188
189         std::string outboundMessageBuf;
190         std::string outboundSentSynch;
191         std::string outboundReceiveReadySynch;
192
193         std::string inboundMessageBuf;
194         std::string inboundSentSynch;
195         std::string inboundReceiveReadySynch;
196
197         std::function<void(std::string)> processor_;
198         size_t numProcessingThreads_;
199
200         size_t uncompressedFrameSize_;
201         size_t compressedFrameSize_;
202         size_t numFrames_;
203 };
204
205 /*************************** Synchronization *******************************/
206 enum SynchDirection
207 {
208         SYNCH_SENT,
209         SYNCH_RECEIVE_READY
210 };
211
212 typedef int grk_handle;
213
214 struct Synch
215 {
216         Synch(const std::string& sentSemName, const std::string& receiveReadySemName)
217                 : _sent_sem_name(sentSemName)
218                 , _receive_ready_sem_name(receiveReadySemName)
219         {
220                 // unlink semaphores in case of previous crash
221                 if (MessengerInit::first_launch(true)) {
222                         unlink();
223                 }
224                 open();
225         }
226
227         ~Synch()
228         {
229                 close();
230                 if (MessengerInit::first_launch(true)) {
231                         unlink();
232                 }
233         }
234
235         void post(SynchDirection dir)
236         {
237                 auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
238                 int rc = sem_post(sem);
239                 if (rc) {
240                         getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
241                 }
242         }
243
244         void wait(SynchDirection dir)
245         {
246                 auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
247                 int rc = sem_wait(sem);
248                 if (rc) {
249                         getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
250                 }
251         }
252
253         void open()
254         {
255                 sentSem_ = sem_open(_sent_sem_name.c_str(), O_CREAT, 0666, 0);
256                 if (!sentSem_) {
257                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
258                 }
259                 receiveReadySem_ = sem_open(_receive_ready_sem_name.c_str(), O_CREAT, 0666, 1);
260                 if (!receiveReadySem_) {
261                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
262                 }
263         }
264
265         void close()
266         {
267                 int rc = sem_close(sentSem_);
268                 if (rc) {
269                         getMessengerLogger()->error("Error closing semaphore %s: %s", _sent_sem_name.c_str(),
270                                                     strerror(errno));
271                 }
272                 rc = sem_close(receiveReadySem_);
273                 if (rc) {
274                         getMessengerLogger()->error("Error closing semaphore %s: %s",
275                                                     _receive_ready_sem_name.c_str(), strerror(errno));
276                 }
277         }
278
279         void unlink()
280         {
281                 int rc = sem_unlink(_sent_sem_name.c_str());
282                 if (rc == -1 && errno != ENOENT) {
283                         getMessengerLogger()->error("Error unlinking semaphore %s: %s", _sent_sem_name.c_str(),
284                                                     strerror(errno));
285                 }
286                 rc = sem_unlink(_receive_ready_sem_name.c_str());
287                 if (rc == -1 && errno != ENOENT) {
288                         getMessengerLogger()->error("Error unlinking semaphore %s: %s",
289                                                     _receive_ready_sem_name.c_str(), strerror(errno));
290                 }
291         }
292
293         sem_t* sentSem_;
294         sem_t* receiveReadySem_;
295
296 private:
297         std::string _sent_sem_name;
298         std::string _receive_ready_sem_name;
299 };
300
301 struct SharedMemoryManager
302 {
303         static bool init_shm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer)
304         {
305                 *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
306                 if (*shm_fd < 0) {
307                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
308                         return false;
309                 }
310                 int rc = ftruncate(*shm_fd, sizeof(char) * len);
311                 if (rc) {
312                         getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
313                         rc = close(*shm_fd);
314                         if (rc) {
315                                 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
316                         }
317                         rc = shm_unlink(name.c_str());
318                         if (rc) {
319                                 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
320                         }
321                         return false;
322                 }
323                 *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
324                 if (!*buffer) {
325                         getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
326                         rc = close(*shm_fd);
327                         if (rc) {
328                                 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
329                         }
330                         rc = shm_unlink(name.c_str());
331                         if (rc) {
332                                 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
333                         }
334                 }
335
336                 return *buffer != nullptr;
337         }
338
339         static bool deinit_shm(const std::string& name, size_t len, grk_handle& shm_fd, char** buffer)
340         {
341                 if (!*buffer || !shm_fd) {
342                         return true;
343                 }
344
345                 int rc = munmap(*buffer, len);
346                 *buffer = nullptr;
347                 if (rc) {
348                         getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
349                 }
350                 rc = close(shm_fd);
351                 shm_fd = 0;
352                 if (rc) {
353                         getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
354                 }
355                 rc = shm_unlink(name.c_str());
356                 if (rc) {
357                         fprintf(stderr, "Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
358                 }
359
360                 return true;
361         }
362 };
363
364 template <typename Data>
365 class MessengerBlockingQueue
366 {
367 public:
368         explicit MessengerBlockingQueue(size_t max)
369                 : _active(true)
370                 , _max_size(max)
371         {
372         }
373
374         MessengerBlockingQueue()
375                 : MessengerBlockingQueue(UINT_MAX)
376         {
377         }
378
379         size_t size() const
380         {
381                 return _queue.size();
382         }
383
384         // deactivate and clear queue
385         void deactivate()
386         {
387                 {
388                         std::lock_guard<std::mutex> lk(_mutex);
389                         _active = false;
390                         while (!_queue.empty()) {
391                                 _queue.pop();
392                         }
393                 }
394
395                 // release all waiting threads
396                 _can_pop.notify_all();
397                 _can_push.notify_all();
398         }
399
400         void activate()
401         {
402                 std::lock_guard<std::mutex> lk(_mutex);
403                 _active = true;
404         }
405
406         bool push(Data const& value)
407         {
408                 bool rc;
409                 {
410                         std::unique_lock<std::mutex> lk(_mutex);
411                         rc = _push(value);
412                 }
413                 if (rc) {
414                         _can_pop.notify_one();
415                 }
416
417                 return rc;
418         }
419
420         bool wait_and_push(Data& value)
421         {
422                 bool rc;
423                 {
424                         std::unique_lock<std::mutex> lk(_mutex);
425                         if (!_active) {
426                                 return false;
427                         }
428                         // in case of spurious wakeup, loop until predicate in lambda
429                         // is satisfied.
430                         _can_push.wait(lk, [this] { return _queue.size() < _max_size || !_active; });
431                         rc = _push(value);
432                 }
433                 if (rc) {
434                         _can_pop.notify_one();
435                 }
436
437                 return rc;
438         }
439
440         bool pop(Data& value)
441         {
442                 bool rc;
443                 {
444                         std::unique_lock<std::mutex> lk(_mutex);
445                         rc = _pop(value);
446                 }
447                 if (rc) {
448                         _can_push.notify_one();
449                 }
450
451                 return rc;
452         }
453
454         bool wait_and_pop(Data& value)
455         {
456                 bool rc;
457                 {
458                         std::unique_lock<std::mutex> lk(_mutex);
459                         if (!_active) {
460                                 return false;
461                         }
462                         // in case of spurious wakeup, loop until predicate in lambda
463                         // is satisfied.
464                         _can_pop.wait(lk, [this] { return !_queue.empty() || !_active; });
465                         rc = _pop(value);
466                 }
467                 if (rc) {
468                         _can_push.notify_one();
469                 }
470
471                 return rc;
472         }
473
474 private:
475         bool _push(Data const& value)
476         {
477                 if (_queue.size() == _max_size || !_active) {
478                         return false;
479                 }
480                 _queue.push(value);
481
482                 return true;
483         }
484
485         bool _pop(Data& value)
486         {
487                 if (_queue.empty() || !_active) {
488                         return false;
489                 }
490                 value = _queue.front();
491                 _queue.pop();
492
493                 return true;
494         }
495
496         std::queue<Data> _queue;
497         mutable std::mutex _mutex;
498         std::condition_variable _can_pop;
499         std::condition_variable _can_push;
500         bool _active;
501         size_t _max_size;
502 };
503
504 struct BufferSrc
505 {
506         BufferSrc()
507                 : BufferSrc("")
508         {
509         }
510
511         explicit BufferSrc(const std::string& file)
512                 : file_(file)
513                 , clientFrameId_(0)
514                 , frameId_(0)
515                 , framePtr_(nullptr)
516         {
517         }
518
519         BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
520                 : file_("")
521                 , clientFrameId_(clientFrameId)
522                 , frameId_(frameId)
523                 , framePtr_(framePtr)
524         {
525         }
526
527         bool from_disk()
528         {
529                 return !file_.empty() && framePtr_ == nullptr;
530         }
531
532         size_t index() const
533         {
534                 return clientFrameId_;
535         }
536
537         std::string file_;
538         size_t clientFrameId_;
539         size_t frameId_;
540         uint8_t* framePtr_;
541 };
542
543 struct Messenger;
544 static void
545 outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch);
546 static void
547 inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch);
548 static void
549 processorThread(Messenger* messenger, std::function<void(std::string)> processor);
550
551 struct Messenger
552 {
553         explicit Messenger(MessengerInit init)
554                 : running(true)
555                 , initialized_(false)
556                 , shutdown_(false)
557                 , init_(init)
558                 , _outbound_synch(nullptr)
559                 , _inbound_synch(nullptr)
560                 , _uncompressed_buffer(nullptr)
561                 , _compressed_buffer(nullptr)
562                 , _uncompressed_fd(0)
563                 , _compressed_fd(0)
564         {
565         }
566
567         virtual ~Messenger()
568         {
569                 running = false;
570                 sendQueue.deactivate();
571                 receiveQueue.deactivate();
572
573                 if (_outbound_synch) {
574                         _outbound_synch->post(SYNCH_RECEIVE_READY);
575                         _outbound.join();
576                 }
577
578                 if (_inbound_synch) {
579                         _inbound_synch->post(SYNCH_SENT);
580                         _inbound.join();
581                 }
582
583                 for (auto& p : _processors) {
584                         p.join();
585                 }
586
587                 delete _outbound_synch;
588                 delete _inbound_synch;
589
590                 deinit_shm();
591         }
592
593         void start_threads()
594         {
595                 _outbound_synch =
596                     new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
597                 _outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, _outbound_synch);
598
599                 _inbound_synch =
600                     new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
601                 _inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, _inbound_synch);
602
603                 for (size_t i = 0; i < init_.numProcessingThreads_; ++i) {
604                         _processors.push_back(std::thread(processorThread, this, init_.processor_));
605                 }
606         }
607
608         size_t serialize(const std::string& dir, size_t clientFrameId, uint8_t* compressedPtr,
609                          size_t compressedLength)
610         {
611                 char fname[512];
612                 if (!compressedPtr || !compressedLength) {
613                         return 0;
614                 }
615                 sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
616                 auto fp = fopen(fname, "wb");
617                 if (!fp) {
618                         return 0;
619                 }
620                 size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
621                 if (written != compressedLength) {
622                         fclose(fp);
623                         return 0;
624                 }
625                 fflush(fp);
626                 fclose(fp);
627
628                 return written;
629         }
630
631         bool init_buffers()
632         {
633                 bool rc = true;
634                 if (init_.uncompressedFrameSize_) {
635                         rc = rc && SharedMemoryManager::init_shm(grokUncompressedBuf,
636                                                                  init_.uncompressedFrameSize_ * init_.numFrames_,
637                                                                  &_uncompressed_fd, &_uncompressed_buffer);
638                 }
639                 if (init_.compressedFrameSize_) {
640                         rc = rc && SharedMemoryManager::init_shm(grokCompressedBuf,
641                                                                  init_.compressedFrameSize_ * init_.numFrames_,
642                                                                  &_compressed_fd, &_compressed_buffer);
643                 }
644
645                 return rc;
646         }
647
648         bool deinit_shm()
649         {
650                 bool rc = SharedMemoryManager::deinit_shm(grokUncompressedBuf,
651                                                           init_.uncompressedFrameSize_ * init_.numFrames_,
652                                                           _uncompressed_fd, &_uncompressed_buffer);
653                 rc = rc && SharedMemoryManager::deinit_shm(grokCompressedBuf,
654                                                            init_.compressedFrameSize_ * init_.numFrames_,
655                                                            _compressed_fd, &_compressed_buffer);
656
657                 return rc;
658         }
659
660         template <typename... Args>
661         void send(const std::string& str, Args... args)
662         {
663                 std::ostringstream oss;
664                 oss << str;
665                 int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
666                 static_cast<void>(dummy);
667
668                 sendQueue.push(oss.str());
669         }
670
671         static pid_t get_pid_by_process_name(const char* name)
672         {
673                 char command[256];
674                 snprintf(command, sizeof(command), "pgrep %s", name);
675                 auto pgrep = popen(command, "r");
676                 if (!pgrep) {
677                         return -1;
678                 }
679                 pid_t pid;
680                 if (fscanf(pgrep, "%d", &pid) != 1) {
681                         pid = -1;
682                 }
683                 pclose(pgrep);
684
685                 return pid;
686         }
687
688         static bool terminate_process(const char* name)
689         {
690                 auto pid = get_pid_by_process_name(name);
691
692                 return (pid != -1 && kill(pid, SIGTERM) != -1);
693         }
694
695         static bool kill_process(const char* name)
696         {
697                 auto pid = get_pid_by_process_name(name);
698
699                 return (pid != -1 && kill(pid, SIGKILL) != -1);
700         }
701
702         void launch_grok(const std::string& dir, uint32_t width, uint32_t stride,
703                          uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
704                          int device, bool is4K, uint32_t fps, uint32_t bandwidth,
705                          const std::string server, uint32_t port,
706                          const std::string license)
707         {
708
709                 std::unique_lock<std::mutex> lk(shutdownMutex_);
710                 if (async_result_.valid()) {
711                         return;
712                 }
713                 if (MessengerInit::first_launch(true)) {
714                         init_.unlink();
715                 }
716                 start_threads();
717                 char _cmd[4096];
718                 auto fullServer = server + ":" + std::to_string(port);
719                 sprintf(_cmd,
720                         "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
721                         "-G %d -%s %d,%d -j %s -J %s",
722                         GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
723                         device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
724                         license.c_str(), fullServer.c_str());
725                 launch(_cmd, dir);
726         }
727
728         void init_client(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
729         {
730                 // client fills queue with pending uncompressed buffers
731                 init_.uncompressedFrameSize_ = uncompressedFrameSize;
732                 init_.compressedFrameSize_ = compressedFrameSize;
733                 init_.numFrames_ = numFrames;
734                 init_buffers();
735                 auto ptr = _uncompressed_buffer;
736                 for (size_t i = 0; i < init_.numFrames_; ++i) {
737                         availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
738                         ptr += init_.uncompressedFrameSize_;
739                 }
740
741                 std::unique_lock<std::mutex> lk(shutdownMutex_);
742                 initialized_ = true;
743                 clientInitializedCondition_.notify_all();
744         }
745
746         bool wait_for_client_init()
747         {
748                 if (initialized_) {
749                         return true;
750                 }
751
752                 std::unique_lock<std::mutex> lk(shutdownMutex_);
753                 if (initialized_) {
754                         return true;
755                 } else if (shutdown_) {
756                         return false;
757                 }
758                 clientInitializedCondition_.wait(lk, [this] { return initialized_ || shutdown_; });
759
760                 return initialized_ && !shutdown_;
761         }
762
763         static size_t uncompressed_frame_size(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
764         {
765                 return sizeof(uint16_t) * w * h * samplesPerPixel;
766         }
767
768         void reclaim_compressed(size_t frameId)
769         {
770                 availableBuffers_.push(BufferSrc(0, frameId, get_compressed_frame(frameId)));
771         }
772
773         void reclaim_uncompressed(size_t frameId)
774         {
775                 availableBuffers_.push(BufferSrc(0, frameId, get_uncompressed_frame(frameId)));
776         }
777
778         uint8_t* get_uncompressed_frame(size_t frameId)
779         {
780                 assert(frameId < init_.numFrames_);
781                 if (frameId >= init_.numFrames_) {
782                         return nullptr;
783                 }
784
785                 return (uint8_t*)(_uncompressed_buffer + frameId * init_.uncompressedFrameSize_);
786         }
787
788         uint8_t* get_compressed_frame(size_t frameId)
789         {
790                 assert(frameId < init_.numFrames_);
791                 if (frameId >= init_.numFrames_) {
792                         return nullptr;
793                 }
794
795                 return (uint8_t*)(_compressed_buffer + frameId * init_.compressedFrameSize_);
796         }
797
798         std::atomic_bool running;
799         bool initialized_;
800         bool shutdown_;
801         MessengerBlockingQueue<std::string> sendQueue;
802         MessengerBlockingQueue<std::string> receiveQueue;
803         MessengerBlockingQueue<BufferSrc> availableBuffers_;
804         MessengerInit init_;
805         std::string cmd_;
806         std::future<int> async_result_;
807         std::mutex shutdownMutex_;
808         std::condition_variable shutdownCondition_;
809
810 protected:
811         std::condition_variable clientInitializedCondition_;
812
813 private:
814         void launch(const std::string& cmd, const std::string& dir)
815         {
816                 // Change the working directory
817                 if (!dir.empty()) {
818                         if (chdir(dir.c_str()) != 0) {
819                                 getMessengerLogger()->error("Error: failed to change the working directory");
820                                 return;
821                         }
822                 }
823                 // Execute the command using std::async and std::system
824                 cmd_ = cmd;
825                 async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
826         }
827
828         std::thread _outbound;
829         Synch* _outbound_synch;
830
831         std::thread _inbound;
832         Synch* _inbound_synch;
833
834         std::vector<std::thread> _processors;
835         char* _uncompressed_buffer;
836         char* _compressed_buffer;
837
838         grk_handle _uncompressed_fd;
839         grk_handle _compressed_fd;
840 };
841
842 static void
843 outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch)
844 {
845         grk_handle shm_fd = 0;
846         char* send_buffer = nullptr;
847
848         if (!SharedMemoryManager::init_shm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) {
849                 return;
850         }
851         while (messenger->running) {
852                 synch->wait(SYNCH_RECEIVE_READY);
853                 if (!messenger->running) {
854                         break;
855                 }
856                 std::string message;
857                 if (!messenger->sendQueue.wait_and_pop(message)) {
858                         break;
859                 }
860                 if (!messenger->running) {
861                         break;
862                 }
863                 memcpy(send_buffer, message.c_str(), message.size() + 1);
864                 synch->post(SYNCH_SENT);
865         }
866         SharedMemoryManager::deinit_shm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
867 }
868
869 static void
870 inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch)
871 {
872         grk_handle shm_fd = 0;
873         char* receive_buffer = nullptr;
874
875         if (!SharedMemoryManager::init_shm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) {
876                 return;
877         }
878         while (messenger->running) {
879                 synch->wait(SYNCH_SENT);
880                 if (!messenger->running) {
881                         break;
882                 }
883                 auto message = std::string(receive_buffer);
884                 synch->post(SYNCH_RECEIVE_READY);
885                 messenger->receiveQueue.push(message);
886         }
887         SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
888 }
889
890 struct Msg
891 {
892         explicit Msg(const std::string& msg)
893                 : ct_(0)
894         {
895                 std::stringstream ss(msg);
896                 while (ss.good()) {
897                         std::string substr;
898                         std::getline(ss, substr, ',');
899                         cs_.push_back(substr);
900                 }
901         }
902
903         std::string next()
904         {
905                 if (ct_ == cs_.size()) {
906                         getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
907                         return "";
908                 }
909                 return cs_[ct_++];
910         }
911
912         uint32_t next_uint()
913         {
914                 return (uint32_t)std::stoi(next());
915         }
916
917         std::vector<std::string> cs_;
918         size_t ct_;
919 };
920
921 static void
922 processorThread(Messenger* messenger, std::function<void(std::string)> processor)
923 {
924         while (messenger->running) {
925                 std::string message;
926                 if (!messenger->receiveQueue.wait_and_pop(message)) {
927                         break;
928                 }
929                 if (!messenger->running) {
930                         break;
931                 }
932                 Msg msg(message);
933                 auto tag = msg.next();
934                 if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) {
935                         auto width = msg.next_uint();
936                         auto stride = msg.next_uint();
937                         (void)stride;
938                         auto height = msg.next_uint();
939                         auto samplesPerPixel = msg.next_uint();
940                         auto depth = msg.next_uint();
941                         (void)depth;
942                         messenger->init_.uncompressedFrameSize_ =
943                             Messenger::uncompressed_frame_size(width, height, samplesPerPixel);
944                         auto compressedFrameSize = msg.next_uint();
945                         auto numFrames = msg.next_uint();
946                         messenger->init_client(compressedFrameSize, compressedFrameSize, numFrames);
947                 } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) {
948                         messenger->reclaim_uncompressed(msg.next_uint());
949                 } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) {
950                         messenger->reclaim_compressed(msg.next_uint());
951                 }
952                 processor(message);
953         }
954 }
955
956 template <typename F>
957 struct ScheduledFrames
958 {
959         void store(F& val)
960         {
961                 std::unique_lock<std::mutex> lk(_map_mutex);
962                 auto it = _map.find(val.index());
963                 if (it == _map.end()) {
964                         _map[val.index()] = val;
965                 }
966         }
967
968         F retrieve(size_t index, bool& success)
969         {
970                 std::unique_lock<std::mutex> lk(_map_mutex);
971                 success = false;
972                 auto it = _map.find(index);
973                 if (it == _map.end()) {
974                         return F();
975                 }
976
977                 success = true;
978                 F val = it->second;
979                 _map.erase(index);
980
981                 return val;
982         }
983
984 private:
985         std::mutex _map_mutex;
986         std::map<size_t, F> _map;
987 };
988
989 template <typename F>
990 struct ScheduledMessenger : public Messenger
991 {
992         explicit ScheduledMessenger(MessengerInit init)
993                 : Messenger(init)
994                 , _frames_scheduled(0)
995                 , _frames_compressed(0)
996         {
997         }
998
999         ~ScheduledMessenger()
1000         {
1001                 shutdown();
1002         }
1003
1004         bool schedule_compress(F proxy, std::function<void(BufferSrc)> converter)
1005         {
1006                 size_t frameSize = init_.uncompressedFrameSize_;
1007                 assert(frameSize >= init_.uncompressedFrameSize_);
1008                 BufferSrc src;
1009                 if (!availableBuffers_.wait_and_pop(src)) {
1010                         return false;
1011                 }
1012                 converter(src);
1013                 _scheduled_frames.store(proxy);
1014                 _frames_scheduled++;
1015                 send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
1016
1017                 return true;
1018         }
1019
1020         void process_compressed(const std::string& message, std::function<void(F, uint8_t*, uint32_t)> processor, bool needsRecompression)
1021         {
1022                 Msg msg(message);
1023                 msg.next();
1024                 auto clientFrameId = msg.next_uint();
1025                 auto compressedFrameId = msg.next_uint();
1026                 auto compressedFrameLength = msg.next_uint();
1027                 if (!needsRecompression) {
1028                         bool success = false;
1029                         auto srcFrame = _scheduled_frames.retrieve(clientFrameId, success);
1030                         if (!success) {
1031                                 return;
1032                         }
1033                         processor(srcFrame, get_compressed_frame(compressedFrameId), compressedFrameLength);
1034                 }
1035                 ++_frames_compressed;
1036                 send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
1037                 if (shutdown_ && _frames_compressed == _frames_scheduled) {
1038                         shutdownCondition_.notify_all();
1039                 }
1040         }
1041
1042         void shutdown()
1043         {
1044                 try {
1045                         std::unique_lock<std::mutex> lk(shutdownMutex_);
1046                         if (!async_result_.valid()) {
1047                                 return;
1048                         }
1049                         shutdown_ = true;
1050                         if (_frames_scheduled) {
1051                                 uint32_t scheduled = _frames_scheduled;
1052                                 send(GRK_MSGR_BATCH_FLUSH, scheduled);
1053                                 shutdownCondition_.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
1054                         }
1055                         availableBuffers_.deactivate();
1056                         send(GRK_MSGR_BATCH_SHUTDOWN);
1057                         int result = async_result_.get();
1058                         if (result != 0) {
1059                                 getMessengerLogger()->error("Accelerator failed with return code: %d\n", result);
1060                         }
1061                 } catch (std::exception& ex) {
1062                         getMessengerLogger()->error("%s", ex.what());
1063                 }
1064         }
1065
1066         F retrieve(size_t index, bool& success)
1067         {
1068                 return _scheduled_frames.retrieve(index, success);
1069         }
1070
1071         void store(F& val)
1072         {
1073                 _scheduled_frames.store(val);
1074         }
1075
1076 private:
1077         ScheduledFrames<F> _scheduled_frames;
1078         std::atomic<uint32_t> _frames_scheduled;
1079         std::atomic<uint32_t> _frames_compressed;
1080 };
1081
1082 } // namespace grk_plugin