summaryrefslogtreecommitdiff
path: root/src/lib/j2k_encoder.cc
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2021-11-21 14:29:21 +0100
committerCarl Hetherington <cth@carlh.net>2021-11-22 23:59:43 +0100
commit33acff363c978914cdcbf21fb8fa3d1c4c6991b7 (patch)
tree4bd8de953c0692d816e48e59d036a1625b9d3ff2 /src/lib/j2k_encoder.cc
parent0a49cc2ebbfc3809313f252208a0050a3fce1e97 (diff)
Allow J2K encode backends to accept more than one frame at once.
Diffstat (limited to 'src/lib/j2k_encoder.cc')
-rw-r--r--src/lib/j2k_encoder.cc32
1 files changed, 19 insertions, 13 deletions
diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc
index a003a6ea2..0f4109c14 100644
--- a/src/lib/j2k_encoder.cc
+++ b/src/lib/j2k_encoder.cc
@@ -141,9 +141,9 @@ J2KEncoder::end ()
for (auto const& i: _queue) {
LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
try {
- auto enc = cpu.encode(i);
- DCPOMATIC_ASSERT (enc);
- _writer->write (make_shared<dcp::ArrayData>(*enc), i.index(), i.eyes());
+ auto enc = cpu.encode({i});
+ DCPOMATIC_ASSERT (!enc.empty());
+ _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes());
frame_done ();
} catch (std::exception& e) {
LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
@@ -206,7 +206,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
/* Wait until the queue has gone down a bit. Allow one thing in the queue even
when there are no threads.
*/
- while (_queue.size() >= (threads * 2) + 1) {
+ while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) {
LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
_full_condition.wait (queue_lock);
LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
@@ -297,29 +297,31 @@ try
}
LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
- auto vf = _queue.front ();
+ auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity()));
+ std::vector<DCPVideo> vf (_queue.begin(), end);
- /* We're about to commit to either encoding this frame or putting it back onto the queue,
+ /* We're about to commit to either encoding these frames or putting them back onto the queue,
so we must not be interrupted until one or other of these things have happened. This
block has thread interruption disabled.
*/
{
boost::this_thread::disable_interruption dis;
- LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
- _queue.pop_front ();
+ _queue.erase(_queue.begin(), end);
lock.unlock ();
auto encoded = backend->encode(vf);
- if (encoded) {
- _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes());
- frame_done ();
+ if (encoded.size() == vf.size()) {
+ for (auto i = 0U; i < encoded.size(); ++i) {
+ _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes());
+ frame_done ();
+ }
} else {
lock.lock ();
- LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
- _queue.push_front (vf);
+ LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size());
+ _queue.insert (_queue.begin(), vf.begin(), vf.end());
lock.unlock ();
}
}
@@ -349,6 +351,8 @@ J2KEncoder::servers_list_changed ()
terminate_threads ();
_threads = make_shared<boost::thread_group>();
+ _frames_in_parallel = 0;
+
/* XXX: could re-use threads */
if (!Config::instance()->only_servers_encode ()) {
@@ -360,6 +364,7 @@ J2KEncoder::servers_list_changed ()
#else
_threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
#endif
+ _frames_in_parallel += backend->quantity();
}
}
@@ -373,6 +378,7 @@ J2KEncoder::servers_list_changed ()
LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
for (int j = 0; j < i.threads(); ++j) {
_threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
+ _frames_in_parallel += backend->quantity();
}
}