diff options
| author | Carl Hetherington <cth@carlh.net> | 2013-11-05 22:43:34 +0000 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2013-11-05 22:43:34 +0000 |
| commit | f660fe3f1be97f373318806a77b3ce3fcd53cb73 (patch) | |
| tree | 3d862060843ad00aae4c23008912ba7d21a38ca2 /src/lib/encoder.cc | |
| parent | 5698918140d640b3477634504a83da0d8d71a1cf (diff) | |
Various work on server discovery; works on localhost.
Diffstat (limited to 'src/lib/encoder.cc')
| -rw-r--r-- | src/lib/encoder.cc | 134 |
1 files changed, 124 insertions, 10 deletions
diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index 924a91439..f1d2375b6 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -22,6 +22,8 @@ */ #include <iostream> +#include <boost/lambda/lambda.hpp> +#include <libcxml/cxml.h> #include "encoder.h" #include "util.h" #include "film.h" @@ -44,6 +46,7 @@ using std::min; using std::make_pair; using boost::shared_ptr; using boost::optional; +using boost::scoped_array; int const Encoder::_history_size = 25; @@ -53,6 +56,8 @@ Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j) , _job (j) , _video_frames_out (0) , _terminate (false) + , _broadcast_thread (0) + , _listen_thread (0) { _have_a_real_frame[EYES_BOTH] = false; _have_a_real_frame[EYES_LEFT] = false; @@ -67,21 +72,41 @@ Encoder::~Encoder () } } +/** Add a worker thread for a remote server. Caller must hold + * a lock on _mutex, or know that one is not currently required to + * safely modify _threads. + */ +void +Encoder::add_worker_thread (ServerDescription d) +{ + _threads.push_back ( + make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d))) + ); +} + void Encoder::process_begin () { for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { - _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ()))); + _threads.push_back ( + make_pair ( + optional<ServerDescription> (), + new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())) + ) + ); } vector<ServerDescription> servers = Config::instance()->servers (); for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) { for (int j = 0; j < i->threads (); ++j) { - _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i))); + add_worker_thread (*i); } } + _broadcast_thread = new boost::thread (boost::bind (&Encoder::broadcast_thread, this)); + _listen_thread = new boost::thread (boost::bind (&Encoder::listen_thread, this)); + _writer.reset (new Writer (_film, _job)); } @@ -228,19 +253,30 @@ Encoder::process_audio (shared_ptr<const AudioBuffers> data) void Encoder::terminate_threads () { - boost::mutex::scoped_lock lock (_mutex); - _terminate = true; - _condition.notify_all (); - lock.unlock (); + { + boost::mutex::scoped_lock lock (_mutex); + _terminate = true; + _condition.notify_all (); + } - for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) { - if ((*i)->joinable ()) { - (*i)->join (); + for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) { + if (i->second->joinable ()) { + i->second->join (); } - delete *i; + delete i->second; } _threads.clear (); + + if (_broadcast_thread && _broadcast_thread->joinable ()) { + _broadcast_thread->join (); + } + delete _broadcast_thread; + + if (_listen_thread && _listen_thread->joinable ()) { + _listen_thread->join (); + } + delete _listen_thread; } void @@ -326,3 +362,81 @@ Encoder::encoder_thread (optional<ServerDescription> server) _condition.notify_all (); } } + +void +Encoder::broadcast_thread () +{ + boost::system::error_code error; + boost::asio::io_service io_service; + boost::asio::ip::udp::socket socket (io_service); + socket.open (boost::asio::ip::udp::v4(), error); + if (error) { + throw NetworkError ("failed to set up broadcast socket"); + } + + socket.set_option (boost::asio::ip::udp::socket::reuse_address (true)); + socket.set_option (boost::asio::socket_base::broadcast (true)); + + boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1); + + while (1) { + boost::mutex::scoped_lock lm (_mutex); + if (_terminate) { + socket.close (error); + return; + } + + string data = DCPOMATIC_HELLO; + socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point); + + lm.unlock (); + dcpomatic_sleep (10); + } +} + +void +Encoder::listen_thread () +{ + while (1) { + { + /* See if we need to stop */ + boost::mutex::scoped_lock lm (_mutex); + if (_terminate) { + return; + } + } + + shared_ptr<Socket> sock (new Socket (10)); + + try { + sock->accept (Config::instance()->server_port_base() + 1); + } catch (std::exception& e) { + continue; + } + + uint32_t length = sock->read_uint32 (); + scoped_array<char> buffer (new char[length]); + sock->read (reinterpret_cast<uint8_t*> (buffer.get()), length); + + stringstream s (buffer.get()); + shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable")); + xml->read_stream (s); + + { + /* See if we already know about this server */ + string const ip = sock->socket().remote_endpoint().address().to_string (); + boost::mutex::scoped_lock lm (_mutex); + ThreadList::iterator i = _threads.begin(); + while (i != _threads.end() && (!i->first || i->first->host_name() != ip)) { + ++i; + } + + if (i == _threads.end ()) { + cout << "Adding a thread for " << ip << "\n"; + add_worker_thread (ServerDescription (ip, xml->number_child<int> ("Threads"))); + } else { + cout << "Already know about " << ip << "\n"; + } + } + } +} |
