Allow J2K encode backends to accept more than one frame at once.
authorCarl Hetherington <cth@carlh.net>
Sun, 21 Nov 2021 13:29:21 +0000 (14:29 +0100)
committerCarl Hetherington <cth@carlh.net>
Mon, 22 Nov 2021 22:59:43 +0000 (23:59 +0100)
src/lib/encode_server.cc
src/lib/j2k_encoder.cc
src/lib/j2k_encoder.h
src/lib/j2k_encoder_backend.h
src/lib/j2k_encoder_cpu_backend.cc
src/lib/j2k_encoder_cpu_backend.h
src/lib/j2k_encoder_remote_backend.cc
src/lib/j2k_encoder_remote_backend.h
src/tools/server_test.cc
test/client_server_test.cc
test/low_bitrate_test.cc

index 56774a5ca76e4cc50298f5030257a010608715b1..6cec9883839c1ed6b35af93e3ea23965d97d706c 100644 (file)
@@ -153,15 +153,15 @@ EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, st
        gettimeofday (&after_read, 0);
 
        J2KEncoderCPUBackend cpu;
-       auto encoded = cpu.encode (dcp_video_frame);
-       DCPOMATIC_ASSERT (encoded);
+       auto encoded = cpu.encode ({dcp_video_frame});
+       DCPOMATIC_ASSERT (!encoded.empty());
 
        gettimeofday (&after_encode, 0);
 
        try {
                Socket::WriteDigestScope ds (socket);
-               socket->write (encoded->size());
-               socket->write (encoded->data(), encoded->size());
+               socket->write (encoded[0].size());
+               socket->write (encoded[0].data(), encoded[0].size());
        } catch (std::exception& e) {
                cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
                LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
index a003a6ea2b0399f972b33010a34b9276e096dea8..0f4109c1472148ab3bbe30b1ade1b41f97f5791c 100644 (file)
@@ -141,9 +141,9 @@ J2KEncoder::end ()
        for (auto const& i: _queue) {
                LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
                try {
-                       auto enc = cpu.encode(i);
-                       DCPOMATIC_ASSERT (enc);
-                       _writer->write (make_shared<dcp::ArrayData>(*enc), i.index(), i.eyes());
+                       auto enc = cpu.encode({i});
+                       DCPOMATIC_ASSERT (!enc.empty());
+                       _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes());
                        frame_done ();
                } catch (std::exception& e) {
                        LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
@@ -206,7 +206,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
        /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
           when there are no threads.
        */
-       while (_queue.size() >= (threads * 2) + 1) {
+       while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) {
                LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
                _full_condition.wait (queue_lock);
                LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
@@ -297,29 +297,31 @@ try
                }
 
                LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
-               auto vf = _queue.front ();
+               auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity()));
+               std::vector<DCPVideo> vf (_queue.begin(), end);
 
-               /* We're about to commit to either encoding this frame or putting it back onto the queue,
+               /* We're about to commit to either encoding these frames or putting them back onto the queue,
                   so we must not be interrupted until one or other of these things have happened.  This
                   block has thread interruption disabled.
                */
                {
                        boost::this_thread::disable_interruption dis;
 
-                       LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
-                       _queue.pop_front ();
+                       _queue.erase(_queue.begin(), end);
 
                        lock.unlock ();
 
                        auto encoded = backend->encode(vf);
 
-                       if (encoded) {
-                               _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes());
-                               frame_done ();
+                       if (encoded.size() == vf.size()) {
+                               for (auto i = 0U; i < encoded.size(); ++i) {
+                                       _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes());
+                                       frame_done ();
+                               }
                        } else {
                                lock.lock ();
-                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
-                               _queue.push_front (vf);
+                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size());
+                               _queue.insert (_queue.begin(), vf.begin(), vf.end());
                                lock.unlock ();
                        }
                }
@@ -349,6 +351,8 @@ J2KEncoder::servers_list_changed ()
        terminate_threads ();
        _threads = make_shared<boost::thread_group>();
 
