auto fp = _messenger->retrieve(clientFrameId, success);
if (!success) {
return;
-}
+ }
auto encoded = std::make_shared<dcp::ArrayData>(fp.vf.encode_locally());
_dcpomatic_context.writer_.write(encoded, fp.vf.index(), fp.vf.eyes());
{
if (!_messenger) {
return false;
-}
+ }
if (_launched) {
return true;
-}
+ }
std::unique_lock<std::mutex> lk_global(launchMutex);
if (!_messenger) {
return false;
-}
+ }
if (_launched) {
return true;
-}
+ }
if (MessengerInit::first_launch(true)) {
auto s = dcpv.get_size();
_dcpomatic_context.set_dimensions(s.width, s.height);
auto config = Config::instance();
_messenger->launch_grok(_dcpomatic_context.location_,
- _dcpomatic_context.width_, _dcpomatic_context.width_,
- _dcpomatic_context.height_,
- 3, 12, device,
- _dcpomatic_context.film_->resolution() == Resolution::FOUR_K,
- _dcpomatic_context.film_->video_frame_rate(),
- _dcpomatic_context.film_->j2k_bandwidth(),
- config->gpu_license_server(),
- config->gpu_license_port(),
- config->gpu_license());
+ _dcpomatic_context.width_, _dcpomatic_context.width_,
+ _dcpomatic_context.height_,
+ 3, 12, device,
+ _dcpomatic_context.film_->resolution() == Resolution::FOUR_K,
+ _dcpomatic_context.film_->video_frame_rate(),
+ _dcpomatic_context.film_->j2k_bandwidth(),
+ config->gpu_license_server(),
+ config->gpu_license_port(),
+ config->gpu_license());
}
_launched = _messenger->wait_for_client_init();
{
if (!_messenger) {
return false;
-}
+ }
auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
auto cvt = [this, &fp](BufferSrc src) {
{
if (!_messenger) {
return;
-}
+ }
std::unique_lock<std::mutex> lk_global(launchMutex);
if (!_messenger) {
return;
-}
+ }
if (_launched) {
_messenger->shutdown();
-}
+ }
delete _messenger;
_messenger = nullptr;
}
{
if (first_launch(true)) {
unlink();
-}
+ }
}
void unlink()
// unlink semaphores in case of previous crash
if (MessengerInit::first_launch(true)) {
unlink();
-}
+ }
open();
}
close();
if (MessengerInit::first_launch(true)) {
unlink();
-}
+ }
}
void post(SynchDirection dir)
int rc = sem_post(sem);
if (rc) {
getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
-}
+ }
}
void wait(SynchDirection dir)
int rc = sem_wait(sem);
if (rc) {
getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
-}
+ }
}
void open()
sentSem_ = sem_open(_sent_sem_name.c_str(), O_CREAT, 0666, 0);
if (!sentSem_) {
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
-}
+ }
receiveReadySem_ = sem_open(_receive_ready_sem_name.c_str(), O_CREAT, 0666, 1);
if (!receiveReadySem_) {
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
-}
+ }
}
void close()
if (rc) {
getMessengerLogger()->error("Error closing semaphore %s: %s", _sent_sem_name.c_str(),
strerror(errno));
-}
+ }
rc = sem_close(receiveReadySem_);
if (rc) {
getMessengerLogger()->error("Error closing semaphore %s: %s",
_receive_ready_sem_name.c_str(), strerror(errno));
-}
+ }
}
void unlink()
if (rc == -1 && errno != ENOENT) {
getMessengerLogger()->error("Error unlinking semaphore %s: %s", _sent_sem_name.c_str(),
strerror(errno));
-}
+ }
rc = sem_unlink(_receive_ready_sem_name.c_str());
if (rc == -1 && errno != ENOENT) {
getMessengerLogger()->error("Error unlinking semaphore %s: %s",
_receive_ready_sem_name.c_str(), strerror(errno));
-}
+ }
}
sem_t* sentSem_;
rc = close(*shm_fd);
if (rc) {
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
-}
+ }
rc = shm_unlink(name.c_str());
if (rc) {
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
-}
+ }
return false;
}
*buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
rc = close(*shm_fd);
if (rc) {
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
-}
+ }
rc = shm_unlink(name.c_str());
if (rc) {
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
-}
+ }
}
return *buffer != nullptr;
{
if (!*buffer || !shm_fd) {
return true;
-}
+ }
int rc = munmap(*buffer, len);
*buffer = nullptr;
if (rc) {
getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
-}
+ }
rc = close(shm_fd);
shm_fd = 0;
if (rc) {
getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
-}
+ }
rc = shm_unlink(name.c_str());
if (rc) {
fprintf(stderr, "Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
-}
+ }
return true;
}
_active = false;
while (!_queue.empty()) {
_queue.pop();
-}
+ }
}
// release all waiting threads
}
if (rc) {
_can_pop.notify_one();
-}
+ }
return rc;
}
std::unique_lock<std::mutex> lk(_mutex);
if (!_active) {
return false;
-}
+ }
// in case of spurious wakeup, loop until predicate in lambda
// is satisfied.
_can_push.wait(lk, [this] { return _queue.size() < _max_size || !_active; });
}
if (rc) {
_can_pop.notify_one();
-}
+ }
return rc;
}
}
if (rc) {
_can_push.notify_one();
-}
+ }
return rc;
}
std::unique_lock<std::mutex> lk(_mutex);
if (!_active) {
return false;
-}
+ }
// in case of spurious wakeup, loop until predicate in lambda
// is satisfied.
_can_pop.wait(lk, [this] { return !_queue.empty() || !_active; });
}
if (rc) {
_can_push.notify_one();
-}
+ }
return rc;
}
{
if (_queue.size() == _max_size || !_active) {
return false;
-}
+ }
_queue.push(value);
return true;
{
if (_queue.empty() || !_active) {
return false;
-}
+ }
value = _queue.front();
_queue.pop();
for (auto& p : _processors) {
p.join();
-}
+ }
delete _outbound_synch;
delete _inbound_synch;
for (size_t i = 0; i < init_.numProcessingThreads_; ++i) {
_processors.push_back(std::thread(processorThread, this, init_.processor_));
-}
+ }
}
size_t serialize(const std::string& dir, size_t clientFrameId, uint8_t* compressedPtr,
char fname[512];
if (!compressedPtr || !compressedLength) {
return 0;
-}
+ }
sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
auto fp = fopen(fname, "wb");
if (!fp) {
return 0;
-}
+ }
size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
if (written != compressedLength) {
fclose(fp);
bool rc = true;
if (init_.uncompressedFrameSize_) {
rc = rc && SharedMemoryManager::init_shm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- &_uncompressed_fd, &_uncompressed_buffer);
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ &_uncompressed_fd, &_uncompressed_buffer);
}
if (init_.compressedFrameSize_) {
rc = rc && SharedMemoryManager::init_shm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- &_compressed_fd, &_compressed_buffer);
+ init_.compressedFrameSize_ * init_.numFrames_,
+ &_compressed_fd, &_compressed_buffer);
}
return rc;
bool deinit_shm()
{
bool rc = SharedMemoryManager::deinit_shm(grokUncompressedBuf,
- init_.uncompressedFrameSize_ * init_.numFrames_,
- _uncompressed_fd, &_uncompressed_buffer);
+ init_.uncompressedFrameSize_ * init_.numFrames_,
+ _uncompressed_fd, &_uncompressed_buffer);
rc = rc && SharedMemoryManager::deinit_shm(grokCompressedBuf,
- init_.compressedFrameSize_ * init_.numFrames_,
- _compressed_fd, &_compressed_buffer);
+ init_.compressedFrameSize_ * init_.numFrames_,
+ _compressed_fd, &_compressed_buffer);
return rc;
}
auto pgrep = popen(command, "r");
if (!pgrep) {
return -1;
-}
+ }
pid_t pid;
if (fscanf(pgrep, "%d", &pid) != 1) {
pid = -1;
-}
+ }
pclose(pgrep);
return pid;
}
void launch_grok(const std::string& dir, uint32_t width, uint32_t stride,
- uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
- int device, bool is4K, uint32_t fps, uint32_t bandwidth,
- const std::string server, uint32_t port,
- const std::string license)
+ uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
+ int device, bool is4K, uint32_t fps, uint32_t bandwidth,
+ const std::string server, uint32_t port,
+ const std::string license)
{
std::unique_lock<std::mutex> lk(shutdownMutex_);
if (async_result_.valid()) {
return;
-}
+ }
if (MessengerInit::first_launch(true)) {
init_.unlink();
-}
+ }
start_threads();
char _cmd[4096];
auto fullServer = server + ":" + std::to_string(port);
{
if (initialized_) {
return true;
-}
+ }
std::unique_lock<std::mutex> lk(shutdownMutex_);
if (initialized_) {
return true;
} else if (shutdown_) {
return false;
-}
+ }
clientInitializedCondition_.wait(lk, [this] { return initialized_ || shutdown_; });
return initialized_ && !shutdown_;
assert(frameId < init_.numFrames_);
if (frameId >= init_.numFrames_) {
return nullptr;
-}
+ }
return (uint8_t*)(_uncompressed_buffer + frameId * init_.uncompressedFrameSize_);
}
assert(frameId < init_.numFrames_);
if (frameId >= init_.numFrames_) {
return nullptr;
-}
+ }
return (uint8_t*)(_compressed_buffer + frameId * init_.compressedFrameSize_);
}
if (!SharedMemoryManager::init_shm(sendBuf, messageBufferLen, &shm_fd, &send_buffer)) {
return;
-}
+ }
while (messenger->running) {
synch->wait(SYNCH_RECEIVE_READY);
if (!messenger->running) {
break;
-}
+ }
std::string message;
if (!messenger->sendQueue.wait_and_pop(message)) {
break;
-}
+ }
if (!messenger->running) {
break;
-}
+ }
memcpy(send_buffer, message.c_str(), message.size() + 1);
synch->post(SYNCH_SENT);
}
if (!SharedMemoryManager::init_shm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer)) {
return;
-}
+ }
while (messenger->running) {
synch->wait(SYNCH_SENT);
if (!messenger->running) {
break;
-}
+ }
auto message = std::string(receive_buffer);
synch->post(SYNCH_RECEIVE_READY);
messenger->receiveQueue.push(message);
std::string message;
if (!messenger->receiveQueue.wait_and_pop(message)) {
break;
-}
+ }
if (!messenger->running) {
break;
-}
+ }
Msg msg(message);
auto tag = msg.next();
if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) {
auto it = _map.find(val.index());
if (it == _map.end()) {
_map[val.index()] = val;
-}
+ }
}
F retrieve(size_t index, bool& success)
auto it = _map.find(index);
if (it == _map.end()) {
return F();
-}
+ }
success = true;
F val = it->second;
BufferSrc src;
if (!availableBuffers_.wait_and_pop(src)) {
return false;
-}
+ }
converter(src);
_scheduled_frames.store(proxy);
_frames_scheduled++;
auto srcFrame = _scheduled_frames.retrieve(clientFrameId, success);
if (!success) {
return;
-}
+ }
processor(srcFrame, get_compressed_frame(compressedFrameId), compressedFrameLength);
}
++_frames_compressed;
send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
if (shutdown_ && _frames_compressed == _frames_scheduled) {
shutdownCondition_.notify_all();
-}
+ }
}
void shutdown()
std::unique_lock<std::mutex> lk(shutdownMutex_);
if (!async_result_.valid()) {
return;
-}
+ }
shutdown_ = true;
if (_frames_scheduled) {
uint32_t scheduled = _frames_scheduled;
int result = async_result_.get();
if (result != 0) {
getMessengerLogger()->error("Accelerator failed with return code: %d\n", result);
-}
+ }
} catch (std::exception& ex) {
getMessengerLogger()->error("%s", ex.what());
}