Separate thread for handling the CUDA parts seems a little better.
authorCarl Hetherington <cth@carlh.net>
Mon, 23 May 2022 21:03:45 +0000 (23:03 +0200)
committerCarl Hetherington <cth@carlh.net>
Mon, 23 May 2022 21:03:45 +0000 (23:03 +0200)
src/lib/cuda_j2k_frame_encoder.cc
src/lib/cuda_j2k_frame_encoder.h

index 445548558bb931c9a00dff114c6c8767d3440bc7..95b742484aac81037fadc661ba543e7ac2e0bdb3 100644 (file)
 #include "dcp_video.h"
 #include "exceptions.h"
 #include "player_video.h"
+#include <dcp/array_data.h>
 #include <dcp/openjpeg_image.h>
 #include <nvjpeg2k.h>
+#include <thread>
 #include <vector>
 
 
 using std::make_pair;
+using std::thread;
 using std::vector;
 using boost::optional;
 
 
+vector<thread> CUDAJ2KFrameEncoder::_cuda_threads;
+std::queue<CUDAJ2KFrameEncoder::Input> CUDAJ2KFrameEncoder::_input;
+std::map<std::pair<int, Eyes>, dcp::ArrayData> CUDAJ2KFrameEncoder::_output;
+boost::condition CUDAJ2KFrameEncoder::_input_condition;
+boost::mutex CUDAJ2KFrameEncoder::_input_mutex;
+boost::condition CUDAJ2KFrameEncoder::_output_condition;
+boost::mutex CUDAJ2KFrameEncoder::_output_mutex;
+
+
 CUDAJ2KFrameEncoder::CUDAJ2KFrameEncoder()
 {
-       nvjpeg2kEncoderCreateSimple(&_encoder_handle);
-       nvjpeg2kEncodeStateCreate(_encoder_handle, &_encoder_state);
-       nvjpeg2kEncodeParamsCreate(&_encoder_params);
-
-       cudaStreamCreateWithFlags(&_stream, cudaStreamNonBlocking);
+       if (_cuda_threads.empty()) {
+               for (int i = 0; i < 8; ++i) {
+                       _cuda_threads.push_back(std::thread(&CUDAJ2KFrameEncoder::cuda_thread));
+               }
+       }
 }
 
 
-CUDAJ2KFrameEncoder::~CUDAJ2KFrameEncoder()
+void
+CUDAJ2KFrameEncoder::cuda_thread()
 {
-       cudaStreamDestroy(_stream);
+       nvjpeg2kEncoder_t encoder_handle;
+       nvjpeg2kEncodeState_t encoder_state;
+       nvjpeg2kEncodeParams_t encoder_params;
+
+       nvjpeg2kEncoderCreateSimple(&encoder_handle);
+       nvjpeg2kEncodeStateCreate(encoder_handle, &encoder_state);
+       nvjpeg2kEncodeParamsCreate(&encoder_params);
+
+       cudaStream_t stream;
+       cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking);
+
+       while (true) {
+               boost::mutex::scoped_lock lm(_input_mutex);
+               while (_input.empty()) {
+                       std::cout << "gpu starved.\n";
+                       _input_condition.wait(lm);
+               }
+
+               auto input = std::move(_input.front());
+               _input.pop();
+               lm.unlock();
+
+               nvjpeg2kImageComponentInfo_t info[3];
+               for (int i = 0; i < 3; ++i) {
+                       info[i].component_width = input.size().width;
+                       info[i].component_height = input.size().height;
+                       info[i].precision = 12;
+                       info[i].sgn = 0;
+               }
+
+               nvjpeg2kEncodeConfig_t config;
+               memset(&config, 0, sizeof(config));
+               config.stream_type = NVJPEG2K_STREAM_J2K;
+               config.color_space = NVJPEG2K_COLORSPACE_SRGB;
+               config.image_width = input.size().width;
+               config.image_height = input.size().height;
+               config.num_components = 3;
+               config.image_comp_info = reinterpret_cast<nvjpeg2kImageComponentInfo_t*>(&info);
+               config.code_block_w = 32;
+               config.code_block_h = 32;
+               config.irreversible = 0;
+               config.mct_mode = 1;
+               config.prog_order = NVJPEG2K_CPRL;
+               config.num_resolutions = input.resolution() == Resolution::FOUR_K ? 7 : 6;
+
+               auto status = nvjpeg2kEncodeParamsSetEncodeConfig(encoder_params, &config);
+               if (status != NVJPEG2K_STATUS_SUCCESS) {
+                       throw CUDAError("nvjpeg2kEncodeParamsSetEncodeConfig", status);
+               }
+
+               // XXX: quality
+               status = nvjpeg2kEncodeParamsSetQuality(encoder_params, 30);
+               if (status != NVJPEG2K_STATUS_SUCCESS) {
+                       throw CUDAError("nvjpeg2kEncodeParamsSetQuality", status);
+               }
+
+               status = nvjpeg2kEncode(encoder_handle, encoder_state, encoder_params, input.device_image(), stream);
+               if (status != NVJPEG2K_STATUS_SUCCESS) {
+                       throw CUDAError("nvjpeg2kEncode", status);
+               }
+
+               size_t compressed_size;
+               status = nvjpeg2kEncodeRetrieveBitstream(encoder_handle, encoder_state, nullptr, &compressed_size, stream);
+
+               dcp::ArrayData output(compressed_size);
+               status = nvjpeg2kEncodeRetrieveBitstream(encoder_handle, encoder_state, output.data(), &compressed_size, stream);
+               if (status != NVJPEG2K_STATUS_SUCCESS) {
+                       throw CUDAError("nvjpeg2kEncodeRetrieveBitstream", status);
+               }
+
+               boost::mutex::scoped_lock lm2(_output_mutex);
+               _output[make_pair(input.index(), input.eyes())] = output;
+               _output_condition.notify_all();
+       }
+
+       cudaStreamDestroy(stream);
 }
 
 
