From 59602b67d0637817a156b7bd0fc05f96fe41dee5 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Wed, 6 Nov 2013 16:43:01 +0000 Subject: [PATCH] Various bits of server tidying up. --- src/lib/encoder.cc | 33 +++--------- src/lib/encoder.h | 4 +- src/lib/server.cc | 7 ++- src/lib/server.h | 3 +- src/lib/server_finder.cc | 83 ++++++++++++++++--------------- src/lib/server_finder.h | 16 ++++-- src/tools/dcpomatic_server_cli.cc | 10 +++- src/wx/servers_list_dialog.cc | 12 +---- src/wx/servers_list_dialog.h | 2 - 9 files changed, 82 insertions(+), 88 deletions(-) diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index 2ec32deb7..ccaeab18c 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -79,9 +79,7 @@ void Encoder::add_worker_threads (ServerDescription d) { for (int i = 0; i < d.threads(); ++i) { - _threads.push_back ( - make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d))) - ); + _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d))); } } @@ -89,12 +87,7 @@ void Encoder::process_begin () { for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { - _threads.push_back ( - make_pair ( - optional (), - new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional ())) - ) - ); + _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional ()))); } vector servers = Config::instance()->servers (); @@ -104,8 +97,7 @@ Encoder::process_begin () } _writer.reset (new Writer (_film, _job)); - _server_finder.reset (new ServerFinder ()); - _server_finder->ServerFound.connect (boost::bind (&Encoder::server_found, this, _1)); + ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1)); } @@ -257,11 +249,11 @@ Encoder::terminate_threads () _condition.notify_all (); } - for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) { - if (i->second->joinable ()) { - i->second->join (); + for (list::iterator i = _threads.begin(); i != _threads.end(); ++i) { + if ((*i)->joinable ()) { + (*i)->join (); } - delete i->second; + delete *i; } _threads.clear (); @@ -354,14 +346,5 @@ Encoder::encoder_thread (optional server) void Encoder::server_found (ServerDescription s) { - /* See if we already know about this server */ - boost::mutex::scoped_lock lm (_mutex); - ThreadList::iterator i = _threads.begin(); - while (i != _threads.end() && (!i->first || i->first.get().host_name() != s.host_name())) { - ++i; - } - - if (i == _threads.end ()) { - add_worker_threads (s); - } + add_worker_threads (s); } diff --git a/src/lib/encoder.h b/src/lib/encoder.h index 902dae2d5..9875a179b 100644 --- a/src/lib/encoder.h +++ b/src/lib/encoder.h @@ -107,13 +107,11 @@ private: bool _have_a_real_frame[EYES_COUNT]; bool _terminate; std::list > _queue; - typedef std::list, boost::thread *> > ThreadList; - ThreadList _threads; + std::list _threads; mutable boost::mutex _mutex; boost::condition _condition; boost::shared_ptr _writer; - boost::shared_ptr _server_finder; }; #endif diff --git a/src/lib/server.cc b/src/lib/server.cc index bad7ad893..2930e3c4b 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -209,7 +209,7 @@ Server::run (int num_threads) { _log->log (String::compose ("Server starting with %1 threads", num_threads)); if (_verbose) { - cout << "DCP-o-matic server started with " << num_threads << " threads.\n"; + cout << "DCP-o-matic server starting with " << num_threads << " threads.\n"; } for (int i = 0; i < num_threads; ++i) { @@ -243,6 +243,7 @@ Server::run (int num_threads) void Server::broadcast_thread () +try { boost::asio::io_service io_service; @@ -261,6 +262,10 @@ Server::broadcast_thread () io_service.run (); } +catch (...) +{ + store_current (); +} void Server::broadcast_received () diff --git a/src/lib/server.h b/src/lib/server.h index 68de3c2f0..9be47bd94 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -32,6 +32,7 @@ #include #include #include "log.h" +#include "exceptions.h" class Socket; @@ -91,7 +92,7 @@ private: int _threads; }; -class Server : public boost::noncopyable +class Server : public ExceptionStore, public boost::noncopyable { public: Server (boost::shared_ptr log, bool verbose); diff --git a/src/lib/server_finder.cc b/src/lib/server_finder.cc index c0b554eee..56f52b7fc 100644 --- a/src/lib/server_finder.cc +++ b/src/lib/server_finder.cc @@ -27,36 +27,20 @@ using std::string; using std::stringstream; +using std::list; using boost::shared_ptr; using boost::scoped_array; +ServerFinder* ServerFinder::_instance = 0; + ServerFinder::ServerFinder () : _broadcast_thread (0) , _listen_thread (0) - , _terminate (false) { _broadcast_thread = new boost::thread (boost::bind (&ServerFinder::broadcast_thread, this)); _listen_thread = new boost::thread (boost::bind (&ServerFinder::listen_thread, this)); } -ServerFinder::~ServerFinder () -{ - { - boost::mutex::scoped_lock lm (_mutex); - _terminate = true; - } - - if (_broadcast_thread && _broadcast_thread->joinable ()) { - _broadcast_thread->join (); - } - delete _broadcast_thread; - - if (_listen_thread && _listen_thread->joinable ()) { - _listen_thread->join (); - } - delete _listen_thread; -} - void ServerFinder::broadcast_thread () { @@ -74,16 +58,8 @@ ServerFinder::broadcast_thread () boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1); while (1) { - boost::mutex::scoped_lock lm (_mutex); - if (_terminate) { - socket.close (error); - return; - } - - string data = DCPOMATIC_HELLO; + string const data = DCPOMATIC_HELLO; socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point); - - lm.unlock (); dcpomatic_sleep (10); } } @@ -92,14 +68,6 @@ void ServerFinder::listen_thread () { while (1) { - { - /* See if we need to stop */ - boost::mutex::scoped_lock lm (_mutex); - if (_terminate) { - return; - } - } - shared_ptr sock (new Socket (10)); try { @@ -116,9 +84,44 @@ ServerFinder::listen_thread () shared_ptr xml (new cxml::Document ("ServerAvailable")); xml->read_stream (s); - ui_signaller->emit (boost::bind (boost::ref (ServerFound), ServerDescription ( - sock->socket().remote_endpoint().address().to_string (), - xml->number_child ("Threads") - ))); + boost::mutex::scoped_lock lm (_mutex); + + string const ip = sock->socket().remote_endpoint().address().to_string (); + list::const_iterator i = _servers.begin(); + while (i != _servers.end() && i->host_name() != ip) { + ++i; + } + + if (i == _servers.end ()) { + ServerDescription sd (ip, xml->number_child ("Threads")); + _servers.push_back (sd); + ui_signaller->emit (boost::bind (boost::ref (ServerFound), sd)); + } } } + +void +ServerFinder::connect (boost::function fn) +{ + boost::mutex::scoped_lock lm (_mutex); + + /* Emit the current list of servers */ + for (list::iterator i = _servers.begin(); i != _servers.end(); ++i) { + fn (*i); + } + + ServerFound.connect (fn); +} + +ServerFinder* +ServerFinder::instance () +{ + if (!_instance) { + _instance = new ServerFinder (); + } + + return _instance; +} + + + diff --git a/src/lib/server_finder.h b/src/lib/server_finder.h index 8b56022de..04a7786cf 100644 --- a/src/lib/server_finder.h +++ b/src/lib/server_finder.h @@ -23,19 +23,25 @@ class ServerFinder { public: - ServerFinder (); - ~ServerFinder (); + void connect (boost::function); - boost::signals2::signal ServerFound; + static ServerFinder* instance (); private: + ServerFinder (); + void broadcast_thread (); void listen_thread (); + + boost::signals2::signal ServerFound; - /** A thread to periodically issue broadcasts to find encoding servers */ + /** Thread to periodically issue broadcasts to find encoding servers */ boost::thread* _broadcast_thread; + /** Thread to listen to the responses from servers */ boost::thread* _listen_thread; - bool _terminate; + std::list _servers; boost::mutex _mutex; + + static ServerFinder* _instance; }; diff --git a/src/tools/dcpomatic_server_cli.cc b/src/tools/dcpomatic_server_cli.cc index e9540ff70..1ec985b4b 100644 --- a/src/tools/dcpomatic_server_cli.cc +++ b/src/tools/dcpomatic_server_cli.cc @@ -98,6 +98,14 @@ main (int argc, char* argv[]) Scaler::setup_scalers (); shared_ptr log (new FileLog ("dcpomatic_server_cli.log")); Server server (log, verbose); - server.run (num_threads); + try { + server.run (num_threads); + } catch (boost::system::system_error e) { + if (e.code() == boost::system::errc::address_in_use) { + cerr << argv[0] << ": address already in use. Is another DCP-o-matic server instance already running?\n"; + exit (EXIT_FAILURE); + } + cerr << argv[0] << ": " << e.what() << "\n"; + } return 0; } diff --git a/src/wx/servers_list_dialog.cc b/src/wx/servers_list_dialog.cc index 49d91fca4..be69a14ed 100644 --- a/src/wx/servers_list_dialog.cc +++ b/src/wx/servers_list_dialog.cc @@ -18,6 +18,7 @@ */ #include +#include "lib/server_finder.h" #include "servers_list_dialog.h" #include "wx_util.h" @@ -60,21 +61,12 @@ ServersListDialog::ServersListDialog (wxWindow* parent) s->Layout (); s->SetSizeHints (this); - _server_finder.ServerFound.connect (boost::bind (&ServersListDialog::server_found, this, _1)); + ServerFinder::instance()->connect (boost::bind (&ServersListDialog::server_found, this, _1)); } void ServersListDialog::server_found (ServerDescription s) { - list::const_iterator i = _servers.begin(); - while (i != _servers.end() && i->host_name() != s.host_name()) { - ++i; - } - - if (i != _servers.end ()) { - return; - } - wxListItem list_item; int const n = _list->GetItemCount (); list_item.SetId (n); diff --git a/src/wx/servers_list_dialog.h b/src/wx/servers_list_dialog.h index 0662a141d..63f1cd6a9 100644 --- a/src/wx/servers_list_dialog.h +++ b/src/wx/servers_list_dialog.h @@ -19,7 +19,6 @@ #include #include -#include "lib/server_finder.h" class ServersListDialog : public wxDialog { @@ -29,7 +28,6 @@ public: private: void server_found (ServerDescription); - ServerFinder _server_finder; std::list _servers; wxListCtrl* _list; }; -- 2.30.2