encoder using batching
authorCarl Hetherington <cth@carlh.net>
Mon, 23 May 2022 13:25:43 +0000 (15:25 +0200)
committerCarl Hetherington <cth@carlh.net>
Mon, 23 May 2022 14:55:28 +0000 (16:55 +0200)
src/lib/cuda_j2k_frame_encoder.cc
src/lib/cuda_j2k_frame_encoder.h
src/lib/j2k_encoder.cc
src/lib/j2k_frame_encoder.h

index 675b26f57f2421e4ad0823a19b339c000cc031a7..3726f63a7c5577be1f976053166b094351d5c809 100644 (file)
 #include <vector>
 
 
+using std::make_pair;
 using std::vector;
 using boost::optional;
 
 
+boost::mutex CUDAJ2KFrameEncoder::_mutex;
+boost::condition CUDAJ2KFrameEncoder::_condition;
+std::vector<CUDAJ2KFrameEncoder::Input> CUDAJ2KFrameEncoder::_queue;
+std::map<std::pair<int, Eyes>, dcp::ArrayData> CUDAJ2KFrameEncoder::_output;
+
+boost::optional<dcp::Size> CUDAJ2KFrameEncoder::_size;
+boost::optional<Resolution> CUDAJ2KFrameEncoder::_resolution;
+
+
 CUDAJ2KFrameEncoder::CUDAJ2KFrameEncoder()
 {
        nvjpeg2kEncoderCreateSimple(&_encoder_handle);
@@ -42,7 +52,9 @@ CUDAJ2KFrameEncoder::CUDAJ2KFrameEncoder()
 }
 
 
-CUDAJ2KFrameEncoder::Frame::Frame(DCPVideo const& vf)
+CUDAJ2KFrameEncoder::Input::Input(DCPVideo const& vf)
+       : _index(vf.index())
+       , _eyes(vf.eyes())
 {
        auto xyz = convert_to_xyz(vf.frame(), boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2));
 
@@ -91,7 +103,9 @@ CUDAJ2KFrameEncoder::Frame::Frame(DCPVideo const& vf)
 }
 
 