+       _frames_in_parallel = 0;
+
        /* XXX: could re-use threads */
 
        if (!Config::instance()->only_servers_encode ()) {
@@ -360,6 +364,7 @@ J2KEncoder::servers_list_changed ()
 #else
                        _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
 #endif
+                       _frames_in_parallel += backend->quantity();
                }
        }
 
@@ -373,6 +378,7 @@ J2KEncoder::servers_list_changed ()
                LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
                for (int j = 0; j < i.threads(); ++j) {
                        _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
+                       _frames_in_parallel += backend->quantity();
                }
        }
 
index df98d1a7b48e332495996b92e81140a8572e3ea6..fdb3217527ae66d7982277d43ed28f554336a7d4 100644 (file)
@@ -98,6 +98,8 @@ private:
        boost::mutex _threads_mutex;
        std::shared_ptr<boost::thread_group> _threads;
 
+       int _frames_in_parallel = 0;
+
        mutable boost::mutex _queue_mutex;
        std::list<DCPVideo> _queue;
        /** condition to manage thread wakeups when we have nothing to do */
index 295f81e8486f09ef87fbff8888b7d3a17876b8ac..ae25187799a6d8906b4a7db4421826603e2d8be3 100644 (file)
@@ -38,7 +38,10 @@ public:
        J2KEncoderBackend (J2KEncoderBackend const&) = delete;
        J2KEncoderBackend& operator= (J2KEncoderBackend const&) = delete;
 
-       virtual boost::optional<dcp::ArrayData> encode (DCPVideo video) = 0;
+       virtual std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) = 0;
+       virtual int quantity () const {
+               return 1;
+       }
 };
 
 
index 99963c103cefa150eeb366359adcf4ab2e2967de..1474da1bc1bdcf8f3ab07d3ae2f31752daa7e09a 100644 (file)
 
 
 using std::shared_ptr;
+using std::vector;
 using boost::optional;
 #if BOOST_VERSION >= 106100
 using namespace boost::placeholders;
 #endif
 
 
-optional<dcp::ArrayData>
-J2KEncoderCPUBackend::encode (DCPVideo video)
+vector<dcp::ArrayData>
+J2KEncoderCPUBackend::encode (vector<DCPVideo> const& all_video)
 {
+       DCPOMATIC_ASSERT (all_video.size() == 1);
+       auto video = all_video.front();
+
        try {
                auto const comment = Config::instance()->dcp_j2k_comment();
 
@@ -63,7 +67,7 @@ J2KEncoderCPUBackend::encode (DCPVideo video)
 
                        if (enc.size() >= minimum_size) {
                                LOG_GENERAL (N_("Frame %1 encoded size was OK (%2)"), video.index(), enc.size());
-                               return enc;
+                               return { enc };
                        }
 
                        LOG_GENERAL (N_("Frame %1 encoded size was small (%2); adding noise at level %3 with pixel skip %4"), video.index(), enc.size(), noise_amount, pixel_skip);
index 41ea8d2e50a8f4f3bca0091234319b0fec43ae32..82b8aeb489e45422ef97dd28191b447b03ea4b40 100644 (file)
@@ -32,7 +32,7 @@ public:
        J2KEncoderCPUBackend () = default;
        J2KEncoderCPUBackend (J2KEncoderCPUBackend&& other) = default;
 
-       boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+       std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) override;
 };
 
 
index 8bf1a0caf5a88660a46c6659723c9b8faea01d16..df5d6e30d621da4e2aadb6b32bbd06991dd98594 100644 (file)
@@ -41,6 +41,7 @@ using std::make_shared;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::vector;
 using boost::optional;
 using dcp::raw_convert;
 
@@ -53,9 +54,12 @@ J2KEncoderRemoteBackend::J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& othe
 }
 
 
