diff options
| author | Carl Hetherington <cth@carlh.net> | 2023-07-07 16:50:03 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2023-07-07 16:50:03 +0200 |
| commit | 5c6bd7a9be5f2bf16fc887272a4bc02413c3f5fd (patch) | |
| tree | 98fe1865a784679b422d9363c442fc9894906f3b /src/lib | |
| parent | 05ca80a67cc99324ee6360b53d79b721ac0047d6 (diff) | |
Run clang-tidy.
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dcp_encoder.cc | 4 | ||||
| -rw-r--r-- | src/lib/dcp_encoder.h | 4 | ||||
| -rw-r--r-- | src/lib/dcp_video.cc | 2 | ||||
| -rw-r--r-- | src/lib/dcp_video.h | 4 | ||||
| -rw-r--r-- | src/lib/encoder.h | 4 | ||||
| -rw-r--r-- | src/lib/grok_context.h | 105 | ||||
| -rw-r--r-- | src/lib/grok_messenger.h | 487 | ||||
| -rw-r--r-- | src/lib/j2k_encoder.cc | 30 | ||||
| -rw-r--r-- | src/lib/j2k_encoder.h | 8 |
9 files changed, 357 insertions, 291 deletions
diff --git a/src/lib/dcp_encoder.cc b/src/lib/dcp_encoder.cc index bf6691a8a..1f47dee01 100644 --- a/src/lib/dcp_encoder.cc +++ b/src/lib/dcp_encoder.cc @@ -119,13 +119,13 @@ DCPEncoder::go () } void -DCPEncoder::pause(void) +DCPEncoder::pause() { _j2k_encoder.pause(); } void -DCPEncoder::resume(void) +DCPEncoder::resume() { _j2k_encoder.resume(); } diff --git a/src/lib/dcp_encoder.h b/src/lib/dcp_encoder.h index 7ddb1dfba..771679a98 100644 --- a/src/lib/dcp_encoder.h +++ b/src/lib/dcp_encoder.h @@ -53,8 +53,8 @@ public: return _finishing; } - void pause(void) override; - void resume(void) override; + void pause() override; + void resume() override; private: diff --git a/src/lib/dcp_video.cc b/src/lib/dcp_video.cc index ecf350ff0..456f62c7c 100644 --- a/src/lib/dcp_video.cc +++ b/src/lib/dcp_video.cc @@ -119,7 +119,7 @@ DCPVideo::convert_to_xyz (shared_ptr<const PlayerVideo> frame, dcp::NoteHandler } dcp::Size -DCPVideo::get_size(void) +DCPVideo::get_size() { auto image = _frame->image(bind(&PlayerVideo::keep_xyz_or_rgb, _1), VideoRange::FULL, false); return image->size(); diff --git a/src/lib/dcp_video.h b/src/lib/dcp_video.h index 3af657e17..d6f62d2bb 100644 --- a/src/lib/dcp_video.h +++ b/src/lib/dcp_video.h @@ -49,7 +49,7 @@ class PlayerVideo; class DCPVideo { public: - DCPVideo(void) + DCPVideo() : DCPVideo(nullptr, 0, 0, 0, Resolution::TWO_K) { } @@ -73,7 +73,7 @@ public: static std::shared_ptr<dcp::OpenJPEGImage> convert_to_xyz (std::shared_ptr<const PlayerVideo> frame, dcp::NoteHandler note); void convert_to_xyz(uint16_t* dst); - dcp::Size get_size(void); + dcp::Size get_size(); private: diff --git a/src/lib/encoder.h b/src/lib/encoder.h index 902190166..0c0578d83 100644 --- a/src/lib/encoder.h +++ b/src/lib/encoder.h @@ -59,9 +59,9 @@ public: virtual Frame frames_done () const = 0; virtual bool finishing() const = 0; - virtual void pause(void) {} + virtual void pause() {} - virtual void resume(void) {} + virtual void resume() {} protected: std::shared_ptr<const Film> _film; diff --git a/src/lib/grok_context.h b/src/lib/grok_context.h index 1e1850717..411e2f79f 100644 --- a/src/lib/grok_context.h +++ b/src/lib/grok_context.h @@ -70,7 +70,7 @@ struct GrokLogger : public MessengerLogger struct GrokInitializer { - GrokInitializer(void) + GrokInitializer() { setMessengerLogger(new GrokLogger("[GROK] ")); } @@ -80,7 +80,7 @@ struct GrokInitializer struct FrameProxy { - FrameProxy(void) + FrameProxy() : FrameProxy(0, Eyes::LEFT, DCPVideo()) { } @@ -97,7 +97,7 @@ struct FrameProxy return index_; } - Eyes eyes(void) const + Eyes eyes() const { return eyes_; } @@ -120,7 +120,7 @@ struct DcpomaticContext { } - void setDimensions(uint32_t w, uint32_t h) + void set_dimensions(uint32_t w, uint32_t h) { width_ = w; height_ = h; @@ -138,9 +138,9 @@ class GrokContext { public: explicit GrokContext(const DcpomaticContext& dcpomaticContext) - : dcpomaticContext_(dcpomaticContext) - , messenger_(nullptr) - , launched_(false) + : _dcpomatic_context(dcpomaticContext) + , _messenger(nullptr) + , _launched(false) { struct CompressedData : public dcp::Data { @@ -150,7 +150,7 @@ public: { } - ~CompressedData(void) + ~CompressedData() { delete[] data_; } @@ -175,11 +175,11 @@ public: }; if (Config::instance()->enable_gpu()) { - boost::filesystem::path folder(dcpomaticContext_.location_); + boost::filesystem::path folder(_dcpomatic_context.location_); boost::filesystem::path binaryPath = folder / "grk_compress"; if (!boost::filesystem::exists(binaryPath)) { getMessengerLogger()->error("Invalid binary location %s", - dcpomaticContext_.location_.c_str()); + _dcpomatic_context.location_.c_str()); return; } auto proc = [this](const std::string& str) { @@ -187,28 +187,29 @@ public: Msg msg(str); auto tag = msg.next(); if (tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED) { - auto clientFrameId = msg.nextUint(); - auto compressedFrameId = msg.nextUint(); + auto clientFrameId = msg.next_uint(); + auto compressedFrameId = msg.next_uint(); (void)compressedFrameId; - auto compressedFrameLength = msg.nextUint(); + auto compressedFrameLength = msg.next_uint(); auto processor = [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength) { auto compressedData = std::make_shared<CompressedData>(compressedFrameLength); memcpy(compressedData->data_, compressed, compressedFrameLength); - dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes()); + _dcpomatic_context.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes()); frame_done(); }; int const minimum_size = 16384; bool needsRecompression = compressedFrameLength < minimum_size; - messenger_->processCompressed(str, processor, needsRecompression); + _messenger->process_compressed(str, processor, needsRecompression); if (needsRecompression) { bool success = false; - auto fp = messenger_->retrieve(clientFrameId, success); - if (!success) + auto fp = _messenger->retrieve(clientFrameId, success); + if (!success) { return; +} auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally()); - dcpomaticContext_.writer_.write(encoded, fp.vf.index(), fp.vf.eyes()); + _dcpomatic_context.writer_.write(encoded, fp.vf.index(), fp.vf.eyes()); frame_done(); } } @@ -220,82 +221,90 @@ public: MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch, grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc, std::thread::hardware_concurrency()); - messenger_ = new ScheduledMessenger<FrameProxy>(clientInit); + _messenger = new ScheduledMessenger<FrameProxy>(clientInit); } } - ~GrokContext(void) + ~GrokContext() { shutdown(); } bool launch(DCPVideo dcpv, int device) { - if (!messenger_) + if (!_messenger) { return false; - if (launched_) +} + if (_launched) { return true; +} std::unique_lock<std::mutex> lk_global(launchMutex); - if (!messenger_) + if (!_messenger) { return false; - if (launched_) +} + if (_launched) { return true; - if (MessengerInit::firstLaunch(true)) { +} + if (MessengerInit::first_launch(true)) { auto s = dcpv.get_size(); - dcpomaticContext_.setDimensions(s.width, s.height); + _dcpomatic_context.set_dimensions(s.width, s.height); auto config = Config::instance(); - messenger_->launchGrok(dcpomaticContext_.location_, - dcpomaticContext_.width_, dcpomaticContext_.width_, - dcpomaticContext_.height_, + _messenger->launch_grok(_dcpomatic_context.location_, + _dcpomatic_context.width_, _dcpomatic_context.width_, + _dcpomatic_context.height_, 3, 12, device, - dcpomaticContext_.film_->resolution() == Resolution::FOUR_K, - dcpomaticContext_.film_->video_frame_rate(), - dcpomaticContext_.film_->j2k_bandwidth(), + _dcpomatic_context.film_->resolution() == Resolution::FOUR_K, + _dcpomatic_context.film_->video_frame_rate(), + _dcpomatic_context.film_->j2k_bandwidth(), config->gpu_license_server(), config->gpu_license_port(), config->gpu_license()); } - launched_ = messenger_->waitForClientInit(); + _launched = _messenger->wait_for_client_init(); - return launched_; + return _launched; } - bool scheduleCompress(const DCPVideo& vf) + bool schedule_compress(const DCPVideo& vf) { - if (!messenger_) + if (!_messenger) { return false; +} auto fp = FrameProxy(vf.index(), vf.eyes(), vf); auto cvt = [this, &fp](BufferSrc src) { // xyz conversion fp.vf.convert_to_xyz((uint16_t*)src.framePtr_); }; - return messenger_->scheduleCompress(fp, cvt); + return _messenger->schedule_compress(fp, cvt); } - void shutdown(void) + void shutdown() { - if (!messenger_) + if (!_messenger) { return; +} std::unique_lock<std::mutex> lk_global(launchMutex); - if (!messenger_) + if (!_messenger) { return; - if (launched_) - messenger_->shutdown(); - delete messenger_; - messenger_ = nullptr; +} + if (_launched) { + _messenger->shutdown(); +} + delete _messenger; + _messenger = nullptr; } void frame_done() { - dcpomaticContext_.history_.event(); + _dcpomatic_context.history_.event(); } private: - DcpomaticContext dcpomaticContext_; - ScheduledMessenger<FrameProxy>* messenger_; - bool launched_; + DcpomaticContext _dcpomatic_context; + ScheduledMessenger<FrameProxy>* _messenger; + bool _launched; }; } // namespace grk_plugin diff --git a/src/lib/grok_messenger.h b/src/lib/grok_messenger.h index bd5b6da6c..6b475aefd 100644 --- a/src/lib/grok_messenger.h +++ b/src/lib/grok_messenger.h @@ -70,7 +70,7 @@ static const size_t messageBufferLen = 256; struct IMessengerLogger { - virtual ~IMessengerLogger(void) = default; + virtual ~IMessengerLogger() = default; virtual void info(const char* fmt, ...) = 0; virtual void warn(const char* fmt, ...) = 0; virtual void error(const char* fmt, ...) = 0; @@ -142,7 +142,7 @@ setMessengerLogger(IMessengerLogger* logger) #pragma GCC diagnostic pop #endif static IMessengerLogger* -getMessengerLogger(void) +getMessengerLogger() { return sLogger; } @@ -167,11 +167,12 @@ struct MessengerInit , compressedFrameSize_(0) , numFrames_(0) { - if (firstLaunch(true)) + if (first_launch(true)) { unlink(); +} } - void unlink(void) + void unlink() { #ifndef _WIN32 shm_unlink(grokToClientMessageBuf.c_str()); @@ -179,7 +180,7 @@ struct MessengerInit #endif } - static bool firstLaunch(bool isClient) + static bool first_launch(bool isClient) { bool debugGrok = false; return debugGrok != isClient; @@ -213,83 +214,93 @@ typedef int grk_handle; struct Synch { Synch(const std::string& sentSemName, const std::string& receiveReadySemName) - : sentSemName_(sentSemName) - , receiveReadySemName_(receiveReadySemName) + : _sent_sem_name(sentSemName) + , _receive_ready_sem_name(receiveReadySemName) { // unlink semaphores in case of previous crash - if (MessengerInit::firstLaunch(true)) + if (MessengerInit::first_launch(true)) { unlink(); +} open(); } ~Synch() { close(); - if (MessengerInit::firstLaunch(true)) + if (MessengerInit::first_launch(true)) { unlink(); +} } void post(SynchDirection dir) { auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_); int rc = sem_post(sem); - if (rc) + if (rc) { getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno)); +} } void wait(SynchDirection dir) { auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_; int rc = sem_wait(sem); - if (rc) + if (rc) { getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno)); +} } - void open(void) + void open() { - sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0); - if (!sentSem_) + sentSem_ = sem_open(_sent_sem_name.c_str(), O_CREAT, 0666, 0); + if (!sentSem_) { getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); - receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1); - if (!receiveReadySem_) +} + receiveReadySem_ = sem_open(_receive_ready_sem_name.c_str(), O_CREAT, 0666, 1); + if (!receiveReadySem_) { getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno)); +} } - void close(void) + void close() { int rc = sem_close(sentSem_); - if (rc) - getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(), + if (rc) { + getMessengerLogger()->error("Error closing semaphore %s: %s", _sent_sem_name.c_str(), strerror(errno)); +} rc = sem_close(receiveReadySem_); - if (rc) + if (rc) { getMessengerLogger()->error("Error closing semaphore %s: %s", - receiveReadySemName_.c_str(), strerror(errno)); + _receive_ready_sem_name.c_str(), strerror(errno)); +} } - void unlink(void) + void unlink() { - int rc = sem_unlink(sentSemName_.c_str()); - if (rc == -1 && errno != ENOENT) - getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(), + int rc = sem_unlink(_sent_sem_name.c_str()); + if (rc == -1 && errno != ENOENT) { + getMessengerLogger()->error("Error unlinking semaphore %s: %s", _sent_sem_name.c_str(), strerror(errno)); - rc = sem_unlink(receiveReadySemName_.c_str()); - if (rc == -1 && errno != ENOENT) +} + rc = sem_unlink(_receive_ready_sem_name.c_str()); + if (rc == -1 && errno != ENOENT) { getMessengerLogger()->error("Error unlinking semaphore %s: %s", - receiveReadySemName_.c_str(), strerror(errno)); + _receive_ready_sem_name.c_str(), strerror(errno)); +} } sem_t* sentSem_; sem_t* receiveReadySem_; private: - std::string sentSemName_; - std::string receiveReadySemName_; + std::string _sent_sem_name; + std::string _receive_ready_sem_name; }; struct SharedMemoryManager { - static bool initShm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer) + static bool init_shm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer) { *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666); if (*shm_fd < 0) { @@ -300,43 +311,51 @@ struct SharedMemoryManager if (rc) { getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno)); rc = close(*shm_fd); - if (rc) + if (rc) { getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno)); +} rc = shm_unlink(name.c_str()); - if (rc) + if (rc) { getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno)); +} return false; } *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0)); if (!*buffer) { getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno)); rc = close(*shm_fd); - if (rc) + if (rc) { getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno)); +} rc = shm_unlink(name.c_str()); - if (rc) + if (rc) { getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno)); +} } return *buffer != nullptr; } - static bool deinitShm(const std::string& name, size_t len, grk_handle& shm_fd, char** buffer) + static bool deinit_shm(const std::string& name, size_t len, grk_handle& shm_fd, char** buffer) { - if (!*buffer || !shm_fd) + if (!*buffer || !shm_fd) { return true; +} int rc = munmap(*buffer, len); *buffer = nullptr; - if (rc) + if (rc) { getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno)); +} rc = close(shm_fd); shm_fd = 0; - if (rc) + if (rc) { getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno)); +} rc = shm_unlink(name.c_str()); - if (rc) + if (rc) { fprintf(stderr, "Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno)); +} return true; } @@ -347,8 +366,8 @@ class MessengerBlockingQueue { public: explicit MessengerBlockingQueue(size_t max) - : active_(true) - , max_size_(max) + : _active(true) + , _max_size(max) { } @@ -359,57 +378,61 @@ public: size_t size() const { - return queue_.size(); + return _queue.size(); } // deactivate and clear queue void deactivate() { { - std::lock_guard<std::mutex> lk(mutex_); - active_ = false; - while (!queue_.empty()) - queue_.pop(); + std::lock_guard<std::mutex> lk(_mutex); + _active = false; + while (!_queue.empty()) { + _queue.pop(); +} } // release all waiting threads - can_pop_.notify_all(); - can_push_.notify_all(); + _can_pop.notify_all(); + _can_push.notify_all(); } void activate() { - std::lock_guard<std::mutex> lk(mutex_); - active_ = true; + std::lock_guard<std::mutex> lk(_mutex); + _active = true; } bool push(Data const& value) { bool rc; { - std::unique_lock<std::mutex> lk(mutex_); - rc = push_(value); + std::unique_lock<std::mutex> lk(_mutex); + rc = _push(value); } - if (rc) - can_pop_.notify_one(); + if (rc) { + _can_pop.notify_one(); +} return rc; } - bool waitAndPush(Data& value) + bool wait_and_push(Data& value) { bool rc; { - std::unique_lock<std::mutex> lk(mutex_); - if (!active_) + std::unique_lock<std::mutex> lk(_mutex); + if (!_active) { return false; +} // in case of spurious wakeup, loop until predicate in lambda // is satisfied. - can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; }); - rc = push_(value); + _can_push.wait(lk, [this] { return _queue.size() < _max_size || !_active; }); + rc = _push(value); } - if (rc) - can_pop_.notify_one(); + if (rc) { + _can_pop.notify_one(); +} return rc; } @@ -418,64 +441,69 @@ public: { bool rc; { - std::unique_lock<std::mutex> lk(mutex_); - rc = pop_(value); + std::unique_lock<std::mutex> lk(_mutex); + rc = _pop(value); } - if (rc) - can_push_.notify_one(); + if (rc) { + _can_push.notify_one(); +} return rc; } - bool waitAndPop(Data& value) + bool wait_and_pop(Data& value) { bool rc; { - std::unique_lock<std::mutex> lk(mutex_); - if (!active_) + std::unique_lock<std::mutex> lk(_mutex); + if (!_active) { return false; +} // in case of spurious wakeup, loop until predicate in lambda // is satisfied. - can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; }); - rc = pop_(value); + _can_pop.wait(lk, [this] { return !_queue.empty() || !_active; }); + rc = _pop(value); } - if (rc) - can_push_.notify_one(); + if (rc) { + _can_push.notify_one(); +} return rc; } private: - bool push_(Data const& value) + bool _push(Data const& value) { - if (queue_.size() == max_size_ || !active_) + if (_queue.size() == _max_size || !_active) { return false; - queue_.push(value); +} + _queue.push(value); return true; } - bool pop_(Data& value) + bool _pop(Data& value) { - if (queue_.empty() || !active_) + if (_queue.empty() || !_active) { return false; - value = queue_.front(); - queue_.pop(); +} + value = _queue.front(); + _queue.pop(); return true; } - std::queue<Data> queue_; - mutable std::mutex mutex_; - std::condition_variable can_pop_; - std::condition_variable can_push_; - bool active_; - size_t max_size_; + std::queue<Data> _queue; + mutable std::mutex _mutex; + std::condition_variable _can_pop; + std::condition_variable _can_push; + bool _active; + size_t _max_size; }; struct BufferSrc { - BufferSrc(void) + BufferSrc() : BufferSrc("") { } @@ -496,7 +524,7 @@ struct BufferSrc { } - bool fromDisk(void) + bool from_disk() { return !file_.empty() && framePtr_ == nullptr; } @@ -527,64 +555,68 @@ struct Messenger , initialized_(false) , shutdown_(false) , init_(init) - , outboundSynch_(nullptr) - , inboundSynch_(nullptr) - , uncompressed_buffer_(nullptr) - , compressed_buffer_(nullptr) - , uncompressed_fd_(0) - , compressed_fd_(0) + , _outbound_synch(nullptr) + , _inbound_synch(nullptr) + , _uncompressed_buffer(nullptr) + , _compressed_buffer(nullptr) + , _uncompressed_fd(0) + , _compressed_fd(0) { } - virtual ~Messenger(void) + virtual ~Messenger() { running = false; sendQueue.deactivate(); receiveQueue.deactivate(); - if (outboundSynch_) { - outboundSynch_->post(SYNCH_RECEIVE_READY); - outbound.join(); + if (_outbound_synch) { + _outbound_synch->post(SYNCH_RECEIVE_READY); + _outbound.join(); } - if (inboundSynch_) { - inboundSynch_->post(SYNCH_SENT); - inbound.join(); + if (_inbound_synch) { + _inbound_synch->post(SYNCH_SENT); + _inbound.join(); } - for (auto& p : processors_) + for (auto& p : _processors) { p.join(); +} - delete outboundSynch_; - delete inboundSynch_; + delete _outbound_synch; + delete _inbound_synch; - deinitShm(); + deinit_shm(); } - void startThreads(void) + void start_threads() { - outboundSynch_ = + _outbound_synch = new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch); - outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_); + _outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, _outbound_synch); - inboundSynch_ = + _inbound_synch = new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch); - inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_); + _inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, _inbound_synch); - for (size_t i = 0; i < init_.numProcessingThreads_; ++i) - processors_.push_back(std::thread(processorThread, this, init_.processor_)); + for (size_t i = 0; i < init_.numProcessingThreads_; ++i) { + _processors.push_back(std::thread(processorThread, this, init_.processor_)); +} } size_t serialize(const std::string& dir, size_t clientFrameId, uint8_t* compressedPtr, size_t compressedLength) { char fname[512]; - if (!compressedPtr || !compressedLength) + if (!compressedPtr || !compressedLength) { return 0; +} sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId); auto fp = fopen(fname, "wb"); - if (!fp) + if (!fp) { return 0; +} size_t written = fwrite(compressedPtr, 1, compressedLength, fp); if (written != compressedLength) { fclose(fp); @@ -596,31 +628,31 @@ struct Messenger return written; } - bool initBuffers(void) + bool init_buffers() { bool rc = true; if (init_.uncompressedFrameSize_) { - rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf, + rc = rc && SharedMemoryManager::init_shm(grokUncompressedBuf, init_.uncompressedFrameSize_ * init_.numFrames_, - &uncompressed_fd_, &uncompressed_buffer_); + &_uncompressed_fd, &_uncompressed_buffer); } if (init_.compressedFrameSize_) { - rc = rc && SharedMemoryManager::initShm(grokCompressedBuf, + rc = rc && SharedMemoryManager::init_shm(grokCompressedBuf, init_.compressedFrameSize_ * init_.numFrames_, - &compressed_fd_, &compressed_buffer_); + &_compressed_fd, &_compressed_buffer); } return rc; } - bool deinitShm(void) + bool deinit_shm() { - bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf, + bool rc = SharedMemoryManager::deinit_shm(grokUncompressedBuf, init_.uncompressedFrameSize_ * init_.numFrames_, - uncompressed_fd_, &uncompressed_buffer_); - rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf, + _uncompressed_fd, &_uncompressed_buffer); + rc = rc && SharedMemoryManager::deinit_shm(grokCompressedBuf, init_.compressedFrameSize_ * init_.numFrames_, - compressed_fd_, &compressed_buffer_); + _compressed_fd, &_compressed_buffer); return rc; } @@ -641,11 +673,13 @@ struct Messenger char command[256]; snprintf(command, sizeof(command), "pgrep %s", name); auto pgrep = popen(command, "r"); - if (!pgrep) + if (!pgrep) { return -1; +} pid_t pid; - if (fscanf(pgrep, "%d", &pid) != 1) + if (fscanf(pgrep, "%d", &pid) != 1) { pid = -1; +} pclose(pgrep); return pid; @@ -665,7 +699,7 @@ struct Messenger return (pid != -1 && kill(pid, SIGKILL) != -1); } - void launchGrok(const std::string& dir, uint32_t width, uint32_t stride, + void launch_grok(const std::string& dir, uint32_t width, uint32_t stride, uint32_t height, uint32_t samplesPerPixel, uint32_t depth, int device, bool is4K, uint32_t fps, uint32_t bandwidth, const std::string server, uint32_t port, @@ -673,11 +707,13 @@ struct Messenger { std::unique_lock<std::mutex> lk(shutdownMutex_); - if (async_result_.valid()) + if (async_result_.valid()) { return; - if (MessengerInit::firstLaunch(true)) +} + if (MessengerInit::first_launch(true)) { init_.unlink(); - startThreads(); +} + start_threads(); char _cmd[4096]; auto fullServer = server + ":" + std::to_string(port); sprintf(_cmd, @@ -689,14 +725,14 @@ struct Messenger launch(_cmd, dir); } - void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) + void init_client(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) { // client fills queue with pending uncompressed buffers init_.uncompressedFrameSize_ = uncompressedFrameSize; init_.compressedFrameSize_ = compressedFrameSize; init_.numFrames_ = numFrames; - initBuffers(); - auto ptr = uncompressed_buffer_; + init_buffers(); + auto ptr = _uncompressed_buffer; for (size_t i = 0; i < init_.numFrames_; ++i) { availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr)); ptr += init_.uncompressedFrameSize_; @@ -707,52 +743,56 @@ struct Messenger clientInitializedCondition_.notify_all(); } - bool waitForClientInit(void) + bool wait_for_client_init() { - if (initialized_) + if (initialized_) { return true; +} std::unique_lock<std::mutex> lk(shutdownMutex_); - if (initialized_) + if (initialized_) { return true; - else if (shutdown_) + } else if (shutdown_) { return false; +} clientInitializedCondition_.wait(lk, [this] { return initialized_ || shutdown_; }); return initialized_ && !shutdown_; } - static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) + static size_t uncompressed_frame_size(uint32_t w, uint32_t h, uint32_t samplesPerPixel) { return sizeof(uint16_t) * w * h * samplesPerPixel; } - void reclaimCompressed(size_t frameId) + void reclaim_compressed(size_t frameId) { - availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId))); + availableBuffers_.push(BufferSrc(0, frameId, get_compressed_frame(frameId))); } - void reclaimUncompressed(size_t frameId) + void reclaim_uncompressed(size_t frameId) { - availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId))); + availableBuffers_.push(BufferSrc(0, frameId, get_uncompressed_frame(frameId))); } - uint8_t* getUncompressedFrame(size_t frameId) + uint8_t* get_uncompressed_frame(size_t frameId) { assert(frameId < init_.numFrames_); - if (frameId >= init_.numFrames_) + if (frameId >= init_.numFrames_) { return nullptr; +} - return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_); + return (uint8_t*)(_uncompressed_buffer + frameId * init_.uncompressedFrameSize_); } - uint8_t* getCompressedFrame(size_t frameId) + uint8_t* get_compressed_frame(size_t frameId) { assert(frameId < init_.numFrames_); - if (frameId >= init_.numFrames_) + if (frameId >= init_.numFrames_) { return nullptr; +} - return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_); + return (uint8_t*)(_compressed_buffer + frameId * init_.compressedFrameSize_); } std::atomic_bool running; @@ -785,18 +825,18 @@ private: async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); }); } - std::thread outbound; - Synch* outboundSynch_; + std::thread _outbound; + Synch* _outbound_synch; - std::thread inbound; - Synch* inboundSynch_; + std::thread _inbound; + Synch* _inbound_synch; - std::vector<std::thread> processors_; - char* uncompressed_buffer_; - char* compressed_buffer_; + std::vector<std::thread> _processors; + char* _uncompressed_buffer; + char* _compressed_buffer; - grk_handle uncompressed_fd_; - grk_handle compressed_fd_; + grk_handle _uncompressed_fd; + grk_handle _compressed_fd; }; static void @@ -805,21 +845,25 @@ outboundThread(Messenger* messenger, const std::string& sendBuf, Synch* synch) grk_handle shm_fd = 0; char* send_buffer = nullptr; - if (!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) + if (!SharedMemoryManager::init_shm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) { return; +} while (messenger->running) { synch->wait(SYNCH_RECEIVE_READY); - if (!messenger->running) + if (!messenger->running) { break; +} std::string message; - if (!messenger->sendQueue.waitAndPop(message)) + if (!messenger->sendQueue.wait_and_pop(message)) { break; - if (!messenger->running) +} + if (!messenger->running) { break; +} memcpy(send_buffer, message.c_str(), message.size() + 1); synch->post(SYNCH_SENT); } - SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer); + SharedMemoryManager::deinit_shm(sendBuf, messageBufferLen, shm_fd, &send_buffer); } static void @@ -828,17 +872,19 @@ inboundThread(Messenger* messenger, const std::string& receiveBuf, Synch* synch) grk_handle shm_fd = 0; char* receive_buffer = nullptr; - if (!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) + if (!SharedMemoryManager::init_shm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) { return; +} while (messenger->running) { synch->wait(SYNCH_SENT); - if (!messenger->running) + if (!messenger->running) { break; +} auto message = std::string(receive_buffer); synch->post(SYNCH_RECEIVE_READY); messenger->receiveQueue.push(message); } - SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer); + SharedMemoryManager::deinit_shm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer); } struct Msg @@ -863,7 +909,7 @@ struct Msg return cs_[ct_++]; } - uint32_t nextUint(void) + uint32_t next_uint() { return (uint32_t)std::stoi(next()); } @@ -877,29 +923,31 @@ processorThread(Messenger* messenger, std::function<void(std::string)> processor { while (messenger->running) { std::string message; - if (!messenger->receiveQueue.waitAndPop(message)) + if (!messenger->receiveQueue.wait_and_pop(message)) { break; - if (!messenger->running) +} + if (!messenger->running) { break; +} Msg msg(message); auto tag = msg.next(); if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) { - auto width = msg.nextUint(); - auto stride = msg.nextUint(); + auto width = msg.next_uint(); + auto stride = msg.next_uint(); (void)stride; - auto height = msg.nextUint(); - auto samplesPerPixel = msg.nextUint(); - auto depth = msg.nextUint(); + auto height = msg.next_uint(); + auto samplesPerPixel = msg.next_uint(); + auto depth = msg.next_uint(); (void)depth; messenger->init_.uncompressedFrameSize_ = - Messenger::uncompressedFrameSize(width, height, samplesPerPixel); - auto compressedFrameSize = msg.nextUint(); - auto numFrames = msg.nextUint(); - messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames); + Messenger::uncompressed_frame_size(width, height, samplesPerPixel); + auto compressedFrameSize = msg.next_uint(); + auto numFrames = msg.next_uint(); + messenger->init_client(compressedFrameSize, compressedFrameSize, numFrames); } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) { - messenger->reclaimUncompressed(msg.nextUint()); + messenger->reclaim_uncompressed(msg.next_uint()); } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) { - messenger->reclaimCompressed(msg.nextUint()); + messenger->reclaim_compressed(msg.next_uint()); } processor(message); } @@ -910,30 +958,32 @@ struct ScheduledFrames { void store(F& val) { - std::unique_lock<std::mutex> lk(mapMutex_); - auto it = map_.find(val.index()); - if (it == map_.end()) - map_[val.index()] = val; + std::unique_lock<std::mutex> lk(_map_mutex); + auto it = _map.find(val.index()); + if (it == _map.end()) { + _map[val.index()] = val; +} } F retrieve(size_t index, bool& success) { - std::unique_lock<std::mutex> lk(mapMutex_); + std::unique_lock<std::mutex> lk(_map_mutex); success = false; - auto it = map_.find(index); - if (it == map_.end()) + auto it = _map.find(index); + if (it == _map.end()) { return F(); +} success = true; F val = it->second; - map_.erase(index); + _map.erase(index); return val; } private: - std::mutex mapMutex_; - std::map<size_t, F> map_; + std::mutex _map_mutex; + std::map<size_t, F> _map; }; template <typename F> @@ -941,68 +991,73 @@ struct ScheduledMessenger : public Messenger { explicit ScheduledMessenger(MessengerInit init) : Messenger(init) - , framesScheduled_(0) - , framesCompressed_(0) + , _frames_scheduled(0) + , _frames_compressed(0) { } - ~ScheduledMessenger(void) + ~ScheduledMessenger() { shutdown(); } - bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter) + bool schedule_compress(F proxy, std::function<void(BufferSrc)> converter) { size_t frameSize = init_.uncompressedFrameSize_; assert(frameSize >= init_.uncompressedFrameSize_); BufferSrc src; - if (!availableBuffers_.waitAndPop(src)) + if (!availableBuffers_.wait_and_pop(src)) { return false; +} converter(src); - scheduledFrames_.store(proxy); - framesScheduled_++; + _scheduled_frames.store(proxy); + _frames_scheduled++; send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_); return true; } - void processCompressed(const std::string& message, std::function<void(F, uint8_t*, uint32_t)> processor, bool needsRecompression) + void process_compressed(const std::string& message, std::function<void(F, uint8_t*, uint32_t)> processor, bool needsRecompression) { Msg msg(message); msg.next(); - auto clientFrameId = msg.nextUint(); - auto compressedFrameId = msg.nextUint(); - auto compressedFrameLength = msg.nextUint(); + auto clientFrameId = msg.next_uint(); + auto compressedFrameId = msg.next_uint(); + auto compressedFrameLength = msg.next_uint(); if (!needsRecompression) { bool success = false; - auto srcFrame = scheduledFrames_.retrieve(clientFrameId, success); - if (!success) + auto srcFrame = _scheduled_frames.retrieve(clientFrameId, success); + if (!success) { return; - processor(srcFrame, getCompressedFrame(compressedFrameId), compressedFrameLength); +} + processor(srcFrame, get_compressed_frame(compressedFrameId), compressedFrameLength); } - ++framesCompressed_; + ++_frames_compressed; send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); - if (shutdown_ && framesCompressed_ == framesScheduled_) + if (shutdown_ && _frames_compressed == _frames_scheduled) { shutdownCondition_.notify_all(); +} } - void shutdown(void) + void shutdown() { try { std::unique_lock<std::mutex> lk(shutdownMutex_); - if (!async_result_.valid()) + if (!async_result_.valid()) { return; +} shutdown_ = true; - if (framesScheduled_) { - uint32_t scheduled = framesScheduled_; + if (_frames_scheduled) { + uint32_t scheduled = _frames_scheduled; send(GRK_MSGR_BATCH_FLUSH, scheduled); - shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; }); + shutdownCondition_.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); } availableBuffers_.deactivate(); send(GRK_MSGR_BATCH_SHUTDOWN); int result = async_result_.get(); - if (result != 0) + if (result != 0) { getMessengerLogger()->error("Accelerator failed with return code: %d\n", result); +} } catch (std::exception& ex) { getMessengerLogger()->error("%s", ex.what()); } @@ -1010,18 +1065,18 @@ struct ScheduledMessenger : public Messenger F retrieve(size_t index, bool& success) { - return scheduledFrames_.retrieve(index, success); + return _scheduled_frames.retrieve(index, success); } void store(F& val) { - scheduledFrames_.store(val); + _scheduled_frames.store(val); } private: - ScheduledFrames<F> scheduledFrames_; - std::atomic<uint32_t> framesScheduled_; - std::atomic<uint32_t> framesCompressed_; + ScheduledFrames<F> _scheduled_frames; + std::atomic<uint32_t> _frames_scheduled; + std::atomic<uint32_t> _frames_compressed; }; } // namespace grk_plugin diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index dbb7f4f7b..7d2b95b1a 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -62,8 +62,8 @@ J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer) : _film(film) , _history(200) , _writer(writer) - , dcpomaticContext_(film, writer, _history, Config::instance()->gpu_binary_location()) - , context_(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(dcpomaticContext_) : nullptr) + , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location()) + , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr) { servers_list_changed (); } @@ -78,7 +78,7 @@ J2KEncoder::~J2KEncoder () terminate_threads(); } - delete context_; + delete _context; } void @@ -90,17 +90,18 @@ J2KEncoder::begin () } void -J2KEncoder::pause(void) +J2KEncoder::pause() { - if (Config::instance()->enable_gpu()) + if (Config::instance()->enable_gpu()) { end(false); } +} void -J2KEncoder::resume(void) +J2KEncoder::resume() { if (Config::instance()->enable_gpu()) { - context_ = new grk_plugin::GrokContext(dcpomaticContext_); + _context = new grk_plugin::GrokContext(_dcpomatic_context); servers_list_changed(); } } @@ -145,7 +146,7 @@ J2KEncoder::end(bool isFinal) if (isFinal) { for (auto& i : _queue) { if (Config::instance()->enable_gpu()) { - if (!context_->scheduleCompress(i)) { + if (!_context->schedule_compress(i)) { LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index()); // handle error } @@ -163,8 +164,8 @@ J2KEncoder::end(bool isFinal) } } } - delete context_; - context_ = nullptr; + delete _context; + _context = nullptr; } @@ -214,10 +215,11 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time) size_t threads = 0; { boost::mutex::scoped_lock lm (_threads_mutex); - if (_threads) + if (_threads) { threads = _threads->size(); - else + } else { threads = std::thread::hardware_concurrency(); +} } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -368,8 +370,8 @@ try } } else { - if (context_) { - if (!context_->launch(vf, config->selected_gpu()) || !context_->scheduleCompress(vf)) { + if (_context) { + if (!_context->launch(vf, config->selected_gpu()) || !_context->schedule_compress(vf)) { LOG_GENERAL(N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); _queue.push_front(vf); } diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h index 2f30c5f80..d91aadf4f 100644 --- a/src/lib/j2k_encoder.h +++ b/src/lib/j2k_encoder.h @@ -71,8 +71,8 @@ public: /** Called to pass a bit of video to be encoded as the next DCP frame */ void encode (std::shared_ptr<PlayerVideo> pv, dcpomatic::DCPTime time); - void pause(void); - void resume(void); + void pause(); + void resume(); /** Called when a processing run has finished */ void end(bool isFinal); @@ -112,8 +112,8 @@ private: boost::signals2::scoped_connection _server_found_connection; - grk_plugin::DcpomaticContext dcpomaticContext_; - grk_plugin::GrokContext* context_; + grk_plugin::DcpomaticContext _dcpomatic_context; + grk_plugin::GrokContext* _context; }; |
