Separate out local/remote encode code from DCPVideo.
authorCarl Hetherington <cth@carlh.net>
Sun, 21 Nov 2021 13:51:09 +0000 (14:51 +0100)
committerCarl Hetherington <cth@carlh.net>
Mon, 22 Nov 2021 22:59:43 +0000 (23:59 +0100)
Now we have a J2KEncoderCPUBackend and a J2KEncoderRemoteBackend.

16 files changed:
src/lib/dcp_video.cc
src/lib/dcp_video.h
src/lib/encode_server.cc
src/lib/j2k_encoder.cc
src/lib/j2k_encoder.h
src/lib/j2k_encoder_backend.h [new file with mode: 0644]
src/lib/j2k_encoder_cpu_backend.cc [new file with mode: 0644]
src/lib/j2k_encoder_cpu_backend.h [new file with mode: 0644]
src/lib/j2k_encoder_remote_backend.cc [new file with mode: 0644]
src/lib/j2k_encoder_remote_backend.h [new file with mode: 0644]
src/lib/writer.cc
src/lib/wscript
src/tools/server_test.cc
test/client_server_test.cc
test/low_bitrate_test.cc
test/writer_test.cc

index 4a41e17b9c514fe728a081d0e9f7c453fb05c76b..9b0c76ceb990bd40d86bf534dd4dd1eee7ae521b 100644 (file)
@@ -121,138 +121,6 @@ DCPVideo::convert_to_xyz (shared_ptr<const PlayerVideo> frame, dcp::NoteHandler
        return xyz;
 }
 
-/** J2K-encode this frame on the local host.
- *  @return Encoded data.
- */
-ArrayData
-DCPVideo::encode_locally () const
-{
-       auto const comment = Config::instance()->dcp_j2k_comment();
-
-       ArrayData enc = {};
-       /* This was empirically derived by a user: see #1902 */
-       int const minimum_size = 16384;
-       LOG_GENERAL ("Using minimum frame size %1", minimum_size);
-
-       auto xyz = convert_to_xyz (_frame, boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2));
-       int noise_amount = 2;
-       int pixel_skip = 16;
-       while (true) {
-               enc = dcp::compress_j2k (
-                       xyz,
-                       _j2k_bandwidth,
-                       _frames_per_second,
-                       _frame->eyes() == Eyes::LEFT || _frame->eyes() == Eyes::RIGHT,
-                       _resolution == Resolution::FOUR_K,
-                       comment.empty() ? "libdcp" : comment
-               );
-
-               if (enc.size() >= minimum_size) {
-                       LOG_GENERAL (N_("Frame %1 encoded size was OK (%2)"), _index, enc.size());
-                       break;
-               }
-
-               LOG_GENERAL (N_("Frame %1 encoded size was small (%2); adding noise at level %3 with pixel skip %4"), _index, enc.size(), noise_amount, pixel_skip);
-
-               /* The JPEG2000 is too low-bitrate for some decoders <cough>DSS200</cough> so add some noise
-                * and try again.  This is slow but hopefully won't happen too often.  We have to do
-                * convert_to_xyz() again because compress_j2k() corrupts its xyz parameter.
-                */
-
-               xyz = convert_to_xyz (_frame, boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2));
-               auto size = xyz->size ();
-               auto pixels = size.width * size.height;
-               dcpomatic::RNG rng(42);
-               for (auto c = 0; c < 3; ++c) {
-                       auto p = xyz->data(c);
-                       auto e = xyz->data(c) + pixels;
-                       while (p < e) {
-                               *p = std::min(4095, std::max(0, *p + (rng.get() % noise_amount)));
-                               p += pixel_skip;
-                       }
-               }
-
-               if (pixel_skip > 1) {
-                       --pixel_skip;
-               } else {
-                       ++noise_amount;
-               }
-               /* Something's gone badly wrong if this much noise doesn't help */
-               DCPOMATIC_ASSERT (noise_amount < 16);
-       }
-
-       switch (_frame->eyes()) {
-       case Eyes::BOTH:
-               LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for mono"), _index);
-               break;
-       case Eyes::LEFT:
-               LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for L"), _index);
-               break;
-       case Eyes::RIGHT:
-               LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for R"), _index);
-               break;
-       default:
-               break;
-       }
-
-       return enc;
-}
-
-/** Send this frame to a remote server for J2K encoding, then read the result.
- *  @param serv Server to send to.
- *  @param timeout timeout in seconds.
- *  @return Encoded data.
- */
-ArrayData
-DCPVideo::encode_remotely (EncodeServerDescription serv, int timeout) const
-{
-       boost::asio::io_service io_service;
-       boost::asio::ip::tcp::resolver resolver (io_service);
-       boost::asio::ip::tcp::resolver::query query (serv.host_name(), raw_convert<string> (ENCODE_FRAME_PORT));
-       boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
-
-       auto socket = make_shared<Socket>(timeout);
-
-       socket->connect (*endpoint_iterator);
-
-       /* Collect all XML metadata */
-       xmlpp::Document doc;
-       auto root = doc.create_root_node ("EncodingRequest");
-       root->add_child("Version")->add_child_text (raw_convert<string> (SERVER_LINK_VERSION));
-       add_metadata (root);
-
-       LOG_DEBUG_ENCODE (N_("Sending frame %1 to remote"), _index);
-
-       {
-               Socket::WriteDigestScope ds (socket);
-
-               /* Send XML metadata */
-               auto xml = doc.write_to_string ("UTF-8");
-               socket->write (xml.length() + 1);
-               socket->write ((uint8_t *) xml.c_str(), xml.bytes() + 1);
-
-               /* Send binary data */
-               LOG_TIMING("start-remote-send thread=%1", thread_id ());
-               _frame->write_to_socket (socket);
-       }
-
-       /* Read the response (JPEG2000-encoded data); this blocks until the data
-          is ready and sent back.
-       */
-       Socket::ReadDigestScope ds (socket);
-       LOG_TIMING("start-remote-encode thread=%1", thread_id ());
-       ArrayData e (socket->read_uint32 ());
-       LOG_TIMING("start-remote-receive thread=%1", thread_id ());
-       socket->read (e.data(), e.size());
-       LOG_TIMING("finish-remote-receive thread=%1", thread_id ());
-       if (!ds.check()) {
-               throw NetworkError ("Checksums do not match");
-       }
-
-       LOG_DEBUG_ENCODE (N_("Finished remotely-encoded frame %1"), _index);
-
-       return e;
-}
 
 void
 DCPVideo::add_metadata (xmlpp::Element* el) const
