summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2025-05-16 23:25:34 +0200
committerCarl Hetherington <cth@carlh.net>2025-05-28 00:33:55 +0200
commitd3e29ae53e2ac55dc3ea8bb80cd74802d0aac4e4 (patch)
tree44997b8856e174c526704027ac4b30f321f21de2
parentda99ccd7b3bff2ff0de99331d540e645ce2039fa (diff)
Cleanup: move some methods into messenger.cc.
-rw-r--r--src/lib/grok/messenger.cc68
-rw-r--r--src/lib/grok/messenger.h62
2 files changed, 71 insertions, 59 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc
index 38263597b..96d895287 100644
--- a/src/lib/grok/messenger.cc
+++ b/src/lib/grok/messenger.cc
@@ -366,3 +366,71 @@ Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir)
return success;
}
+
+
+bool
+Messenger::schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter)
+{
+ BufferSrc src;
+ if (!_available_buffers.waitAndPop(src)) {
+ return false;
+ }
+ converter(src);
+ _scheduled_frames.store(proxy);
+ _frames_scheduled++;
+ send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
+
+ return true;
+}
+
+
+/** @param processor function taking compressed J2K data and sending it to the Writer */
+void
+Messenger::process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression)
+{
+ Msg msg(message);
+ msg.next();
+ auto const clientFrameId = msg.nextUint();
+ auto const compressedFrameId = msg.nextUint();
+ auto const compressedFrameLength = msg.nextUint();
+ if (!needsRecompression) {
+ auto src_frame = _scheduled_frames.retrieve(clientFrameId);
+ if (!src_frame) {
+ return;
+ }
+ /* Write the compressed J2K data out */
+ processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength);
+ }
+
+ ++_frames_compressed;
+ send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
+
+ if (_shutdown && _frames_compressed == _frames_scheduled) {
+ _shutdown_condition.notify_all();
+ }
+}
+
+void
+Messenger::shutdown()
+{
+ try {
+ std::unique_lock<std::mutex> lk(_shutdown_mutex);
+ if (!_async_result.valid()) {
+ return;
+ }
+ _shutdown = true;
+ if (_frames_scheduled) {
+ uint32_t scheduled = _frames_scheduled;
+ send(GRK_MSGR_BATCH_FLUSH, scheduled);
+ _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
+ }
+ _available_buffers.deactivate();
+ send(GRK_MSGR_BATCH_SHUTDOWN);
+ int result = _async_result.get();
+ if(result != 0) {
+ getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
+ }
+ } catch (std::exception &ex) {
+ getMessengerLogger()->error("%s",ex.what());
+ }
+}
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
index ce4b74025..c47ef0107 100644
--- a/src/lib/grok/messenger.h
+++ b/src/lib/grok/messenger.h
@@ -543,68 +543,12 @@ public:
static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel);
- bool schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter)
- {
- BufferSrc src;
- if (!_available_buffers.waitAndPop(src)) {
- return false;
- }
- converter(src);
- _scheduled_frames.store(proxy);
- _frames_scheduled++;
- send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
-
- return true;
- }
+ bool schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter);
/** @param processor function taking compressed J2K data and sending it to the Writer */
- void process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression)
- {
- Msg msg(message);
- msg.next();
- auto const clientFrameId = msg.nextUint();
- auto const compressedFrameId = msg.nextUint();
- auto const compressedFrameLength = msg.nextUint();
- if (!needsRecompression) {
- auto src_frame = _scheduled_frames.retrieve(clientFrameId);
- if (!src_frame) {
- return;
- }
- /* Write the compressed J2K data out */
- processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength);
- }
+ void process_compressed(std::string const& message, std::function<void (DCPVideo, uint8_t*, uint32_t)> processor, bool needsRecompression);
- ++_frames_compressed;
- send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
-
- if (_shutdown && _frames_compressed == _frames_scheduled) {
- _shutdown_condition.notify_all();
- }
- }
-
- void shutdown()
- {
- try {
- std::unique_lock<std::mutex> lk(_shutdown_mutex);
- if (!_async_result.valid()) {
- return;
- }
- _shutdown = true;
- if (_frames_scheduled) {
- uint32_t scheduled = _frames_scheduled;
- send(GRK_MSGR_BATCH_FLUSH, scheduled);
- _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
- }
- _available_buffers.deactivate();
- send(GRK_MSGR_BATCH_SHUTDOWN);
- int result = _async_result.get();
- if(result != 0) {
- getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
- }
- } catch (std::exception &ex) {
- getMessengerLogger()->error("%s",ex.what());
- }
- }
+ void shutdown();
boost::optional<DCPVideo> retrieve(size_t index) {
return _scheduled_frames.retrieve(index);