-CUDAJ2KFrameEncoder::Input::Input(DCPVideo const& vf, cudaStream_t stream)
+CUDAJ2KFrameEncoder::Input::Input(DCPVideo const& vf)
        : _index(vf.index())
        , _eyes(vf.eyes())
+       , _resolution(vf.resolution())
 {
        _xyz = convert_to_xyz(vf.frame(), boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2));
 
@@ -61,7 +150,9 @@ CUDAJ2KFrameEncoder::Input::Input(DCPVideo const& vf, cudaStream_t stream)
                _pixel_data_h[i] = reinterpret_cast<uint8_t*>(_xyz->data(i));
        }
 
-       auto const pitch = _xyz->size().width * 2;
+       _size = _xyz->size();
+
+       auto const pitch = _size.width * 2;
 
        for (int i = 0; i < 3; ++i) {
                _pitch_in_bytes[i] = pitch;
@@ -69,22 +160,21 @@ CUDAJ2KFrameEncoder::Input::Input(DCPVideo const& vf, cudaStream_t stream)
                        reinterpret_cast<void**>(&_pixel_data_d[i]),
                        &_pitch_in_bytes[i],
                        pitch,
-                       _xyz->size().height
+                       _size.height
                        );
 
                if (status != cudaSuccess) {
                        throw CUDAError("cudaMallocPitch", status);
                }
 