index 3bd516ccd4ea676567e80ac70880d39b382568c6..2508988fd96b6df60636520aa6069fc4c7eae4e2 100644 (file)
@@ -48,23 +48,35 @@ public:
        DCPVideo (DCPVideo const&) = default;
        DCPVideo& operator= (DCPVideo const&) = default;
 
-       dcp::ArrayData encode_locally () const;
-       dcp::ArrayData encode_remotely (EncodeServerDescription, int timeout = 30) const;
+       std::shared_ptr<const PlayerVideo> frame () const {
+               return _frame;
+       }
 
        int index () const {
                return _index;
        }
 
+       int frames_per_second () const {
+               return _frames_per_second;
+       }
+
+       int j2k_bandwidth () const {
+               return _j2k_bandwidth;
+       }
+
+       Resolution resolution () const {
+               return _resolution;
+       }
+
        Eyes eyes () const;
 
        bool same (std::shared_ptr<const DCPVideo> other) const;
 
+       void add_metadata (xmlpp::Element *) const;
+
        static std::shared_ptr<dcp::OpenJPEGImage> convert_to_xyz (std::shared_ptr<const PlayerVideo> frame, dcp::NoteHandler note);
 
 private:
-
-       void add_metadata (xmlpp::Element *) const;
-
        std::shared_ptr<const PlayerVideo> _frame;
        int _index;                      ///< frame index within the DCP's intrinsic duration
        int _frames_per_second;          ///< Frames per second that we will use for the DCP
index 24ba5bc451449bc59ab63d2c71d8ed182eb56a13..56774a5ca76e4cc50298f5030257a010608715b1 100644 (file)
  */
 
 
-#include "encode_server.h"
-#include "util.h"
-#include "dcpomatic_socket.h"
-#include "image.h"
-#include "dcp_video.h"
+#include "compose.hpp"
 #include "config.h"
 #include "cross.h"
-#include "player_video.h"
-#include "compose.hpp"
-#include "log.h"
+#include "dcp_video.h"
 #include "dcpomatic_log.h"
+#include "dcpomatic_socket.h"
+#include "encode_server.h"
 #include "encoded_log_entry.h"
+#include "image.h"
+#include "j2k_encoder_cpu_backend.h"
+#include "log.h"
+#include "player_video.h"
+#include "util.h"
 #include "version.h"
 #include "warnings.h"
 #include <dcp/raw_convert.h>
@@ -151,14 +152,16 @@ EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, st
 
        gettimeofday (&after_read, 0);
 
