/* Copyright (C) 2023 Grok Image Compression Inc. This file is part of DCP-o-matic. DCP-o-matic is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. DCP-o-matic is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with DCP-o-matic. If not, see . */ #include "context.h" #include "shm.h" #include "synch.h" #include "messenger.h" using namespace grk_plugin; auto constexpr CLIENT_TO_GROK_MESSAGE = "Global\\client_to_grok_message"; auto constexpr CLIENT_SENT = "Global\\client_sent"; auto constexpr GROK_RECEIVE_READY = "Global\\grok_receive_ready"; auto constexpr GROK_TO_CLIENT_MESSAGE = "Global\\grok_to_client_message"; auto constexpr GROK_SENT = "Global\\grok_sent"; auto constexpr CLIENT_RECEIVE_READY = "Global\\client_receive_ready"; auto constexpr GROK_UNCOMPRESSED_BUFFER = "Global\\grok_uncompressed_buf"; auto constexpr GROK_COMPRESSED_BUFFER = "Global\\grok_compressed_buf"; Messenger::Messenger(std::function processor, size_t numProcessingThreads) : _running(true) , _frames_scheduled(0) , _frames_compressed(0) , _processor(processor) , _num_processing_threads(numProcessingThreads) , _uncompressed_frame_size(0) , _compressed_frame_size(0) , _num_frames(0) { shm_unlink(GROK_TO_CLIENT_MESSAGE); shm_unlink(CLIENT_TO_GROK_MESSAGE); } Messenger::~Messenger() { shutdown(); _running = false; _send_queue.deactivate(); _receive_queue.deactivate(); if (_outbound_synch) { _outbound_synch->post(SYNCH_RECEIVE_READY); _outbound.join(); } if (_inbound_synch) { _inbound_synch->post(SYNCH_SENT); _inbound.join(); } for(auto& p: _processors) { p.join(); } delete _outbound_synch; delete _inbound_synch; deinit_shm(); } /* One of these is created for each core, to receive messages (e.g. "frame is encoded") * from the grok process and handle them. */ void Messenger::processor_thread() { while (_running) { std::string message; if (!_receive_queue.wait_and_pop(message)) { break; } if (!_running) { break; } Message msg(message); auto tag = msg.next(); if (tag == GRK_MSGR_BATCH_COMPRESS_INIT) { auto width = msg.nextUint(); msg.nextUint(); // stride auto height = msg.nextUint(); auto samples_per_pixel = msg.nextUint(); msg.nextUint(); // depth _uncompressed_frame_size = Messenger::uncompressedFrameSize(width, height, samples_per_pixel); auto compressed_frame_size = msg.nextUint(); auto num_frames = msg.nextUint(); initClient(_uncompressed_frame_size, compressed_frame_size, num_frames); } else if (tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED) { reclaimUncompressed(msg.nextUint()); } else if (tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED) { reclaimCompressed(msg.nextUint()); } /* Handle writing J2K data to disk */ _processor(message); } } void Messenger::outbound_thread() { int shm_fd = 0; char* send_buffer = nullptr; if (!init_shm(CLIENT_TO_GROK_MESSAGE, messageBufferLen, &shm_fd, &send_buffer)) { return; } while (_running) { _outbound_synch->wait(SYNCH_RECEIVE_READY); if (!_running) { break; } std::string message; if (!_send_queue.wait_and_pop(message)) { break; } if (!_running) { break; } memcpy(send_buffer, message.c_str(), message.size() + 1); _outbound_synch->post(SYNCH_SENT); } ::deinit_shm(CLIENT_TO_GROK_MESSAGE, messageBufferLen, shm_fd, &send_buffer); } void Messenger::inbound_thread() { int shm_fd = 0; char* receive_buffer = nullptr; if (!init_shm(GROK_TO_CLIENT_MESSAGE, messageBufferLen, &shm_fd, &receive_buffer)) { return; } while (_running) { _inbound_synch->wait(SYNCH_SENT); if (!_running) { break; } auto message = std::string(receive_buffer); _inbound_synch->post(SYNCH_RECEIVE_READY); _receive_queue.push(message); } ::deinit_shm(GROK_TO_CLIENT_MESSAGE, messageBufferLen, shm_fd, &receive_buffer); } void Messenger::startThreads() { _outbound_synch = new Synch(CLIENT_SENT, GROK_RECEIVE_READY); _outbound = std::thread(&Messenger::outbound_thread, this); _inbound_synch = new Synch(GROK_SENT, CLIENT_RECEIVE_READY); _inbound = std::thread(&Messenger::inbound_thread, this); for (size_t i = 0; i < _num_processing_threads; ++i) { _processors.push_back(std::thread(&Messenger::processor_thread, this)); } } bool Messenger::initBuffers() { bool rc = true; if (_uncompressed_frame_size) { rc = rc && init_shm( GROK_UNCOMPRESSED_BUFFER, _uncompressed_frame_size * _num_frames, &_uncompressed_fd, &_uncompressed_buffer ); } if (_compressed_frame_size) { rc = rc && init_shm( GROK_COMPRESSED_BUFFER, _compressed_frame_size * _num_frames, &_compressed_fd, &_compressed_buffer ); } return rc; } bool Messenger::deinit_shm() { bool rc = ::deinit_shm( GROK_UNCOMPRESSED_BUFFER, _uncompressed_frame_size * _num_frames, _uncompressed_fd, &_uncompressed_buffer ); rc = rc && ::deinit_shm( GROK_COMPRESSED_BUFFER, _compressed_frame_size * _num_frames, _compressed_fd, &_compressed_buffer ); return rc; } bool Messenger::launch_grok( boost::filesystem::path const& dir, uint32_t width, uint32_t stride, uint32_t height, uint32_t samplesPerPixel, uint32_t depth, int device, bool is4K, uint32_t fps, uint32_t bandwidth, const std::string server, const std::string license ) { std::unique_lock lk(_shutdown_mutex); if (_async_result.valid()) { return true; } shm_unlink(GROK_TO_CLIENT_MESSAGE); shm_unlink(CLIENT_TO_GROK_MESSAGE); startThreads(); char cmd[4096]; snprintf(cmd, sizeof(cmd), "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -k 1 " "-G %d -%s %d,%d -j %s -J %s -v", GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth, device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth, license.c_str(), server.c_str()); return launch(cmd, dir); } void Messenger::initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames) { // client fills queue with pending uncompressed buffers _uncompressed_frame_size = uncompressedFrameSize; _compressed_frame_size = compressedFrameSize; _num_frames = numFrames; initBuffers(); auto ptr = _uncompressed_buffer; for(size_t i = 0; i < _num_frames; ++i) { _available_buffers.push(BufferSrc(i, (uint8_t*)ptr)); ptr += _uncompressed_frame_size; } std::unique_lock lk(_shutdown_mutex); _initialized = true; _client_initialized_condition.notify_all(); } bool Messenger::waitForClientInit() { if (_initialized) { return true; } else if (_shutdown) { return false; } std::unique_lock lk(_shutdown_mutex); if (_initialized) { return true; } else if (_shutdown) { return false; } while (true) { if (_client_initialized_condition.wait_for(lk, std::chrono::seconds(1), [this]{ return _initialized || _shutdown; })) { break; } auto status = _async_result.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { getGrokLogger()->error("Grok exited unexpectedly during initialization"); return false; } } return _initialized && !_shutdown; } size_t Messenger::uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel) { return sizeof(uint16_t) * w * h * samplesPerPixel; } void Messenger::reclaimCompressed(size_t frameId) { _available_buffers.push(BufferSrc(frameId, getCompressedFrame(frameId))); } void Messenger::reclaimUncompressed(size_t frameId) { _available_buffers.push(BufferSrc(frameId, getUncompressedFrame(frameId))); } uint8_t* Messenger::getUncompressedFrame(size_t frameId) { assert(frameId < _num_frames); if (frameId >= _num_frames) { return nullptr; } return (uint8_t*)(_uncompressed_buffer + frameId * _uncompressed_frame_size); } uint8_t* Messenger::getCompressedFrame(size_t frameId) { assert(frameId < _num_frames); if (frameId >= _num_frames) { return nullptr; } return (uint8_t*)(_compressed_buffer + frameId * _compressed_frame_size); } bool Messenger::launch(std::string const& cmd, boost::filesystem::path const& dir) { // Execute the command using std::async and std::system auto const cmd_with_dir = dir / cmd; getGrokLogger()->info(cmd_with_dir.string().c_str()); _async_result = std::async(std::launch::async, [cmd_with_dir]() { return std::system(cmd_with_dir.string().c_str()); }); auto const success = _async_result.valid(); if (!success) { getGrokLogger()->error("Grok launch failed"); } return success; } bool Messenger::schedule_compress(DCPVideo const& proxy, std::function converter) { BufferSrc src; if (!_available_buffers.wait_and_pop(src)) { return false; } converter(src); _scheduled_frames.store(proxy); _frames_scheduled++; send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frame_id); return true; } /** @param processor function taking compressed J2K data and sending it to the Writer */ void Messenger::process_compressed(std::string const& message, std::function processor, bool needsRecompression) { Message msg(message); msg.next(); auto const clientFrameId = msg.nextUint(); auto const compressedFrameId = msg.nextUint(); auto const compressedFrameLength = msg.nextUint(); if (!needsRecompression) { auto src_frame = _scheduled_frames.retrieve(clientFrameId); if (!src_frame) { return; } /* Write the compressed J2K data out */ processor(*src_frame, getCompressedFrame(compressedFrameId), compressedFrameLength); } ++_frames_compressed; send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId); if (_shutdown && _frames_compressed == _frames_scheduled) { _shutdown_condition.notify_all(); } } void Messenger::shutdown() { try { std::unique_lock lk(_shutdown_mutex); if (!_async_result.valid()) { return; } _shutdown = true; if (_frames_scheduled) { uint32_t scheduled = _frames_scheduled; send(GRK_MSGR_BATCH_FLUSH, scheduled); _shutdown_condition.wait(lk, [this] { return _frames_scheduled == _frames_compressed; }); } _available_buffers.deactivate(); send(GRK_MSGR_BATCH_SHUTDOWN); int result = _async_result.get(); if(result != 0) { getGrokLogger()->error("Accelerator failed with return code: %d\n",result); } } catch (std::exception &ex) { getGrokLogger()->error("%s",ex.what()); } }