summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2021-11-21 14:29:21 +0100
committerCarl Hetherington <cth@carlh.net>2021-11-22 23:59:43 +0100
commit33acff363c978914cdcbf21fb8fa3d1c4c6991b7 (patch)
tree4bd8de953c0692d816e48e59d036a1625b9d3ff2 /src
parent0a49cc2ebbfc3809313f252208a0050a3fce1e97 (diff)
Allow J2K encode backends to accept more than one frame at once.
Diffstat (limited to 'src')
-rw-r--r--src/lib/encode_server.cc8
-rw-r--r--src/lib/j2k_encoder.cc32
-rw-r--r--src/lib/j2k_encoder.h2
-rw-r--r--src/lib/j2k_encoder_backend.h5
-rw-r--r--src/lib/j2k_encoder_cpu_backend.cc10
-rw-r--r--src/lib/j2k_encoder_cpu_backend.h2
-rw-r--r--src/lib/j2k_encoder_remote_backend.cc10
-rw-r--r--src/lib/j2k_encoder_remote_backend.h2
-rw-r--r--src/tools/server_test.cc18
9 files changed, 55 insertions, 34 deletions
diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc
index 56774a5ca..6cec98838 100644
--- a/src/lib/encode_server.cc
+++ b/src/lib/encode_server.cc
@@ -153,15 +153,15 @@ EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, st
gettimeofday (&after_read, 0);
J2KEncoderCPUBackend cpu;
- auto encoded = cpu.encode (dcp_video_frame);
- DCPOMATIC_ASSERT (encoded);
+ auto encoded = cpu.encode ({dcp_video_frame});
+ DCPOMATIC_ASSERT (!encoded.empty());
gettimeofday (&after_encode, 0);
try {
Socket::WriteDigestScope ds (socket);
- socket->write (encoded->size());
- socket->write (encoded->data(), encoded->size());
+ socket->write (encoded[0].size());
+ socket->write (encoded[0].data(), encoded[0].size());
} catch (std::exception& e) {
cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc
index a003a6ea2..0f4109c14 100644
--- a/src/lib/j2k_encoder.cc
+++ b/src/lib/j2k_encoder.cc
@@ -141,9 +141,9 @@ J2KEncoder::end ()
for (auto const& i: _queue) {
LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
try {
- auto enc = cpu.encode(i);
- DCPOMATIC_ASSERT (enc);
- _writer->write (make_shared<dcp::ArrayData>(*enc), i.index(), i.eyes());
+ auto enc = cpu.encode({i});
+ DCPOMATIC_ASSERT (!enc.empty());
+ _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes());
frame_done ();
} catch (std::exception& e) {
LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
@@ -206,7 +206,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
/* Wait until the queue has gone down a bit. Allow one thing in the queue even
when there are no threads.
*/
- while (_queue.size() >= (threads * 2) + 1) {
+ while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) {
LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
_full_condition.wait (queue_lock);
LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
@@ -297,29 +297,31 @@ try
}
LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
- auto vf = _queue.front ();
+ auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity()));
+ std::vector<DCPVideo> vf (_queue.begin(), end);
- /* We're about to commit to either encoding this frame or putting it back onto the queue,
+ /* We're about to commit to either encoding these frames or putting them back onto the queue,
so we must not be interrupted until one or other of these things have happened. This
block has thread interruption disabled.
*/
{
boost::this_thread::disable_interruption dis;
- LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
- _queue.pop_front ();
+ _queue.erase(_queue.begin(), end);
lock.unlock ();
auto encoded = backend->encode(vf);
- if (encoded) {
- _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes());
- frame_done ();
+ if (encoded.size() == vf.size()) {
+ for (auto i = 0U; i < encoded.size(); ++i) {
+ _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes());
+ frame_done ();
+ }
} else {
lock.lock ();
- LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
- _queue.push_front (vf);
+ LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size());
+ _queue.insert (_queue.begin(), vf.begin(), vf.end());
lock.unlock ();
}
}
@@ -349,6 +351,8 @@ J2KEncoder::servers_list_changed ()
terminate_threads ();
_threads = make_shared<boost::thread_group>();
+ _frames_in_parallel = 0;
+
/* XXX: could re-use threads */
if (!Config::instance()->only_servers_encode ()) {
@@ -360,6 +364,7 @@ J2KEncoder::servers_list_changed ()
#else
_threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
#endif
+ _frames_in_parallel += backend->quantity();
}
}
@@ -373,6 +378,7 @@ J2KEncoder::servers_list_changed ()
LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
for (int j = 0; j < i.threads(); ++j) {
_threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
+ _frames_in_parallel += backend->quantity();
}
}
diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h
index df98d1a7b..fdb321752 100644
--- a/src/lib/j2k_encoder.h
+++ b/src/lib/j2k_encoder.h
@@ -98,6 +98,8 @@ private:
boost::mutex _threads_mutex;
std::shared_ptr<boost::thread_group> _threads;
+ int _frames_in_parallel = 0;
+
mutable boost::mutex _queue_mutex;
std::list<DCPVideo> _queue;
/** condition to manage thread wakeups when we have nothing to do */
diff --git a/src/lib/j2k_encoder_backend.h b/src/lib/j2k_encoder_backend.h
index 295f81e84..ae2518779 100644
--- a/src/lib/j2k_encoder_backend.h
+++ b/src/lib/j2k_encoder_backend.h
@@ -38,7 +38,10 @@ public:
J2KEncoderBackend (J2KEncoderBackend const&) = delete;
J2KEncoderBackend& operator= (J2KEncoderBackend const&) = delete;
- virtual boost::optional<dcp::ArrayData> encode (DCPVideo video) = 0;
+ virtual std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) = 0;
+ virtual int quantity () const {
+ return 1;
+ }
};
diff --git a/src/lib/j2k_encoder_cpu_backend.cc b/src/lib/j2k_encoder_cpu_backend.cc
index 99963c103..1474da1bc 100644
--- a/src/lib/j2k_encoder_cpu_backend.cc
+++ b/src/lib/j2k_encoder_cpu_backend.cc
@@ -32,15 +32,19 @@
using std::shared_ptr;
+using std::vector;
using boost::optional;
#if BOOST_VERSION >= 106100
using namespace boost::placeholders;
#endif
-optional<dcp::ArrayData>
-J2KEncoderCPUBackend::encode (DCPVideo video)
+vector<dcp::ArrayData>
+J2KEncoderCPUBackend::encode (vector<DCPVideo> const& all_video)
{
+ DCPOMATIC_ASSERT (all_video.size() == 1);
+ auto video = all_video.front();
+
try {
auto const comment = Config::instance()->dcp_j2k_comment();
@@ -63,7 +67,7 @@ J2KEncoderCPUBackend::encode (DCPVideo video)
if (enc.size() >= minimum_size) {
LOG_GENERAL (N_("Frame %1 encoded size was OK (%2)"), video.index(), enc.size());
- return enc;
+ return { enc };
}
LOG_GENERAL (N_("Frame %1 encoded size was small (%2); adding noise at level %3 with pixel skip %4"), video.index(), enc.size(), noise_amount, pixel_skip);
diff --git a/src/lib/j2k_encoder_cpu_backend.h b/src/lib/j2k_encoder_cpu_backend.h
index 41ea8d2e5..82b8aeb48 100644
--- a/src/lib/j2k_encoder_cpu_backend.h
+++ b/src/lib/j2k_encoder_cpu_backend.h
@@ -32,7 +32,7 @@ public:
J2KEncoderCPUBackend () = default;
J2KEncoderCPUBackend (J2KEncoderCPUBackend&& other) = default;
- boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+ std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) override;
};
diff --git a/src/lib/j2k_encoder_remote_backend.cc b/src/lib/j2k_encoder_remote_backend.cc
index 8bf1a0caf..df5d6e30d 100644
--- a/src/lib/j2k_encoder_remote_backend.cc
+++ b/src/lib/j2k_encoder_remote_backend.cc
@@ -41,6 +41,7 @@ using std::make_shared;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
+using std::vector;
using boost::optional;
using dcp::raw_convert;
@@ -53,9 +54,12 @@ J2KEncoderRemoteBackend::J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& othe
}
-optional<dcp::ArrayData>
-J2KEncoderRemoteBackend::encode (DCPVideo video)
+vector<dcp::ArrayData>
+J2KEncoderRemoteBackend::encode (vector<DCPVideo> const& all_video)
{
+ DCPOMATIC_ASSERT (all_video.size() == 1);
+ auto video = all_video.front();
+
try {
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver (io_service);
@@ -103,7 +107,7 @@ J2KEncoderRemoteBackend::encode (DCPVideo video)
LOG_DEBUG_ENCODE (N_("Finished remotely-encoded frame %1"), video.index());
_backoff = 0;
- return enc;
+ return { enc };
} catch (std::exception& e) {
if (_backoff < 60) {
diff --git a/src/lib/j2k_encoder_remote_backend.h b/src/lib/j2k_encoder_remote_backend.h
index a8f1aa401..962944bbe 100644
--- a/src/lib/j2k_encoder_remote_backend.h
+++ b/src/lib/j2k_encoder_remote_backend.h
@@ -37,7 +37,7 @@ public:
J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& other);
- boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+ std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) override;
private:
EncodeServerDescription _server;
diff --git a/src/tools/server_test.cc b/src/tools/server_test.cc
index 39363cc1e..baae6ac40 100644
--- a/src/tools/server_test.cc
+++ b/src/tools/server_test.cc
@@ -45,6 +45,7 @@ using std::cout;
using std::make_shared;
using std::shared_ptr;
using std::string;
+using std::vector;
using boost::optional;
using boost::bind;
#if BOOST_VERSION >= 106100
@@ -71,32 +72,33 @@ process_video (shared_ptr<PlayerVideo> pvf)
++frame_count;
J2KEncoderCPUBackend cpu_backend;
- auto local_encoded = cpu_backend.encode(local);
- optional<ArrayData> remote_encoded;
+ auto local_encoded = cpu_backend.encode({local});
+ vector<ArrayData> remote_encoded;
string remote_error;
J2KEncoderRemoteBackend remote_backend(*server);
try {
- remote_encoded = remote_backend.encode(remote);
+ remote_encoded = remote_backend.encode({remote});
} catch (NetworkError& e) {
remote_error = e.what ();
}
- DCPOMATIC_ASSERT (remote_encoded);
+ DCPOMATIC_ASSERT (!local_encoded.empty());
+ DCPOMATIC_ASSERT (!remote_encoded.empty());
if (!remote_error.empty()) {
cout << "\033[0;31mnetwork problem: " << remote_error << "\033[0m\n";
return;
}
- if (local_encoded->size() != remote_encoded->size()) {
+ if (local_encoded[0].size() != remote_encoded[0].size()) {
cout << "\033[0;31msizes differ\033[0m\n";
return;
}
- auto p = local_encoded->data();
- auto q = remote_encoded->data();
- for (int i = 0; i < local_encoded->size(); ++i) {
+ auto p = local_encoded[0].data();
+ auto q = remote_encoded[0].data();
+ for (int i = 0; i < local_encoded[0].size(); ++i) {
if (*p++ != *q++) {
cout << "\033[0;31mdata differ\033[0m at byte " << i << "\n";
return;