-       auto encoded = dcp_video_frame.encode_locally ();
+       J2KEncoderCPUBackend cpu;
+       auto encoded = cpu.encode (dcp_video_frame);
+       DCPOMATIC_ASSERT (encoded);
 
        gettimeofday (&after_encode, 0);
 
        try {
                Socket::WriteDigestScope ds (socket);
-               socket->write (encoded.size());
-               socket->write (encoded.data(), encoded.size());
+               socket->write (encoded->size());
+               socket->write (encoded->data(), encoded->size());
        } catch (std::exception& e) {
                cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
                LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
index 8e00f355613b08ec0a55756e25df13ad37a2bc0b..a003a6ea2b0399f972b33010a34b9276e096dea8 100644 (file)
@@ -24,6 +24,8 @@
  */
 
 
+#include "j2k_encoder_cpu_backend.h"
+#include "j2k_encoder_remote_backend.h"
 #include "j2k_encoder.h"
 #include "util.h"
 #include "film.h"
@@ -135,14 +137,13 @@ J2KEncoder::end ()
             So just mop up anything left in the queue here.
        */
 
+       J2KEncoderCPUBackend cpu;
        for (auto const& i: _queue) {
                LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
                try {
-                       _writer->write (
-                               make_shared<dcp::ArrayData>(i.encode_locally()),
-                               i.index(),
-                               i.eyes()
-                               );
+                       auto enc = cpu.encode(i);
+                       DCPOMATIC_ASSERT (enc);
+                       _writer->write (make_shared<dcp::ArrayData>(*enc), i.index(), i.eyes());
                        frame_done ();
                } catch (std::exception& e) {
                        LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
@@ -280,22 +281,12 @@ J2KEncoder::terminate_threads ()
 
 
 void
-J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
+J2KEncoder::encoder_thread (shared_ptr<J2KEncoderBackend> backend)
 try
 {
        start_of_thread ("J2KEncoder");
 
-       if (server) {
-               LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
-       } else {
-               LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
-       }
-
-       /* Number of seconds that we currently wait between attempts
-          to connect to the server; not relevant for localhost
-          encodings.
-       */
-       int remote_backoff = 0;
+       LOG_TIMING ("start-encoder-thread thread=%1", thread_id());
 
        while (true) {
 
@@ -320,45 +311,10 @@ try
 
                        lock.unlock ();
 
-                       shared_ptr<Data> encoded;
-
-                       /* We need to encode this input */
-                       if (server) {
-                               try {
-                                       encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
-
-                                       if (remote_backoff > 0) {
-                                               LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
-                                       }
-
-                                       /* This job succeeded, so remove any backoff */
-                                       remote_backoff = 0;
-
-                               } catch (std::exception& e) {
-                                       if (remote_backoff < 60) {
-                                               /* back off more */
-                                               remote_backoff += 10;
-                                       }
-                                       LOG_ERROR (
-                                               N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
-                                               vf.index(), server->host_name(), e.what(), remote_backoff
-                                               );
-                               }
-
-                       } else {
-                               try {
-                                       LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
-                                       encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
-                                       LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
-                               } catch (std::exception& e) {
-                                       /* This is very bad, so don't cope with it, just pass it on */
-                                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
-                                       throw;
-                               }
-                       }
+                       auto encoded = backend->encode(vf);
 
                        if (encoded) {
-                               _writer->write (encoded, vf.index(), vf.eyes());
+                               _writer->write (make_shared<dcp::ArrayData>(*encoded), vf.index(), vf.eyes());
                                frame_done ();
                        } else {
                                lock.lock ();
@@ -368,10 +324,6 @@ try
                        }
                }
 
-               if (remote_backoff > 0) {
-                       boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
-               }
-
                /* The queue might not be full any more, so notify anything that is waiting on that */
                lock.lock ();
                _full_condition.notify_all ();
@@ -400,12 +352,13 @@ J2KEncoder::servers_list_changed ()
        /* XXX: could re-use threads */
 
        if (!Config::instance()->only_servers_encode ()) {
+               auto backend = std::make_shared<J2KEncoderCPUBackend>();
                for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
 #ifdef DCPOMATIC_LINUX
-                       auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
+                       auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
                        pthread_setname_np (t->native_handle(), "encode-worker");
 #else
-                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
+                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
 #endif
                }
        }
@@ -415,11 +368,14 @@ J2KEncoder::servers_list_changed ()
                        continue;
                }
 
-               LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
+               auto backend = std::make_shared<J2KEncoderRemoteBackend>(i);
+
+               LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name());
                for (int j = 0; j < i.threads(); ++j) {
-                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
+                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
                }
        }
 
        _writer->set_encoder_threads (_threads->size());
 }