-optional<dcp::ArrayData>
-J2KEncoderRemoteBackend::encode (DCPVideo video)
+vector<dcp::ArrayData>
+J2KEncoderRemoteBackend::encode (vector<DCPVideo> const& all_video)
 {
+       DCPOMATIC_ASSERT (all_video.size() == 1);
+       auto video = all_video.front();
+
        try {
                boost::asio::io_service io_service;
                boost::asio::ip::tcp::resolver resolver (io_service);
@@ -103,7 +107,7 @@ J2KEncoderRemoteBackend::encode (DCPVideo video)
                LOG_DEBUG_ENCODE (N_("Finished remotely-encoded frame %1"), video.index());
 
                _backoff = 0;
-               return enc;
+               return { enc };
 
        } catch (std::exception& e) {
                if (_backoff < 60) {
index a8f1aa401fee95dfe301bbe7f9928d1affb537ed..962944bbeca20754804dfdd43a68718aef8c9036 100644 (file)
@@ -37,7 +37,7 @@ public:
 
        J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& other);
 
-       boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+       std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) override;
 
 private:
        EncodeServerDescription _server;
index 39363cc1eb30a837450ccccf223ceee12aade9f1..baae6ac40257cf1a0d4bdd00603817c318909e4c 100644 (file)
@@ -45,6 +45,7 @@ using std::cout;
 using std::make_shared;
 using std::shared_ptr;
 using std::string;
+using std::vector;
 using boost::optional;
 using boost::bind;
 #if BOOST_VERSION >= 106100
@@ -71,32 +72,33 @@ process_video (shared_ptr<PlayerVideo> pvf)
        ++frame_count;
 
        J2KEncoderCPUBackend cpu_backend;
-       auto local_encoded = cpu_backend.encode(local);
-       optional<ArrayData> remote_encoded;
+       auto local_encoded = cpu_backend.encode({local});
+       vector<ArrayData> remote_encoded;
 
        string remote_error;
        J2KEncoderRemoteBackend remote_backend(*server);
        try {
-               remote_encoded = remote_backend.encode(remote);
+               remote_encoded = remote_backend.encode({remote});
        } catch (NetworkError& e) {
                remote_error = e.what ();
        }
 
-       DCPOMATIC_ASSERT (remote_encoded);
+       DCPOMATIC_ASSERT (!local_encoded.empty());
+       DCPOMATIC_ASSERT (!remote_encoded.empty());
 
        if (!remote_error.empty()) {
                cout << "\033[0;31mnetwork problem: " << remote_error << "\033[0m\n";
                return;
        }
 
-       if (local_encoded->size() != remote_encoded->size()) {
+       if (local_encoded[0].size() != remote_encoded[0].size()) {
                cout << "\033[0;31msizes differ\033[0m\n";
                return;
        }
 
-       auto p = local_encoded->data();
-       auto q = remote_encoded->data();
-       for (int i = 0; i < local_encoded->size(); ++i) {
+       auto p = local_encoded[0].data();
+       auto q = remote_encoded[0].data();
+       for (int i = 0; i < local_encoded[0].size(); ++i) {
                if (*p++ != *q++) {
                        cout << "\033[0;31mdata differ\033[0m at byte " << i << "\n";
                        return;
index 2355c08c09ea98b995ebf278dd7a5f9966ac9960..8de12effda3b69766822acd8cf71647fc58e915d 100644 (file)
@@ -50,6 +50,7 @@ using std::list;
 using std::make_shared;
 using std::shared_ptr;
 using std::weak_ptr;
+using std::vector;
 using boost::thread;
 using boost::optional;
 using dcp::ArrayData;
@@ -59,13 +60,13 @@ using namespace dcpomatic;
 void
 do_remote_encode (DCPVideo frame, EncodeServerDescription description, ArrayData locally_encoded)
 {
-       optional<ArrayData> remotely_encoded;
+       vector<ArrayData> remotely_encoded;
        J2KEncoderRemoteBackend backend (description, 1200);
-       BOOST_REQUIRE_NO_THROW (remotely_encoded = backend.encode(frame));
-       BOOST_REQUIRE (static_cast<bool>(remotely_encoded));
+       BOOST_REQUIRE_NO_THROW (remotely_encoded = backend.encode({frame}));
+       BOOST_REQUIRE (!remotely_encoded.empty());
 
-       BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded->size());
-       BOOST_CHECK_EQUAL (memcmp (locally_encoded.data(), remotely_encoded->data(), locally_encoded.size()), 0);
+       BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded[0].size());
+       BOOST_CHECK_EQUAL (memcmp(locally_encoded.data(), remotely_encoded[0].data(), locally_encoded.size()), 0);
 }
 
 
@@ -125,8 +126,8 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb)
                );
 
        J2KEncoderCPUBackend cpu;
-       auto locally_encoded = cpu.encode (frame);
-       BOOST_REQUIRE (static_cast<bool>(locally_encoded));
+       auto locally_encoded = cpu.encode ({frame});
+       BOOST_REQUIRE (!locally_encoded.empty());
 
        auto server = new EncodeServer (true, 2);
 
@@ -140,7 +141,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb)
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
-               threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded)));
+               threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, locally_encoded[0])));
        }
 
        for (auto i: threads) {
@@ -210,8 +211,8 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv)
                );
 
        J2KEncoderCPUBackend cpu;