-               status = cudaMemcpy2DAsync(
+               status = cudaMemcpy2D(
                        _pixel_data_d[i],
                        _pitch_in_bytes[i],
                        _pixel_data_h[i],
                        _pitch_in_bytes[i],
                        pitch,
-                       _xyz->size().height,
-                       cudaMemcpyHostToDevice,
-                       stream
+                       _size.height,
+                       cudaMemcpyHostToDevice
                        );
 
                if (status != cudaSuccess) {
@@ -102,6 +192,8 @@ CUDAJ2KFrameEncoder::Input::Input(DCPVideo const& vf, cudaStream_t stream)
 CUDAJ2KFrameEncoder::Input::Input(Input&& other)
        : _index(other._index)
        , _eyes(other._eyes)
+       , _size(other._size)
+       , _resolution(other._resolution)
 {
        for (int i = 0; i < 3; ++i) {
                _pixel_data_d[i] = other._pixel_data_d[i];
@@ -127,7 +219,7 @@ CUDAJ2KFrameEncoder::Input::~Input()
 optional<dcp::ArrayData>
 CUDAJ2KFrameEncoder::encode(DCPVideo const& vf)
 {
-       auto input = Input(vf, _stream);
+       auto input = Input(vf);
 
        auto const size = vf.frame()->out_size();
        DCPOMATIC_ASSERT(!_size || size == *_size);
@@ -136,58 +228,26 @@ CUDAJ2KFrameEncoder::encode(DCPVideo const& vf)
        DCPOMATIC_ASSERT(!_resolution || vf.resolution() == *_resolution);
        _resolution = vf.resolution();
 
-       nvjpeg2kImageComponentInfo_t info[3];
-       for (int i = 0; i < 3; ++i) {
-               info[i].component_width = _size->width;
-               info[i].component_height = _size->height;
-               info[i].precision = 12;
-               info[i].sgn = 0;
-       }
-
-       nvjpeg2kEncodeConfig_t config;
-       memset(&config, 0, sizeof(config));
-       config.stream_type = NVJPEG2K_STREAM_J2K;
-       config.color_space = NVJPEG2K_COLORSPACE_SRGB;
-       config.image_width = _size->width;
-       config.image_height = _size->height;
-       config.num_components = 3;
-       config.image_comp_info = reinterpret_cast<nvjpeg2kImageComponentInfo_t*>(&info);
-       config.code_block_w = 32;
-       config.code_block_h = 32;
-       config.irreversible = 0;
-       config.mct_mode = 1;
-       config.prog_order = NVJPEG2K_CPRL;
-       config.num_resolutions = *_resolution == Resolution::FOUR_K ? 7 : 6;
-
-       auto status = nvjpeg2kEncodeParamsSetEncodeConfig(_encoder_params, &config);
-       if (status != NVJPEG2K_STATUS_SUCCESS) {
-               throw CUDAError("nvjpeg2kEncodeParamsSetEncodeConfig", status);
-       }
-
-       // XXX: quality
-       status = nvjpeg2kEncodeParamsSetQuality(_encoder_params, 30);
-       if (status != NVJPEG2K_STATUS_SUCCESS) {
-               throw CUDAError("nvjpeg2kEncodeParamsSetQuality", status);
+       {
+               boost::mutex::scoped_lock lm (_input_mutex);
+               _input.push(std::move(input));
+               std::cout << "push input: " << _input.size() << "\n";
+               _input_condition.notify_all();
        }
 
-       status = nvjpeg2kEncode(_encoder_handle, _encoder_state, _encoder_params, input.device_image(), _stream);
-       if (status != NVJPEG2K_STATUS_SUCCESS) {
-               throw CUDAError("nvjpeg2kEncode", status);
+       boost::mutex::scoped_lock lm(_output_mutex);
+       while (_output.find(make_pair(vf.index(), vf.eyes())) == _output.end()) {
+               _output_condition.wait(lm);
        }
 
-       size_t compressed_size;
-       status = nvjpeg2kEncodeRetrieveBitstream(_encoder_handle, _encoder_state, nullptr, &compressed_size, _stream);
-
-       dcp::ArrayData output(compressed_size);
-       status = nvjpeg2kEncodeRetrieveBitstream(_encoder_handle, _encoder_state, output.data(), &compressed_size, _stream);
-       if (status != NVJPEG2K_STATUS_SUCCESS) {
-               throw CUDAError("nvjpeg2kEncodeRetrieveBitstream", status);
-       }
-
-       return output;
+       auto iter = _output.find(make_pair(vf.index(), vf.eyes()));
+       auto data = iter->second;
+       _output.erase(iter);
+       return data;
 }
 
 
+
 void
 CUDAJ2KFrameEncoder::log_thread_start ()
 {
index 36539a05e601f0c9369738a53749531e4bd62fc2..34d8a600b44aad6ba7570f1762569d6607e7816e 100644 (file)
 #include "types.h"
 #include <dcp/types.h>
 #include <nvjpeg2k.h>
+#include <boost/lockfree/spsc_queue.hpp>
 #include <boost/thread/condition.hpp>
 #include <boost/thread/mutex.hpp>
 #include <map>
+#include <queue>
+#include <thread>
 #include <vector>
 
 
@@ -37,7 +40,6 @@ class CUDAJ2KFrameEncoder : public J2KFrameEncoder
 {
 public:
        CUDAJ2KFrameEncoder();
-       ~CUDAJ2KFrameEncoder();
 
        boost::optional<dcp::ArrayData> encode(DCPVideo const &) override;
        void flush() override;
@@ -46,14 +48,10 @@ public:
 private:
        void encode_queue();
 
-       nvjpeg2kEncoder_t _encoder_handle;
-       nvjpeg2kEncodeState_t _encoder_state;
-       nvjpeg2kEncodeParams_t _encoder_params;
-
        class Input
        {
        public:
-               Input(DCPVideo const& vf, cudaStream_t stream);
+               Input(DCPVideo const& vf);
                Input(Input const& other) = delete;
                Input(Input&& other);
                ~Input();
@@ -72,6 +70,14 @@ private:
                        return _eyes;
                }
 
+               dcp::Size size() const {
+                       return _size;
+               }
+
+               Resolution resolution() const {
+                       return _resolution;
+               }
+
        private:
                std::shared_ptr<dcp::OpenJPEGImage> _xyz;
                uint8_t* _pixel_data_h[3];
@@ -80,11 +86,23 @@ private:
                nvjpeg2kImage_t _device_image;
                int _index;
                Eyes _eyes;
+               dcp::Size _size;
+               Resolution _resolution;
        };
 
        boost::optional<dcp::Size> _size;
        boost::optional<Resolution> _resolution;
-       cudaStream_t _stream;
+
+       static void cuda_thread();
+
+       static std::vector<std::thread> _cuda_threads;
+       static std::queue<Input> _input;
+       static boost::condition _input_condition;
+       static boost::mutex _input_mutex;
+
+       static std::map<std::pair<int, Eyes>, dcp::ArrayData> _output;
+       static boost::condition _output_condition;
+       static boost::mutex _output_mutex;
 };