Add abstraction of io_{context,service} and use it as appropriate.
authorCarl Hetherington <cth@carlh.net>
Mon, 17 Mar 2025 00:25:02 +0000 (01:25 +0100)
committerCarl Hetherington <cth@carlh.net>
Sat, 22 Mar 2025 12:29:58 +0000 (13:29 +0100)
19 files changed:
src/lib/butler.cc
src/lib/butler.h
src/lib/dcpomatic_socket.cc
src/lib/dcpomatic_socket.h
src/lib/encode_server.cc
src/lib/encode_server.h
src/lib/encode_server_finder.cc
src/lib/encode_server_finder.h
src/lib/io_context.cc [new file with mode: 0644]
src/lib/io_context.h [new file with mode: 0644]
src/lib/json_server.cc
src/lib/server.cc
src/lib/server.h
src/lib/signal_manager.h
src/lib/writer.cc
src/lib/wscript
test/disk_writer_test.cc
test/socket_test.cc
wscript

index dd98745878a1aadac6fb8699c6ee4ab8ceabb5bf..d4d5ae0a23a4b636713ce36a88472c65cf369c82 100644 (file)
@@ -74,7 +74,7 @@ Butler::Butler (
        )
        : _film (film)
        , _player (player)
-       , _prepare_work (new boost::asio::io_service::work(_prepare_service))
+       , _prepare_work(dcpomatic::make_work_guard(_prepare_context))
        , _pending_seek_accurate (false)
        , _suspended (0)
        , _finished (false)
@@ -108,7 +108,7 @@ Butler::Butler (
        LOG_TIMING("start-prepare-threads %1", boost::thread::hardware_concurrency() * 2);
 
        for (size_t i = 0; i < boost::thread::hardware_concurrency() * 2; ++i) {
-               _prepare_pool.create_thread (bind (&boost::asio::io_service::run, &_prepare_service));
+               _prepare_pool.create_thread(bind(&dcpomatic::io_context::run, &_prepare_context));
        }
 }
 
@@ -124,7 +124,7 @@ Butler::~Butler ()
 
        _prepare_work.reset ();
        _prepare_pool.join_all ();
-       _prepare_service.stop ();
+       _prepare_context.stop();
 
        _thread.interrupt ();
        try {
@@ -355,7 +355,7 @@ Butler::video (shared_ptr<PlayerVideo> video, DCPTime time)
                return;
        }
 
-       _prepare_service.post (bind(&Butler::prepare, this, weak_ptr<PlayerVideo>(video)));
+       dcpomatic::post(_prepare_context, bind(&Butler::prepare, this, weak_ptr<PlayerVideo>(video)));
 
        _video.put (video, time);
 }
index 6bb0467af1f526ed5670dde6237511c47cc0f8df..82b95220794e6018a8d7a2927d7f3f1b86d7337f 100644 (file)
@@ -27,6 +27,7 @@
 #include "audio_ring_buffers.h"
 #include "change_signaller.h"
 #include "exception_store.h"
+#include "io_context.h"
 #include "text_ring_buffers.h"
 #include "text_type.h"
 #include "video_ring_buffers.h"
@@ -114,8 +115,8 @@ private:
        TextRingBuffers _closed_caption;
 
        boost::thread_group _prepare_pool;
-       boost::asio::io_service _prepare_service;
-       std::shared_ptr<boost::asio::io_service::work> _prepare_work;
+       dcpomatic::io_context _prepare_context;
+       boost::optional<dcpomatic::work_guard> _prepare_work;
 
        /** mutex to protect _pending_seek_position, _pending_seek_accurate, _finished, _died, _stop_thread */
        boost::mutex _mutex;
index f03536cb7fcd6e25bed84a262096391bb4ae0df6..f34d6fb53e627aed6cae6a0623fafb1bb42ca4ef 100644 (file)
@@ -39,8 +39,8 @@ using std::weak_ptr;
 
 /** @param timeout Timeout in seconds */
 Socket::Socket (int timeout)