-       auto locally_encoded = cpu.encode (frame);
-       BOOST_REQUIRE (static_cast<bool>(locally_encoded));
+       auto locally_encoded = cpu.encode ({frame});
+       BOOST_REQUIRE (!locally_encoded.empty());
 
        auto server = new EncodeServer (true, 2);
 
@@ -225,7 +226,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv)
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
-               threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded)));
+               threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, locally_encoded[0])));
        }
 
        for (auto i: threads) {
@@ -280,11 +281,11 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
                );
 
        J2KEncoderCPUBackend cpu;
-       auto raw_locally_encoded = cpu.encode (raw_frame);
-       BOOST_REQUIRE (static_cast<bool>(raw_locally_encoded));
+       auto raw_locally_encoded = cpu.encode ({raw_frame});
+       BOOST_REQUIRE (!raw_locally_encoded.empty());
 
        auto j2k_pvf = std::make_shared<PlayerVideo> (
-               std::make_shared<J2KImageProxy>(*raw_locally_encoded, dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE),
+               std::make_shared<J2KImageProxy>(raw_locally_encoded[0], dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE),
                Crop(),
                optional<double>(),
                dcp::Size(1998, 1080),
@@ -306,8 +307,8 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
                Resolution::TWO_K
                );
 
-       auto j2k_locally_encoded = cpu.encode(j2k_frame);
-       BOOST_REQUIRE (static_cast<bool>(j2k_locally_encoded));
+       auto j2k_locally_encoded = cpu.encode({j2k_frame});
+       BOOST_REQUIRE (!j2k_locally_encoded.empty());
 
        auto server = new EncodeServer (true, 2);
 
@@ -321,7 +322,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
-               threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, *j2k_locally_encoded)));
+               threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, j2k_locally_encoded[0])));
        }
 
        for (auto i: threads) {
index 8f16f56f171701ef1961e70cdeb36aa25339b87c..7fe963447a402abf6a4d0067b2d37fb762d00896 100644 (file)
@@ -59,9 +59,9 @@ BOOST_AUTO_TEST_CASE (low_bitrate_test)
 
        DCPVideo dcp_video(frame, 0, 24, 100000000, Resolution::TWO_K);
        J2KEncoderCPUBackend cpu;
-       auto j2k = cpu.encode(dcp_video);
-       BOOST_REQUIRE (static_cast<bool>(j2k));
-       BOOST_REQUIRE (j2k->size() >= 16536);
+       auto j2k = cpu.encode({dcp_video});
+       BOOST_REQUIRE (!j2k.empty());
+       BOOST_REQUIRE (j2k[0].size() >= 16536);
 }