*/
#pragma once
-#include <iostream>
-#include <string>
-#include <cstring>
#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <cstdarg>
+#include <cstring>
#include <functional>
-#include <sstream>
#include <future>
+#include <iostream>
#include <map>
-#include <thread>
#include <mutex>
-#include <condition_variable>
#include <queue>
-#include <cassert>
-#include <cstdarg>
+#include <sstream>
+#include <string>
+#include <thread>
#ifdef _WIN32
-#include <windows.h>
#include <direct.h>
#include <tlhelp32.h>
+#include <windows.h>
#pragma warning(disable : 4100)
#else
-#include <unistd.h>
#include <fcntl.h>
-#include <sys/mman.h>
#include <semaphore.h>
#include <signal.h>
+#include <sys/mman.h>
+#include <unistd.h>
#endif
-namespace grk_plugin
-{
+namespace grk_plugin {
static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
static std::string grokSentSynch = "Global\\grok_sent";
static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
- "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
+ "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
- "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
+ "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
static const size_t messageBufferLen = 256;
+
struct IMessengerLogger
{
- virtual ~IMessengerLogger(void) = default;
+ virtual ~IMessengerLogger() = default;
virtual void info(const char* fmt, ...) = 0;
virtual void warn(const char* fmt, ...) = 0;
virtual void error(const char* fmt, ...) = 0;
- protected:
- template<typename... Args>
+protected:
+ template <typename... Args>
std::string log_message(char const* const format, Args&... args) noexcept
{
constexpr size_t message_size = 512;
return std::string(message);
}
};
+
struct MessengerLogger : public IMessengerLogger
{
- explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
+ explicit MessengerLogger(const std::string& preamble)
+ : preamble_(preamble)
+ {
+ }
+
virtual ~MessengerLogger() = default;
+
virtual void info(const char* fmt, ...) override
{
va_list args;
vfprintf(stdout, new_fmt.c_str(), args);
va_end(args);
}
+
virtual void warn(const char* fmt, ...) override
{
va_list args;
vfprintf(stdout, new_fmt.c_str(), args);
va_end(args);
}
+
virtual void error(const char* fmt, ...) override
{
va_list args;
va_end(args);
}
- protected:
+protected:
std::string preamble_;
};
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-function"
#endif
-static void setMessengerLogger(IMessengerLogger* logger)
+static void
+setMessengerLogger(IMessengerLogger* logger)
{
- delete sLogger;
- sLogger = logger;
+ delete sLogger;
+ sLogger = logger;
}
#if defined(__GNUC__) || defined(__clang__)
#pragma GCC diagnostic pop
#endif
-static IMessengerLogger* getMessengerLogger(void)
+static IMessengerLogger*
+getMessengerLogger()
{
return sLogger;
}
+
struct MessengerInit
{
- MessengerInit(const std::string &outBuf, const std::string &outSent,
- const std::string &outReceiveReady, const std::string &inBuf,
- const std::string &inSent,
- const std::string &inReceiveReady,
- std::function<void(std::string)> processor,
- size_t numProcessingThreads)
- : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
- outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
- inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
- numProcessingThreads_(numProcessingThreads),
- uncompressedFrameSize_(0), compressedFrameSize_(0),
- numFrames_(0)
- {
- if(firstLaunch(true))
+ MessengerInit(const std::string& outBuf, const std::string& outSent,
+ const std::string& outReceiveReady, const std::string& inBuf,
+ const std::string& inSent,
+ const std::string& inReceiveReady,
+ std::function<void(std::string)> processor,
+ size_t numProcessingThreads)
+ : outboundMessageBuf(outBuf)
+ , outboundSentSynch(outSent)
+ , outboundReceiveReadySynch(outReceiveReady)
+ , inboundMessageBuf(inBuf)
+ , inboundSentSynch(inSent)
+ , inboundReceiveReadySynch(inReceiveReady)
+ , processor_(processor)
+ , numProcessingThreads_(numProcessingThreads)
+ , uncompressedFrameSize_(0)
+ , compressedFrameSize_(0)
+ , numFrames_(0)
+ {
+ if (first_launch(true)) {
unlink();
+}
}
- void unlink(void)
+
+ void unlink()
{
#ifndef _WIN32
shm_unlink(grokToClientMessageBuf.c_str());
shm_unlink(clientToGrokMessageBuf.c_str());
#endif
}
- static bool firstLaunch(bool isClient)
+
+ static bool first_launch(bool isClient)
{
bool debugGrok = false;
return debugGrok != isClient;
}
+
std::string outboundMessageBuf;
std::string outboundSentSynch;
std::string outboundReceiveReadySynch;
};
typedef int grk_handle;
+
struct Synch
{
- Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
- : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
+ Synch(const std::string& sentSemName, const std::string& receiveReadySemName)
+ : _sent_sem_name(sentSemName)
+ , _receive_ready_sem_name(receiveReadySemName)
{
// unlink semaphores in case of previous crash
- if(MessengerInit::firstLaunch(true))
+ if (MessengerInit::first_launch(true)) {
unlink();
+}
open();
}
+
~Synch()
{
close();
- if(MessengerInit::firstLaunch(true))
+ if (MessengerInit::first_launch(true)) {
unlink();
+}
}
+
void post(SynchDirection dir)
{
auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
int rc = sem_post(sem);
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
+}
}
+
void wait(SynchDirection dir)
{
auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
int rc = sem_wait(sem);
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
+}
}
- void open(void)
+
+ void open()
{
- sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
- if(!sentSem_)
+ sentSem_ = sem_open(_sent_sem_name.c_str(), O_CREAT, 0666, 0);
+ if (!sentSem_) {
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
- receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
- if(!receiveReadySem_)
+}
+ receiveReadySem_ = sem_open(_receive_ready_sem_name.c_str(), O_CREAT, 0666, 1);
+ if (!receiveReadySem_) {
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
+}
}
- void close(void)
+
+ void close()
{
int rc = sem_close(sentSem_);
- if(rc)
- getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
- strerror(errno));
+ if (rc) {
+ getMessengerLogger()->error("Error closing semaphore %s: %s", _sent_sem_name.c_str(),
+ strerror(errno));
+}
rc = sem_close(receiveReadySem_);
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error closing semaphore %s: %s",
- receiveReadySemName_.c_str(), strerror(errno));
+ _receive_ready_sem_name.c_str(), strerror(errno));
+}
}
- void unlink(void)
+
+ void unlink()
{
- int rc = sem_unlink(sentSemName_.c_str());
- if(rc == -1 && errno != ENOENT)
- getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
- strerror(errno));
- rc = sem_unlink(receiveReadySemName_.c_str());
- if(rc == -1 && errno != ENOENT)
+ int rc = sem_unlink(_sent_sem_name.c_str());
+ if (rc == -1 && errno != ENOENT) {
+ getMessengerLogger()->error("Error unlinking semaphore %s: %s", _sent_sem_name.c_str(),
+ strerror(errno));
+}
+ rc = sem_unlink(_receive_ready_sem_name.c_str());
+ if (rc == -1 && errno != ENOENT) {
getMessengerLogger()->error("Error unlinking semaphore %s: %s",
- receiveReadySemName_.c_str(), strerror(errno));
+ _receive_ready_sem_name.c_str(), strerror(errno));
+}
}
+
sem_t* sentSem_;
sem_t* receiveReadySem_;
- private:
- std::string sentSemName_;
- std::string receiveReadySemName_;
+private:
+ std::string _sent_sem_name;
+ std::string _receive_ready_sem_name;
};
+
struct SharedMemoryManager
{
- static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
+ static bool init_shm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer)
{
*shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
- if(*shm_fd < 0)
- {
+ if (*shm_fd < 0) {
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
return false;
}
int rc = ftruncate(*shm_fd, sizeof(char) * len);
- if(rc)
- {
+ if (rc) {
getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
rc = close(*shm_fd);
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+}
rc = shm_unlink(name.c_str());
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+}
return false;
}
*buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
- if(!*buffer)
- {
+ if (!*buffer) {
getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
rc = close(*shm_fd);
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
+}
rc = shm_unlink(name.c_str());
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
+}
}
return *buffer != nullptr;
}
- static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
+
+ static bool deinit_shm(const std::string& name, size_t len, grk_handle& shm_fd, char** buffer)
{
- if (!*buffer || !shm_fd)
+ if (!*buffer || !shm_fd) {
return true;
+}
int rc = munmap(*buffer, len);
*buffer = nullptr;
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
+}
rc = close(shm_fd);
shm_fd = 0;
- if(rc)
+ if (rc) {
getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
+}
rc = shm_unlink(name.c_str());
- if(rc)
- fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
+ if (rc) {
+ fprintf(stderr, "Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
+}
return true;
}
};
-template<typename Data>
+template <typename Data>
class MessengerBlockingQueue
{
- public:
- explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
- MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
+public:
+ explicit MessengerBlockingQueue(size_t max)
+ : _active(true)
+ , _max_size(max)
+ {
+ }
+
+ MessengerBlockingQueue()
+ : MessengerBlockingQueue(UINT_MAX)
+ {
+ }
+
size_t size() const
{
- return queue_.size();
+ return _queue.size();
}
+
// deactivate and clear queue
void deactivate()
{
{
- std::lock_guard<std::mutex> lk(mutex_);
- active_ = false;
- while(!queue_.empty())
- queue_.pop();
+ std::lock_guard<std::mutex> lk(_mutex);
+ _active = false;
+ while (!_queue.empty()) {
+ _queue.pop();
+}
}
// release all waiting threads
- can_pop_.notify_all();
- can_push_.notify_all();
+ _can_pop.notify_all();
+ _can_push.notify_all();
}
+
void activate()
{
- std::lock_guard<std::mutex> lk(mutex_);
- active_ = true;
+ std::lock_guard<std::mutex> lk(_mutex);
+ _active = true;
}
+
bool push(Data const& value)
{
bool rc;
{
- std::unique_lock<std::mutex> lk(mutex_);
- rc = push_(value);
+ std::unique_lock<std::mutex> lk(_mutex);
+ rc = _push(value);
}
- if(rc)
- can_pop_.notify_one();
+ if (rc) {
+ _can_pop.notify_one();
+}
return rc;
}
- bool waitAndPush(Data& value)
+
+ bool wait_and_push(Data& value)
{
bool rc;
{
- std::unique_lock<std::mutex> lk(mutex_);
- if(!active_)
+ std::unique_lock<std::mutex> lk(_mutex);
+ if (!_active) {
return false;
+}
// in case of spurious wakeup, loop until predicate in lambda
// is satisfied.
- can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
- rc = push_(value);
+ _can_push.wait(lk, [this] { return _queue.size() < _max_size || !_active; });
+ rc = _push(value);
}
- if(rc)
- can_pop_.notify_one();
+ if (rc) {
+ _can_pop.notify_one();
+}
return rc;
}
+
bool pop(Data& value)
{
bool rc;
{
- std::unique_lock<std::mutex> lk(mutex_);
- rc = pop_(value);
+ std::unique_lock<std::mutex> lk(_mutex);
+ rc = _pop(value);
}
- if(rc)
- can_push_.notify_one();
+ if (rc) {
+ _can_push.notify_one();
+}
return rc;
}
- bool waitAndPop(Data& value)
+
+ bool wait_and_pop(Data& value)
{
bool rc;
{
- std::unique_lock<std::mutex> lk(mutex_);
- if(!active_)
+ std::unique_lock<std::mutex> lk(_mutex);
+ if (!_active) {
return false;
+}
// in case of spurious wakeup, loop until predicate in lambda
// is satisfied.
- can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
- rc = pop_(value);
+ _can_pop.wait(lk, [this] { return !_queue.empty() || !_active; });
+ rc = _pop(value);
}
- if(rc)
- can_push_.notify_one();
+ if (rc) {
+ _can_push.notify_one();
+}
return rc;
}
- private:
- bool push_(Data const& value)
+private:
+ bool _push(Data const& value)
{
- if(queue_.size() == max_size_ || !active_)
+ if (_queue.size() == _max_size || !_active) {
return false;
- queue_.push(value);
+}
+ _queue.push(value);
return true;
}
- bool pop_(Data& value)
+
+ bool _pop(Data& value)
{
- if(queue_.empty() || !active_)
+ if (_queue.empty() || !_active) {
return false;
- value = queue_.front();
- queue_.pop();
+}
+ value = _queue.front();
+ _queue.pop();
return true;
}
- std::queue<Data> queue_;
- mutable std::mutex mutex_;
- std::condition_variable can_pop_;
- std::condition_variable can_push_;
- bool active_;
- size_t max_size_;
+
+ std::queue<Data> _queue;
+ mutable std::mutex _mutex;
+ std::condition_variable _can_pop;
+ std::condition_variable _can_push;
+ bool _active;
+ size_t _max_size;
};
+
struct BufferSrc
{
- BufferSrc(void) : BufferSrc("") {}
- explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
- {}
+ BufferSrc()
+ : BufferSrc("")
+ {
+ }
+
+ explicit BufferSrc(const std::string& file)
+ : file_(file)
+ , clientFrameId_(0)
+ , frameId_(0)
+ , framePtr_(nullptr)
+ {
+ }
+
BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
- : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
- {}
- bool fromDisk(void)
+ : file_("")
+ , clientFrameId_(clientFrameId)
+ , frameId_(frameId)
+ , framePtr_(framePtr)
+ {
+ }
+
+ bool from_disk()
{
return !file_.empty() && framePtr_ == nullptr;
}
+
size_t index() const
{
return clientFrameId_;
}
+
std::string file_;
size_t clientFrameId_;
size_t frameId_;
};
struct Messenger;
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
+static void
+outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch);
+static void
+inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch);
+static void
+processorThread(Messenger* messenger, std::function<void(std::string)> processor);
struct Messenger
{
explicit Messenger(MessengerInit init)
- : running(true), initialized_(false), shutdown_(false), init_(init),
- outboundSynch_(nullptr),
- inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
- uncompressed_fd_(0), compressed_fd_(0)
- {}
- virtual ~Messenger(void)
+ : running(true)
+ , initialized_(false)
+ , shutdown_(false)
+ , init_(init)
+ , _outbound_synch(nullptr)
+ , _inbound_synch(nullptr)
+ , _uncompressed_buffer(nullptr)
+ , _compressed_buffer(nullptr)
+ , _uncompressed_fd(0)
+ , _compressed_fd(0)
+ {
+ }
+
+ virtual ~Messenger()
{
running = false;
sendQueue.deactivate();
receiveQueue.deactivate();
- if (outboundSynch_) {
- outboundSynch_->post(SYNCH_RECEIVE_READY);
- outbound.join();
+ if (_outbound_synch) {
+ _outbound_synch->post(SYNCH_RECEIVE_READY);
+ _outbound.join();
}
- if (inboundSynch_) {
- inboundSynch_->post(SYNCH_SENT);
- inbound.join();
+ if (_inbound_synch) {
+ _inbound_synch->post(SYNCH_SENT);
+ _inbound.join();
}
- for(auto& p : processors_)
+ for (auto& p : _processors) {
p.join();
+}
- delete outboundSynch_;
- delete inboundSynch_;
+ delete _outbound_synch;
+ delete _inbound_synch;
- deinitShm();
+ deinit_shm();
}
- void startThreads(void) {
- outboundSynch_ =
- new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
- outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
- inboundSynch_ =
- new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
- inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
+ void start_threads()
+ {
+ _outbound_synch =
+ new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
+ _outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, _outbound_synch);
+
+ _inbound_synch =
+ new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
+ _inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, _inbound_synch);
- for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
- processors_.push_back(std::thread(processorThread, this, init_.processor_));
+ for (size_t i = 0; i < init_.numProcessingThreads_; ++i) {
+ _processors.push_back(std::thread(processorThread, this, init_.processor_));
+}
}
- size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
- size_t compressedLength)
+
+ size_t serialize(const std::string& dir, size_t clientFrameId, uint8_t* compressedPtr,
+ size_t compressedLength)
{
char fname[512];
- if(!compressedPtr || !compressedLength)
+ if (!compressedPtr || !compressedLength) {
return 0;
+}
sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
auto fp = fopen(fname, "wb");
- if(!fp)
+ if (!fp) {
return 0;
+}
size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
- if(written != compressedLength)
- {
+ if (written != compressedLength) {
fclose(fp);
return 0;
}
return written;
}
- bool initBuffers(void)
+
+ bool init_buffers()
{
bool rc = true;
- if(init_.uncompressedFrameSize_)
- {
- rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- &uncompressed_fd_, &uncompressed_buffer_);
+ if (init_.uncompressedFrameSize_) {
+ rc = rc && SharedMemoryManager::init_shm(grokUncompressedBuf,
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ &_uncompressed_fd, &_uncompressed_buffer);
}
- if(init_.compressedFrameSize_)
- {
- rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- &compressed_fd_, &compressed_buffer_);
+ if (init_.compressedFrameSize_) {
+ rc = rc && SharedMemoryManager::init_shm(grokCompressedBuf,
+ init_.compressedFrameSize_ * init_.numFrames_,
+ &_compressed_fd, &_compressed_buffer);
}
return rc;
}
- bool deinitShm(void)
+ bool deinit_shm()
{
- bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- uncompressed_fd_, &uncompressed_buffer_);
- rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- compressed_fd_, &compressed_buffer_);
+ bool rc = SharedMemoryManager::deinit_shm(grokUncompressedBuf,
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ _uncompressed_fd, &_uncompressed_buffer);
+ rc = rc && SharedMemoryManager::deinit_shm(grokCompressedBuf,
+ init_.compressedFrameSize_ * init_.numFrames_,
+ _compressed_fd, &_compressed_buffer);
return rc;
}
- template<typename... Args>
+
+ template <typename... Args>
void send(const std::string& str, Args... args)
{
std::ostringstream oss;
sendQueue.push(oss.str());
}
+
static pid_t get_pid_by_process_name(const char* name)
{
char command[256];
snprintf(command, sizeof(command), "pgrep %s", name);
auto pgrep = popen(command, "r");
- if(!pgrep)
+ if (!pgrep) {
return -1;
+}
pid_t pid;
- if(fscanf(pgrep, "%d", &pid) != 1)
+ if (fscanf(pgrep, "%d", &pid) != 1) {
pid = -1;
+}
pclose(pgrep);
return pid;
}
+
static bool terminate_process(const char* name)
{
auto pid = get_pid_by_process_name(name);
return (pid != -1 && kill(pid, SIGTERM) != -1);
}
+
static bool kill_process(const char* name)
{
auto pid = get_pid_by_process_name(name);
return (pid != -1 && kill(pid, SIGKILL) != -1);
}
- void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
- uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
- int device, bool is4K, uint32_t fps, uint32_t bandwidth,
- const std::string server, uint32_t port,
- const std::string license)
+
+ void launch_grok(const std::string& dir, uint32_t width, uint32_t stride,
+ uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
+ int device, bool is4K, uint32_t fps, uint32_t bandwidth,
+ const std::string server, uint32_t port,
+ const std::string license)
{
std::unique_lock<std::mutex> lk(shutdownMutex_);
- if (async_result_.valid())
+ if (async_result_.valid()) {
return;
- if(MessengerInit::firstLaunch(true))
+}
+ if (MessengerInit::first_launch(true)) {
init_.unlink();
- startThreads();
+}
+ start_threads();
char _cmd[4096];
auto fullServer = server + ":" + std::to_string(port);
sprintf(_cmd,
- "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
- "-G %d -%s %d,%d -j %s -J %s",
- GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
- device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
- license.c_str(), fullServer.c_str());
+ "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
+ "-G %d -%s %d,%d -j %s -J %s",
+ GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
+ device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
+ license.c_str(), fullServer.c_str());
launch(_cmd, dir);
}
- void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
+
+ void init_client(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
{
// client fills queue with pending uncompressed buffers
init_.uncompressedFrameSize_ = uncompressedFrameSize;
init_.compressedFrameSize_ = compressedFrameSize;
init_.numFrames_ = numFrames;
- initBuffers();
- auto ptr = uncompressed_buffer_;
- for(size_t i = 0; i < init_.numFrames_; ++i)
- {
+ init_buffers();
+ auto ptr = _uncompressed_buffer;
+ for (size_t i = 0; i < init_.numFrames_; ++i) {
availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
ptr += init_.uncompressedFrameSize_;
}
initialized_ = true;
clientInitializedCondition_.notify_all();
}
- bool waitForClientInit(void)
+
+ bool wait_for_client_init()
{
- if(initialized_)
+ if (initialized_) {
return true;
+}
std::unique_lock<std::mutex> lk(shutdownMutex_);
- if(initialized_)
+ if (initialized_) {
return true;
- else if (shutdown_)
+ } else if (shutdown_) {
return false;
- clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
+}
+ clientInitializedCondition_.wait(lk, [this] { return initialized_ || shutdown_; });
return initialized_ && !shutdown_;
}
- static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
+
+ static size_t uncompressed_frame_size(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
{
return sizeof(uint16_t) * w * h * samplesPerPixel;
}
- void reclaimCompressed(size_t frameId)
+
+ void reclaim_compressed(size_t frameId)
{
- availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
+ availableBuffers_.push(BufferSrc(0, frameId, get_compressed_frame(frameId)));
}
- void reclaimUncompressed(size_t frameId)
+
+ void reclaim_uncompressed(size_t frameId)
{
- availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
+ availableBuffers_.push(BufferSrc(0, frameId, get_uncompressed_frame(frameId)));
}
- uint8_t* getUncompressedFrame(size_t frameId)
+
+ uint8_t* get_uncompressed_frame(size_t frameId)
{
assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
+ if (frameId >= init_.numFrames_) {
return nullptr;
+}
- return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
+ return (uint8_t*)(_uncompressed_buffer + frameId * init_.uncompressedFrameSize_);
}
- uint8_t* getCompressedFrame(size_t frameId)
+
+ uint8_t* get_compressed_frame(size_t frameId)
{
assert(frameId < init_.numFrames_);
- if(frameId >= init_.numFrames_)
+ if (frameId >= init_.numFrames_) {
return nullptr;
+}
- return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
+ return (uint8_t*)(_compressed_buffer + frameId * init_.compressedFrameSize_);
}
+
std::atomic_bool running;
bool initialized_;
bool shutdown_;
std::mutex shutdownMutex_;
std::condition_variable shutdownCondition_;
- protected:
+protected:
std::condition_variable clientInitializedCondition_;
- private:
- void launch(const std::string &cmd, const std::string &dir)
+
+private:
+ void launch(const std::string& cmd, const std::string& dir)
{
// Change the working directory
- if(!dir.empty())
- {
- if(chdir(dir.c_str()) != 0)
- {
+ if (!dir.empty()) {
+ if (chdir(dir.c_str()) != 0) {
getMessengerLogger()->error("Error: failed to change the working directory");
return;
}
cmd_ = cmd;
async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
}
- std::thread outbound;
- Synch* outboundSynch_;
- std::thread inbound;
- Synch* inboundSynch_;
+ std::thread _outbound;
+ Synch* _outbound_synch;
- std::vector<std::thread> processors_;
- char* uncompressed_buffer_;
- char* compressed_buffer_;
+ std::thread _inbound;
+ Synch* _inbound_synch;
- grk_handle uncompressed_fd_;
- grk_handle compressed_fd_;
+ std::vector<std::thread> _processors;
+ char* _uncompressed_buffer;
+ char* _compressed_buffer;
+
+ grk_handle _uncompressed_fd;
+ grk_handle _compressed_fd;
};
-static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
+static void
+outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch)
{
grk_handle shm_fd = 0;
char* send_buffer = nullptr;
- if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
+ if (!SharedMemoryManager::init_shm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) {
return;
- while(messenger->running)
- {
+}
+ while (messenger->running) {
synch->wait(SYNCH_RECEIVE_READY);
- if(!messenger->running)
+ if (!messenger->running) {
break;
+}
std::string message;
- if(!messenger->sendQueue.waitAndPop(message))
+ if (!messenger->sendQueue.wait_and_pop(message)) {
break;
- if(!messenger->running)
+}
+ if (!messenger->running) {
break;
+}
memcpy(send_buffer, message.c_str(), message.size() + 1);
synch->post(SYNCH_SENT);
}
- SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
+ SharedMemoryManager::deinit_shm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
}
-static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
+static void
+inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch)
{
grk_handle shm_fd = 0;
char* receive_buffer = nullptr;
- if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
+ if (!SharedMemoryManager::init_shm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) {
return;
- while(messenger->running)
- {
+}
+ while (messenger->running) {
synch->wait(SYNCH_SENT);
- if(!messenger->running)
+ if (!messenger->running) {
break;
+}
auto message = std::string(receive_buffer);
synch->post(SYNCH_RECEIVE_READY);
messenger->receiveQueue.push(message);
}
- SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
+ SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
}
+
struct Msg
{
- explicit Msg(const std::string &msg) : ct_(0)
+ explicit Msg(const std::string& msg)
+ : ct_(0)
{
std::stringstream ss(msg);
- while(ss.good())
- {
+ while (ss.good()) {
std::string substr;
std::getline(ss, substr, ',');
cs_.push_back(substr);
}
}
+
std::string next()
{
- if(ct_ == cs_.size())
- {
+ if (ct_ == cs_.size()) {
getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
return "";
}
return cs_[ct_++];
}
- uint32_t nextUint(void)
+ uint32_t next_uint()
{
return (uint32_t)std::stoi(next());
}
std::vector<std::string> cs_;
size_t ct_;
};
-static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
+
+static void
+processorThread(Messenger* messenger, std::function<void(std::string)> processor)
{
- while(messenger->running)
- {
+ while (messenger->running) {
std::string message;
- if(!messenger->receiveQueue.waitAndPop(message))
+ if (!messenger->receiveQueue.wait_and_pop(message)) {
break;
- if(!messenger->running)
+}
+ if (!messenger->running) {
break;
+}
Msg msg(message);
auto tag = msg.next();
- if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
- {
- auto width = msg.nextUint();
- auto stride = msg.nextUint();
+ if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) {
+ auto width = msg.next_uint();
+ auto stride = msg.next_uint();
(void)stride;
- auto height = msg.nextUint();
- auto samplesPerPixel = msg.nextUint();
- auto depth = msg.nextUint();
+ auto height = msg.next_uint();
+ auto samplesPerPixel = msg.next_uint();
+ auto depth = msg.next_uint();
(void)depth;
messenger->init_.uncompressedFrameSize_ =
- Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
- auto compressedFrameSize = msg.nextUint();
- auto numFrames = msg.nextUint();
- messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
- }
- else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
- {
- messenger->reclaimUncompressed(msg.nextUint());
- }
- else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
- {
- messenger->reclaimCompressed(msg.nextUint());
+ Messenger::uncompressed_frame_size(width, height, samplesPerPixel);
+ auto compressedFrameSize = msg.next_uint();
+ auto numFrames = msg.next_uint();
+ messenger->init_client(compressedFrameSize, compressedFrameSize, numFrames);
+ } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) {
+ messenger->reclaim_uncompressed(msg.next_uint());
+ } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) {
+ messenger->reclaim_compressed(msg.next_uint());
}
processor(message);
}
}
-template<typename F>
+template <typename F>
struct ScheduledFrames
{
void store(F& val)
{
- std::unique_lock<std::mutex> lk(mapMutex_);
- auto it = map_.find(val.index());
- if (it == map_.end())
- map_[val.index()] = val;
+ std::unique_lock<std::mutex> lk(_map_mutex);
+ auto it = _map.find(val.index());
+ if (it == _map.end()) {
+ _map[val.index()] = val;
+}
}
- F retrieve(size_t index, bool &success)
+
+ F retrieve(size_t index, bool& success)
{
- std::unique_lock<std::mutex> lk(mapMutex_);
+ std::unique_lock<std::mutex> lk(_map_mutex);
success = false;
- auto it = map_.find(index);
- if(it == map_.end())
+ auto it = _map.find(index);
+ if (it == _map.end()) {
return F();
+}
success = true;
F val = it->second;
- map_.erase(index);
+ _map.erase(index);
return val;
}
- private:
- std::mutex mapMutex_;
- std::map<size_t, F> map_;
+private:
+ std::mutex _map_mutex;
+ std::map<size_t, F> _map;
};
-template<typename F>
+template <typename F>
struct ScheduledMessenger : public Messenger
{
- explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
- framesScheduled_(0),
- framesCompressed_(0)
- {}
- ~ScheduledMessenger(void) {
+ explicit ScheduledMessenger(MessengerInit init)
+ : Messenger(init)
+ , _frames_scheduled(0)
+ , _frames_compressed(0)
+ {
+ }
+
+ ~ScheduledMessenger()
+ {
shutdown();
}
- bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
+
+ bool schedule_compress(F proxy, std::function<void(BufferSrc)> converter)
+ {
size_t frameSize = init_.uncompressedFrameSize_;
assert(frameSize >= init_.uncompressedFrameSize_);
BufferSrc src;
- if(!availableBuffers_.waitAndPop(src))
+ if (!availableBuffers_.wait_and_pop(src)) {
return false;
+}
converter(src);
- scheduledFrames_.store(proxy);
- framesScheduled_++;
+ _scheduled_frames.store(proxy);
+ _frames_scheduled++;
send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
return true;
}
- void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
+
+ void process_compressed(const std::string& message, std::function<void(F, uint8_t*, uint32_t)> processor, bool needsRecompression)
+ {
Msg msg(message);
msg.next();
- auto clientFrameId = msg.nextUint();
- auto compressedFrameId = msg.nextUint();
- auto compressedFrameLength = msg.nextUint();
+ auto clientFrameId = msg.next_uint();
+ auto compressedFrameId = msg.next_uint();
+ auto compressedFrameLength = msg.next_uint();
if (!needsRecompression) {
bool success = false;
- auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
- if (!success)
+ auto srcFrame = _scheduled_frames.retrieve(clientFrameId, success);
+ if (!success) {
return;
- processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
+}
+ processor(srcFrame, get_compressed_frame(compressedFrameId), compressedFrameLength);
}
- ++framesCompressed_;
+ ++_frames_compressed;
send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
- if (shutdown_ && framesCompressed_ == framesScheduled_)
+ if (shutdown_ && _frames_compressed == _frames_scheduled) {
shutdownCondition_.notify_all();
+}
}
- void shutdown(void){
+
+ void shutdown()
+ {
try {
std::unique_lock<std::mutex> lk(shutdownMutex_);
- if (!async_result_.valid())
+ if (!async_result_.valid()) {
return;
+}
shutdown_ = true;
- if (framesScheduled_) {
- uint32_t scheduled = framesScheduled_;
+ if (_frames_scheduled) {
+ uint32_t scheduled = _frames_scheduled;
send(GRK_MSGR_BATCH_FLUSH, scheduled);
- shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
+ shutdownCondition_.wait(lk, [this] { return _frames_scheduled == _frames_compressed; });
}
availableBuffers_.deactivate();
send(GRK_MSGR_BATCH_SHUTDOWN);
int result = async_result_.get();
- if(result != 0)
- getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
- } catch (std::exception &ex) {
- getMessengerLogger()->error("%s",ex.what());
+ if (result != 0) {
+ getMessengerLogger()->error("Accelerator failed with return code: %d\n", result);
+}
+ } catch (std::exception& ex) {
+ getMessengerLogger()->error("%s", ex.what());
}
-
}
- F retrieve(size_t index, bool &success) {
- return scheduledFrames_.retrieve(index, success);
+
+ F retrieve(size_t index, bool& success)
+ {
+ return _scheduled_frames.retrieve(index, success);
}
- void store(F& val) {
- scheduledFrames_.store(val);
+
+ void store(F& val)
+ {
+ _scheduled_frames.store(val);
}
private:
- ScheduledFrames<F> scheduledFrames_;
- std::atomic<uint32_t> framesScheduled_;
- std::atomic<uint32_t> framesCompressed_;
+ ScheduledFrames<F> _scheduled_frames;
+ std::atomic<uint32_t> _frames_scheduled;
+ std::atomic<uint32_t> _frames_compressed;
};
} // namespace grk_plugin