gettimeofday (&after_read, 0);
J2KEncoderCPUBackend cpu;
- auto encoded = cpu.encode (dcp_video_frame);
- DCPOMATIC_ASSERT (encoded);
+ auto encoded = cpu.encode ({dcp_video_frame});
+ DCPOMATIC_ASSERT (!encoded.empty());
gettimeofday (&after_encode, 0);
try {
Socket::WriteDigestScope ds (socket);
- socket->write (encoded->size());
- socket->write (encoded->data(), encoded->size());
+ socket->write (encoded[0].size());
+ socket->write (encoded[0].data(), encoded[0].size());
} catch (std::exception& e) {
cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
for (auto const& i: _queue) {
LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
try {
- auto enc = cpu.encode(i);
- DCPOMATIC_ASSERT (enc);
- _writer->write (make_shared<dcp::ArrayData>(*enc), i.index(), i.eyes());
+ auto enc = cpu.encode({i});
+ DCPOMATIC_ASSERT (!enc.empty());
+ _writer->write (make_shared<dcp::ArrayData>(enc.front()), i.index(), i.eyes());
frame_done ();
} catch (std::exception& e) {
LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
/* Wait until the queue has gone down a bit. Allow one thing in the queue even
when there are no threads.
*/
- while (_queue.size() >= (threads * 2) + 1) {
+ while (static_cast<int>(_queue.size()) >= (_frames_in_parallel * 2) + 1) {
LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
_full_condition.wait (queue_lock);
LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
}
LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
- auto vf = _queue.front ();
+ auto end = std::next(_queue.begin(), std::min(static_cast<int>(_queue.size()), backend->quantity()));
+ std::vector<DCPVideo> vf (_queue.begin(), end);
- /* We're about to commit to either encoding this frame or putting it back onto the queue,
+ /* We're about to commit to either encoding these frames or putting them back onto the queue,
so we must not be interrupted until one or other of these things have happened. This
block has thread interruption disabled.
*/
{
boost::this_thread::disable_interruption dis;
- LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
- _queue.pop_front ();
+ _queue.erase(_queue.begin(), end);
lock.unlock ();
auto encoded = backend->encode(vf);
- if (encoded) {
- _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes());
- frame_done ();
+ if (encoded.size() == vf.size()) {
+ for (auto i = 0U; i < encoded.size(); ++i) {
+ _writer->write (make_shared<dcp::ArrayData>(encoded[i]), vf[i].index(), vf[i].eyes());
+ frame_done ();
+ }
} else {
lock.lock ();
- LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
- _queue.push_front (vf);
+ LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames onto queue after failure"), thread_id(), vf.size());
+ _queue.insert (_queue.begin(), vf.begin(), vf.end());
lock.unlock ();
}
}
terminate_threads ();
_threads = make_shared<boost::thread_group>();
+ _frames_in_parallel = 0;
+
/* XXX: could re-use threads */
if (!Config::instance()->only_servers_encode ()) {
#else
_threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
#endif
+ _frames_in_parallel += backend->quantity();
}
}
LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
for (int j = 0; j < i.threads(); ++j) {
_threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
+ _frames_in_parallel += backend->quantity();
}
}
boost::mutex _threads_mutex;
std::shared_ptr<boost::thread_group> _threads;
+ int _frames_in_parallel = 0;
+
mutable boost::mutex _queue_mutex;
std::list<DCPVideo> _queue;
/** condition to manage thread wakeups when we have nothing to do */
J2KEncoderBackend (J2KEncoderBackend const&) = delete;
J2KEncoderBackend& operator= (J2KEncoderBackend const&) = delete;
- virtual boost::optional<dcp::ArrayData> encode (DCPVideo video) = 0;
+ virtual std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) = 0;
+ virtual int quantity () const {
+ return 1;
+ }
};
using std::shared_ptr;
+using std::vector;
using boost::optional;
#if BOOST_VERSION >= 106100
using namespace boost::placeholders;
#endif
-optional<dcp::ArrayData>
-J2KEncoderCPUBackend::encode (DCPVideo video)
+vector<dcp::ArrayData>
+J2KEncoderCPUBackend::encode (vector<DCPVideo> const& all_video)
{
+ DCPOMATIC_ASSERT (all_video.size() == 1);
+ auto video = all_video.front();
+
try {
auto const comment = Config::instance()->dcp_j2k_comment();
if (enc.size() >= minimum_size) {
LOG_GENERAL (N_("Frame %1 encoded size was OK (%2)"), video.index(), enc.size());
- return enc;
+ return { enc };
}
LOG_GENERAL (N_("Frame %1 encoded size was small (%2); adding noise at level %3 with pixel skip %4"), video.index(), enc.size(), noise_amount, pixel_skip);
J2KEncoderCPUBackend () = default;
J2KEncoderCPUBackend (J2KEncoderCPUBackend&& other) = default;
- boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+ std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) override;
};
using std::shared_ptr;
using std::string;
using std::unique_ptr;
+using std::vector;
using boost::optional;
using dcp::raw_convert;
}
-optional<dcp::ArrayData>
-J2KEncoderRemoteBackend::encode (DCPVideo video)
+vector<dcp::ArrayData>
+J2KEncoderRemoteBackend::encode (vector<DCPVideo> const& all_video)
{
+ DCPOMATIC_ASSERT (all_video.size() == 1);
+ auto video = all_video.front();
+
try {
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver (io_service);
LOG_DEBUG_ENCODE (N_("Finished remotely-encoded frame %1"), video.index());
_backoff = 0;
- return enc;
+ return { enc };
} catch (std::exception& e) {
if (_backoff < 60) {
J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& other);
- boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+ std::vector<dcp::ArrayData> encode (std::vector<DCPVideo> const& video) override;
private:
EncodeServerDescription _server;
using std::make_shared;
using std::shared_ptr;
using std::string;
+using std::vector;
using boost::optional;
using boost::bind;
#if BOOST_VERSION >= 106100
++frame_count;
J2KEncoderCPUBackend cpu_backend;
- auto local_encoded = cpu_backend.encode(local);
- optional<ArrayData> remote_encoded;
+ auto local_encoded = cpu_backend.encode({local});
+ vector<ArrayData> remote_encoded;
string remote_error;
J2KEncoderRemoteBackend remote_backend(*server);
try {
- remote_encoded = remote_backend.encode(remote);
+ remote_encoded = remote_backend.encode({remote});
} catch (NetworkError& e) {
remote_error = e.what ();
}
- DCPOMATIC_ASSERT (remote_encoded);
+ DCPOMATIC_ASSERT (!local_encoded.empty());
+ DCPOMATIC_ASSERT (!remote_encoded.empty());
if (!remote_error.empty()) {
cout << "\033[0;31mnetwork problem: " << remote_error << "\033[0m\n";
return;
}
- if (local_encoded->size() != remote_encoded->size()) {
+ if (local_encoded[0].size() != remote_encoded[0].size()) {
cout << "\033[0;31msizes differ\033[0m\n";
return;
}
- auto p = local_encoded->data();
- auto q = remote_encoded->data();
- for (int i = 0; i < local_encoded->size(); ++i) {
+ auto p = local_encoded[0].data();
+ auto q = remote_encoded[0].data();
+ for (int i = 0; i < local_encoded[0].size(); ++i) {
if (*p++ != *q++) {
cout << "\033[0;31mdata differ\033[0m at byte " << i << "\n";
return;
using std::make_shared;
using std::shared_ptr;
using std::weak_ptr;
+using std::vector;
using boost::thread;
using boost::optional;
using dcp::ArrayData;
void
do_remote_encode (DCPVideo frame, EncodeServerDescription description, ArrayData locally_encoded)
{
- optional<ArrayData> remotely_encoded;
+ vector<ArrayData> remotely_encoded;
J2KEncoderRemoteBackend backend (description, 1200);
- BOOST_REQUIRE_NO_THROW (remotely_encoded = backend.encode(frame));
- BOOST_REQUIRE (static_cast<bool>(remotely_encoded));
+ BOOST_REQUIRE_NO_THROW (remotely_encoded = backend.encode({frame}));
+ BOOST_REQUIRE (!remotely_encoded.empty());
- BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded->size());
- BOOST_CHECK_EQUAL (memcmp (locally_encoded.data(), remotely_encoded->data(), locally_encoded.size()), 0);
+ BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded[0].size());
+ BOOST_CHECK_EQUAL (memcmp(locally_encoded.data(), remotely_encoded[0].data(), locally_encoded.size()), 0);
}
);
J2KEncoderCPUBackend cpu;
- auto locally_encoded = cpu.encode (frame);
- BOOST_REQUIRE (static_cast<bool>(locally_encoded));
+ auto locally_encoded = cpu.encode ({frame});
+ BOOST_REQUIRE (!locally_encoded.empty());
auto server = new EncodeServer (true, 2);
list<thread*> threads;
for (int i = 0; i < 8; ++i) {
- threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded)));
+ threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, locally_encoded[0])));
}
for (auto i: threads) {
);
J2KEncoderCPUBackend cpu;
- auto locally_encoded = cpu.encode (frame);
- BOOST_REQUIRE (static_cast<bool>(locally_encoded));
+ auto locally_encoded = cpu.encode ({frame});
+ BOOST_REQUIRE (!locally_encoded.empty());
auto server = new EncodeServer (true, 2);
list<thread*> threads;
for (int i = 0; i < 8; ++i) {
- threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded)));
+ threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, locally_encoded[0])));
}
for (auto i: threads) {
);
J2KEncoderCPUBackend cpu;
- auto raw_locally_encoded = cpu.encode (raw_frame);
- BOOST_REQUIRE (static_cast<bool>(raw_locally_encoded));
+ auto raw_locally_encoded = cpu.encode ({raw_frame});
+ BOOST_REQUIRE (!raw_locally_encoded.empty());
auto j2k_pvf = std::make_shared<PlayerVideo> (
- std::make_shared<J2KImageProxy>(*raw_locally_encoded, dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE),
+ std::make_shared<J2KImageProxy>(raw_locally_encoded[0], dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE),
Crop(),
optional<double>(),
dcp::Size(1998, 1080),
Resolution::TWO_K
);
- auto j2k_locally_encoded = cpu.encode(j2k_frame);
- BOOST_REQUIRE (static_cast<bool>(j2k_locally_encoded));
+ auto j2k_locally_encoded = cpu.encode({j2k_frame});
+ BOOST_REQUIRE (!j2k_locally_encoded.empty());
auto server = new EncodeServer (true, 2);
list<thread*> threads;
for (int i = 0; i < 8; ++i) {
- threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, *j2k_locally_encoded)));
+ threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, j2k_locally_encoded[0])));
}
for (auto i: threads) {
DCPVideo dcp_video(frame, 0, 24, 100000000, Resolution::TWO_K);
J2KEncoderCPUBackend cpu;
- auto j2k = cpu.encode(dcp_video);
- BOOST_REQUIRE (static_cast<bool>(j2k));
- BOOST_REQUIRE (j2k->size() >= 16536);
+ auto j2k = cpu.encode({dcp_video});
+ BOOST_REQUIRE (!j2k.empty());
+ BOOST_REQUIRE (j2k[0].size() >= 16536);
}