+
index ea0a2bef89bfbe873a83cbad11be48a1a0788985..df98d1a7b48e332495996b92e81140a8572e3ea6 100644 (file)
@@ -32,6 +32,8 @@
 #include "cross.h"
 #include "event_history.h"
 #include "exception_store.h"
+#include "j2k_encoder_cpu_backend.h"
+#include "j2k_encoder_remote_backend.h"
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition.hpp>
 #include <boost/thread.hpp>
@@ -45,6 +47,7 @@ class Film;
 class EncodeServerDescription;
 class DCPVideo;
 class Writer;
+class J2KEncoderBackend;
 class Job;
 class PlayerVideo;
 
@@ -84,12 +87,12 @@ private:
 
        void frame_done ();
 
-       void encoder_thread (boost::optional<EncodeServerDescription>);
+       void encoder_thread (std::shared_ptr<J2KEncoderBackend> backend);
        void terminate_threads ();
 
-       /** Film that we are encoding */
        std::shared_ptr<const Film> _film;
 
+
        EventHistory _history;
 
        boost::mutex _threads_mutex;
diff --git a/src/lib/j2k_encoder_backend.h b/src/lib/j2k_encoder_backend.h
new file mode 100644 (file)
index 0000000..295f81e
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+    Copyright (C) 2021 Carl Hetherington <cth@carlh.net>
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#ifndef DCPOMATIC_J2K_ENCODER_BACKEND_H
+#define DCPOMATIC_J2K_ENCODER_BACKEND_H
+
+
+#include <boost/optional.hpp>
+#include <dcp/array_data.h>
+
+
+class DCPVideo;
+
+
+class J2KEncoderBackend
+{
+public:
+       J2KEncoderBackend () {}
+
+       J2KEncoderBackend (J2KEncoderBackend const&) = delete;
+       J2KEncoderBackend& operator= (J2KEncoderBackend const&) = delete;
+
+       virtual boost::optional<dcp::ArrayData> encode (DCPVideo video) = 0;
+};
+
+
+#endif
diff --git a/src/lib/j2k_encoder_cpu_backend.cc b/src/lib/j2k_encoder_cpu_backend.cc
new file mode 100644 (file)
index 0000000..99963c1
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+    Copyright (C) 2021 Carl Hetherington <cth@carlh.net>
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#include "config.h"
+#include "dcp_video.h"
+#include "dcpomatic_log.h"
+#include "j2k_encoder_cpu_backend.h"
+#include "player_video.h"
+#include "rng.h"
+#include <dcp/openjpeg_image.h>
+#include <dcp/j2k_transcode.h>
+
+#include "i18n.h"
+
+
+using std::shared_ptr;
+using boost::optional;
+#if BOOST_VERSION >= 106100
+using namespace boost::placeholders;
+#endif
+
+
+optional<dcp::ArrayData>
+J2KEncoderCPUBackend::encode (DCPVideo video)
+{
+       try {
+               auto const comment = Config::instance()->dcp_j2k_comment();
+
+               /* This was empirically derived by a user: see #1902 */
+               int const minimum_size = 16384;
+               LOG_GENERAL ("Using minimum frame size %1", minimum_size);
+
+               auto xyz = DCPVideo::convert_to_xyz (video.frame(), boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2));
+               int noise_amount = 2;
+               int pixel_skip = 16;
+               while (true) {
+                       auto enc = dcp::compress_j2k (
+                               xyz,
+                               video.j2k_bandwidth(),
+                               video.frames_per_second(),
+                               video.frame()->eyes() == Eyes::LEFT || video.frame()->eyes() == Eyes::RIGHT,
+                               video.resolution() == Resolution::FOUR_K,
+                               comment.empty() ? "libdcp" : comment
+                       );
+
+                       if (enc.size() >= minimum_size) {
+                               LOG_GENERAL (N_("Frame %1 encoded size was OK (%2)"), video.index(), enc.size());
+                               return enc;
+                       }
+
+                       LOG_GENERAL (N_("Frame %1 encoded size was small (%2); adding noise at level %3 with pixel skip %4"), video.index(), enc.size(), noise_amount, pixel_skip);
+
+                       /* The JPEG2000 is too low-bitrate for some decoders <cough>DSS200</cough> so add some noise
+                        * and try again.  This is slow but hopefully won't happen too often.  We have to do
+                        * convert_to_xyz() again because compress_j2k() corrupts its xyz parameter.
+                        */
+
+                       xyz = DCPVideo::convert_to_xyz (video.frame(), boost::bind(&Log::dcp_log, dcpomatic_log.get(), _1, _2));
+                       auto const size = xyz->size ();
+                       auto const pixels = size.width * size.height;
+                       dcpomatic::RNG rng(42);
+                       for (auto c = 0; c < 3; ++c) {
+                               auto p = xyz->data(c);
+                               auto e = xyz->data(c) + pixels;
+                               while (p < e) {
+                                       *p = std::min(4095, std::max(0, *p + (rng.get() % noise_amount)));
+                                       p += pixel_skip;
+                               }
+                       }
+
+                       if (pixel_skip > 1) {
+                               --pixel_skip;
+                       } else {
+                               ++noise_amount;
+                       }
+                       /* Something's gone badly wrong if this much noise doesn't help */
+                       DCPOMATIC_ASSERT (noise_amount < 16);
+               }
+
+               switch (video.frame()->eyes()) {
+               case Eyes::BOTH:
+                       LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for mono"), video.index());
+                       break;
+               case Eyes::LEFT:
+                       LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for L"), video.index());
+                       break;
+               case Eyes::RIGHT:
+                       LOG_DEBUG_ENCODE (N_("Finished locally-encoded frame %1 for R"), video.index());
+                       break;
+               default:
+                       break;
+               }
+       } catch (std::exception& e) {
+               LOG_ERROR (N_("Local encode failed (%1)"), e.what());
+       }
+
+       return {};
+}
+
+
diff --git a/src/lib/j2k_encoder_cpu_backend.h b/src/lib/j2k_encoder_cpu_backend.h
new file mode 100644 (file)
index 0000000..41ea8d2
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+    Copyright (C) 2021 Carl Hetherington <cth@carlh.net>
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#ifndef DCPOMATIC_J2K_ENCODER_CPU_BACKEND_H
+#define DCPOMATIC_J2K_ENCODER_CPU_BACKEND_H
+
+
+#include "j2k_encoder_backend.h"
+
+
+class J2KEncoderCPUBackend : public J2KEncoderBackend
+{
+public:
+       J2KEncoderCPUBackend () = default;
+       J2KEncoderCPUBackend (J2KEncoderCPUBackend&& other) = default;
+
+       boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+};
+
+
+#endif
diff --git a/src/lib/j2k_encoder_remote_backend.cc b/src/lib/j2k_encoder_remote_backend.cc
new file mode 100644 (file)
index 0000000..8bf1a0c
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+    Copyright (C) 2021 Carl Hetherington <cth@carlh.net>
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#include "config.h"
+#include "cross.h"
+#include "dcp_video.h"
+#include "dcpomatic_log.h"
+#include "dcpomatic_socket.h"
+#include "j2k_encoder_remote_backend.h"
+#include "player_video.h"
+#include "warnings.h"
+#include <dcp/raw_convert.h>
+DCPOMATIC_DISABLE_WARNINGS
+#include <libxml++/libxml++.h>
+DCPOMATIC_ENABLE_WARNINGS
+#include <boost/asio.hpp>
+
+
+#include "i18n.h"
+
+
+using std::make_shared;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using boost::optional;
+using dcp::raw_convert;
+
+
+J2KEncoderRemoteBackend::J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& other)
+       : _server (other._server)
+       , _backoff (other._backoff)
+{
+
+}
+
+
+optional<dcp::ArrayData>
+J2KEncoderRemoteBackend::encode (DCPVideo video)
+{
+       try {
+               boost::asio::io_service io_service;
+               boost::asio::ip::tcp::resolver resolver (io_service);
+               boost::asio::ip::tcp::resolver::query query (_server.host_name(), raw_convert<string>(ENCODE_FRAME_PORT));
+               auto endpoint_iterator = resolver.resolve (query);
+
+               auto socket = make_shared<Socket>(_timeout);
+
+               socket->connect (*endpoint_iterator);
+
+               /* Collect all XML metadata */
+               xmlpp::Document doc;
+               auto root = doc.create_root_node ("EncodingRequest");
+               root->add_child("Version")->add_child_text(raw_convert<string>(SERVER_LINK_VERSION));
+               video.add_metadata (root);
+
+               LOG_DEBUG_ENCODE (N_("Sending frame %1 to remote"), video.index());
+
+               {
+                       Socket::WriteDigestScope ds (socket);
+
+                       /* Send XML metadata */
+                       auto xml = doc.write_to_string ("UTF-8");
+                       socket->write (xml.length() + 1);
+                       socket->write (reinterpret_cast<uint8_t const*>(xml.c_str()), xml.bytes() + 1);
+
+                       /* Send binary data */
+                       LOG_TIMING("start-remote-send thread=%1", thread_id());
+                       video.frame()->write_to_socket(socket);
+               }
+
+               /* Read the response (JPEG2000-encoded data); this blocks until the data
+                  is ready and sent back.
+               */
+               Socket::ReadDigestScope ds (socket);
+               LOG_TIMING("start-remote-encode thread=%1", thread_id());
+               dcp::ArrayData enc(socket->read_uint32());
+               LOG_TIMING("start-remote-receive thread=%1", thread_id());
+               socket->read (enc.data(), enc.size());
+               LOG_TIMING("finish-remote-receive thread=%1", thread_id());
+               if (!ds.check()) {
+                       throw NetworkError ("Checksums do not match");
+               }
+
+               LOG_DEBUG_ENCODE (N_("Finished remotely-encoded frame %1"), video.index());
+
+               _backoff = 0;
+               return enc;
+
+       } catch (std::exception& e) {
+               if (_backoff < 60) {
+                       /* back off more */
+                       _backoff += 10;
+               }
+               LOG_ERROR (
+                       N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
+                       video.index(), _server.host_name(), e.what(), _backoff
+                       );
+               return {};
+       }
+
+}
+
diff --git a/src/lib/j2k_encoder_remote_backend.h b/src/lib/j2k_encoder_remote_backend.h
new file mode 100644 (file)
index 0000000..a8f1aa4
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+    Copyright (C) 2021 Carl Hetherington <cth@carlh.net>
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#ifndef DCPOMATIC_J2K_ENCODER_REMOTE_BACKEND_H
+#define DCPOMATIC_J2K_ENCODER_REMOTE_BACKEND_H
+
+
+#include "encode_server_description.h"
+#include "j2k_encoder_backend.h"
+
+
+class J2KEncoderRemoteBackend : public J2KEncoderBackend
+{
+public:
+       J2KEncoderRemoteBackend (EncodeServerDescription server, int timeout = 30)
+               : _server (server)
+               , _timeout (timeout)
+       {}
+
+       J2KEncoderRemoteBackend (J2KEncoderRemoteBackend&& other);
+
+       boost::optional<dcp::ArrayData> encode (DCPVideo video) override;
+
+private:
+       EncodeServerDescription _server;
+       int _timeout;
+       int _backoff = 0;
+};
+
+
+#endif
+
index d85be9eff2b741adebbb9165d5b3ff01938c16a8..9674fc3d5c5a209702f07156aba71b62c13f83aa 100644 (file)
@@ -437,7 +437,7 @@ try
                        case QueueItem::Type::FULL:
                                LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
                                if (!qi.encoded) {
-                                       qi.encoded.reset (new ArrayData(film()->j2c_path(qi.reel, qi.frame, qi.eyes, false)));
+                                       qi.encoded = make_shared<ArrayData>(film()->j2c_path(qi.reel, qi.frame, qi.eyes, false));
                                }
                                reel.write (qi.encoded, qi.frame, qi.eyes);
                                ++_full_written;
