2 Copyright (C) 2023 Grok Image Compression Inc.
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
24 #include <condition_variable>
41 #pragma warning(disable : 4100)
44 #include <semaphore.h>
50 namespace grk_plugin {
51 static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
52 static std::string grokSentSynch = "Global\\grok_sent";
53 static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
54 static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
55 static std::string clientSentSynch = "Global\\client_sent";
56 static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
57 static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
58 static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
59 static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
60 static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
61 static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
62 static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
63 "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
64 static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
65 static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
66 "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
67 static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
68 static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
69 static const size_t messageBufferLen = 256;
71 struct IMessengerLogger
73 virtual ~IMessengerLogger() = default;
74 virtual void info(const char* fmt, ...) = 0;
75 virtual void warn(const char* fmt, ...) = 0;
76 virtual void error(const char* fmt, ...) = 0;
79 template <typename... Args>
80 std::string log_message(char const* const format, Args&... args) noexcept
82 constexpr size_t message_size = 512;
83 char message[message_size];
85 std::snprintf(message, message_size, format, args...);
86 return std::string(message);
90 struct MessengerLogger : public IMessengerLogger
92 explicit MessengerLogger(const std::string& preamble)
97 virtual ~MessengerLogger() = default;
99 virtual void info(const char* fmt, ...) override
102 std::string new_fmt = preamble_ + fmt + "\n";
104 vfprintf(stdout, new_fmt.c_str(), args);
108 virtual void warn(const char* fmt, ...) override
111 std::string new_fmt = preamble_ + fmt + "\n";
113 vfprintf(stdout, new_fmt.c_str(), args);
117 virtual void error(const char* fmt, ...) override
120 std::string new_fmt = preamble_ + fmt + "\n";
122 vfprintf(stderr, new_fmt.c_str(), args);
127 std::string preamble_;
130 static IMessengerLogger* sLogger = nullptr;
131 #if defined(__GNUC__) || defined(__clang__)
132 #pragma GCC diagnostic push
133 #pragma GCC diagnostic ignored "-Wunused-function"
136 setMessengerLogger(IMessengerLogger* logger)
141 #if defined(__GNUC__) || defined(__clang__)
142 #pragma GCC diagnostic pop
144 static IMessengerLogger*
152 MessengerInit(const std::string& outBuf, const std::string& outSent,
153 const std::string& outReceiveReady, const std::string& inBuf,
154 const std::string& inSent,
155 const std::string& inReceiveReady,
156 std::function<void(std::string)> processor,
157 size_t numProcessingThreads)
158 : outboundMessageBuf(outBuf)
159 , outboundSentSynch(outSent)
160 , outboundReceiveReadySynch(outReceiveReady)
161 , inboundMessageBuf(inBuf)
162 , inboundSentSynch(inSent)
163 , inboundReceiveReadySynch(inReceiveReady)
164 , processor_(processor)
165 , numProcessingThreads_(numProcessingThreads)
166 , uncompressedFrameSize_(0)
167 , compressedFrameSize_(0)
170 if (first_launch(true)) {
178 shm_unlink(grokToClientMessageBuf.c_str());
179 shm_unlink(clientToGrokMessageBuf.c_str());
183 static bool first_launch(bool isClient)
185 bool debugGrok = false;
186 return debugGrok != isClient;
189 std::string outboundMessageBuf;
190 std::string outboundSentSynch;
191 std::string outboundReceiveReadySynch;
193 std::string inboundMessageBuf;
194 std::string inboundSentSynch;
195 std::string inboundReceiveReadySynch;
197 std::function<void(std::string)> processor_;
198 size_t numProcessingThreads_;
200 size_t uncompressedFrameSize_;
201 size_t compressedFrameSize_;
205 /*************************** Synchronization *******************************/
212 typedef int grk_handle;
216 Synch(const std::string& sentSemName, const std::string& receiveReadySemName)
217 : _sent_sem_name(sentSemName)
218 , _receive_ready_sem_name(receiveReadySemName)
220 // unlink semaphores in case of previous crash
221 if (MessengerInit::first_launch(true)) {
230 if (MessengerInit::first_launch(true)) {
235 void post(SynchDirection dir)
237 auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
238 int rc = sem_post(sem);
240 getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
244 void wait(SynchDirection dir)
246 auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
247 int rc = sem_wait(sem);
249 getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
255 sentSem_ = sem_open(_sent_sem_name.c_str(), O_CREAT, 0666, 0);
257 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
259 receiveReadySem_ = sem_open(_receive_ready_sem_name.c_str(), O_CREAT, 0666, 1);
260 if (!receiveReadySem_) {
261 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
267 int rc = sem_close(sentSem_);
269 getMessengerLogger()->error("Error closing semaphore %s: %s", _sent_sem_name.c_str(),
272 rc = sem_close(receiveReadySem_);
274 getMessengerLogger()->error("Error closing semaphore %s: %s",
275 _receive_ready_sem_name.c_str(), strerror(errno));
281 int rc = sem_unlink(_sent_sem_name.c_str());
282 if (rc == -1 && errno != ENOENT) {
283 getMessengerLogger()->error("Error unlinking semaphore %s: %s", _sent_sem_name.c_str(),
286 rc = sem_unlink(_receive_ready_sem_name.c_str());
287 if (rc == -1 && errno != ENOENT) {
288 getMessengerLogger()->error("Error unlinking semaphore %s: %s",
289 _receive_ready_sem_name.c_str(), strerror(errno));
294 sem_t* receiveReadySem_;
297 std::string _sent_sem_name;
298 std::string _receive_ready_sem_name;
301 struct SharedMemoryManager
303 static bool init_shm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer)
305 *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
307 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
310 int rc = ftruncate(*shm_fd, sizeof(char) * len);
312 getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
315 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
317 rc = shm_unlink(name.c_str());
319 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
323 *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
325 getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
328 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
330 rc = shm_unlink(name.c_str());
332 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
336 return *buffer != nullptr;
339 static bool deinit_shm(const std::string& name, size_t len, grk_handle& shm_fd, char** buffer)
341 if (!*buffer || !shm_fd) {
345 int rc = munmap(*buffer, len);
348 getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
353 getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
355 rc = shm_unlink(name.c_str());
357 fprintf(stderr, "Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
364 template <typename Data>
365 class MessengerBlockingQueue
368 explicit MessengerBlockingQueue(size_t max)
374 MessengerBlockingQueue()
375 : MessengerBlockingQueue(UINT_MAX)
381 return _queue.size();
384 // deactivate and clear queue
388 std::lock_guard<std::mutex> lk(_mutex);
390 while (!_queue.empty()) {
395 // release all waiting threads
396 _can_pop.notify_all();
397 _can_push.notify_all();
402 std::lock_guard<std::mutex> lk(_mutex);
406 bool push(Data const& value)
410 std::unique_lock<std::mutex> lk(_mutex);
414 _can_pop.notify_one();
420 bool wait_and_push(Data& value)
424 std::unique_lock<std::mutex> lk(_mutex);
428 // in case of spurious wakeup, loop until predicate in lambda
430 _can_push.wait(lk, [this] { return _queue.size() < _max_size || !_active; });
434 _can_pop.notify_one();
440 bool pop(Data& value)
444 std::unique_lock<std::mutex> lk(_mutex);
448 _can_push.notify_one();
454 bool wait_and_pop(Data& value)
458 std::unique_lock<std::mutex> lk(_mutex);
462 // in case of spurious wakeup, loop until predicate in lambda
464 _can_pop.wait(lk, [this] { return !_queue.empty() || !_active; });
468 _can_push.notify_one();
475 bool _push(Data const& value)
477 if (_queue.size() == _max_size || !_active) {
485 bool _pop(Data& value)
487 if (_queue.empty() || !_active) {
490 value = _queue.front();
496 std::queue<Data> _queue;
497 mutable std::mutex _mutex;
498 std::condition_variable _can_pop;
499 std::condition_variable _can_push;
511 explicit BufferSrc(const std::string& file)
519 BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
521 , clientFrameId_(clientFrameId)
523 , framePtr_(framePtr)
529 return !file_.empty() && framePtr_ == nullptr;
534 return clientFrameId_;
538 size_t clientFrameId_;
545 outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch);
547 inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch);
549 processorThread(Messenger* messenger, std::function<void(std::string)> processor);
553 explicit Messenger(MessengerInit init)
555 , initialized_(false)
558 , _outbound_synch(nullptr)
559 , _inbound_synch(nullptr)
560 , _uncompressed_buffer(nullptr)
561 , _compressed_buffer(nullptr)
562 , _uncompressed_fd(0)
570 sendQueue.deactivate();
571 receiveQueue.deactivate();
573 if (_outbound_synch) {
574 _outbound_synch->post(SYNCH_RECEIVE_READY);
578 if (_inbound_synch) {
579 _inbound_synch->post(SYNCH_SENT);
583 for (auto& p : _processors) {
587 delete _outbound_synch;
588 delete _inbound_synch;
596 new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
597 _outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, _outbound_synch);
600 new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
601 _inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, _inbound_synch);
603 for (size_t i = 0; i < init_.numProcessingThreads_; ++i) {
604 _processors.push_back(std::thread(processorThread, this, init_.processor_));
608 size_t serialize(const std::string& dir, size_t clientFrameId, uint8_t* compressedPtr,
609 size_t compressedLength)
612 if (!compressedPtr || !compressedLength) {
615 sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
616 auto fp = fopen(fname, "wb");
620 size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
621 if (written != compressedLength) {
634 if (init_.uncompressedFrameSize_) {
635 rc = rc && SharedMemoryManager::init_shm(grokUncompressedBuf,
636 init_.uncompressedFrameSize_ * init_.numFrames_,
637 &_uncompressed_fd, &_uncompressed_buffer);
639 if (init_.compressedFrameSize_) {
640 rc = rc && SharedMemoryManager::init_shm(grokCompressedBuf,
641 init_.compressedFrameSize_ * init_.numFrames_,
642 &_compressed_fd, &_compressed_buffer);
650 bool rc = SharedMemoryManager::deinit_shm(grokUncompressedBuf,
651 init_.uncompressedFrameSize_ * init_.numFrames_,
652 _uncompressed_fd, &_uncompressed_buffer);
653 rc = rc && SharedMemoryManager::deinit_shm(grokCompressedBuf,
654 init_.compressedFrameSize_ * init_.numFrames_,
655 _compressed_fd, &_compressed_buffer);
660 template <typename... Args>
661 void send(const std::string& str, Args... args)
663 std::ostringstream oss;
665 int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
666 static_cast<void>(dummy);
668 sendQueue.push(oss.str());
671 static pid_t get_pid_by_process_name(const char* name)
674 snprintf(command, sizeof(command), "pgrep %s", name);
675 auto pgrep = popen(command, "r");
680 if (fscanf(pgrep, "%d", &pid) != 1) {
688 static bool terminate_process(const char* name)
690 auto pid = get_pid_by_process_name(name);
692 return (pid != -1 && kill(pid, SIGTERM) != -1);
695 static bool kill_process(const char* name)
697 auto pid = get_pid_by_process_name(name);
699 return (pid != -1 && kill(pid, SIGKILL) != -1);
702 void launch_grok(const std::string& dir, uint32_t width, uint32_t stride,
703 uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
704 int device, bool is4K, uint32_t fps, uint32_t bandwidth,
705 const std::string server, uint32_t port,
706 const std::string license)
709 std::unique_lock<std::mutex> lk(shutdownMutex_);
710 if (async_result_.valid()) {
713 if (MessengerInit::first_launch(true)) {
718 auto fullServer = server + ":" + std::to_string(port);
720 "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
721 "-G %d -%s %d,%d -j %s -J %s",
722 GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
723 device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
724 license.c_str(), fullServer.c_str());
728 void init_client(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
730 // client fills queue with pending uncompressed buffers
731 init_.uncompressedFrameSize_ = uncompressedFrameSize;
732 init_.compressedFrameSize_ = compressedFrameSize;
733 init_.numFrames_ = numFrames;
735 auto ptr = _uncompressed_buffer;
736 for (size_t i = 0; i < init_.numFrames_; ++i) {
737 availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
738 ptr += init_.uncompressedFrameSize_;
741 std::unique_lock<std::mutex> lk(shutdownMutex_);
743 clientInitializedCondition_.notify_all();
746 bool wait_for_client_init()
752 std::unique_lock<std::mutex> lk(shutdownMutex_);
755 } else if (shutdown_) {
758 clientInitializedCondition_.wait(lk, [this] { return initialized_ || shutdown_; });
760 return initialized_ && !shutdown_;
763 static size_t uncompressed_frame_size(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
765 return sizeof(uint16_t) * w * h * samplesPerPixel;
768 void reclaim_compressed(size_t frameId)
770 availableBuffers_.push(BufferSrc(0, frameId, get_compressed_frame(frameId)));
773 void reclaim_uncompressed(size_t frameId)
775 availableBuffers_.push(BufferSrc(0, frameId, get_uncompressed_frame(frameId)));
778 uint8_t* get_uncompressed_frame(size_t frameId)
780 assert(frameId < init_.numFrames_);
781 if (frameId >= init_.numFrames_) {
785 return (uint8_t*)(_uncompressed_buffer + frameId * init_.uncompressedFrameSize_);
788 uint8_t* get_compressed_frame(size_t frameId)
790 assert(frameId < init_.numFrames_);
791 if (frameId >= init_.numFrames_) {
795 return (uint8_t*)(_compressed_buffer + frameId * init_.compressedFrameSize_);
798 std::atomic_bool running;
801 MessengerBlockingQueue<std::string> sendQueue;
802 MessengerBlockingQueue<std::string> receiveQueue;
803 MessengerBlockingQueue<BufferSrc> availableBuffers_;
806 std::future<int> async_result_;
807 std::mutex shutdownMutex_;
808 std::condition_variable shutdownCondition_;
811 std::condition_variable clientInitializedCondition_;
814 void launch(const std::string& cmd, const std::string& dir)
816 // Change the working directory
818 if (chdir(dir.c_str()) != 0) {
819 getMessengerLogger()->error("Error: failed to change the working directory");
823 // Execute the command using std::async and std::system
825 async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
828 std::thread _outbound;
829 Synch* _outbound_synch;
831 std::thread _inbound;
832 Synch* _inbound_synch;
834 std::vector<std::thread> _processors;
835 char* _uncompressed_buffer;
836 char* _compressed_buffer;
838 grk_handle _uncompressed_fd;
839 grk_handle _compressed_fd;
843 outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch)
845 grk_handle shm_fd = 0;
846 char* send_buffer = nullptr;
848 if (!SharedMemoryManager::init_shm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) {
851 while (messenger->running) {
852 synch->wait(SYNCH_RECEIVE_READY);
853 if (!messenger->running) {
857 if (!messenger->sendQueue.wait_and_pop(message)) {
860 if (!messenger->running) {
863 memcpy(send_buffer, message.c_str(), message.size() + 1);
864 synch->post(SYNCH_SENT);
866 SharedMemoryManager::deinit_shm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
870 inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch)
872 grk_handle shm_fd = 0;
873 char* receive_buffer = nullptr;
875 if (!SharedMemoryManager::init_shm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) {
878 while (messenger->running) {
879 synch->wait(SYNCH_SENT);
880 if (!messenger->running) {
883 auto message = std::string(receive_buffer);
884 synch->post(SYNCH_RECEIVE_READY);
885 messenger->receiveQueue.push(message);
887 SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
892 explicit Msg(const std::string& msg)
895 std::stringstream ss(msg);
898 std::getline(ss, substr, ',');
899 cs_.push_back(substr);
905 if (ct_ == cs_.size()) {
906 getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
914 return (uint32_t)std::stoi(next());
917 std::vector<std::string> cs_;
922 processorThread(Messenger* messenger, std::function<void(std::string)> processor)
924 while (messenger->running) {
926 if (!messenger->receiveQueue.wait_and_pop(message)) {
929 if (!messenger->running) {
933 auto tag = msg.next();
934 if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) {
935 auto width = msg.next_uint();
936 auto stride = msg.next_uint();
938 auto height = msg.next_uint();
939 auto samplesPerPixel = msg.next_uint();
940 auto depth = msg.next_uint();
942 messenger->init_.uncompressedFrameSize_ =
943 Messenger::uncompressed_frame_size(width, height, samplesPerPixel);
944 auto compressedFrameSize = msg.next_uint();
945 auto numFrames = msg.next_uint();
946 messenger->init_client(compressedFrameSize, compressedFrameSize, numFrames);
947 } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) {
948 messenger->reclaim_uncompressed(msg.next_uint());
949 } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) {
950 messenger->reclaim_compressed(msg.next_uint());
956 template <typename F>
957 struct ScheduledFrames
961 std::unique_lock<std::mutex> lk(_map_mutex);
962 auto it = _map.find(val.index());
963 if (it == _map.end()) {
964 _map[val.index()] = val;
968 F retrieve(size_t index, bool& success)
970 std::unique_lock<std::mutex> lk(_map_mutex);
972 auto it = _map.find(index);
973 if (it == _map.end()) {
985 std::mutex _map_mutex;
986 std::map<size_t, F> _map;
989 template <typename F>
990 struct ScheduledMessenger : public Messenger
992 explicit ScheduledMessenger(MessengerInit init)
994 , _frames_scheduled(0)
995 , _frames_compressed(0)
999 ~ScheduledMessenger()
1004 bool schedule_compress(F proxy, std::function<void(BufferSrc)> converter)
1006 size_t frameSize = init_.uncompressedFrameSize_;
1007 assert(frameSize >= init_.uncompressedFrameSize_);
1009 if (!availableBuffers_.wait_and_pop(src)) {
1013 _scheduled_frames.store(proxy);
1014 _frames_scheduled++;
1015 send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
1020 void process_compressed(const std::string& message, std::function<void(F, uint8_t*, uint32_t)> processor, bool needsRecompression)
1024 auto clientFrameId = msg.next_uint();
1025 auto compressedFrameId = msg.next_uint();
1026 auto compressedFrameLength = msg.next_uint();
1027 if (!needsRecompression) {
1028 bool success = false;
1029 auto srcFrame = _scheduled_frames.retrieve(clientFrameId, success);
1033 processor(srcFrame, get_compressed_frame(compressedFrameId), compressedFrameLength);
1035 ++_frames_compressed;
1036 send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
1037 if (shutdown_ && _frames_compressed == _frames_scheduled) {
1038 shutdownCondition_.notify_all();
1045 std::unique_lock<std::mutex> lk(shutdownMutex_);
1046 if (!async_result_.valid()) {
1050 if (_frames_scheduled) {
1051 uint32_t scheduled = _frames_scheduled;
1052 send(GRK_MSGR_BATCH_FLUSH, scheduled);
1053 shutdownCondition_.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
1055 availableBuffers_.deactivate();
1056 send(GRK_MSGR_BATCH_SHUTDOWN);
1057 int result = async_result_.get();
1059 getMessengerLogger()->error("Accelerator failed with return code: %d\n", result);
1061 } catch (std::exception& ex) {
1062 getMessengerLogger()->error("%s", ex.what());
1066 F retrieve(size_t index, bool& success)
1068 return _scheduled_frames.retrieve(index, success);
1073 _scheduled_frames.store(val);
1077 ScheduledFrames<F> _scheduled_frames;
1078 std::atomic<uint32_t> _frames_scheduled;
1079 std::atomic<uint32_t> _frames_compressed;
1082 } // namespace grk_plugin