bb7ada213dbf3219015a35679a768d6669a48dbd
[dcpomatic.git] / src / lib / grok / messenger.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20 #pragma once
21
22 #include <iostream>
23 #include <string>
24 #include <cstring>
25 #include <atomic>
26 #include <functional>
27 #include <sstream>
28 #include <future>
29 #include <map>
30 #include <thread>
31 #include <mutex>
32 #include <condition_variable>
33 #include <queue>
34 #include <cassert>
35 #include <cstdarg>
36
37 #ifdef _WIN32
38 #include <windows.h>
39 #include <direct.h>
40 #include <tlhelp32.h>
41 #pragma warning(disable : 4100)
42 #else
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <sys/mman.h>
46 #include <semaphore.h>
47 #include <signal.h>
48 #endif
49
50 namespace grk_plugin
51 {
52 static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
53 static std::string grokSentSynch = "Global\\grok_sent";
54 static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
55 static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
56 static std::string clientSentSynch = "Global\\client_sent";
57 static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
58 static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
59 static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
60 static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
61 static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
62 static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
63 static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
64         "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
65 static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
66 static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
67         "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
68 static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
69 static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
70 static const size_t messageBufferLen = 256;
71 struct IMessengerLogger
72 {
73         virtual ~IMessengerLogger(void) = default;
74         virtual void info(const char* fmt, ...) = 0;
75         virtual void warn(const char* fmt, ...) = 0;
76         virtual void error(const char* fmt, ...) = 0;
77
78   protected:
79         template<typename... Args>
80         std::string log_message(char const* const format, Args&... args) noexcept
81         {
82                 constexpr size_t message_size = 512;
83                 char message[message_size];
84
85                 std::snprintf(message, message_size, format, args...);
86                 return std::string(message);
87         }
88 };
89 struct MessengerLogger : public IMessengerLogger
90 {
91         explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
92         virtual ~MessengerLogger() = default;
93         virtual void info(const char* fmt, ...) override
94         {
95                 va_list args;
96                 std::string new_fmt = preamble_ + fmt + "\n";
97                 va_start(args, fmt);
98                 vfprintf(stdout, new_fmt.c_str(), args);
99                 va_end(args);
100         }
101         virtual void warn(const char* fmt, ...) override
102         {
103                 va_list args;
104                 std::string new_fmt = preamble_ + fmt + "\n";
105                 va_start(args, fmt);
106                 vfprintf(stdout, new_fmt.c_str(), args);
107                 va_end(args);
108         }
109         virtual void error(const char* fmt, ...) override
110         {
111                 va_list args;
112                 std::string new_fmt = preamble_ + fmt + "\n";
113                 va_start(args, fmt);
114                 vfprintf(stderr, new_fmt.c_str(), args);
115                 va_end(args);
116         }
117
118   protected:
119         std::string preamble_;
120 };
121
122 static IMessengerLogger* sLogger = nullptr;
123 #if defined(__GNUC__) || defined(__clang__)
124 #pragma GCC diagnostic push
125 #pragma GCC diagnostic ignored "-Wunused-function"
126 #endif
127 static void setMessengerLogger(IMessengerLogger* logger)
128 {
129         delete sLogger;
130         sLogger = logger;
131 }
132 #if defined(__GNUC__) || defined(__clang__)
133 #pragma GCC diagnostic pop
134 #endif
135 static IMessengerLogger* getMessengerLogger(void)
136 {
137         return sLogger;
138 }
139 struct MessengerInit
140 {
141         MessengerInit(const std::string &outBuf, const std::string &outSent,
142                                   const std::string &outReceiveReady, const std::string &inBuf,
143                                   const std::string &inSent,
144                                   const std::string &inReceiveReady,
145                                   std::function<void(std::string)> processor,
146                                   size_t numProcessingThreads)
147                 : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
148                   outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
149                   inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
150                   numProcessingThreads_(numProcessingThreads),
151                   uncompressedFrameSize_(0), compressedFrameSize_(0),
152                   numFrames_(0)
153         {
154                 if(firstLaunch(true))
155                         unlink();
156         }
157         void unlink(void)
158         {
159 #ifndef _WIN32
160                 shm_unlink(grokToClientMessageBuf.c_str());
161                 shm_unlink(clientToGrokMessageBuf.c_str());
162 #endif
163         }
164         static bool firstLaunch(bool isClient)
165         {
166                 bool debugGrok = false;
167                 return debugGrok != isClient;
168         }
169         std::string outboundMessageBuf;
170         std::string outboundSentSynch;
171         std::string outboundReceiveReadySynch;
172
173         std::string inboundMessageBuf;
174         std::string inboundSentSynch;
175         std::string inboundReceiveReadySynch;
176
177         std::function<void(std::string)> processor_;
178         size_t numProcessingThreads_;
179
180         size_t uncompressedFrameSize_;
181         size_t compressedFrameSize_;
182         size_t numFrames_;
183 };
184
185 /*************************** Synchronization *******************************/
186 enum SynchDirection
187 {
188         SYNCH_SENT,
189         SYNCH_RECEIVE_READY
190 };
191
192 typedef int grk_handle;
193 struct Synch
194 {
195         Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
196                 : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
197         {
198                 // unlink semaphores in case of previous crash
199                 if(MessengerInit::firstLaunch(true))
200                         unlink();
201                 open();
202         }
203         ~Synch()
204         {
205                 close();
206                 if(MessengerInit::firstLaunch(true))
207                         unlink();
208         }
209         void post(SynchDirection dir)
210         {
211                 auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
212                 int rc = sem_post(sem);
213                 if(rc)
214                         getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
215         }
216         void wait(SynchDirection dir)
217         {
218                 auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
219                 int rc = sem_wait(sem);
220                 if(rc)
221                         getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
222         }
223         void open(void)
224         {
225                 sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
226                 if(!sentSem_)
227                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
228                 receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
229                 if(!receiveReadySem_)
230                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
231         }
232         void close(void)
233         {
234                 int rc = sem_close(sentSem_);
235                 if(rc)
236                         getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
237                                                                                 strerror(errno));
238                 rc = sem_close(receiveReadySem_);
239                 if(rc)
240                         getMessengerLogger()->error("Error closing semaphore %s: %s",
241                                                                                 receiveReadySemName_.c_str(), strerror(errno));
242         }
243         void unlink(void)
244         {
245                 int rc = sem_unlink(sentSemName_.c_str());
246                 if(rc == -1 && errno != ENOENT)
247                         getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
248                                                                                 strerror(errno));
249                 rc = sem_unlink(receiveReadySemName_.c_str());
250                 if(rc == -1 && errno != ENOENT)
251                         getMessengerLogger()->error("Error unlinking semaphore %s: %s",
252                                                                                 receiveReadySemName_.c_str(), strerror(errno));
253         }
254         sem_t* sentSem_;
255         sem_t* receiveReadySem_;
256
257   private:
258         std::string sentSemName_;
259         std::string receiveReadySemName_;
260 };
261 struct SharedMemoryManager
262 {
263         static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
264         {
265                 *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
266                 if(*shm_fd < 0)
267                 {
268                         getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
269                         return false;
270                 }
271                 int rc = ftruncate(*shm_fd, sizeof(char) * len);
272                 if(rc)
273                 {
274                         getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
275                         rc = close(*shm_fd);
276                         if(rc)
277                                 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
278                         rc = shm_unlink(name.c_str());
279                         // 2 == No such file or directory
280                         if(rc && errno != 2)
281                                 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
282                         return false;
283                 }
284                 *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
285                 if(!*buffer)
286                 {
287                         getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
288                         rc = close(*shm_fd);
289                         if(rc)
290                                 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
291                         rc = shm_unlink(name.c_str());
292                         // 2 == No such file or directory
293                         if(rc && errno != 2)
294                                 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
295                 }
296
297                 return *buffer != nullptr;
298         }
299         static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
300         {
301                 if (!*buffer || !shm_fd)
302                         return true;
303
304                 int rc = munmap(*buffer, len);
305                 *buffer = nullptr;
306                 if(rc)
307                         getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
308                 rc = close(shm_fd);
309                 shm_fd = 0;
310                 if(rc)
311                         getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
312                 rc = shm_unlink(name.c_str());
313                 // 2 == No such file or directory
314                 if(rc && errno != 2)
315                         fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
316
317                 return true;
318         }
319 };
320
321 template<typename Data>
322 class MessengerBlockingQueue
323 {
324   public:
325         explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
326         MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
327         size_t size() const
328         {
329                 return queue_.size();
330         }
331         // deactivate and clear queue
332         void deactivate()
333         {
334                 {
335                         std::lock_guard<std::mutex> lk(mutex_);
336                         active_ = false;
337                         while(!queue_.empty())
338                                 queue_.pop();
339                 }
340
341                 // release all waiting threads
342                 can_pop_.notify_all();
343                 can_push_.notify_all();
344         }
345         void activate()
346         {
347                 std::lock_guard<std::mutex> lk(mutex_);
348                 active_ = true;
349         }
350         bool push(Data const& value)
351         {
352                 bool rc;
353                 {
354                         std::unique_lock<std::mutex> lk(mutex_);
355                         rc = push_(value);
356                 }
357                 if(rc)
358                         can_pop_.notify_one();
359
360                 return rc;
361         }
362         bool waitAndPush(Data& value)
363         {
364                 bool rc;
365                 {
366                         std::unique_lock<std::mutex> lk(mutex_);
367                         if(!active_)
368                                 return false;
369                         // in case of spurious wakeup, loop until predicate in lambda
370                         // is satisfied.
371                         can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
372                         rc = push_(value);
373                 }
374                 if(rc)
375                         can_pop_.notify_one();
376
377                 return rc;
378         }
379         bool pop(Data& value)
380         {
381                 bool rc;
382                 {
383                         std::unique_lock<std::mutex> lk(mutex_);
384                         rc = pop_(value);
385                 }
386                 if(rc)
387                         can_push_.notify_one();
388
389                 return rc;
390         }
391         bool waitAndPop(Data& value)
392         {
393                 bool rc;
394                 {
395                         std::unique_lock<std::mutex> lk(mutex_);
396                         if(!active_)
397                                 return false;
398                         // in case of spurious wakeup, loop until predicate in lambda
399                         // is satisfied.
400                         can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
401                         rc = pop_(value);
402                 }
403                 if(rc)
404                         can_push_.notify_one();
405
406                 return rc;
407         }
408
409   private:
410         bool push_(Data const& value)
411         {
412                 if(queue_.size() == max_size_ || !active_)
413                         return false;
414                 queue_.push(value);
415
416                 return true;
417         }
418         bool pop_(Data& value)
419         {
420                 if(queue_.empty() || !active_)
421                         return false;
422                 value = queue_.front();
423                 queue_.pop();
424
425                 return true;
426         }
427         std::queue<Data> queue_;
428         mutable std::mutex mutex_;
429         std::condition_variable can_pop_;
430         std::condition_variable can_push_;
431         bool active_;
432         size_t max_size_;
433 };
434 struct BufferSrc
435 {
436         BufferSrc(void) : BufferSrc("") {}
437         explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
438         {}
439         BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
440                 : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
441         {}
442         bool fromDisk(void)
443         {
444                 return !file_.empty() && framePtr_ == nullptr;
445         }
446         size_t index() const
447         {
448                 return clientFrameId_;
449         }
450         std::string file_;
451         size_t clientFrameId_;
452         size_t frameId_;
453         uint8_t* framePtr_;
454 };
455
456 struct Messenger;
457 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
458 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
459 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
460
461 struct Messenger
462 {
463         explicit Messenger(MessengerInit init)
464                 : running(true), _initialized(false), _shutdown(false), init_(init),
465                   outboundSynch_(nullptr),
466                   inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
467                   uncompressed_fd_(0), compressed_fd_(0)
468         {}
469         virtual ~Messenger(void)
470         {
471                 running = false;
472                 sendQueue.deactivate();
473                 receiveQueue.deactivate();
474
475                 if (outboundSynch_) {
476                         outboundSynch_->post(SYNCH_RECEIVE_READY);
477                         outbound.join();
478                 }
479
480                 if (inboundSynch_) {
481                         inboundSynch_->post(SYNCH_SENT);
482                         inbound.join();
483                 }
484
485                 for(auto& p : processors_)
486                         p.join();
487
488                 delete outboundSynch_;
489                 delete inboundSynch_;
490
491                 deinitShm();
492         }
493         void startThreads(void) {
494                 outboundSynch_ =
495                         new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
496                 outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
497
498                 inboundSynch_ =
499                         new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
500                 inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
501
502                 for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
503                         processors_.push_back(std::thread(processorThread, this, init_.processor_));
504         }
505         bool initBuffers(void)
506         {
507                 bool rc = true;
508                 if(init_.uncompressedFrameSize_)
509                 {
510                         rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
511                                                                                                         init_.uncompressedFrameSize_ * init_.numFrames_,
512                                                                                                         &uncompressed_fd_, &uncompressed_buffer_);
513                 }
514                 if(init_.compressedFrameSize_)
515                 {
516                         rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
517                                                                                                         init_.compressedFrameSize_ * init_.numFrames_,
518                                                                                                         &compressed_fd_, &compressed_buffer_);
519                 }
520
521                 return rc;
522         }
523
524         bool deinitShm(void)
525         {
526                 bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
527                                                                                                  init_.uncompressedFrameSize_ * init_.numFrames_,
528                                                                                                  uncompressed_fd_, &uncompressed_buffer_);
529                 rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
530                                                                                                   init_.compressedFrameSize_ * init_.numFrames_,
531                                                                                                   compressed_fd_, &compressed_buffer_);
532
533                 return rc;
534         }
535         template<typename... Args>
536         void send(const std::string& str, Args... args)
537         {
538                 std::ostringstream oss;
539                 oss << str;
540                 int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
541                 static_cast<void>(dummy);
542
543                 sendQueue.push(oss.str());
544         }
545
546         void launchGrok(
547                 boost::filesystem::path const& dir,
548                 uint32_t width,
549                 uint32_t stride,
550                 uint32_t height,
551                 uint32_t samplesPerPixel,
552                 uint32_t depth,
553                 int device,
554                 bool is4K,
555                 uint32_t fps,
556                 uint32_t bandwidth,
557                 const std::string server,
558                 uint32_t port,
559                 const std::string license
560                 )
561         {
562
563                 std::unique_lock<std::mutex> lk(shutdownMutex_);
564                 if (async_result_.valid())
565                         return;
566                 if(MessengerInit::firstLaunch(true))
567                         init_.unlink();
568                 startThreads();
569                 char _cmd[4096];
570                 auto fullServer = server + ":" + std::to_string(port);
571                 sprintf(_cmd,
572                                 "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -k 1 "
573                                 "-G %d -%s %d,%d -j %s -J %s",
574                                 GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
575                                 device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
576                                 license.c_str(), fullServer.c_str());
577                 launch(_cmd, dir);
578         }
579         void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
580         {
581                 // client fills queue with pending uncompressed buffers
582                 init_.uncompressedFrameSize_ = uncompressedFrameSize;
583                 init_.compressedFrameSize_ = compressedFrameSize;
584                 init_.numFrames_ = numFrames;
585                 initBuffers();
586                 auto ptr = uncompressed_buffer_;
587                 for(size_t i = 0; i < init_.numFrames_; ++i)
588                 {
589                         availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
590                         ptr += init_.uncompressedFrameSize_;
591                 }
592
593                 std::unique_lock<std::mutex> lk(shutdownMutex_);
594                 _initialized = true;
595                 clientInitializedCondition_.notify_all();
596         }
597
598         bool waitForClientInit()
599         {
600                 if (_initialized) {
601                         return true;
602                 }
603
604                 std::unique_lock<std::mutex> lk(shutdownMutex_);
605
606                 if (_initialized) {
607                         return true;
608                 } else if (_shutdown) {
609                         return false;
610                 }
611
612                 clientInitializedCondition_.wait(lk, [this] { return _initialized || _shutdown; });
613
614                 return _initialized && !_shutdown;
615         }
616
617         static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
618         {
619                 return sizeof(uint16_t) * w * h * samplesPerPixel;
620         }
621         void reclaimCompressed(size_t frameId)
622         {
623                 availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
624         }
625         void reclaimUncompressed(size_t frameId)
626         {
627                 availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
628         }
629         uint8_t* getUncompressedFrame(size_t frameId)
630         {
631                 assert(frameId < init_.numFrames_);
632                 if(frameId >= init_.numFrames_)
633                         return nullptr;
634
635                 return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
636         }
637         uint8_t* getCompressedFrame(size_t frameId)
638         {
639                 assert(frameId < init_.numFrames_);
640                 if(frameId >= init_.numFrames_)
641                         return nullptr;
642
643                 return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
644         }
645         std::atomic_bool running;
646         bool _initialized;
647         bool _shutdown;
648         MessengerBlockingQueue<std::string> sendQueue;
649         MessengerBlockingQueue<std::string> receiveQueue;
650         MessengerBlockingQueue<BufferSrc> availableBuffers_;
651         MessengerInit init_;
652         std::string cmd_;
653         std::future<int> async_result_;
654         std::mutex shutdownMutex_;
655         std::condition_variable shutdownCondition_;
656
657   protected:
658         std::condition_variable clientInitializedCondition_;
659   private:
660         void launch(std::string const& cmd, boost::filesystem::path const& dir)
661         {
662                 // Change the working directory
663                 if(!dir.empty())
664                 {
665                         boost::system::error_code ec;
666                         boost::filesystem::current_path(dir, ec);
667                         if (ec) {
668                                 getMessengerLogger()->error("Error: failed to change the working directory");
669                                 return;
670                         }
671                 }
672                 // Execute the command using std::async and std::system
673                 cmd_ = cmd;
674                 async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
675         }
676         std::thread outbound;
677         Synch* outboundSynch_;
678
679         std::thread inbound;
680         Synch* inboundSynch_;
681
682         std::vector<std::thread> processors_;
683         char* uncompressed_buffer_;
684         char* compressed_buffer_;
685
686         grk_handle uncompressed_fd_;
687         grk_handle compressed_fd_;
688 };
689
690 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
691 {
692         grk_handle shm_fd = 0;
693         char* send_buffer = nullptr;
694
695         if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
696                 return;
697         while(messenger->running)
698         {
699                 synch->wait(SYNCH_RECEIVE_READY);
700                 if(!messenger->running)
701                         break;
702                 std::string message;
703                 if(!messenger->sendQueue.waitAndPop(message))
704                         break;
705                 if(!messenger->running)
706                         break;
707                 memcpy(send_buffer, message.c_str(), message.size() + 1);
708                 synch->post(SYNCH_SENT);
709         }
710         SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
711 }
712
713 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
714 {
715         grk_handle shm_fd = 0;
716         char* receive_buffer = nullptr;
717
718         if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
719                 return;
720         while(messenger->running)
721         {
722                 synch->wait(SYNCH_SENT);
723                 if(!messenger->running)
724                         break;
725                 auto message = std::string(receive_buffer);
726                 synch->post(SYNCH_RECEIVE_READY);
727                 messenger->receiveQueue.push(message);
728         }
729         SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
730 }
731 struct Msg
732 {
733         explicit Msg(const std::string &msg) : ct_(0)
734         {
735                 std::stringstream ss(msg);
736                 while(ss.good())
737                 {
738                         std::string substr;
739                         std::getline(ss, substr, ',');
740                         cs_.push_back(substr);
741                 }
742         }
743         std::string next()
744         {
745                 if(ct_ == cs_.size())
746                 {
747                         getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
748                         return "";
749                 }
750                 return cs_[ct_++];
751         }
752
753         uint32_t nextUint(void)
754         {
755                 return (uint32_t)std::stoi(next());
756         }
757
758         std::vector<std::string> cs_;
759         size_t ct_;
760 };
761
762 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
763 {
764         while (messenger->running) {
765                 std::string message;
766                 if (!messenger->receiveQueue.waitAndPop(message)) {
767                         break;
768                 }
769
770                 if (!messenger->running) {
771                         break;
772                 }
773
774                 Msg msg(message);
775                 auto tag = msg.next();
776                 if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) {
777                         auto width = msg.nextUint();
778                         msg.nextUint(); // stride
779                         auto height = msg.nextUint();
780                         auto samples_per_pixel = msg.nextUint();
781                         msg.nextUint(); // depth
782                         messenger->init_.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel);
783                         auto compressed_frame_size = msg.nextUint();
784                         auto num_frames = msg.nextUint();
785                         messenger->initClient(compressed_frame_size, compressed_frame_size, num_frames);
786                 } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) {
787                         messenger->reclaimUncompressed(msg.nextUint());
788                 } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) {
789                         messenger->reclaimCompressed(msg.nextUint());
790                 }
791                 processor(message);
792         }
793 }
794
795 template<typename F>
796 struct ScheduledFrames
797 {
798         void store(F const& val)
799         {
800                 std::unique_lock<std::mutex> lk(mapMutex_);
801                 auto it = map_.find(val.index());
802                 if (it == map_.end())
803                         map_.emplace(std::make_pair(val.index(), val));
804         }
805         boost::optional<F> retrieve(size_t index)
806         {
807                 std::unique_lock<std::mutex> lk(mapMutex_);
808                 auto it = map_.find(index);
809                 if(it == map_.end())
810                         return {};
811
812                 F val = it->second;
813                 map_.erase(index);
814
815                 return val;
816         }
817
818  private:
819         std::mutex mapMutex_;
820         std::map<size_t, F> map_;
821 };
822
823 template<typename F>
824 struct ScheduledMessenger : public Messenger
825 {
826         explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
827                                                                                         framesScheduled_(0),
828                                                                                         framesCompressed_(0)
829         {}
830         ~ScheduledMessenger(void) {
831                 shutdown();
832         }
833         bool scheduleCompress(F const& proxy, std::function<void(BufferSrc const&)> converter){
834                 size_t frameSize = init_.uncompressedFrameSize_;
835                 assert(frameSize >= init_.uncompressedFrameSize_);
836                 BufferSrc src;
837                 if(!availableBuffers_.waitAndPop(src))
838                         return false;
839                 converter(src);
840                 scheduledFrames_.store(proxy);
841                 framesScheduled_++;
842                 send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
843
844                 return true;
845         }
846         void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
847                 Msg msg(message);
848                 msg.next();
849                 auto clientFrameId = msg.nextUint();
850                 auto compressedFrameId = msg.nextUint();
851                 auto compressedFrameLength = msg.nextUint();
852                 if (!needsRecompression) {
853                         auto src_frame = scheduledFrames_.retrieve(clientFrameId);
854                         if (!src_frame) {
855                                 return;
856                         }
857                         processor(*src_frame, getCompressedFrame(compressedFrameId),compressedFrameLength);
858                 }
859                 ++framesCompressed_;
860                 send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
861                 if (_shutdown && framesCompressed_ == framesScheduled_)
862                         shutdownCondition_.notify_all();
863         }
864         void shutdown(void){
865                 try {
866                         std::unique_lock<std::mutex> lk(shutdownMutex_);
867                         if (!async_result_.valid())
868                                 return;
869                         _shutdown = true;
870                         if (framesScheduled_) {
871                                 uint32_t scheduled = framesScheduled_;
872                                 send(GRK_MSGR_BATCH_FLUSH, scheduled);
873                                 shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
874                         }
875                         availableBuffers_.deactivate();
876                         send(GRK_MSGR_BATCH_SHUTDOWN);
877                         int result = async_result_.get();
878                         if(result != 0)
879                                 getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
880                 } catch (std::exception &ex) {
881                         getMessengerLogger()->error("%s",ex.what());
882                 }
883
884         }
885
886         boost::optional<F> retrieve(size_t index) {
887                 return scheduledFrames_.retrieve(index);
888         }
889
890         void store(F& val) {
891                 scheduledFrames_.store(val);
892         }
893
894 private:
895         ScheduledFrames<F> scheduledFrames_;
896         std::atomic<uint32_t> framesScheduled_;
897         std::atomic<uint32_t> framesCompressed_;
898 };
899
900 } // namespace grk_plugin