diff options
| author | Carl Hetherington <cth@carlh.net> | 2021-11-21 14:29:21 +0100 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2021-11-22 23:59:43 +0100 |
| commit | 33acff363c978914cdcbf21fb8fa3d1c4c6991b7 (patch) | |
| tree | 4bd8de953c0692d816e48e59d036a1625b9d3ff2 /src/lib/j2k_encoder.cc | |
| parent | 0a49cc2ebbfc3809313f252208a0050a3fce1e97 (diff) | |
Allow J2K encode backends to accept more than one frame at once.
Diffstat (limited to 'src/lib/j2k_encoder.cc')
| -rw-r--r-- | src/lib/j2k_encoder.cc | 32 |
1 files changed, 19 insertions, 13 deletions
diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index a003a6ea2..0f4109c14 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -141,9 +141,9 @@ J2KEncoder::end () for (auto const& i: _queue) { LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); try { - auto enc = cpu.encode(i); - DCPOMATIC_ASSERT (enc); - _writer->write (make_shared<dcp::ArrayData>(*enc), i.index(), i.eyes()); + auto enc = cpu.encode({i}); + DCPOMATIC_ASSERT (!enc.empty()); + _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes()); frame_done (); } catch (std::exception& e) { LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); @@ -206,7 +206,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time) /* Wait until the queue has gone down a bit. Allow one thing in the queue even when there are no threads. */ - while (_queue.size() >= (threads * 2) + 1) { + while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) { LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads); _full_condition.wait (queue_lock); LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads); @@ -297,29 +297,31 @@ try } LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); - auto vf = _queue.front (); + auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity())); + std::vector<DCPVideo> vf (_queue.begin(), end); - /* We're about to commit to either encoding this frame or putting it back onto the queue, + /* We're about to commit to either encoding these frames or putting them back onto the queue, so we must not be interrupted until one or other of these things have happened. This block has thread interruption disabled. */ { boost::this_thread::disable_interruption dis; - LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes())); - _queue.pop_front (); + _queue.erase(_queue.begin(), end); lock.unlock (); auto encoded = backend->encode(vf); - if (encoded) { - _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes()); - frame_done (); + if (encoded.size() == vf.size()) { + for (auto i = 0U; i < encoded.size(); ++i) { + _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes()); + frame_done (); + } } else { lock.lock (); - LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); - _queue.push_front (vf); + LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size()); + _queue.insert (_queue.begin(), vf.begin(), vf.end()); lock.unlock (); } } @@ -349,6 +351,8 @@ J2KEncoder::servers_list_changed () terminate_threads (); _threads = make_shared<boost::thread_group>(); + _frames_in_parallel = 0; + /* XXX: could re-use threads */ if (!Config::instance()->only_servers_encode ()) { @@ -360,6 +364,7 @@ J2KEncoder::servers_list_changed () #else _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend)); #endif + _frames_in_parallel += backend->quantity(); } } @@ -373,6 +378,7 @@ J2KEncoder::servers_list_changed () LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name()); for (int j = 0; j < i.threads(); ++j) { _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend)); + _frames_in_parallel += backend->quantity(); } } |