-       : _deadline (_io_service)
-       , _socket (_io_service)
+       : _deadline(_io_context)
+       , _socket(_io_context)
        , _timeout (timeout)
 {
        _deadline.expires_at (boost::posix_time::pos_infin);
@@ -69,7 +69,7 @@ Socket::connect (boost::asio::ip::tcp::endpoint endpoint)
        boost::system::error_code ec = boost::asio::error::would_block;
        _socket.async_connect (endpoint, boost::lambda::var(ec) = boost::lambda::_1);
        do {
-               _io_service.run_one();
+               _io_context.run_one();
        } while (ec == boost::asio::error::would_block);
 
        if (ec) {
@@ -93,7 +93,7 @@ Socket::connect (boost::asio::ip::tcp::endpoint endpoint)
 void
 Socket::connect(string host_name, int port)
 {
-       boost::asio::ip::tcp::resolver resolver(_io_service);
+       boost::asio::ip::tcp::resolver resolver(_io_context);
        boost::asio::ip::tcp::resolver::query query(host_name, fmt::to_string(port));
        connect(*resolver.resolve(query));
 }
@@ -119,7 +119,7 @@ Socket::write (uint8_t const * data, int size)
        boost::asio::async_write (_socket, boost::asio::buffer (data, size), boost::lambda::var(ec) = boost::lambda::_1);
 
        do {
-               _io_service.run_one ();
+               _io_context.run_one();
        } while (ec == boost::asio::error::would_block);
 
        if (ec) {
@@ -160,7 +160,7 @@ Socket::read (uint8_t* data, int size)
        boost::asio::async_read (_socket, boost::asio::buffer (data, size), boost::lambda::var(ec) = boost::lambda::_1);
 
        do {
-               _io_service.run_one ();
+               _io_context.run_one();
        } while (ec == boost::asio::error::would_block);
 
        if (ec) {
@@ -294,7 +294,7 @@ Socket::set_deadline_from_now(int seconds)
 void
 Socket::run()
 {
-       _io_service.run_one();
+       _io_context.run_one();
 }
 
 void
index ed2d8b728c6148fcd4a2f4704c207e77cfe66b49..37b7ba830103ead17d2d3df97f013fc33304ca34 100644 (file)
 
 */
 
+
 #include "digester.h"
+#include "io_context.h"
 #include <boost/asio.hpp>
 #include <boost/scoped_ptr.hpp>
 
+
 /** @class Socket
  *  @brief A class to wrap a boost::asio::ip::tcp::socket with some things
  *  that are useful for DCP-o-matic.
@@ -93,7 +96,7 @@ private:
        void finish_write_digest ();
        void connect(boost::asio::ip::tcp::endpoint);
 
-       boost::asio::io_service _io_service;
+       dcpomatic::io_context _io_context;
        boost::asio::deadline_timer _deadline;
        boost::asio::ip::tcp::socket _socket;
        int _timeout;
index 9fd330a12e849140aaa2f1a6dc71d4fda6d36b2b..7598ebcf6124141e8e7072629314228974e110da 100644 (file)
@@ -112,7 +112,7 @@ EncodeServer::~EncodeServer ()
                }
        }
 
-       _broadcast.io_service.stop ();
+       _broadcast.io_context.stop();
        try {
                _broadcast.thread.join ();
        } catch (...) {}
@@ -274,7 +274,7 @@ try
 {
        boost::asio::ip::udp::endpoint listen_endpoint(boost::asio::ip::udp::v4(), HELLO_PORT);
 
-       _broadcast.socket = new boost::asio::ip::udp::socket(_broadcast.io_service, listen_endpoint);
+       _broadcast.socket = new boost::asio::ip::udp::socket(_broadcast.io_context, listen_endpoint);
 
        _broadcast.socket->async_receive_from (
                boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
@@ -282,7 +282,7 @@ try
                boost::bind (&EncodeServer::broadcast_received, this)
                );
 
-       _broadcast.io_service.run ();
+       _broadcast.io_context.run();
 }
 catch (...)
 {
index 8059abd0f6d5bfe1c3ab04763ffe62ce5fa97051..2762125831f3fe66667247f770bd8c846bdec7fd 100644 (file)
@@ -85,7 +85,7 @@ private:
                boost::asio::ip::udp::socket* socket;
                char buffer[64];
                boost::asio::ip::udp::endpoint send_endpoint;
-               boost::asio::io_service io_service;
+               dcpomatic::io_context io_context;
 
        } _broadcast;
 };
index 22c1d37dd5cc82da2ecb444d668cadd03ba4dc8d..e9636620c40a4aa471f5ebaa970930e02dae6d69 100644 (file)
@@ -91,7 +91,7 @@ EncodeServerFinder::stop ()
                _search_thread.join();
        } catch (...) {}
 
-       _listen_io_service.stop ();
+       _listen_io_context.stop();
        try {
                _listen_thread.join ();
        } catch (...) {}
@@ -108,8 +108,8 @@ try
        start_of_thread ("EncodeServerFinder-search");
 
        boost::system::error_code error;
-       boost::asio::io_service io_service;
-       boost::asio::ip::udp::socket socket (io_service);
+       dcpomatic::io_context io_context;
+       boost::asio::ip::udp::socket socket(io_context);
        socket.open (boost::asio::ip::udp::v4(), error);
        if (error) {
                throw NetworkError ("failed to set up broadcast socket");
@@ -135,7 +135,7 @@ try
                /* Query our `definite' servers (if there are any) */
                for (auto const& i: Config::instance()->servers()) {
                        try {
-                               boost::asio::ip::udp::resolver resolver (io_service);
+                               boost::asio::ip::udp::resolver resolver(io_context);
                                boost::asio::ip::udp::resolver::query query(i, fmt::to_string(HELLO_PORT));
                                boost::asio::ip::udp::endpoint end_point (*resolver.resolve(query));
                                socket.send_to (boost::asio::buffer(data.c_str(), data.size() + 1), end_point);
@@ -186,14 +186,14 @@ try {
 
        try {
                _listen_acceptor.reset (
-                       new tcp::acceptor (_listen_io_service, tcp::endpoint(tcp::v4(), is_batch_converter ? BATCH_SERVER_PRESENCE_PORT : MAIN_SERVER_PRESENCE_PORT))
+                       new tcp::acceptor(_listen_io_context, tcp::endpoint(tcp::v4(), is_batch_converter ? BATCH_SERVER_PRESENCE_PORT : MAIN_SERVER_PRESENCE_PORT))
                        );
        } catch (...) {
                boost::throw_exception(NetworkError(variant::insert_dcpomatic(_("Could not listen for remote encode servers.  Perhaps another instance of %1 is running."))));
        }
 
        start_accept ();
-       _listen_io_service.run ();
+       _listen_io_context.run();
 }
 catch (...)
 {
index c478387f9bbaaaaaf799d4ca237382decb1b904e..722786b77d8a65de4fb649dcc56cceb4b97c7050 100644 (file)
@@ -80,7 +80,7 @@ private:
        /** Mutex for _servers */
        mutable boost::mutex _servers_mutex;
 
-       boost::asio::io_service _listen_io_service;
+       dcpomatic::io_context _listen_io_context;
        std::shared_ptr<boost::asio::ip::tcp::acceptor> _listen_acceptor;
        bool _stop;
 
diff --git a/src/lib/io_context.cc b/src/lib/io_context.cc
new file mode 100644 (file)
index 0000000..fe70e83
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+    Copyright (C) 2025 Carl Hetherington <cth@carlh.net>
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#include "io_context.h"
+
+
+dcpomatic::work_guard
+dcpomatic::make_work_guard(io_context& context)
+{
+#ifdef DCPOMATIC_HAVE_BOOST_ASIO_IO_CONTEXT
+       return boost::asio::make_work_guard(context);
+#else
+       return boost::asio::io_service::work(context);
+#endif
+}
diff --git a/src/lib/io_context.h b/src/lib/io_context.h
new file mode 100644 (file)
index 0000000..0b56e27
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+    Copyright (C) 2025 Carl Hetherington <cth@carlh.net>
+
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    DCP-o-matic is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#ifndef DCPOMATIC_IO_CONTEXT_H
+#define DCPOMATIC_IO_CONTEXT_H
+
+
+#include <boost/asio.hpp>
+
+
+namespace dcpomatic {
+
+#ifdef DCPOMATIC_HAVE_BOOST_ASIO_IO_CONTEXT
+
+using io_context = boost::asio::io_context;
+using work_guard = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
+
+template <typename T>
+void post(io_context& context, T handler)
+{
+       boost::asio::post(context, handler);
+}
+
+work_guard
+make_work_guard(io_context& context);
+
+#else
+
+using io_context = boost::asio::io_service;
+using work_guard = boost::asio::io_service::work;
+
+template <typename T>
+void post(io_context& context, T handler)
+{
+       context.post(handler);
+}
+
+work_guard
+make_work_guard(io_context& context);
+
+#endif
+
+}
+
+
+#endif
+
index bed563f5a6e75b7f74d35436f2119077ee91dcf6..59aeb2c8eeea4f84552c28360accafb682be54a2 100644 (file)
@@ -69,11 +69,11 @@ void
 JSONServer::run (int port)
 try
 {
-       boost::asio::io_service io_service;
-       tcp::acceptor a (io_service, tcp::endpoint (tcp::v4 (), port));
+       dcpomatic::io_context io_context;
+       tcp::acceptor a(io_context, tcp::endpoint(tcp::v4(), port));
        while (true) {
                try {
-                       auto s = make_shared<tcp::socket>(io_service);
+                       auto s = make_shared<tcp::socket>(io_context);
                        a.accept (*s);
                        handle (s);
                }
index 359234044ef36aca659cb8751e7f7db47727922a..06d02e53830a9c8df165988fffa465fd4defec24 100644 (file)
@@ -32,7 +32,7 @@ using std::shared_ptr;
 
 Server::Server (int port, int timeout)
        : _terminate (false)
-       , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port))
+       , _acceptor(_io_context, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port))
        , _timeout (timeout)
 {
 
@@ -49,7 +49,7 @@ void
 Server::run ()
 {
        start_accept ();
-       _io_service.run ();
+       _io_context.run();
 }
 
 
@@ -94,5 +94,5 @@ Server::stop ()
        if (auto s = _socket.lock()) {
                s->close();
        }
-       _io_service.stop ();
+       _io_context.stop();
 }
index 7e42f6cbdcb48a48795145f7abe910e155d35e25..5025a27605c65f2d261e4884bde0187bf79885f8 100644 (file)
@@ -23,8 +23,9 @@
 #define DCPOMATIC_SERVER_H
 
 
-#include <boost/thread.hpp>
+#include "io_context.h"
 #include <boost/asio.hpp>
+#include <boost/thread.hpp>
 #include <boost/thread/condition.hpp>
 #include <string>
 
@@ -54,7 +55,7 @@ private:
        void start_accept ();
        void handle_accept (std::shared_ptr<Socket>, boost::system::error_code const &);
 
-       boost::asio::io_service _io_service;
+       dcpomatic::io_context _io_context;
        boost::asio::ip::tcp::acceptor _acceptor;
        int _timeout;
        std::weak_ptr<Socket> _socket;
index 99e3b5c52c271277a64fd654d121303eed877505..8138817d3a404639587c4a6be8b1b6823e7deff3 100644 (file)
@@ -24,6 +24,7 @@
 
 
 #include "exception_store.h"
+#include "io_context.h"
 #include <boost/asio.hpp>
 #include <boost/thread.hpp>
 
@@ -39,7 +40,7 @@ class SignalManager : public ExceptionStore
 public:
        /** Create a SignalManager.  Must be called from the UI thread */
        SignalManager ()
-               : _work (_service)
+               : _work(dcpomatic::make_work_guard(_context))
        {
                _ui_thread = boost::this_thread::get_id ();
        }
@@ -52,15 +53,15 @@ public:
        /* Do something next time the UI is idle */
        template <typename T>
        void when_idle (T f) {
-               _service.post (f);
+               dcpomatic::post(_context, f);
        }
 
        /** Call this in the UI when it is idle.
         *  @return Number of idle handlers that were executed.
         */
        size_t ui_idle () {
-               /* This executes one of the functors that has been post()ed to _service */
-               return _service.poll_one ();
+               /* This executes one of the functors that has been post()ed to _context */
+               return _context.poll_one();
        }
 
        /** This should wake the UI and make it call ui_idle() */
@@ -85,18 +86,18 @@ private:
                                store_current ();
                        }
                } else {
-                       /* non-UI thread; post to the service and wake up the UI */
-                       _service.post (f);
+                       /* non-UI thread; post to the context and wake up the UI */
+                       dcpomatic::post(_context, f);
                        wake_ui ();
                }
        }
 
        friend class Signaller;
 
-       /** A io_service which is used as the conduit for messages */
-       boost::asio::io_service _service;
-       /** Object required to keep io_service from stopping when it has nothing to do */
-       boost::asio::io_service::work _work;
+       /** A io_context which is used as the conduit for messages */
+       dcpomatic::io_context _context;
+       /** Object required to keep io_context from stopping when it has nothing to do */
+       dcpomatic::work_guard _work;
        /** The UI thread's ID */
        boost::thread::id _ui_thread;
 };
index 33e4b8f81b33358b73e64958d1a57fb953f97c10..563a86e80eb368ebd2e98c1cbe9ad505a5e93127 100644 (file)
@@ -529,45 +529,43 @@ Writer::calculate_digests ()
                job->sub (_("Computing digests"));
        }
 
-       boost::asio::io_service service;
+       dcpomatic::io_context context;
        boost::thread_group pool;
 
-       auto work = make_shared<boost::asio::io_service::work>(service);
+       {
+               auto work = dcpomatic::make_work_guard(context);
 
-       int const threads = max (1, Config::instance()->master_encoding_threads());
+               int const threads = max (1, Config::instance()->master_encoding_threads());
 
-       for (int i = 0; i < threads; ++i) {
-               pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
-       }
+               for (int i = 0; i < threads; ++i) {
+                       pool.create_thread(boost::bind(&dcpomatic::io_context::run, &context));
+               }
 
-       std::function<void (int, int64_t, int64_t)> set_progress;
-       if (job) {
-               set_progress = boost::bind(&Writer::set_digest_progress, this, job.get(), _1, _2, _3);
-       } else {
-               set_progress = [](int, int64_t, int64_t) {
-                       boost::this_thread::interruption_point();
-               };
-       }
+               std::function<void (int, int64_t, int64_t)> set_progress;
+               if (job) {
+                       set_progress = boost::bind(&Writer::set_digest_progress, this, job.get(), _1, _2, _3);
+               } else {
+                       set_progress = [](int, int64_t, int64_t) {
+                               boost::this_thread::interruption_point();
+                       };
+               }
 
-       int index = 0;
+               int index = 0;
 
-       for (auto& i: _reels) {
-               service.post(
-                       boost::bind(
-                               &ReelWriter::calculate_digests,
-                               &i,
+               for (auto& i: _reels) {
+                       dcpomatic::post(context, boost::bind(
+                                       &ReelWriter::calculate_digests,
+                                       &i,
+                                       std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
+                                       ));
+                       ++index;
+               }
+               dcpomatic::post(context, boost::bind(
+                               &Writer::calculate_referenced_digests,
+                               this,
                                std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
                                ));
-               ++index;
        }
-       service.post(
-               boost::bind(
-                       &Writer::calculate_referenced_digests,
-                       this,
-                       std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
-                       ));
-
-       work.reset ();
 
        try {
                pool.join_all ();
@@ -579,7 +577,7 @@ Writer::calculate_digests ()
                pool.join_all ();
        }
 
-       service.stop ();
+       context.stop();
 }
 
 
index cae78fc4aa2994defbf39dc3ee1245d541cf0dbe..dafd655fe24557b6cf2d53b3b236f18cfbb2b40a 100644 (file)
@@ -144,6 +144,7 @@ sources = """
           image_proxy.cc
           image_store.cc
           internal_player_server.cc
+          io_context.cc
           j2k_image_proxy.cc
           job.cc
           job_manager.cc
index ff39eaaeab6d373d6addfb5de3858e0717f31a39..5004df4d59ad14c5f5330d85e560067e817eba0f 100644 (file)
@@ -21,6 +21,7 @@
 
 #include "lib/cross.h"
 #include "lib/ext.h"
+#include "lib/io_context.h"
 #include "test.h"
 #include <boost/algorithm/string.hpp>
 #include <boost/asio.hpp>
@@ -45,7 +46,7 @@ ext2_ls (vector<string> arguments)
 {
        using namespace boost::process;
 
-       boost::asio::io_service ios;
+       dcpomatic::io_context ios;
        future<string> data;
        child ch (search_path("e2ls"), arguments, std_in.close(), std_out > data, ios);
        ios.run();
@@ -105,7 +106,7 @@ BOOST_AUTO_TEST_CASE (disk_writer_test1)
        BOOST_CHECK_EQUAL (system("/sbin/e2fsck -fn build/test/disk_writer_test1.partition"), 0);
 
        {
-               boost::asio::io_service ios;
+               dcpomatic::io_context ios;
                future<string> data;
                child ch ("/sbin/tune2fs", args({"-l", partition.string()}), std_in.close(), std_out > data, ios);
                ios.run();
index 70df1266b784d5d52832138c7d616a0c834d963c..b85f7b0cb5a4ae619bf4c8ed089ff3da5daae145 100644 (file)
@@ -20,6 +20,7 @@
 
 
 #include "lib/dcpomatic_socket.h"
+#include "lib/io_context.h"
 #include "lib/server.h"
 #include <fmt/format.h>
 #include <boost/test/unit_test.hpp>
diff --git a/wscript b/wscript
index 124bd30aca8ef389e5355497a5fd48bb401f4c4a..b4e165bd21858a5a7522322c539da4999443da8f 100644 (file)
--- a/wscript
+++ b/wscript
@@ -638,6 +638,14 @@ def configure(conf):
                            lib=deps,
                            uselib_store='BOOST_PROCESS')
 
+        conf.check_cxx(fragment="""
+                           #include <boost/asio.hpp>
+                           int main() { boost::asio::io_context context; }
+                           """,
+                       msg='Checking for boost::asio::io_context',
+                       define_name='DCPOMATIC_HAVE_BOOST_ASIO_IO_CONTEXT',
+                       mandatory=False)
+
     # sqlite3
     conf.check_cfg(package="sqlite3", args='--cflags --libs', uselib_store='SQLITE3', mandatory=True)
     conf.check_cxx(fragment="""