summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2025-05-16 23:16:58 +0200
committerCarl Hetherington <cth@carlh.net>2025-05-28 00:33:55 +0200
commit915b20098427ef8c4732b87af22f901ba6bd5e2a (patch)
treefaf11bf3ff812cd242e7b043e3908f8aa51d2646
parentec66fd6126bc1a29539a65ace35bbae8c03796f6 (diff)
Cleanup: move all of Messenger into ScheduledMessenger.
-rw-r--r--src/lib/grok/messenger.cc46
-rw-r--r--src/lib/grok/messenger.h161
2 files changed, 97 insertions, 110 deletions
diff --git a/src/lib/grok/messenger.cc b/src/lib/grok/messenger.cc
index e8dfcf3df..5c6b050ff 100644
--- a/src/lib/grok/messenger.cc
+++ b/src/lib/grok/messenger.cc
@@ -25,16 +25,20 @@
using namespace grk_plugin;
-Messenger::Messenger(MessengerInit init)
+ScheduledMessenger::ScheduledMessenger(MessengerInit init)
: _running(true)
, _init(init)
+ , _frames_scheduled(0)
+ , _frames_compressed(0)
{
}
-Messenger::~Messenger()
+ScheduledMessenger::~ScheduledMessenger()
{
+ shutdown();
+
_running = false;
_send_queue.deactivate();
_receive_queue.deactivate();
@@ -64,7 +68,7 @@ Messenger::~Messenger()
* from the grok process and handle them.
*/
void
-Messenger::processor_thread()
+ScheduledMessenger::processor_thread()
{
while (_running) {
std::string message;
@@ -84,7 +88,7 @@ Messenger::processor_thread()
auto height = msg.nextUint();
auto samples_per_pixel = msg.nextUint();
msg.nextUint(); // depth
- _init.uncompressedFrameSize_ = Messenger::uncompressedFrameSize(width, height, samples_per_pixel);
+ _init.uncompressedFrameSize_ = ScheduledMessenger::uncompressedFrameSize(width, height, samples_per_pixel);
auto compressed_frame_size = msg.nextUint();
auto num_frames = msg.nextUint();
initClient(compressed_frame_size, compressed_frame_size, num_frames);
@@ -101,7 +105,7 @@ Messenger::processor_thread()
void
-Messenger::outbound_thread()
+ScheduledMessenger::outbound_thread()
{
grk_handle shm_fd = 0;
char* send_buffer = nullptr;
@@ -131,7 +135,7 @@ Messenger::outbound_thread()
void
-Messenger::inbound_thread()
+ScheduledMessenger::inbound_thread()
{
grk_handle shm_fd = 0;
char* receive_buffer = nullptr;
@@ -155,22 +159,22 @@ Messenger::inbound_thread()
void
-Messenger::startThreads()
+ScheduledMessenger::startThreads()
{
_outbound_synch = new Synch(_init.outboundSentSynch, _init.outboundReceiveReadySynch);
- _outbound = std::thread(&Messenger::outbound_thread, this);
+ _outbound = std::thread(&ScheduledMessenger::outbound_thread, this);
_inbound_synch = new Synch(_init.inboundSentSynch, _init.inboundReceiveReadySynch);
- _inbound = std::thread(&Messenger::inbound_thread, this);
+ _inbound = std::thread(&ScheduledMessenger::inbound_thread, this);
for (size_t i = 0; i < _init.numProcessingThreads_; ++i) {
- _processors.push_back(std::thread(&Messenger::processor_thread, this));
+ _processors.push_back(std::thread(&ScheduledMessenger::processor_thread, this));
}
}
bool
-Messenger::initBuffers()
+ScheduledMessenger::initBuffers()
{
bool rc = true;
if (_init.uncompressedFrameSize_) {
@@ -194,7 +198,7 @@ Messenger::initBuffers()
bool
-Messenger::deinit_shm()
+ScheduledMessenger::deinit_shm()
{
bool rc = SharedMemoryManager::deinit_shm(
grokUncompressedBuf,
@@ -213,7 +217,7 @@ Messenger::deinit_shm()
bool
-Messenger::launch_grok(
+ScheduledMessenger::launch_grok(
boost::filesystem::path const& dir,
uint32_t width,
uint32_t stride,
@@ -247,7 +251,7 @@ Messenger::launch_grok(
void
-Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
+ScheduledMessenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
{
// client fills queue with pending uncompressed buffers
_init.uncompressedFrameSize_ = uncompressedFrameSize;
@@ -267,7 +271,7 @@ Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize,
}
bool
-Messenger::waitForClientInit()
+ScheduledMessenger::waitForClientInit()
{
if (_initialized) {
return true;
@@ -299,28 +303,28 @@ Messenger::waitForClientInit()
size_t
-Messenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
+ScheduledMessenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
{
return sizeof(uint16_t) * w * h * samplesPerPixel;
}
void
-Messenger::reclaimCompressed(size_t frameId)
+ScheduledMessenger::reclaimCompressed(size_t frameId)
{
_available_buffers.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
}
void
-Messenger::reclaimUncompressed(size_t frameId)
+ScheduledMessenger::reclaimUncompressed(size_t frameId)
{
_available_buffers.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
}
uint8_t*
-Messenger::getUncompressedFrame(size_t frameId)
+ScheduledMessenger::getUncompressedFrame(size_t frameId)
{
assert(frameId < _init.numFrames_);
if(frameId >= _init.numFrames_)
@@ -331,7 +335,7 @@ Messenger::getUncompressedFrame(size_t frameId)
uint8_t*
-Messenger::getCompressedFrame(size_t frameId)
+ScheduledMessenger::getCompressedFrame(size_t frameId)
{
assert(frameId < _init.numFrames_);
if(frameId >= _init.numFrames_)
@@ -341,7 +345,7 @@ Messenger::getCompressedFrame(size_t frameId)
}
bool
-Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir)
+ScheduledMessenger::launch(std::string const& cmd, boost::filesystem::path const& dir)
{
// Change the working directory
if(!dir.empty())
diff --git a/src/lib/grok/messenger.h b/src/lib/grok/messenger.h
index 99c9b35ac..648aa5a63 100644
--- a/src/lib/grok/messenger.h
+++ b/src/lib/grok/messenger.h
@@ -437,87 +437,6 @@ struct BufferSrc
};
-class Messenger
-{
-public:
- explicit Messenger(MessengerInit init);
- virtual ~Messenger();
-
- void startThreads();
- bool initBuffers();
- bool deinit_shm();
-
- template<typename... Args>
- void send(const std::string& str, Args... args)
- {
- std::ostringstream oss;
- oss << str;
- int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
- static_cast<void>(dummy);
-
- _send_queue.push(oss.str());
- }
-
- bool launch_grok(
- boost::filesystem::path const& dir,
- uint32_t width,
- uint32_t stride,
- uint32_t height,
- uint32_t samplesPerPixel,
- uint32_t depth,
- int device,
- bool is4K,
- uint32_t fps,
- uint32_t bandwidth,
- const std::string server,
- const std::string license
- );
-
- void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames);
- bool waitForClientInit();
-
- void reclaimCompressed(size_t frameId);
- void reclaimUncompressed(size_t frameId);
- uint8_t* getUncompressedFrame(size_t frameId);
- uint8_t* getCompressedFrame(size_t frameId);
-
- static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel);
-
-protected:
- std::condition_variable _client_initialized_condition;
- MessengerBlockingQueue<BufferSrc> _available_buffers;
- std::future<int> _async_result;
- std::condition_variable _shutdown_condition;
- bool _shutdown = false;
- std::mutex _shutdown_mutex;
-
-private:
- bool launch(std::string const& cmd, boost::filesystem::path const& dir);
- void processor_thread();
- void outbound_thread();
- void inbound_thread();
-
- std::atomic_bool _running;
- MessengerInit _init;
- bool _initialized = false;
- MessengerBlockingQueue<std::string> _send_queue;
- MessengerBlockingQueue<std::string> _receive_queue;
-
- std::thread _outbound;
- Synch* _outbound_synch = nullptr;
-
- std::thread _inbound;
- Synch* _inbound_synch = nullptr;
-
- std::vector<std::thread> _processors;
- char* _uncompressed_buffer = nullptr;
- char* _compressed_buffer = nullptr;
-
- grk_handle _uncompressed_fd = 0;
- grk_handle _compressed_fd = 0;
-};
-
-
struct Msg
{
explicit Msg(const std::string &msg) : ct_(0)
@@ -578,19 +497,52 @@ struct ScheduledFrames
};
-struct ScheduledMessenger : public Messenger
+class ScheduledMessenger
{
- explicit ScheduledMessenger(MessengerInit init)
- : Messenger(init)
- , _frames_scheduled(0)
- , _frames_compressed(0)
- {}
+public:
+ explicit ScheduledMessenger(MessengerInit init);
+ ~ScheduledMessenger();
- ~ScheduledMessenger()
+ void startThreads();
+ bool initBuffers();
+ bool deinit_shm();
+
+ template<typename... Args>
+ void send(const std::string& str, Args... args)
{
- shutdown();
+ std::ostringstream oss;
+ oss << str;
+ int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
+ static_cast<void>(dummy);
+
+ _send_queue.push(oss.str());
}
+ bool launch_grok(
+ boost::filesystem::path const& dir,
+ uint32_t width,
+ uint32_t stride,
+ uint32_t height,
+ uint32_t samplesPerPixel,
+ uint32_t depth,
+ int device,
+ bool is4K,
+ uint32_t fps,
+ uint32_t bandwidth,
+ const std::string server,
+ const std::string license
+ );
+
+ void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames);
+ bool waitForClientInit();
+
+ void reclaimCompressed(size_t frameId);
+ void reclaimUncompressed(size_t frameId);
+ uint8_t* getUncompressedFrame(size_t frameId);
+ uint8_t* getCompressedFrame(size_t frameId);
+
+ static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel);
+
bool schedule_compress(DCPVideo const& proxy, std::function<void(BufferSrc const&)> converter)
{
BufferSrc src;
@@ -663,6 +615,37 @@ struct ScheduledMessenger : public Messenger
}
private:
+ bool launch(std::string const& cmd, boost::filesystem::path const& dir);
+ void processor_thread();
+ void outbound_thread();
+ void inbound_thread();
+
+ std::atomic_bool _running;
+ MessengerInit _init;
+ bool _initialized = false;
+ MessengerBlockingQueue<std::string> _send_queue;
+ MessengerBlockingQueue<std::string> _receive_queue;
+
+ std::thread _outbound;
+ Synch* _outbound_synch = nullptr;
+
+ std::thread _inbound;
+ Synch* _inbound_synch = nullptr;
+
+ std::vector<std::thread> _processors;
+ char* _uncompressed_buffer = nullptr;
+ char* _compressed_buffer = nullptr;
+
+ grk_handle _uncompressed_fd = 0;
+ grk_handle _compressed_fd = 0;
+
+ std::condition_variable _client_initialized_condition;
+ MessengerBlockingQueue<BufferSrc> _available_buffers;
+ std::future<int> _async_result;
+ std::condition_variable _shutdown_condition;
+ bool _shutdown = false;
+ std::mutex _shutdown_mutex;
+
ScheduledFrames _scheduled_frames;
std::atomic<uint32_t> _frames_scheduled;
std::atomic<uint32_t> _frames_compressed;