index fd243db689afb0fef910ea73e1d7a42f0803fb06..9d7e1d11b06d410e41a93f6ff03f477c64f60d35 100644 (file)
@@ -122,6 +122,8 @@ sources = """
           image_examiner.cc
           image_filename_sorter.cc
           image_proxy.cc
+          j2k_encoder_cpu_backend.cc
+          j2k_encoder_remote_backend.cc
           j2k_image_proxy.cc
           job.cc
           job_manager.cc
index aaeb4115a7e35f78e3819ddde4e379214705687d..39363cc1eb30a837450ccccf223ceee12aade9f1 100644 (file)
@@ -29,6 +29,8 @@
 #include "lib/filter.h"
 #include "lib/player.h"
 #include "lib/player_video.h"
+#include "lib/j2k_encoder_cpu_backend.h"
+#include "lib/j2k_encoder_remote_backend.h"
 #include "lib/ratio.h"
 #include "lib/util.h"
 #include "lib/video_decoder.h"
@@ -60,37 +62,41 @@ static int frame_count = 0;
 void
 process_video (shared_ptr<PlayerVideo> pvf)
 {
-       auto local = make_shared<DCPVideo>(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K);
-       auto remote = make_shared<DCPVideo>(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K);
+       DCPVideo local(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K);
+       DCPVideo remote(pvf, frame_count, film->video_frame_rate(), 250000000, Resolution::TWO_K);
 
        cout << "Frame " << frame_count << ": ";
        cout.flush ();
 
        ++frame_count;
 
-       auto local_encoded = local->encode_locally ();
-       ArrayData remote_encoded;
+       J2KEncoderCPUBackend cpu_backend;
+       auto local_encoded = cpu_backend.encode(local);
+       optional<ArrayData> remote_encoded;
 
        string remote_error;
+       J2KEncoderRemoteBackend remote_backend(*server);
        try {
-               remote_encoded = remote->encode_remotely (*server);
+               remote_encoded = remote_backend.encode(remote);
        } catch (NetworkError& e) {
                remote_error = e.what ();
        }
 
+       DCPOMATIC_ASSERT (remote_encoded);
+
        if (!remote_error.empty()) {
                cout << "\033[0;31mnetwork problem: " << remote_error << "\033[0m\n";
                return;
        }
 
-       if (local_encoded.size() != remote_encoded.size()) {
+       if (local_encoded->size() != remote_encoded->size()) {
                cout << "\033[0;31msizes differ\033[0m\n";
                return;
        }
 
-       auto p = local_encoded.data();
-       auto q = remote_encoded.data();
-       for (int i = 0; i < local_encoded.size(); ++i) {
+       auto p = local_encoded->data();
+       auto q = remote_encoded->data();
+       for (int i = 0; i < local_encoded->size(); ++i) {
                if (*p++ != *q++) {
                        cout << "\033[0;31mdata differ\033[0m at byte " << i << "\n";
                        return;
index d6fe4b94841546b98b140253416a0d12eb001366..2355c08c09ea98b995ebf278dd7a5f9966ac9960 100644 (file)
@@ -36,6 +36,8 @@
 #include "lib/encode_server_description.h"
 #include "lib/file_log.h"
 #include "lib/image.h"
+#include "lib/j2k_encoder_cpu_backend.h"
+#include "lib/j2k_encoder_remote_backend.h"
 #include "lib/j2k_image_proxy.h"
 #include "lib/player_video.h"
 #include "lib/raw_image_proxy.h"
@@ -55,13 +57,15 @@ using namespace dcpomatic;
 
 
 void
-do_remote_encode (shared_ptr<DCPVideo> frame, EncodeServerDescription description, ArrayData locally_encoded)
+do_remote_encode (DCPVideo frame, EncodeServerDescription description, ArrayData locally_encoded)
 {
-       ArrayData remotely_encoded;
-       BOOST_REQUIRE_NO_THROW (remotely_encoded = frame->encode_remotely (description, 1200));
+       optional<ArrayData> remotely_encoded;
+       J2KEncoderRemoteBackend backend (description, 1200);
+       BOOST_REQUIRE_NO_THROW (remotely_encoded = backend.encode(frame));
+       BOOST_REQUIRE (static_cast<bool>(remotely_encoded));
 
-       BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded.size());
-       BOOST_CHECK_EQUAL (memcmp (locally_encoded.data(), remotely_encoded.data(), locally_encoded.size()), 0);
+       BOOST_REQUIRE_EQUAL (locally_encoded.size(), remotely_encoded->size());
+       BOOST_CHECK_EQUAL (memcmp (locally_encoded.data(), remotely_encoded->data(), locally_encoded.size()), 0);
 }
 
 
@@ -112,7 +116,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb)
 
        pvf->set_text (PositionImage(sub_image, Position<int>(50, 60)));
 
-       auto frame = make_shared<DCPVideo> (
+       DCPVideo frame(
                pvf,
                0,
                24,
@@ -120,7 +124,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb)
                Resolution::TWO_K
                );
 
-       auto locally_encoded = frame->encode_locally ();
+       J2KEncoderCPUBackend cpu;
+       auto locally_encoded = cpu.encode (frame);
+       BOOST_REQUIRE (static_cast<bool>(locally_encoded));
 
        auto server = new EncodeServer (true, 2);
 
@@ -134,7 +140,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb)
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
-               threads.push_back (new thread (boost::bind (do_remote_encode, frame, description, locally_encoded)));
+               threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded)));
        }
 
        for (auto i: threads) {
@@ -195,7 +201,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv)
 
        pvf->set_text (PositionImage(sub_image, Position<int>(50, 60)));
 
-       auto frame = make_shared<DCPVideo>(
+       DCPVideo frame(
                pvf,
                0,
                24,
@@ -203,7 +209,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv)
                Resolution::TWO_K
                );
 
-       auto locally_encoded = frame->encode_locally ();
+       J2KEncoderCPUBackend cpu;
+       auto locally_encoded = cpu.encode (frame);
+       BOOST_REQUIRE (static_cast<bool>(locally_encoded));
 
        auto server = new EncodeServer (true, 2);
 
@@ -217,7 +225,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv)
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
-               threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, locally_encoded)));
+               threads.push_back (new thread(boost::bind(do_remote_encode, frame, description, *locally_encoded)));
        }
 
        for (auto i: threads) {
@@ -263,7 +271,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
                false
                );
 
-       auto raw_frame = make_shared<DCPVideo> (
+       DCPVideo raw_frame(
                raw_pvf,
                0,
                24,
@@ -271,10 +279,12 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
                Resolution::TWO_K
                );
 
-       auto raw_locally_encoded = raw_frame->encode_locally ();
+       J2KEncoderCPUBackend cpu;
+       auto raw_locally_encoded = cpu.encode (raw_frame);
+       BOOST_REQUIRE (static_cast<bool>(raw_locally_encoded));
 
        auto j2k_pvf = std::make_shared<PlayerVideo> (
-               std::make_shared<J2KImageProxy>(raw_locally_encoded, dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE),
+               std::make_shared<J2KImageProxy>(*raw_locally_encoded, dcp::Size(1998, 1080), AV_PIX_FMT_XYZ12LE),
                Crop(),
                optional<double>(),
                dcp::Size(1998, 1080),
@@ -288,7 +298,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
                false
                );
 
-       auto j2k_frame = make_shared<DCPVideo> (
+       DCPVideo j2k_frame(
                j2k_pvf,
                0,
                24,
@@ -296,7 +306,8 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
                Resolution::TWO_K
                );
 
-       auto j2k_locally_encoded = j2k_frame->encode_locally ();
+       auto j2k_locally_encoded = cpu.encode(j2k_frame);
+       BOOST_REQUIRE (static_cast<bool>(j2k_locally_encoded));
 
        auto server = new EncodeServer (true, 2);
 
@@ -310,7 +321,7 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
-               threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, j2k_locally_encoded)));
+               threads.push_back (new thread(boost::bind(do_remote_encode, j2k_frame, description, *j2k_locally_encoded)));
        }
 
        for (auto i: threads) {
index 356013b2be9ce2ec45478afd4d21b5045f91ee9a..8f16f56f171701ef1961e70cdeb36aa25339b87c 100644 (file)
@@ -22,6 +22,7 @@
 #include "lib/dcp_video.h"
 #include "lib/image.h"
 #include "lib/player_video.h"
+#include "lib/j2k_encoder_cpu_backend.h"
 #include "lib/raw_image_proxy.h"
 extern "C" {
 #include <libavutil/pixfmt.h>
@@ -56,9 +57,11 @@ BOOST_AUTO_TEST_CASE (low_bitrate_test)
                false
                );
 
-       auto dcp_video = make_shared<DCPVideo>(frame, 0, 24, 100000000, Resolution::TWO_K);
-       auto j2k = dcp_video->encode_locally();
-       BOOST_REQUIRE (j2k.size() >= 16536);
+       DCPVideo dcp_video(frame, 0, 24, 100000000, Resolution::TWO_K);
+       J2KEncoderCPUBackend cpu;
+       auto j2k = cpu.encode(dcp_video);
+       BOOST_REQUIRE (static_cast<bool>(j2k));
+       BOOST_REQUIRE (j2k->size() >= 16536);
 }
 
 
index 1dfc75bfc024c588c7eef1b2db86cde3db64c4d2..43b0a49454978addeeebb16124ee5a14d14a3634 100644 (file)
@@ -75,15 +75,14 @@ BOOST_AUTO_TEST_CASE (interrupt_writer)
        }
 
        /* Write some data */
-       auto video = dcp::compress_j2k(image, 100000000, 24, false, false);
-       auto video_ptr = make_shared<dcp::ArrayData>(video.data(), video.size());
+       auto video = make_shared<dcp::ArrayData>(dcp::compress_j2k(image, 100000000, 24, false, false));
        auto audio = make_shared<AudioBuffers>(6, 48000 / 24);
 
        auto writer = make_shared<Writer>(film, shared_ptr<Job>());
        writer->start ();
 
        for (int i = 0; i < frames; ++i) {
-               writer->write (video_ptr, i, Eyes::BOTH);
+               writer->write (video, i, Eyes::BOTH);
                writer->write (audio, dcpomatic::DCPTime::from_frames(i, 24));
        }