-CUDAJ2KFrameEncoder::Frame::Frame(Frame&& other)
+CUDAJ2KFrameEncoder::Input::Input(Input&& other)
+       : _index(other._index)
+       , _eyes(other._eyes)
 {
        for (int i = 0; i < 3; ++i) {
                _pixel_data_d[i] = other._pixel_data_d[i];
@@ -106,7 +120,7 @@ CUDAJ2KFrameEncoder::Frame::Frame(Frame&& other)
 }
 
 
-CUDAJ2KFrameEncoder::Frame::~Frame()
+CUDAJ2KFrameEncoder::Input::~Input()
 {
        cudaFree(_pixel_data_d[0]);
        cudaFree(_pixel_data_d[1]);
@@ -114,10 +128,12 @@ CUDAJ2KFrameEncoder::Frame::~Frame()
 }
 
 
-vector<dcp::ArrayData>
+optional<dcp::ArrayData>
 CUDAJ2KFrameEncoder::encode(DCPVideo const& vf)
 {
-       int constexpr BATCH_SIZE = 128;
+       auto input = Input(vf);
+
+       boost::mutex::scoped_lock lm(_mutex);
 
        auto const size = vf.frame()->out_size();
        DCPOMATIC_ASSERT(!_size || size == *_size);
@@ -126,22 +142,27 @@ CUDAJ2KFrameEncoder::encode(DCPVideo const& vf)
        DCPOMATIC_ASSERT(!_resolution || vf.resolution() == *_resolution);
        _resolution = vf.resolution();
 
-       if (_batch.size() < BATCH_SIZE) {
-               _batch.push_back(Frame(vf));
+       _queue.push_back(std::move(input));
+       if (_queue.size() < batch_size) {
+               std::cout << "queue is " << _queue.size() << " - waiting\n";
+               _condition.wait(lm);
+       } else {
+               encode_queue();
+               _condition.notify_all();
+       }
+
+       auto output = _output.find(make_pair(vf.index(), vf.eyes()));
+       if (output == _output.end()) {
                return {};
        }
 
-       return flush();
+       return output->second;
 }
 
 
-vector<dcp::ArrayData>
-CUDAJ2KFrameEncoder::flush()
+void
+CUDAJ2KFrameEncoder::encode_queue()
 {
-       if (_batch.empty()) {
-               return {};
-       }
-
        nvjpeg2kImageComponentInfo_t info[3];
        for (int i = 0; i < 3; ++i) {
                info[i].component_width = _size->width;
@@ -176,11 +197,8 @@ CUDAJ2KFrameEncoder::flush()
                throw CUDAError("nvjpeg2kEncodeParamsSetQuality", status);
        }
 
-       vector<dcp::ArrayData> output;
-
-       for (auto const& frame: _batch) {
-
-               auto x = frame.device_image();
+       std::cout << "encoding queue of " << _queue.size() << "\n";
+       for (auto const& frame: _queue) {
 
                status = nvjpeg2kEncode(_encoder_handle, _encoder_state, _encoder_params, frame.device_image(), 0);
                if (status != NVJPEG2K_STATUS_SUCCESS) {
@@ -196,13 +214,11 @@ CUDAJ2KFrameEncoder::flush()
                        throw CUDAError("nvjpeg2kEncodeRetrieveBitstream", status);
                }
 
-               output.push_back(this_output);
+               _output[make_pair(frame.index(), frame.eyes())] = this_output;
                cudaStreamSynchronize(0);
        }
 
-       _batch.clear();
-
-       return output;
+       _queue.clear();
 }
 
 
@@ -211,3 +227,12 @@ CUDAJ2KFrameEncoder::log_thread_start ()
 {
        LOG_TIMING("start-encoder-thread thread=%1", thread_id());
 }
+
+
+void
+CUDAJ2KFrameEncoder::flush()
+{
+       boost::mutex::scoped_lock lm(_mutex);
+       encode_queue();
+       _condition.notify_all();
+}
index 28e8a3c764bfbc225778818c0a8a5a1bdf264b5f..9a8666cf6c5b37f7f2014909a68af11bb3990088 100644 (file)
@@ -27,6 +27,9 @@
 #include "types.h"
 #include <dcp/types.h>
 #include <nvjpeg2k.h>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+#include <map>
 #include <vector>
 
 
@@ -35,38 +38,56 @@ class CUDAJ2KFrameEncoder : public J2KFrameEncoder
 public:
        CUDAJ2KFrameEncoder();
 
-       std::vector<dcp::ArrayData> encode(DCPVideo const &) override;
-       std::vector<dcp::ArrayData> flush() override;
+       boost::optional<dcp::ArrayData> encode(DCPVideo const &) override;
+       void flush() override;
        void log_thread_start() override;
 
+       static int constexpr batch_size = 1;
+
 private:
+       void encode_queue();
+
        nvjpeg2kEncoder_t _encoder_handle;
        nvjpeg2kEncodeState_t _encoder_state;
        nvjpeg2kEncodeParams_t _encoder_params;
 
-       class Frame
+       class Input
        {
        public:
-               Frame(DCPVideo const& vf);
-               Frame(Frame const& other) = delete;
-               Frame(Frame&& other);
-               ~Frame();
+               Input(DCPVideo const& vf);
+               Input(Input const& other) = delete;
+               Input(Input&& other);
+               ~Input();
 
-               Frame& operator=(Frame const& other) = delete;
+               Input& operator=(Input const& other) = delete;
 
                nvjpeg2kImage_t const* const device_image() const {
                        return &_device_image;
                }
 
+               int index() const {
+                       return _index;
+               }
+
+               Eyes eyes() const {
+                       return _eyes;
+               }
+
        private:
                uint8_t* _pixel_data_d[3];
                size_t _pitch_in_bytes[3];
                nvjpeg2kImage_t _device_image;
+               int _index;
+               Eyes _eyes;
        };
 
-       std::vector<Frame> _batch;
-       boost::optional<dcp::Size> _size;
-       boost::optional<Resolution> _resolution;
+       static boost::mutex _mutex;
+       static boost::condition _condition;
+       static std::vector<Input> _queue;
+       static std::map<std::pair<int, Eyes>, dcp::ArrayData> _output;
+
+       static boost::optional<dcp::Size> _size;
+       static boost::optional<Resolution> _resolution;
 };
 
 
index 11b360b349e189ae226eb3d76a7d0a320d672cbf..49f11377cdff0f267f65c0c9e52b8e7cb3e4a4f7 100644 (file)
@@ -115,6 +115,10 @@ J2KEncoder::end ()
                _full_condition.wait (lock);
        }
 
+       for (auto& worker: _workers) {
+               worker->flush();
+       }
+
        lock.unlock ();
 
        LOG_GENERAL_NC (N_("Terminating encoder threads"));
@@ -378,9 +382,11 @@ J2KEncoder::servers_list_changed ()
                }
        }
 #endif
-       auto worker = make_shared<CUDAJ2KFrameEncoder>();
-       _workers.push_back(worker);
-       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
+       for (int i = 0; i < CUDAJ2KFrameEncoder::batch_size; ++i) {
+               auto worker = make_shared<CUDAJ2KFrameEncoder>();
+               _workers.push_back(worker);
+               _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, worker));
+       }
 
        _writer->set_encoder_threads (_threads->size());
 }
index 33f9876536b65c6edbfe79a8be02629a47269853..c2779e23858b0d3e16cbdf994cb508cd841da14e 100644 (file)
@@ -37,6 +37,7 @@ public:
        virtual ~J2KFrameEncoder() {}
 
        virtual boost::optional<dcp::ArrayData> encode (DCPVideo const &) = 0;
+       virtual void flush () {}
        virtual void log_thread_start () = 0;
 };