X-Git-Url: https://git.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fencode_server_finder.cc;h=e01019a8c92f9c1747bef9d6bba7d0f77fa1d19a;hb=d83a98096b8675905d78c6fd4e6af1091c2f663e;hp=06a6a396b1ae75161d6cbe940b585ae18b335083;hpb=6dd6676700f830547e9e7c38781f09de5f2a1a00;p=dcpomatic.git diff --git a/src/lib/encode_server_finder.cc b/src/lib/encode_server_finder.cc index 06a6a396b..e01019a8c 100644 --- a/src/lib/encode_server_finder.cc +++ b/src/lib/encode_server_finder.cc @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Carl Hetherington + Copyright (C) 2013-2021 Carl Hetherington This file is part of DCP-o-matic. @@ -18,6 +18,7 @@ */ + #include "encode_server_finder.h" #include "exceptions.h" #include "util.h" @@ -27,39 +28,46 @@ #include "dcpomatic_socket.h" #include #include +#include #include #include #include "i18n.h" -using std::string; + +using std::cout; using std::list; +using std::make_shared; +using std::shared_ptr; +using std::string; using std::vector; -using std::cout; -using boost::shared_ptr; -using boost::scoped_array; -using boost::weak_ptr; +using std::weak_ptr; using boost::optional; +using boost::scoped_array; +#if BOOST_VERSION >= 106100 +using namespace boost::placeholders; +#endif using dcp::raw_convert; + EncodeServerFinder* EncodeServerFinder::_instance = 0; + EncodeServerFinder::EncodeServerFinder () - : _search_thread (0) - , _listen_thread (0) - , _stop (false) + : _stop (false) { Config::instance()->Changed.connect (boost::bind (&EncodeServerFinder::config_changed, this, _1)); } + void EncodeServerFinder::start () { - _search_thread = new boost::thread (boost::bind (&EncodeServerFinder::search_thread, this)); - _listen_thread = new boost::thread (boost::bind (&EncodeServerFinder::listen_thread, this)); + _search_thread = boost::thread (boost::bind(&EncodeServerFinder::search_thread, this)); + _listen_thread = boost::thread (boost::bind(&EncodeServerFinder::listen_thread, this)); #ifdef DCPOMATIC_LINUX - pthread_setname_np (_search_thread->native_handle(), "encode-server-search"); - pthread_setname_np (_listen_thread->native_handle(), "encode-server-listen"); + pthread_setname_np (_search_thread.native_handle(), "encode-server-search"); + pthread_setname_np (_listen_thread.native_handle(), "encode-server-listen"); #endif } @@ -69,63 +77,35 @@ EncodeServerFinder::~EncodeServerFinder () stop (); } + void EncodeServerFinder::stop () { + boost::this_thread::disable_interruption dis; + _stop = true; _search_condition.notify_all (); - if (_search_thread) { - /* Ideally this would be a DCPOMATIC_ASSERT(_search_thread->joinable()) but we - can't throw exceptions from a destructor. - */ - if (_search_thread->joinable ()) { - _search_thread->join (); - } - } - delete _search_thread; - _search_thread = 0; + try { + _search_thread.join(); + } catch (...) {} _listen_io_service.stop (); - if (_listen_thread) { - /* Ideally this would be a DCPOMATIC_ASSERT(_listen_thread->joinable()) but we - can't throw exceptions from a destructor. - */ - if (_listen_thread->joinable ()) { - _listen_thread->join (); - } - } - delete _listen_thread; - _listen_thread = 0; + try { + _listen_thread.join (); + } catch (...) {} boost::mutex::scoped_lock lm (_servers_mutex); - _good_servers.clear (); - _bad_servers.clear (); + _servers.clear (); } -static bool -remove_missing (list& servers, int since) -{ - bool removed = false; - list::iterator i = servers.begin(); - while (i != servers.end()) { - if (i->last_seen_seconds() > since) { - list::iterator j = i; - ++j; - servers.erase (i); - i = j; - removed = true; - } else { - ++i; - } - } - return removed; -} void EncodeServerFinder::search_thread () try { + start_of_thread ("EncodeServerFinder-search"); + boost::system::error_code error; boost::asio::io_service io_service; boost::asio::ip::udp::socket socket (io_service); @@ -134,52 +114,60 @@ try throw NetworkError ("failed to set up broadcast socket"); } - socket.set_option (boost::asio::ip::udp::socket::reuse_address (true)); - socket.set_option (boost::asio::socket_base::broadcast (true)); + socket.set_option (boost::asio::ip::udp::socket::reuse_address(true)); + socket.set_option (boost::asio::socket_base::broadcast(true)); string const data = DCPOMATIC_HELLO; int const interval = 10; while (!_stop) { - if (Config::instance()->use_any_servers ()) { + if (Config::instance()->use_any_servers()) { /* Broadcast to look for servers */ try { boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), HELLO_PORT); - socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point); + socket.send_to (boost::asio::buffer(data.c_str(), data.size() + 1), end_point); } catch (...) { } } /* Query our `definite' servers (if there are any) */ - vector servers = Config::instance()->servers (); - for (vector::const_iterator i = servers.begin(); i != servers.end(); ++i) { - if (server_found (*i)) { - /* Don't bother asking a server that we already know about */ - continue; - } + for (auto const& i: Config::instance()->servers()) { try { boost::asio::ip::udp::resolver resolver (io_service); - boost::asio::ip::udp::resolver::query query (*i, raw_convert (HELLO_PORT)); - boost::asio::ip::udp::endpoint end_point (*resolver.resolve (query)); - socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point); + boost::asio::ip::udp::resolver::query query (i, raw_convert(HELLO_PORT)); + boost::asio::ip::udp::endpoint end_point (*resolver.resolve(query)); + socket.send_to (boost::asio::buffer(data.c_str(), data.size() + 1), end_point); } catch (...) { } } /* Discard servers that we haven't seen for a while */ + bool removed = false; { boost::mutex::scoped_lock lm (_servers_mutex); - bool g = remove_missing(_good_servers, 2 * interval); - bool b = remove_missing(_bad_servers, 2 * interval); - if (g || b) { - emit (boost::bind (boost::ref (ServersListChanged))); + + auto i = _servers.begin(); + while (i != _servers.end()) { + if (i->last_seen_seconds() > 2 * interval) { + auto j = i; + ++j; + _servers.erase (i); + i = j; + removed = true; + } else { + ++i; + } } } + if (removed) { + emit (boost::bind(boost::ref(ServersListChanged))); + } + boost::mutex::scoped_lock lm (_search_condition_mutex); - _search_condition.timed_wait (lm, boost::get_system_time() + boost::posix_time::seconds (interval)); + _search_condition.timed_wait (lm, boost::get_system_time() + boost::posix_time::seconds(interval)); } } catch (...) @@ -187,14 +175,17 @@ catch (...) store_current (); } + void EncodeServerFinder::listen_thread () try { + start_of_thread ("EncodeServerFinder-listen"); + using namespace boost::asio::ip; try { _listen_acceptor.reset ( - new tcp::acceptor (_listen_io_service, tcp::endpoint (tcp::v4(), is_batch_converter ? BATCH_SERVER_PRESENCE_PORT : MAIN_SERVER_PRESENCE_PORT)) + new tcp::acceptor (_listen_io_service, tcp::endpoint(tcp::v4(), is_batch_converter ? BATCH_SERVER_PRESENCE_PORT : MAIN_SERVER_PRESENCE_PORT)) ); } catch (...) { boost::throw_exception (NetworkError (_("Could not listen for remote encode servers. Perhaps another instance of DCP-o-matic is running."))); @@ -208,79 +199,72 @@ catch (...) store_current (); } + void EncodeServerFinder::start_accept () { - shared_ptr socket (new Socket ()); + _accept_socket = make_shared(); + _listen_acceptor->async_accept ( - socket->socket(), - boost::bind (&EncodeServerFinder::handle_accept, this, boost::asio::placeholders::error, socket) + _accept_socket->socket(), + boost::bind(&EncodeServerFinder::handle_accept, this, boost::asio::placeholders::error) ); } + void -EncodeServerFinder::handle_accept (boost::system::error_code ec, shared_ptr socket) +EncodeServerFinder::handle_accept (boost::system::error_code ec) { if (ec) { start_accept (); return; } - uint32_t length; - socket->read (reinterpret_cast (&length), sizeof (uint32_t)); - length = ntohl (length); + string server_available; - scoped_array buffer (new char[length]); - socket->read (reinterpret_cast (buffer.get()), length); - - string s (buffer.get()); - shared_ptr xml (new cxml::Document ("ServerAvailable")); - xml->read_string (s); - - string const ip = socket->socket().remote_endpoint().address().to_string (); - optional::iterator> found = server_found (ip); - if (found) { - (*found)->set_seen (); - } else { - EncodeServerDescription sd (ip, xml->number_child("Threads"), xml->optional_number_child("Version").get_value_or(0)); - if (sd.link_version() == SERVER_LINK_VERSION) { - boost::mutex::scoped_lock lm (_servers_mutex); - _good_servers.push_back (sd); - } else { - boost::mutex::scoped_lock lm (_servers_mutex); - _bad_servers.push_back (sd); - } - emit (boost::bind (boost::ref (ServersListChanged))); + try { + uint32_t length; + _accept_socket->read (reinterpret_cast(&length), sizeof(uint32_t)); + length = ntohl (length); + + scoped_array buffer(new char[length]); + _accept_socket->read (reinterpret_cast(buffer.get()), length); + server_available = buffer.get(); + } catch (NetworkError&) { + /* Maybe the server went away; let's just try again */ + start_accept(); + return; } - start_accept (); -} + auto xml = make_shared("ServerAvailable"); + xml->read_string(server_available); -optional::iterator> -EncodeServerFinder::server_found (string ip) -{ - boost::mutex::scoped_lock lm (_servers_mutex); - list::iterator i = _good_servers.begin(); - while (i != _good_servers.end() && i->host_name() != ip) { - ++i; - } - - if (i != _good_servers.end()) { - return i; - } + auto const ip = _accept_socket->socket().remote_endpoint().address().to_string(); + bool changed = false; + { + boost::mutex::scoped_lock lm (_servers_mutex); + auto i = _servers.begin(); + while (i != _servers.end() && i->host_name() != ip) { + ++i; + } - i = _bad_servers.begin(); - while (i != _bad_servers.end() && i->host_name() != ip) { - ++i; + if (i != _servers.end()) { + i->set_seen(); + } else { + EncodeServerDescription sd (ip, xml->number_child("Threads"), xml->optional_number_child("Version").get_value_or(0)); + _servers.push_back (sd); + changed = true; + } } - if (i != _bad_servers.end()) { - return i; + if (changed) { + emit (boost::bind(boost::ref (ServersListChanged))); } - return optional::iterator>(); + start_accept (); } + EncodeServerFinder* EncodeServerFinder::instance () { @@ -292,35 +276,30 @@ EncodeServerFinder::instance () return _instance; } + void EncodeServerFinder::drop () { delete _instance; - _instance = 0; + _instance = nullptr; } -list -EncodeServerFinder::good_servers () const -{ - boost::mutex::scoped_lock lm (_servers_mutex); - return _good_servers; -} list -EncodeServerFinder::bad_servers () const +EncodeServerFinder::servers () const { boost::mutex::scoped_lock lm (_servers_mutex); - return _bad_servers; + return _servers; } + void EncodeServerFinder::config_changed (Config::Property what) { if (what == Config::USE_ANY_SERVERS || what == Config::SERVERS) { { boost::mutex::scoped_lock lm (_servers_mutex); - _good_servers.clear (); - _bad_servers.clear (); + _servers.clear (); } ServersListChanged (); _search_condition.notify_all ();