From 0a49cc2ebbfc3809313f252208a0050a3fce1e97 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 21 Nov 2021 14:51:09 +0100 Subject: Separate out local/remote encode code from DCPVideo. Now we have a J2KEncoderCPUBackend and a J2KEncoderRemoteBackend. --- src/lib/dcp_video.cc | 132 ---------------------------------- src/lib/dcp_video.h | 22 ++++-- src/lib/encode_server.cc | 25 ++++--- src/lib/j2k_encoder.cc | 80 +++++---------------- src/lib/j2k_encoder.h | 7 +- src/lib/j2k_encoder_backend.h | 45 ++++++++++++ src/lib/j2k_encoder_cpu_backend.cc | 118 ++++++++++++++++++++++++++++++ src/lib/j2k_encoder_cpu_backend.h | 39 ++++++++++ src/lib/j2k_encoder_remote_backend.cc | 121 +++++++++++++++++++++++++++++++ src/lib/j2k_encoder_remote_backend.h | 50 +++++++++++++ src/lib/writer.cc | 2 +- src/lib/wscript | 2 + src/tools/server_test.cc | 24 ++++--- test/client_server_test.cc | 45 +++++++----- test/low_bitrate_test.cc | 9 ++- test/writer_test.cc | 5 +- 16 files changed, 481 insertions(+), 245 deletions(-) create mode 100644 src/lib/j2k_encoder_backend.h create mode 100644 src/lib/j2k_encoder_cpu_backend.cc create mode 100644 src/lib/j2k_encoder_cpu_backend.h create mode 100644 src/lib/j2k_encoder_remote_backend.cc create mode 100644 src/lib/j2k_encoder_remote_backend.h diff --git a/src/lib/dcp_video.cc b/src/lib/dcp_video.cc index 4a41e17b9..9b0c76ceb 100644 --- a/src/lib/dcp_video.cc +++ b/src/lib/dcp_video.cc @@ -121,138 +121,6 @@ DCPVideo::convert_to_xyz (shared_ptr frame, dcp::NoteHandler return xyz; } -/** J2K-encode this frame on the local host. - * @return Encoded data. - */ -ArrayData -DCPVideo::encode_locally () const -{ - auto const comment = Config::instance()->dcp_j2k_comment(); - - ArrayData enc = {}; - /* This was empirically derived by a user: see #1902 */ - int const minimum_size = 16384; - LOG_GENERAL ("Using minimum frame size %1", minimum_size); - - auto xyz = convert_to_xyz (_frame, boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2)); - int noise_amount = 2; - int pixel_skip = 16; - while (true) { - enc = dcp::compress_j2k ( - xyz, - _j2k_bandwidth, - _frames_per_second, - _frame->eyes() == Eyes::LEFT || _frame->eyes() == Eyes::RIGHT, - _resolution == Resolution::FOUR_K, - comment.empty() ? "libdcp" : comment - ); - - if (enc.size() >= minimum_size) { - LOG_GENERAL (N_("Frame %1 encoded size was OK (%2)"), _index, enc.size()); - break; - } - - LOG_GENERAL (N_("Frame %1 encoded size was small (%2); adding noise at level %3 with pixel skip %4"), _index, enc.size(), noise_amount, pixel_skip); - - /* The JPEG2000 is too low-bitrate for some decoders DSS200 so add some noise - * and try again. This is slow but hopefully won't happen too often. We have to do - * convert_to_xyz() again because compress_j2k() corrupts its xyz parameter. - */ - - xyz = convert_to_xyz (_frame, boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2)); - auto size = xyz->size (); - auto pixels = size.width * size.height; - dcpomatic::RNG rng(42); - for (auto c = 0; c < 3; ++c) { - auto p = xyz->data(c); - auto e = xyz->data(c) + pixels; - while (p < e) { - *p = std::min(4095, std::max(0, *p + (rng.get() % noise_amount))); - p += pixel_skip; - } - } - - if (pixel_skip > 1) { - --pixel_skip; - } else { - ++noise_amount; - } - /* Something's gone badly wrong if this much noise doesn't help */ - DCPOMATIC_ASSERT (noise_amount < 16); - } - - switch (_frame->eyes()) { - case Eyes::BOTH: - LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for mono"), _index); - break; - case Eyes::LEFT: - LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for L"), _index); - break; - case Eyes::RIGHT: - LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for R"), _index); - break; - default: - break; - } - - return enc; -} - -/** Send this frame to a remote server for J2K encoding, then read the result. - * @param serv Server to send to. - * @param timeout timeout in seconds. - * @return Encoded data. - */ -ArrayData -DCPVideo::encode_remotely (EncodeServerDescription serv, int timeout) const -{ - boost::asio::io_service io_service; - boost::asio::ip::tcp::resolver resolver (io_service); - boost::asio::ip::tcp::resolver::query query (serv.host_name(), raw_convert (ENCODE_FRAME_PORT)); - boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query); - - auto socket = make_shared(timeout); - - socket->connect (*endpoint_iterator); - - /* Collect all XML metadata */ - xmlpp::Document doc; - auto root = doc.create_root_node ("EncodingRequest"); - root->add_child("Version")->add_child_text (raw_convert (SERVER_LINK_VERSION)); - add_metadata (root); - - LOG_DEBUG_ENCODE (N_("Sending frame %1 to remote"), _index); - - { - Socket::WriteDigestScope ds (socket); - - /* Send XML metadata */ - auto xml = doc.write_to_string ("UTF-8"); - socket->write (xml.length() + 1); - socket->write ((uint8_t *) xml.c_str(), xml.bytes() + 1); - - /* Send binary data */ - LOG_TIMING("start-remote-send thread=%1", thread_id ()); - _frame->write_to_socket (socket); - } - - /* Read the response (JPEG2000-encoded data); this blocks until the data - is ready and sent back. - */ - Socket::ReadDigestScope ds (socket); - LOG_TIMING("start-remote-encode thread=%1", thread_id ()); - ArrayData e (socket->read_uint32 ()); - LOG_TIMING("start-remote-receive thread=%1", thread_id ()); - socket->read (e.data(), e.size()); - LOG_TIMING("finish-remote-receive thread=%1", thread_id ()); - if (!ds.check()) { - throw NetworkError ("Checksums do not match"); - } - - LOG_DEBUG_ENCODE (N_("Finished remotely-encoded frame %1"), _index); - - return e; -} void DCPVideo::add_metadata (xmlpp::Element* el) const diff --git a/src/lib/dcp_video.h b/src/lib/dcp_video.h index 3bd516ccd..2508988fd 100644 --- a/src/lib/dcp_video.h +++ b/src/lib/dcp_video.h @@ -48,23 +48,35 @@ public: DCPVideo (DCPVideo const&) = default; DCPVideo& operator= (DCPVideo const&) = default; - dcp::ArrayData encode_locally () const; - dcp::ArrayData encode_remotely (EncodeServerDescription, int timeout = 30) const; + std::shared_ptr frame () const { + return _frame; + } int index () const { return _index; } + int frames_per_second () const { + return _frames_per_second; + } + + int j2k_bandwidth () const { + return _j2k_bandwidth; + } + + Resolution resolution () const { + return _resolution; + } + Eyes eyes () const; bool same (std::shared_ptr other) const; + void add_metadata (xmlpp::Element *) const; + static std::shared_ptr convert_to_xyz (std::shared_ptr frame, dcp::NoteHandler note); private: - - void add_metadata (xmlpp::Element *) const; - std::shared_ptr _frame; int _index; ///< frame index within the DCP's intrinsic duration int _frames_per_second; ///< Frames per second that we will use for the DCP diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc index 24ba5bc45..56774a5ca 100644 --- a/src/lib/encode_server.cc +++ b/src/lib/encode_server.cc @@ -25,18 +25,19 @@ */ -#include "encode_server.h" -#include "util.h" -#include "dcpomatic_socket.h" -#include "image.h" -#include "dcp_video.h" +#include "compose.hpp" #include "config.h" #include "cross.h" -#include "player_video.h" -#include "compose.hpp" -#include "log.h" +#include "dcp_video.h" #include "dcpomatic_log.h" +#include "dcpomatic_socket.h" +#include "encode_server.h" #include "encoded_log_entry.h" +#include "image.h" +#include "j2k_encoder_cpu_backend.h" +#include "log.h" +#include "player_video.h" +#include "util.h" #include "version.h" #include "warnings.h" #include @@ -151,14 +152,16 @@ EncodeServer::process (shared_ptr socket, struct timeval& after_read, st gettimeofday (&after_read, 0); - auto encoded = dcp_video_frame.encode_locally (); + J2KEncoderCPUBackend cpu; + auto encoded = cpu.encode (dcp_video_frame); + DCPOMATIC_ASSERT (encoded); gettimeofday (&after_encode, 0); try { Socket::WriteDigestScope ds (socket); - socket->write (encoded.size()); - socket->write (encoded.data(), encoded.size()); + socket->write (encoded->size()); + socket->write (encoded->data(), encoded->size()); } catch (std::exception& e) { cerr << "Send failed; frame " << dcp_video_frame.index() << "\n"; LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index()); diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index 8e00f3556..a003a6ea2 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -24,6 +24,8 @@ */ +#include "j2k_encoder_cpu_backend.h" +#include "j2k_encoder_remote_backend.h" #include "j2k_encoder.h" #include "util.h" #include "film.h" @@ -135,14 +137,13 @@ J2KEncoder::end () So just mop up anything left in the queue here. */ + J2KEncoderCPUBackend cpu; for (auto const& i: _queue) { LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); try { - _writer->write ( - make_shared(i.encode_locally()), - i.index(), - i.eyes() - ); + auto enc = cpu.encode(i); + DCPOMATIC_ASSERT (enc); + _writer->write (make_shared(*enc), i.index(), i.eyes()); frame_done (); } catch (std::exception& e) { LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); @@ -280,22 +281,12 @@ J2KEncoder::terminate_threads () void -J2KEncoder::encoder_thread (optional server) +J2KEncoder::encoder_thread (shared_ptr backend) try { start_of_thread ("J2KEncoder"); - if (server) { - LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ()); - } else { - LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ()); - } - - /* Number of seconds that we currently wait between attempts - to connect to the server; not relevant for localhost - encodings. - */ - int remote_backoff = 0; + LOG_TIMING ("start-encoder-thread thread=%1", thread_id()); while (true) { @@ -320,45 +311,10 @@ try lock.unlock (); - shared_ptr encoded; - - /* We need to encode this input */ - if (server) { - try { - encoded = make_shared(vf.encode_remotely(server.get())); - - if (remote_backoff > 0) { - LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); - } - - /* This job succeeded, so remove any backoff */ - remote_backoff = 0; - - } catch (std::exception& e) { - if (remote_backoff < 60) { - /* back off more */ - remote_backoff += 10; - } - LOG_ERROR ( - N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), - vf.index(), server->host_name(), e.what(), remote_backoff - ); - } - - } else { - try { - LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - encoded = make_shared(vf.encode_locally()); - LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index()); - } catch (std::exception& e) { - /* This is very bad, so don't cope with it, just pass it on */ - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); - throw; - } - } + auto encoded = backend->encode(vf); if (encoded) { - _writer->write (encoded, vf.index(), vf.eyes()); + _writer->write (make_shared(*encoded), vf.index(), vf.eyes()); frame_done (); } else { lock.lock (); @@ -368,10 +324,6 @@ try } } - if (remote_backoff > 0) { - boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff)); - } - /* The queue might not be full any more, so notify anything that is waiting on that */ lock.lock (); _full_condition.notify_all (); @@ -400,12 +352,13 @@ J2KEncoder::servers_list_changed () /* XXX: could re-use threads */ if (!Config::instance()->only_servers_encode ()) { + auto backend = std::make_shared(); for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { #ifdef DCPOMATIC_LINUX - auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); + auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend)); pthread_setname_np (t->native_handle(), "encode-worker"); #else - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend)); #endif } } @@ -415,11 +368,14 @@ J2KEncoder::servers_list_changed () continue; } - LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); + auto backend = std::make_shared(i); + + LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name()); for (int j = 0; j < i.threads(); ++j) { - _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend)); } } _writer->set_encoder_threads (_threads->size()); } + diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h index ea0a2bef8..df98d1a7b 100644 --- a/src/lib/j2k_encoder.h +++ b/src/lib/j2k_encoder.h @@ -32,6 +32,8 @@ #include "cross.h" #include "event_history.h" #include "exception_store.h" +#include "j2k_encoder_cpu_backend.h" +#include "j2k_encoder_remote_backend.h" #include #include #include @@ -45,6 +47,7 @@ class Film; class EncodeServerDescription; class DCPVideo; class Writer; +class J2KEncoderBackend; class Job; class PlayerVideo; @@ -84,12 +87,12 @@ private: void frame_done (); - void encoder_thread (boost::optional); + void encoder_thread (std::shared_ptr backend); void terminate_threads (); - /** Film that we are encoding */ std::shared_ptr _film; + EventHistory _history; boost::mutex _threads_mutex; diff --git a/src/lib/j2k_encoder_backend.h b/src/lib/j2k_encoder_backend.h new file mode 100644 index 000000000..295f81e84 --- /dev/null +++ b/src/lib/j2k_encoder_backend.h @@ -0,0 +1,45 @@ +/* + Copyright (C) 2021 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#ifndef DCPOMATIC_J2K_ENCODER_BACKEND_H +#define DCPOMATIC_J2K_ENCODER_BACKEND_H + + +#include +#include + + +class DCPVideo; + + +class J2KEncoderBackend +{ +public: + J2KEncoderBackend () {} + + J2KEncoderBackend (J2KEncoderBackend const&) = delete; + J2KEncoderBackend& operator= (J2KEncoderBackend const&) = delete; + + virtual boost::optional encode (DCPVideo video) = 0; +}; + + +#endif diff --git a/src/lib/j2k_encoder_cpu_backend.cc b/src/lib/j2k_encoder_cpu_backend.cc new file mode 100644 index 000000000..99963c103 --- /dev/null +++ b/src/lib/j2k_encoder_cpu_backend.cc @@ -0,0 +1,118 @@ +/* + Copyright (C) 2021 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "config.h" +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "j2k_encoder_cpu_backend.h" +#include "player_video.h" +#include "rng.h" +#include +#include + +#include "i18n.h" + + +using std::shared_ptr; +using boost::optional; +#if BOOST_VERSION >= 106100 +using namespace boost::placeholders; +#endif + + +optional +J2KEncoderCPUBackend::encode (DCPVideo video) +{ + try { + auto const comment = Config::instance()->dcp_j2k_comment(); + + /* This was empirically derived by a user: see #1902 */ + int const minimum_size = 16384; + LOG_GENERAL ("Using minimum frame size %1", minimum_size); + + auto xyz = DCPVideo::convert_to_xyz (video.frame(), boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2)); + int noise_amount = 2; + int pixel_skip = 16; + while (true) { + auto enc = dcp::compress_j2k ( + xyz, + video.j2k_bandwidth(), + video.frames_per_second(), + video.frame()->eyes() == Eyes::LEFT || video.frame()->eyes() == Eyes::RIGHT, + video.resolution() == Resolution::FOUR_K, + comment.empty() ? "libdcp" : comment + ); + + if (enc.size() >= minimum_size) { + LOG_GENERAL (N_("Frame %1 encoded size was OK (%2)"), video.index(), enc.size()); + return enc; + } + + LOG_GENERAL (N_("Frame %1 encoded size was small (%2); adding noise at level %3 with pixel skip %4"), video.index(), enc.size(), noise_amount, pixel_skip); + + /* The JPEG2000 is too low-bitrate for some decoders DSS200 so add some noise + * and try again. This is slow but hopefully won't happen too often. We have to do + * convert_to_xyz() again because compress_j2k() corrupts its xyz parameter. + */ + + xyz = DCPVideo::convert_to_xyz (video.frame(), boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2)); + auto const size = xyz->size (); + auto const pixels = size.width * size.height; + dcpomatic::RNG rng(42); + for (auto c = 0; c < 3; ++c) { + auto p = xyz->data(c); + auto e = xyz->data(c) + pixels; + while (p < e) { + *p = std::min(4095, std::max(0, *p + (rng.get() % noise_amount))); + p += pixel_skip; + } + } + + if (pixel_skip > 1) { + --pixel_skip; + } else { + ++noise_amount; + } + /* Something's gone badly wrong if this much noise doesn't help */ + DCPOMATIC_ASSERT (noise_amount < 16); + } + + switch (video.frame()->eyes()) { + case Eyes::BOTH: + LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for mono"), video.index()); + break; + case Eyes::LEFT: + LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for L"), video.index()); + break; + case Eyes::RIGHT: + LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for R"), video.index()); + break; + default: + break; + } + } catch (std::exception& e) { + LOG_ERROR (N_("Local encode failed (%1)"), e.what()); + } + + return {}; +} + + diff --git a/src/lib/j2k_encoder_cpu_backend.h b/src/lib/j2k_encoder_cpu_backend.h new file mode 100644 index 000000000..41ea8d2e5 --- /dev/null +++ b/src/lib/j2k_encoder_cpu_backend.h @@ -0,0 +1,39 @@ +/* + Copyright (C) 2021 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#ifndef DCPOMATIC_J2K_ENCODER_CPU_BACKEND_H +#define DCPOMATIC_J2K_ENCODER_CPU_BACKEND_H + + +#include "j2k_encoder_backend.h" + + +class J2KEncoderCPUBackend : public J2KEncoderBackend +{ +public: + J2KEncoderCPUBackend () = default; + J2KEncoderCPUBackend (J2KEncoderCPUBackend&& other) = default; + + boost::optional encode (DCPVideo video) override; +}; + + +#endif diff --git a/src/lib/j2k_encoder_remote_backend.cc b/src/lib/j2k_encoder_remote_backend.cc new file mode 100644 index 000000000..8bf1a0caf --- /dev/null +++ b/src/lib/j2k_encoder_remote_backend.cc @@ -0,0 +1,121 @@ +/* + Copyright (C) 2021 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#include "config.h" +#include "cross.h" +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "dcpomatic_socket.h" +#include "j2k_encoder_remote_backend.h" +#include "player_video.h" +#include "warnings.h" +#include +DCPOMATIC_DISABLE_WARNINGS +#include +DCPOMATIC_ENABLE_WARNINGS +#include + + +#include "i18n.h" + + +using std::make_shared; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using boost::optional; +using dcp::raw_convert; + + +J2KEncoderRemoteBackend::J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& other) + : _server (other._server) + , _backoff (other._backoff) +{ + +} + + +optional +J2KEncoderRemoteBackend::encode (DCPVideo video) +{ + try { + boost::asio::io_service io_service; + boost::asio::ip::tcp::resolver resolver (io_service); + boost::asio::ip::tcp::resolver::query query (_server.host_name(), raw_convert(ENCODE_FRAME_PORT)); + auto endpoint_iterator = resolver.resolve (query); + + auto socket = make_shared(_timeout); + + socket->connect (*endpoint_iterator); + + /* Collect all XML metadata */ + xmlpp::Document doc; + auto root = doc.create_root_node ("EncodingRequest"); + root->add_child("Version")->add_child_text(raw_convert(SERVER_LINK_VERSION)); + video.add_metadata (root); + + LOG_DEBUG_ENCODE (N_("Sending frame %1 to remote"), video.index()); + + { + Socket::WriteDigestScope ds (socket); + + /* Send XML metadata */ + auto xml = doc.write_to_string ("UTF-8"); + socket->write (xml.length() + 1); + socket->write (reinterpret_cast(xml.c_str()), xml.bytes() + 1); + + /* Send binary data */ + LOG_TIMING("start-remote-send thread=%1", thread_id()); + video.frame()->write_to_socket(socket); + } + + /* Read the response (JPEG2000-encoded data); this blocks until the data + is ready and sent back. + */ + Socket::ReadDigestScope ds (socket); + LOG_TIMING("start-remote-encode thread=%1", thread_id()); + dcp::ArrayData enc(socket->read_uint32()); + LOG_TIMING("start-remote-receive thread=%1", thread_id()); + socket->read (enc.data(), enc.size()); + LOG_TIMING("finish-remote-receive thread=%1", thread_id()); + if (!ds.check()) { + throw NetworkError ("Checksums do not match"); + } + + LOG_DEBUG_ENCODE (N_("Finished remotely-encoded frame %1"), video.index()); + + _backoff = 0; + return enc; + + } catch (std::exception& e) { + if (_backoff < 60) { + /* back off more */ + _backoff += 10; + } + LOG_ERROR ( + N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), + video.index(), _server.host_name(), e.what(), _backoff + ); + return {}; + } + +} + diff --git a/src/lib/j2k_encoder_remote_backend.h b/src/lib/j2k_encoder_remote_backend.h new file mode 100644 index 000000000..a8f1aa401 --- /dev/null +++ b/src/lib/j2k_encoder_remote_backend.h @@ -0,0 +1,50 @@ +/* + Copyright (C) 2021 Carl Hetherington + + This file is part of DCP-o-matic. + + DCP-o-matic is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + DCP-o-matic is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with DCP-o-matic. If not, see . + +*/ + + +#ifndef DCPOMATIC_J2K_ENCODER_REMOTE_BACKEND_H +#define DCPOMATIC_J2K_ENCODER_REMOTE_BACKEND_H + + +#include "encode_server_description.h" +#include "j2k_encoder_backend.h" + + +class J2KEncoderRemoteBackend : public J2KEncoderBackend +{ +public: + J2KEncoderRemoteBackend (EncodeServerDescription server, int timeout = 30) + : _server (server) + , _timeout (timeout) + {} + + J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& other); + + boost::optional encode (DCPVideo video) override; + +private: + EncodeServerDescription _server; + int _timeout; + int _backoff = 0; +}; + + +#endif + diff --git a/src/lib/writer.cc b/src/lib/writer.cc index d85be9eff..9674fc3d5 100644 --- a/src/lib/writer.cc +++ b/src/lib/writer.cc @@ -437,7 +437,7 @@ try case QueueItem::Type::FULL: LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes); if (!qi.encoded) { - qi.encoded.reset (new ArrayData(film()->j2c_path(qi.reel, qi.frame, qi.eyes, false))); + qi.encoded = make_shared(film()->j2c_path(qi.reel, qi.frame, qi.eyes, false)); } reel.write (qi.encoded, qi.frame, qi.eyes); ++_full_written; diff --git a/src/lib/wscript b/src/lib/wscript index fd243db68..9d7e1d11b 100644 --- a/src/lib/wscript +++ b/src/lib/wscript @@ -122,6 +122,8 @@ sources = """ image_examiner.cc image_filename_sorter.cc image_proxy.cc + j2k_encoder_cpu_backend.cc + j2k_encoder_remote_backend.cc j2k_image_proxy.cc job.cc job_manager.cc diff --git a/src/tools/server_test.cc b/src/tools/server_test.cc index aaeb4115a..39363cc1e 100644 --- a/src/tools/server_test.cc +++ b/src/tools/server_test.cc @@ -29,6 +29,8 @@ #include "lib/filter.h" #include "lib/player.h" #include "lib/player_video.h" +#include "lib/j2k_encoder_cpu_backend.h" +#include "lib/j2k_encoder_remote_backend.h" #include "lib/ratio.h" #include "lib/util.h" #include "lib/video_decoder.h" @@ -60,37 +62,41 @@ static int frame_count = 0; void process_video (shared_ptr pvf) { - auto local = make_shared(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K); - auto remote = make_shared(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K); + DCPVideo local(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K); + DCPVideo remote(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K); cout << "Frame " << frame_count << ": "; cout.flush (); ++frame_count; - auto local_encoded = local->encode_locally (); - ArrayData remote_encoded; + J2KEncoderCPUBackend cpu_backend; + auto local_encoded = cpu_backend.encode(local); + optional remote_encoded; string remote_error; + J2KEncoderRemoteBackend remote_backend(*server); try { - remote_encoded = remote->encode_remotely (*server); + remote_encoded = remote_backend.encode(remote); } catch (NetworkError& e) { remote_error = e.what (); } + DCPOMATIC_ASSERT (remote_encoded); + if (!remote_error.empty()) { cout << "\033[0;31mnetwork problem: " << remote_error << "\033[0m\n"; return; } - if (local_encoded.size() != remote_encoded.size()) { + if (local_encoded->size() != remote_encoded->size()) { cout << "\033[0;31msizes differ\033[0m\n"; return; } - auto p = local_encoded.data(); - auto q = remote_encoded.data(); - for (int i = 0; i < local_encoded.size(); ++i) { + auto p = local_encoded->data(); + auto q = remote_encoded->data(); + for (int i = 0; i < local_encoded->size(); ++i) { if (*p++ != *q++) { cout << "\033[0;31mdata differ\033[0m at byte " << i << "\n"; return; diff --git a/test/client_server_test.cc b/test/client_server_test.cc index d6fe4b948..2355c08c0 100644 --- a/test/client_server_test.cc +++ b/test/client_server_test.cc @@ -36,6 +36,8 @@ #include "lib/encode_server_description.h" #include "lib/file_log.h" #include "lib/image.h" +#include "lib/j2k_encoder_cpu_backend.h" +#include "lib/j2k_encoder_remote_backend.h" #include "lib/j2k_image_proxy.h" #include "lib/player_video.h" #include "lib/raw_image_proxy.h" @@ -55,13 +57,15 @@ using namespace dcpomatic; void -do_remote_encode (shared_ptr frame, EncodeServerDescription description, ArrayData locally_encoded) +do_remote_encode (DCPVideo frame, EncodeServerDescription description, ArrayData locally_encoded) { - ArrayData remotely_encoded; - BOOST_REQUIRE_NO_THROW (remotely_encoded = frame->encode_remotely (description, 1200)); + optional remotely_encoded; + J2KEncoderRemoteBackend backend (description, 1200); + BOOST_REQUIRE_NO_THROW (remotely_encoded = backend.encode(frame)); + BOOST_REQUIRE (static_cast(remotely_encoded)); - BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded.size()); - BOOST_CHECK_EQUAL (memcmp (locally_encoded.data(), remotely_encoded.data(), locally_encoded.size()), 0); + BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded->size()); + BOOST_CHECK_EQUAL (memcmp (locally_encoded.data(), remotely_encoded->data(), locally_encoded.size()), 0); } @@ -112,7 +116,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb) pvf->set_text (PositionImage(sub_image, Position(50, 60))); - auto frame = make_shared ( + DCPVideo frame( pvf, 0, 24, @@ -120,7 +124,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb) Resolution::TWO_K ); - auto locally_encoded = frame->encode_locally (); + J2KEncoderCPUBackend cpu; + auto locally_encoded = cpu.encode (frame); + BOOST_REQUIRE (static_cast(locally_encoded)); auto server = new EncodeServer (true, 2); @@ -134,7 +140,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb) list threads; for (int i = 0; i < 8; ++i) { - threads.push_back (new thread (boost::bind (do_remote_encode, frame, description, locally_encoded))); + threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded))); } for (auto i: threads) { @@ -195,7 +201,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv) pvf->set_text (PositionImage(sub_image, Position(50, 60))); - auto frame = make_shared( + DCPVideo frame( pvf, 0, 24, @@ -203,7 +209,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv) Resolution::TWO_K ); - auto locally_encoded = frame->encode_locally (); + J2KEncoderCPUBackend cpu; + auto locally_encoded = cpu.encode (frame); + BOOST_REQUIRE (static_cast(locally_encoded)); auto server = new EncodeServer (true, 2); @@ -217,7 +225,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv) list threads; for (int i = 0; i < 8; ++i) { - threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, locally_encoded))); + threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded))); } for (auto i: threads) { @@ -263,7 +271,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) false ); - auto raw_frame = make_shared ( + DCPVideo raw_frame( raw_pvf, 0, 24, @@ -271,10 +279,12 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) Resolution::TWO_K ); - auto raw_locally_encoded = raw_frame->encode_locally (); + J2KEncoderCPUBackend cpu; + auto raw_locally_encoded = cpu.encode (raw_frame); + BOOST_REQUIRE (static_cast(raw_locally_encoded)); auto j2k_pvf = std::make_shared ( - std::make_shared(raw_locally_encoded, dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE), + std::make_shared(*raw_locally_encoded, dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE), Crop(), optional(), dcp::Size(1998, 1080), @@ -288,7 +298,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) false ); - auto j2k_frame = make_shared ( + DCPVideo j2k_frame( j2k_pvf, 0, 24, @@ -296,7 +306,8 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) Resolution::TWO_K ); - auto j2k_locally_encoded = j2k_frame->encode_locally (); + auto j2k_locally_encoded = cpu.encode(j2k_frame); + BOOST_REQUIRE (static_cast(j2k_locally_encoded)); auto server = new EncodeServer (true, 2); @@ -310,7 +321,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) list threads; for (int i = 0; i < 8; ++i) { - threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, j2k_locally_encoded))); + threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, *j2k_locally_encoded))); } for (auto i: threads) { diff --git a/test/low_bitrate_test.cc b/test/low_bitrate_test.cc index 356013b2b..8f16f56f1 100644 --- a/test/low_bitrate_test.cc +++ b/test/low_bitrate_test.cc @@ -22,6 +22,7 @@ #include "lib/dcp_video.h" #include "lib/image.h" #include "lib/player_video.h" +#include "lib/j2k_encoder_cpu_backend.h" #include "lib/raw_image_proxy.h" extern "C" { #include @@ -56,9 +57,11 @@ BOOST_AUTO_TEST_CASE (low_bitrate_test) false ); - auto dcp_video = make_shared(frame, 0, 24, 100000000, Resolution::TWO_K); - auto j2k = dcp_video->encode_locally(); - BOOST_REQUIRE (j2k.size() >= 16536); + DCPVideo dcp_video(frame, 0, 24, 100000000, Resolution::TWO_K); + J2KEncoderCPUBackend cpu; + auto j2k = cpu.encode(dcp_video); + BOOST_REQUIRE (static_cast(j2k)); + BOOST_REQUIRE (j2k->size() >= 16536); } diff --git a/test/writer_test.cc b/test/writer_test.cc index 1dfc75bfc..43b0a4945 100644 --- a/test/writer_test.cc +++ b/test/writer_test.cc @@ -75,15 +75,14 @@ BOOST_AUTO_TEST_CASE (interrupt_writer) } /* Write some data */ - auto video = dcp::compress_j2k(image, 100000000, 24, false, false); - auto video_ptr = make_shared(video.data(), video.size()); + auto video = make_shared(dcp::compress_j2k(image, 100000000, 24, false, false)); auto audio = make_shared(6, 48000 / 24); auto writer = make_shared(film, shared_ptr()); writer->start (); for (int i = 0; i < frames; ++i) { - writer->write (video_ptr, i, Eyes::BOTH); + writer->write (video, i, Eyes::BOTH); writer->write (audio, dcpomatic::DCPTime::from_frames(i, 24)); } -- cgit v1.2.3