diff options
| author | Carl Hetherington <cth@carlh.net> | 2013-11-05 22:43:34 +0000 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2013-11-05 22:43:34 +0000 |
| commit | f660fe3f1be97f373318806a77b3ce3fcd53cb73 (patch) | |
| tree | 3d862060843ad00aae4c23008912ba7d21a38ca2 /src | |
| parent | 5698918140d640b3477634504a83da0d8d71a1cf (diff) | |
Various work on server discovery; works on localhost.
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/config.cc | 13 | ||||
| -rw-r--r-- | src/lib/config.h | 16 | ||||
| -rw-r--r-- | src/lib/cross.cc | 1 | ||||
| -rw-r--r-- | src/lib/dcp_video_frame.cc | 2 | ||||
| -rw-r--r-- | src/lib/encoder.cc | 134 | ||||
| -rw-r--r-- | src/lib/encoder.h | 11 | ||||
| -rw-r--r-- | src/lib/server.cc | 37 | ||||
| -rw-r--r-- | src/lib/util.cc | 41 | ||||
| -rw-r--r-- | src/lib/util.h | 5 |
9 files changed, 226 insertions, 34 deletions
diff --git a/src/lib/config.cc b/src/lib/config.cc index 0c6aed4a8..02feecce8 100644 --- a/src/lib/config.cc +++ b/src/lib/config.cc @@ -54,7 +54,7 @@ Config* Config::_instance = 0; /** Construct default configuration */ Config::Config () : _num_local_encoding_threads (max (2U, boost::thread::hardware_concurrency())) - , _server_port (6192) + , _server_port_base (6192) , _tms_path (".") , _sound_processor (SoundProcessor::from_id (N_("dolby_cp750"))) , _default_still_length (10) @@ -95,7 +95,12 @@ Config::read () _num_local_encoding_threads = f.number_child<int> ("NumLocalEncodingThreads"); _default_directory = f.string_child ("DefaultDirectory"); - _server_port = f.number_child<int> ("ServerPort"); + + boost::optional<int> b = f.optional_number_child<int> ("ServerPort"); + if (!b) { + b = f.optional_number_child<int> ("ServerPortBase"); + } + _server_port_base = b.get (); list<shared_ptr<cxml::Node> > servers = f.node_children ("Server"); for (list<shared_ptr<cxml::Node> >::iterator i = servers.begin(); i != servers.end(); ++i) { @@ -191,7 +196,7 @@ Config::read_old_metadata () } else if (k == N_("default_directory")) { _default_directory = v; } else if (k == N_("server_port")) { - _server_port = atoi (v.c_str ()); + _server_port_base = atoi (v.c_str ()); } else if (k == N_("server")) { optional<ServerDescription> server = ServerDescription::create_from_metadata (v); if (server) { @@ -287,7 +292,7 @@ Config::write () const root->add_child("Version")->add_child_text ("1"); root->add_child("NumLocalEncodingThreads")->add_child_text (lexical_cast<string> (_num_local_encoding_threads)); root->add_child("DefaultDirectory")->add_child_text (_default_directory.string ()); - root->add_child("ServerPort")->add_child_text (lexical_cast<string> (_server_port)); + root->add_child("ServerPortBase")->add_child_text (lexical_cast<string> (_server_port_base)); for (vector<ServerDescription>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) { i->as_xml (root->add_child ("Server")); diff --git a/src/lib/config.h b/src/lib/config.h index 7dd5abd17..07b3d6891 100644 --- a/src/lib/config.h +++ b/src/lib/config.h @@ -59,9 +59,9 @@ public: boost::filesystem::path default_directory_or (boost::filesystem::path a) const; - /** @return port to use for J2K encoding servers */ - int server_port () const { - return _server_port; + /** @return base port number to use for J2K encoding servers */ + int server_port_base () const { + return _server_port_base; } /** @return J2K encoding servers to use */ @@ -156,8 +156,8 @@ public: } /** @param p New server port */ - void set_server_port (int p) { - _server_port = p; + void set_server_port_base (int p) { + _server_port_base = p; } /** @param s New list of servers */ @@ -270,8 +270,10 @@ private: int _num_local_encoding_threads; /** default directory to put new films in */ boost::filesystem::path _default_directory; - /** port to use for J2K encoding servers */ - int _server_port; + /** base port number to use for J2K encoding servers; + * this port and the one above it will be used. + */ + int _server_port_base; /** J2K encoding servers to use */ std::vector<ServerDescription> _servers; diff --git a/src/lib/cross.cc b/src/lib/cross.cc index 94edb688b..45c38da2b 100644 --- a/src/lib/cross.cc +++ b/src/lib/cross.cc @@ -51,6 +51,7 @@ using std::wstring; using std::make_pair; using boost::shared_ptr; +/** @param s Number of seconds to sleep for */ void dcpomatic_sleep (int s) { diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index 464cf672f..25abd6f0d 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -256,7 +256,7 @@ DCPVideoFrame::encode_remotely (ServerDescription serv) { boost::asio::io_service io_service; boost::asio::ip::tcp::resolver resolver (io_service); - boost::asio::ip::tcp::resolver::query query (serv.host_name(), boost::lexical_cast<string> (Config::instance()->server_port ())); + boost::asio::ip::tcp::resolver::query query (serv.host_name(), boost::lexical_cast<string> (Config::instance()->server_port_base ())); boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query); shared_ptr<Socket> socket (new Socket); diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index 924a91439..f1d2375b6 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -22,6 +22,8 @@ */ #include <iostream> +#include <boost/lambda/lambda.hpp> +#include <libcxml/cxml.h> #include "encoder.h" #include "util.h" #include "film.h" @@ -44,6 +46,7 @@ using std::min; using std::make_pair; using boost::shared_ptr; using boost::optional; +using boost::scoped_array; int const Encoder::_history_size = 25; @@ -53,6 +56,8 @@ Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j) , _job (j) , _video_frames_out (0) , _terminate (false) + , _broadcast_thread (0) + , _listen_thread (0) { _have_a_real_frame[EYES_BOTH] = false; _have_a_real_frame[EYES_LEFT] = false; @@ -67,21 +72,41 @@ Encoder::~Encoder () } } +/** Add a worker thread for a remote server. Caller must hold + * a lock on _mutex, or know that one is not currently required to + * safely modify _threads. + */ +void +Encoder::add_worker_thread (ServerDescription d) +{ + _threads.push_back ( + make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d))) + ); +} + void Encoder::process_begin () { for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { - _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ()))); + _threads.push_back ( + make_pair ( + optional<ServerDescription> (), + new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())) + ) + ); } vector<ServerDescription> servers = Config::instance()->servers (); for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) { for (int j = 0; j < i->threads (); ++j) { - _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i))); + add_worker_thread (*i); } } + _broadcast_thread = new boost::thread (boost::bind (&Encoder::broadcast_thread, this)); + _listen_thread = new boost::thread (boost::bind (&Encoder::listen_thread, this)); + _writer.reset (new Writer (_film, _job)); } @@ -228,19 +253,30 @@ Encoder::process_audio (shared_ptr<const AudioBuffers> data) void Encoder::terminate_threads () { - boost::mutex::scoped_lock lock (_mutex); - _terminate = true; - _condition.notify_all (); - lock.unlock (); + { + boost::mutex::scoped_lock lock (_mutex); + _terminate = true; + _condition.notify_all (); + } - for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) { - if ((*i)->joinable ()) { - (*i)->join (); + for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) { + if (i->second->joinable ()) { + i->second->join (); } - delete *i; + delete i->second; } _threads.clear (); + + if (_broadcast_thread && _broadcast_thread->joinable ()) { + _broadcast_thread->join (); + } + delete _broadcast_thread; + + if (_listen_thread && _listen_thread->joinable ()) { + _listen_thread->join (); + } + delete _listen_thread; } void @@ -326,3 +362,81 @@ Encoder::encoder_thread (optional<ServerDescription> server) _condition.notify_all (); } } + +void +Encoder::broadcast_thread () +{ + boost::system::error_code error; + boost::asio::io_service io_service; + boost::asio::ip::udp::socket socket (io_service); + socket.open (boost::asio::ip::udp::v4(), error); + if (error) { + throw NetworkError ("failed to set up broadcast socket"); + } + + socket.set_option (boost::asio::ip::udp::socket::reuse_address (true)); + socket.set_option (boost::asio::socket_base::broadcast (true)); + + boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1); + + while (1) { + boost::mutex::scoped_lock lm (_mutex); + if (_terminate) { + socket.close (error); + return; + } + + string data = DCPOMATIC_HELLO; + socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point); + + lm.unlock (); + dcpomatic_sleep (10); + } +} + +void +Encoder::listen_thread () +{ + while (1) { + { + /* See if we need to stop */ + boost::mutex::scoped_lock lm (_mutex); + if (_terminate) { + return; + } + } + + shared_ptr<Socket> sock (new Socket (10)); + + try { + sock->accept (Config::instance()->server_port_base() + 1); + } catch (std::exception& e) { + continue; + } + + uint32_t length = sock->read_uint32 (); + scoped_array<char> buffer (new char[length]); + sock->read (reinterpret_cast<uint8_t*> (buffer.get()), length); + + stringstream s (buffer.get()); + shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable")); + xml->read_stream (s); + + { + /* See if we already know about this server */ + string const ip = sock->socket().remote_endpoint().address().to_string (); + boost::mutex::scoped_lock lm (_mutex); + ThreadList::iterator i = _threads.begin(); + while (i != _threads.end() && (!i->first || i->first->host_name() != ip)) { + ++i; + } + + if (i == _threads.end ()) { + cout << "Adding a thread for " << ip << "\n"; + add_worker_thread (ServerDescription (ip, xml->number_child<int> ("Threads"))); + } else { + cout << "Already know about " << ip << "\n"; + } + } + } +} diff --git a/src/lib/encoder.h b/src/lib/encoder.h index ab3f40762..e799c8469 100644 --- a/src/lib/encoder.h +++ b/src/lib/encoder.h @@ -36,6 +36,7 @@ extern "C" { #include <libswresample/swresample.h> } #include "util.h" +#include "config.h" class Image; class AudioBuffers; @@ -83,6 +84,9 @@ private: void encoder_thread (boost::optional<ServerDescription>); void terminate_threads (); + void broadcast_thread (); + void listen_thread (); + void add_worker_thread (ServerDescription); /** Film that we are encoding */ boost::shared_ptr<const Film> _film; @@ -103,11 +107,16 @@ private: bool _have_a_real_frame[EYES_COUNT]; bool _terminate; std::list<boost::shared_ptr<DCPVideoFrame> > _queue; - std::list<boost::thread *> _threads; + typedef std::list<std::pair<boost::optional<ServerDescription>, boost::thread *> > ThreadList; + ThreadList _threads; mutable boost::mutex _mutex; boost::condition _condition; boost::shared_ptr<Writer> _writer; + + /** A thread to periodically issue broadcasts to find encoding servers */ + boost::thread* _broadcast_thread; + boost::thread* _listen_thread; }; #endif diff --git a/src/lib/server.cc b/src/lib/server.cc index 19f27ab6a..5010a2051 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -53,6 +53,7 @@ using boost::thread; using boost::bind; using boost::scoped_array; using boost::optional; +using boost::lexical_cast; using libdcp::Size; ServerDescription::ServerDescription (shared_ptr<const cxml::Node> node) @@ -76,7 +77,7 @@ optional<ServerDescription> ServerDescription::create_from_metadata (string v) { vector<string> b; - split (b, v, is_any_of (N_(" "))); + split (b, v, is_any_of (" ")); if (b.size() != 2) { return optional<ServerDescription> (); @@ -152,7 +153,7 @@ Server::worker_thread () try { frame = process (socket); } catch (std::exception& e) { - _log->log (String::compose (N_("Error: %1"), e.what())); + _log->log (String::compose ("Error: %1", e.what())); } socket.reset (); @@ -162,7 +163,8 @@ Server::worker_thread () if (frame >= 0) { struct timeval end; gettimeofday (&end, 0); - _log->log (String::compose (N_("Encoded frame %1 in %2"), frame, seconds (end) - seconds (start))); + cout << String::compose ("Encoded frame %1 in %2", frame, seconds (end) - seconds (start)) << "\n"; + _log->log (String::compose ("Encoded frame %1 in %2", frame, seconds (end) - seconds (start))); } _worker_condition.notify_all (); @@ -181,7 +183,12 @@ Server::run (int num_threads) _broadcast.thread = new thread (bind (&Server::broadcast_thread, this)); boost::asio::io_service io_service; - boost::asio::ip::tcp::acceptor acceptor (io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port ())); + + boost::asio::ip::tcp::acceptor acceptor ( + io_service, + boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base ()) + ); + while (1) { shared_ptr<Socket> socket (new Socket); acceptor.accept (socket->socket ()); @@ -204,7 +211,7 @@ Server::broadcast_thread () boost::asio::io_service io_service; boost::asio::ip::address address = boost::asio::ip::address_v4::any (); - boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port ()); + boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1); _broadcast.socket = new boost::asio::ip::udp::socket (io_service); _broadcast.socket->open (listen_endpoint.protocol ()); @@ -224,8 +231,24 @@ Server::broadcast_received () { _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0'; - cout << _broadcast.buffer << "\n"; - + if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) { + /* Reply to the client saying what we can do */ + xmlpp::Document doc; + xmlpp::Element* root = doc.create_root_node ("ServerAvailable"); + root->add_child("Threads")->add_child_text (lexical_cast<string> (_worker_threads.size ())); + stringstream xml; + doc.write_to_stream (xml, "UTF-8"); + + shared_ptr<Socket> socket (new Socket); + try { + socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1)); + socket->write (xml.str().length() + 1); + socket->write ((uint8_t *) xml.str().c_str(), xml.str().length() + 1); + } catch (...) { + + } + } + _broadcast.socket->async_receive_from ( boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)), _broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this) diff --git a/src/lib/util.cc b/src/lib/util.cc index 96b834fcc..e2ce94dd9 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -90,6 +90,7 @@ using std::istream; using std::numeric_limits; using std::pair; using std::ofstream; +using std::cout; using boost::shared_ptr; using boost::thread; using boost::lexical_cast; @@ -518,17 +519,27 @@ dcp_audio_frame_rate (int fs) Socket::Socket (int timeout) : _deadline (_io_service) , _socket (_io_service) + , _acceptor (0) , _timeout (timeout) { _deadline.expires_at (boost::posix_time::pos_infin); check (); } +Socket::~Socket () +{ + delete _acceptor; +} + void Socket::check () { if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now ()) { - _socket.close (); + if (_acceptor) { + _acceptor->cancel (); + } else { + _socket.close (); + } _deadline.expires_at (boost::posix_time::pos_infin); } @@ -539,7 +550,7 @@ Socket::check () * @param endpoint End-point to connect to. */ void -Socket::connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint) +Socket::connect (boost::asio::ip::tcp::endpoint endpoint) { _deadline.expires_from_now (boost::posix_time::seconds (_timeout)); boost::system::error_code ec = boost::asio::error::would_block; @@ -548,11 +559,35 @@ Socket::connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> con _io_service.run_one(); } while (ec == boost::asio::error::would_block); - if (ec || !_socket.is_open ()) { + if (ec) { + throw NetworkError (ec.message ()); + } + + if (!_socket.is_open ()) { throw NetworkError (_("connect timed out")); } } +void +Socket::accept (int port) +{ + _acceptor = new boost::asio::ip::tcp::acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)); + + _deadline.expires_from_now (boost::posix_time::seconds (_timeout)); + boost::system::error_code ec = boost::asio::error::would_block; + _acceptor->async_accept (_socket, boost::lambda::var(ec) = boost::lambda::_1); + do { + _io_service.run_one (); + } while (ec == boost::asio::error::would_block ); + + delete _acceptor; + _acceptor = 0; + + if (ec) { + throw NetworkError (ec.message ()); + } +} + /** Blocking write. * @param data Buffer to write. * @param size Number of bytes to write. diff --git a/src/lib/util.h b/src/lib/util.h index 351c4c4d9..5e568cc27 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -133,13 +133,15 @@ class Socket { public: Socket (int timeout = 30); + ~Socket (); /** @return Our underlying socket */ boost::asio::ip::tcp::socket& socket () { return _socket; } - void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint); + void connect (boost::asio::ip::tcp::endpoint); + void accept (int); void write (uint32_t n); void write (uint8_t const * data, int size); @@ -155,6 +157,7 @@ private: boost::asio::io_service _io_service; boost::asio::deadline_timer _deadline; boost::asio::ip::tcp::socket _socket; + boost::asio::ip::tcp::acceptor* _acceptor; int _timeout; }; |
