*/
#include "config.h"
-#include "server.h"
#include "filter.h"
#include "ratio.h"
#include "types.h"
#include <boost/filesystem.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/foreach.hpp>
+#include <boost/thread.hpp>
#include <cstdlib>
#include <fstream>
#include <iostream>
#include "dcp_video.h"
#include "config.h"
#include "exceptions.h"
-#include "server_description.h"
+#include "encode_server_description.h"
#include "dcpomatic_socket.h"
#include "image.h"
#include "log.h"
* @return Encoded data.
*/
Data
-DCPVideo::encode_remotely (ServerDescription serv, int timeout)
+DCPVideo::encode_remotely (EncodeServerDescription serv, int timeout)
{
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver (io_service);
*/
#include "types.h"
-#include "server_description.h"
+#include "encode_server_description.h"
#include <libcxml/cxml.h>
#include <dcp/data.h>
DCPVideo (boost::shared_ptr<const PlayerVideo>, cxml::ConstNodePtr, boost::shared_ptr<Log>);
dcp::Data encode_locally (dcp::NoteHandler note);
- dcp::Data encode_remotely (ServerDescription, int timeout = 30);
+ dcp::Data encode_remotely (EncodeServerDescription, int timeout = 30);
int index () const {
return _index;
--- /dev/null
+/*
+ Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+/** @file src/encode_server.cc
+ * @brief Class to describe a server to which we can send
+ * encoding work, and a class to implement such a server.
+ */
+
+#include "encode_server.h"
+#include "util.h"
+#include "dcpomatic_socket.h"
+#include "image.h"
+#include "dcp_video.h"
+#include "config.h"
+#include "cross.h"
+#include "player_video.h"
+#include "safe_stringstream.h"
+#include "raw_convert.h"
+#include "compose.hpp"
+#include "log.h"
+#include "encoded_log_entry.h"
+#include <libcxml/cxml.h>
+#include <libxml++/libxml++.h>
+#include <boost/algorithm/string.hpp>
+#include <boost/scoped_array.hpp>
+#include <boost/foreach.hpp>
+#include <string>
+#include <vector>
+#include <iostream>
+
+#include "i18n.h"
+
+#define LOG_GENERAL(...) _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
+#define LOG_GENERAL_NC(...) _log->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
+#define LOG_ERROR(...) _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
+#define LOG_ERROR_NC(...) _log->log (__VA_ARGS__, LogEntry::TYPE_ERROR);
+
+using std::string;
+using std::vector;
+using std::list;
+using std::cout;
+using std::cerr;
+using std::fixed;
+using boost::shared_ptr;
+using boost::thread;
+using boost::bind;
+using boost::scoped_array;
+using boost::optional;
+using dcp::Size;
+using dcp::Data;
+
+EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
+ : _terminate (false)
+ , _log (log)
+ , _verbose (verbose)
+ , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
+{
+
+}
+
+EncodeServer::~EncodeServer ()
+{
+ {
+ boost::mutex::scoped_lock lm (_worker_mutex);
+ _terminate = true;
+ _empty_condition.notify_all ();
+ _full_condition.notify_all ();
+ }
+
+ BOOST_FOREACH (boost::thread* i, _worker_threads) {
+ DCPOMATIC_ASSERT (i->joinable ());
+ i->join ();
+ delete i;
+ }
+
+ _io_service.stop ();
+
+ _broadcast.io_service.stop ();
+ if (_broadcast.thread) {
+ DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
+ _broadcast.thread->join ();
+ }
+}
+
+/** @param after_read Filled in with gettimeofday() after reading the input from the network.
+ * @param after_encode Filled in with gettimeofday() after encoding the image.
+ */
+int
+EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, struct timeval& after_encode)
+{
+ uint32_t length = socket->read_uint32 ();
+ scoped_array<char> buffer (new char[length]);
+ socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
+
+ string s (buffer.get());
+ shared_ptr<cxml::Document> xml (new cxml::Document ("EncodingRequest"));
+ xml->read_string (s);
+ /* This is a double-check; the server shouldn't even be on the candidate list
+ if it is the wrong version, but it doesn't hurt to make sure here.
+ */
+ if (xml->number_child<int> ("Version") != SERVER_LINK_VERSION) {
+ cerr << "Mismatched server/client versions\n";
+ LOG_ERROR_NC ("Mismatched server/client versions");
+ return -1;
+ }
+
+ shared_ptr<PlayerVideo> pvf (new PlayerVideo (xml, socket));
+
+ DCPVideo dcp_video_frame (pvf, xml, _log);
+
+ gettimeofday (&after_read, 0);
+
+ Data encoded = dcp_video_frame.encode_locally (boost::bind (&Log::dcp_log, _log.get(), _1, _2));
+
+ gettimeofday (&after_encode, 0);
+
+ try {
+ socket->write (encoded.size());
+ socket->write (encoded.data().get(), encoded.size());
+ } catch (std::exception& e) {
+ cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
+ LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
+ throw;
+ }
+
+ return dcp_video_frame.index ();
+}
+
+void
+EncodeServer::worker_thread ()
+{
+ while (true) {
+ boost::mutex::scoped_lock lock (_worker_mutex);
+ while (_queue.empty () && !_terminate) {
+ _empty_condition.wait (lock);
+ }
+
+ if (_terminate) {
+ return;
+ }
+
+ shared_ptr<Socket> socket = _queue.front ();
+ _queue.pop_front ();
+
+ lock.unlock ();
+
+ int frame = -1;
+ string ip;
+
+ struct timeval start;
+ struct timeval after_read;
+ struct timeval after_encode;
+ struct timeval end;
+
+ gettimeofday (&start, 0);
+
+ try {
+ frame = process (socket, after_read, after_encode);
+ ip = socket->socket().remote_endpoint().address().to_string();
+ } catch (std::exception& e) {
+ cerr << "Error: " << e.what() << "\n";
+ LOG_ERROR ("Error: %1", e.what());
+ }
+
+ gettimeofday (&end, 0);
+
+ socket.reset ();
+
+ lock.lock ();
+
+ if (frame >= 0) {
+ struct timeval end;
+ gettimeofday (&end, 0);
+
+ shared_ptr<EncodedLogEntry> e (
+ new EncodedLogEntry (
+ frame, ip,
+ seconds(after_read) - seconds(start),
+ seconds(after_encode) - seconds(after_read),
+ seconds(end) - seconds(after_encode)
+ )
+ );
+
+ if (_verbose) {
+ cout << e->get() << "\n";
+ }
+
+ _log->log (e);
+ }
+
+ _full_condition.notify_all ();
+ }
+}
+
+void
+EncodeServer::run (int num_threads)
+{
+ LOG_GENERAL ("Server starting with %1 threads", num_threads);
+ if (_verbose) {
+ cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
+ }
+
+ for (int i = 0; i < num_threads; ++i) {
+ _worker_threads.push_back (new thread (bind (&EncodeServer::worker_thread, this)));
+ }
+
+ _broadcast.thread = new thread (bind (&EncodeServer::broadcast_thread, this));
+
+ start_accept ();
+ _io_service.run ();
+}
+
+void
+EncodeServer::broadcast_thread ()
+try
+{
+ boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
+ boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
+
+ _broadcast.socket = new boost::asio::ip::udp::socket (_broadcast.io_service);
+ _broadcast.socket->open (listen_endpoint.protocol ());
+ _broadcast.socket->bind (listen_endpoint);
+
+ _broadcast.socket->async_receive_from (
+ boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
+ _broadcast.send_endpoint,
+ boost::bind (&EncodeServer::broadcast_received, this)
+ );
+
+ _broadcast.io_service.run ();
+}
+catch (...)
+{
+ store_current ();
+}
+
+void
+EncodeServer::broadcast_received ()
+{
+ _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
+
+ if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
+ /* Reply to the client saying what we can do */
+ xmlpp::Document doc;
+ xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
+ root->add_child("Threads")->add_child_text (raw_convert<string> (_worker_threads.size ()));
+ root->add_child("Version")->add_child_text (raw_convert<string> (SERVER_LINK_VERSION));
+ string xml = doc.write_to_string ("UTF-8");
+
+ if (_verbose) {
+ cout << "Offering services to master " << _broadcast.send_endpoint.address().to_string () << "\n";
+ }
+ shared_ptr<Socket> socket (new Socket);
+ try {
+ socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
+ socket->write (xml.length() + 1);
+ socket->write ((uint8_t *) xml.c_str(), xml.length() + 1);
+ } catch (...) {
+
+ }
+ }
+
+ _broadcast.socket->async_receive_from (
+ boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
+ _broadcast.send_endpoint, boost::bind (&EncodeServer::broadcast_received, this)
+ );
+}
+
+void
+EncodeServer::start_accept ()
+{
+ if (_terminate) {
+ return;
+ }
+
+ shared_ptr<Socket> socket (new Socket);
+ _acceptor.async_accept (socket->socket (), boost::bind (&EncodeServer::handle_accept, this, socket, boost::asio::placeholders::error));
+}
+
+void
+EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+{
+ if (error) {
+ return;
+ }
+
+ boost::mutex::scoped_lock lock (_worker_mutex);
+
+ /* Wait until the queue has gone down a bit */
+ while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
+ _full_condition.wait (lock);
+ }
+
+ _queue.push_back (socket);
+ _empty_condition.notify_all ();
+
+ start_accept ();
+}
--- /dev/null
+/*
+ Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#ifndef DCPOMATIC_ENCODE_SERVER_H
+#define DCPOMATIC_ENCODE_SERVER_H
+
+/** @file src/encode_server.h
+ * @brief Server class.
+ */
+
+#include "exception_store.h"
+#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread/condition.hpp>
+#include <string>
+
+class Socket;
+class Log;
+
+/** @class EncodeServer
+ * @brief A class to run a server which can accept requests to perform JPEG2000
+ * encoding work.
+ */
+class EncodeServer : public ExceptionStore, public boost::noncopyable
+{
+public:
+ EncodeServer (boost::shared_ptr<Log> log, bool verbose);
+ ~EncodeServer ();
+
+ void run (int num_threads);
+
+private:
+ void worker_thread ();
+ int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
+ void broadcast_thread ();
+ void broadcast_received ();
+ void start_accept ();
+ void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
+
+ bool _terminate;
+
+ std::vector<boost::thread *> _worker_threads;
+ std::list<boost::shared_ptr<Socket> > _queue;
+ boost::mutex _worker_mutex;
+ boost::condition _full_condition;
+ boost::condition _empty_condition;
+ boost::shared_ptr<Log> _log;
+ bool _verbose;
+
+ boost::asio::io_service _io_service;
+ boost::asio::ip::tcp::acceptor _acceptor;
+
+ struct Broadcast {
+
+ Broadcast ()
+ : thread (0)
+ , socket (0)
+ {}
+
+ boost::thread* thread;
+ boost::asio::ip::udp::socket* socket;
+ char buffer[64];
+ boost::asio::ip::udp::endpoint send_endpoint;
+ boost::asio::io_service io_service;
+
+ } _broadcast;
+};
+
+#endif
--- /dev/null
+/*
+ Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#ifndef DCPOMATIC_ENCODE_SERVER_DESCRIPTION_H
+#define DCPOMATIC_ENCODE_SERVER_DESCRIPTION_H
+
+/** @class EncodeServerDescription
+ * @brief Class to describe a server to which we can send encoding work.
+ */
+class EncodeServerDescription
+{
+public:
+ EncodeServerDescription ()
+ : _host_name ("")
+ , _threads (1)
+ {}
+
+ /** @param h Server host name or IP address in string form.
+ * @param t Number of threads to use on the server.
+ */
+ EncodeServerDescription (std::string h, int t)
+ : _host_name (h)
+ , _threads (t)
+ {}
+
+ /* Default copy constructor is fine */
+
+ /** @return server's host name or IP address in string form */
+ std::string host_name () const {
+ return _host_name;
+ }
+
+ /** @return number of threads to use on the server */
+ int threads () const {
+ return _threads;
+ }
+
+ void set_host_name (std::string n) {
+ _host_name = n;
+ }
+
+ void set_threads (int t) {
+ _threads = t;
+ }
+
+private:
+ /** server's host name */
+ std::string _host_name;
+ /** number of threads to use on the server */
+ int _threads;
+};
+
+#endif
--- /dev/null
+/*
+ Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include "encode_server_finder.h"
+#include "exceptions.h"
+#include "util.h"
+#include "config.h"
+#include "cross.h"
+#include "encode_server_description.h"
+#include "dcpomatic_socket.h"
+#include "raw_convert.h"
+#include <libcxml/cxml.h>
+#include <boost/lambda/lambda.hpp>
+#include <iostream>
+
+#include "i18n.h"
+
+using std::string;
+using std::list;
+using std::vector;
+using std::cout;
+using boost::shared_ptr;
+using boost::scoped_array;
+using boost::weak_ptr;
+
+EncodeServerFinder* EncodeServerFinder::_instance = 0;
+
+EncodeServerFinder::EncodeServerFinder ()
+ : _disabled (false)
+ , _search_thread (0)
+ , _listen_thread (0)
+ , _stop (false)
+{
+ Config::instance()->Changed.connect (boost::bind (&EncodeServerFinder::config_changed, this, _1));
+}
+
+void
+EncodeServerFinder::start ()
+{
+ _search_thread = new boost::thread (boost::bind (&EncodeServerFinder::search_thread, this));
+ _listen_thread = new boost::thread (boost::bind (&EncodeServerFinder::listen_thread, this));
+}
+
+EncodeServerFinder::~EncodeServerFinder ()
+{
+ _stop = true;
+
+ _search_condition.notify_all ();
+ if (_search_thread) {
+ DCPOMATIC_ASSERT (_search_thread->joinable ());
+ _search_thread->join ();
+ }
+
+ _listen_io_service.stop ();
+ if (_listen_thread) {
+ DCPOMATIC_ASSERT (_listen_thread->joinable ());
+ _listen_thread->join ();
+ }
+}
+
+void
+EncodeServerFinder::search_thread ()
+try
+{
+ boost::system::error_code error;
+ boost::asio::io_service io_service;
+ boost::asio::ip::udp::socket socket (io_service);
+ socket.open (boost::asio::ip::udp::v4(), error);
+ if (error) {
+ throw NetworkError ("failed to set up broadcast socket");
+ }
+
+ socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
+ socket.set_option (boost::asio::socket_base::broadcast (true));
+
+ string const data = DCPOMATIC_HELLO;
+
+ while (!_stop) {
+ if (Config::instance()->use_any_servers ()) {
+ /* Broadcast to look for servers */
+ try {
+ boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);
+ socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
+ } catch (...) {
+
+ }
+ }
+
+ /* Query our `definite' servers (if there are any) */
+ vector<string> servers = Config::instance()->servers ();
+ for (vector<string>::const_iterator i = servers.begin(); i != servers.end(); ++i) {
+ if (server_found (*i)) {
+ /* Don't bother asking a server that we already know about */
+ continue;
+ }
+ try {
+ boost::asio::ip::udp::resolver resolver (io_service);
+ boost::asio::ip::udp::resolver::query query (*i, raw_convert<string> (Config::instance()->server_port_base() + 1));
+ boost::asio::ip::udp::endpoint end_point (*resolver.resolve (query));
+ socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
+ } catch (...) {
+
+ }
+ }
+
+ boost::mutex::scoped_lock lm (_search_condition_mutex);
+ _search_condition.timed_wait (lm, boost::get_system_time() + boost::posix_time::seconds (10));
+ }
+}
+catch (...)
+{
+ store_current ();
+}
+
+void
+EncodeServerFinder::listen_thread ()
+try {
+ using namespace boost::asio::ip;
+
+ try {
+ _listen_acceptor.reset (new tcp::acceptor (_listen_io_service, tcp::endpoint (tcp::v4(), Config::instance()->server_port_base() + 1)));
+ } catch (...) {
+ boost::throw_exception (NetworkError (_("Could not listen for remote encode servers. Perhaps another instance of DCP-o-matic is running.")));
+ }
+
+ start_accept ();
+ _listen_io_service.run ();
+}
+catch (...)
+{
+ store_current ();
+}
+
+void
+EncodeServerFinder::start_accept ()
+{
+ shared_ptr<Socket> socket (new Socket ());
+ _listen_acceptor->async_accept (
+ socket->socket(),
+ boost::bind (&EncodeServerFinder::handle_accept, this, boost::asio::placeholders::error, socket)
+ );
+}
+
+void
+EncodeServerFinder::handle_accept (boost::system::error_code ec, shared_ptr<Socket> socket)
+{
+ if (ec) {
+ start_accept ();
+ return;
+ }
+
+ uint32_t length;
+ socket->read (reinterpret_cast<uint8_t*> (&length), sizeof (uint32_t));
+ length = ntohl (length);
+
+ scoped_array<char> buffer (new char[length]);
+ socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
+
+ string s (buffer.get());
+ shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
+ xml->read_string (s);
+
+ string const ip = socket->socket().remote_endpoint().address().to_string ();
+ if (!server_found (ip) && xml->optional_number_child<int>("Version").get_value_or (0) == SERVER_LINK_VERSION) {
+ EncodeServerDescription sd (ip, xml->number_child<int> ("Threads"));
+ {
+ boost::mutex::scoped_lock lm (_servers_mutex);
+ _servers.push_back (sd);
+ }
+ emit (boost::bind (boost::ref (ServersListChanged)));
+ }
+
+ start_accept ();
+}
+
+bool
+EncodeServerFinder::server_found (string ip) const
+{
+ boost::mutex::scoped_lock lm (_servers_mutex);
+ list<EncodeServerDescription>::const_iterator i = _servers.begin();
+ while (i != _servers.end() && i->host_name() != ip) {
+ ++i;
+ }
+
+ return i != _servers.end ();
+}
+
+EncodeServerFinder*
+EncodeServerFinder::instance ()
+{
+ if (!_instance) {
+ _instance = new EncodeServerFinder ();
+ _instance->start ();
+ }
+
+ return _instance;
+}
+
+void
+EncodeServerFinder::drop ()
+{
+ delete _instance;
+ _instance = 0;
+}
+
+list<EncodeServerDescription>
+EncodeServerFinder::servers () const
+{
+ boost::mutex::scoped_lock lm (_servers_mutex);
+ return _servers;
+}
+
+void
+EncodeServerFinder::config_changed (Config::Property what)
+{
+ if (what == Config::USE_ANY_SERVERS || what == Config::SERVERS) {
+ {
+ boost::mutex::scoped_lock lm (_servers_mutex);
+ _servers.clear ();
+ }
+ ServersListChanged ();
+ _search_condition.notify_all ();
+ }
+}
--- /dev/null
+/*
+ Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+/** @file src/lib/encode_server_finder.h
+ * @brief EncodeServerFinder class.
+ */
+
+#include "signaller.h"
+#include "encode_server_description.h"
+#include "config.h"
+#include "exception_store.h"
+#include <boost/signals2.hpp>
+#include <boost/thread/condition.hpp>
+
+class Socket;
+
+class EncodeServerFinder : public Signaller, public ExceptionStore
+{
+public:
+ static EncodeServerFinder* instance ();
+ static void drop ();
+
+ void disable () {
+ _disabled = true;
+ }
+
+ bool disabled () const {
+ return _disabled;
+ }
+
+ std::list<EncodeServerDescription> servers () const;
+
+ /** Emitted whenever the list of servers changes */
+ boost::signals2::signal<void ()> ServersListChanged;
+
+private:
+ EncodeServerFinder ();
+ ~EncodeServerFinder ();
+
+ void start ();
+
+ void search_thread ();
+ void listen_thread ();
+
+ bool server_found (std::string) const;
+ void start_accept ();
+ void handle_accept (boost::system::error_code ec, boost::shared_ptr<Socket> socket);
+
+ void config_changed (Config::Property what);
+
+ bool _disabled;
+
+ /** Thread to periodically issue broadcasts and requests to find encoding servers */
+ boost::thread* _search_thread;
+ /** Thread to listen to the responses from servers */
+ boost::thread* _listen_thread;
+
+ std::list<EncodeServerDescription> _servers;
+ mutable boost::mutex _servers_mutex;
+
+ boost::asio::io_service _listen_io_service;
+ boost::shared_ptr<boost::asio::ip::tcp::acceptor> _listen_acceptor;
+ bool _stop;
+
+ boost::condition _search_condition;
+ boost::mutex _search_condition_mutex;
+
+ static EncodeServerFinder* _instance;
+};
#include "log.h"
#include "config.h"
#include "dcp_video.h"
-#include "server.h"
#include "cross.h"
#include "writer.h"
-#include "server_finder.h"
+#include "encode_server_finder.h"
#include "player.h"
#include "player_video.h"
-#include "server_description.h"
+#include "encode_server_description.h"
#include "compose.hpp"
#include <libcxml/cxml.h>
#include <boost/foreach.hpp>
void
Encoder::begin ()
{
- if (!ServerFinder::instance()->disabled ()) {
- _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
+ if (!EncodeServerFinder::instance()->disabled ()) {
+ _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
}
}
}
void
-Encoder::encoder_thread (optional<ServerDescription> server)
+Encoder::encoder_thread (optional<EncodeServerDescription> server)
try
{
if (server) {
if (!Config::instance()->only_servers_encode ()) {
for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
- _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
+ _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ())));
}
}
- BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
+ BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
for (int j = 0; j < i.threads(); ++j) {
_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
#include <stdint.h>
class Film;
-class ServerDescription;
+class EncodeServerDescription;
class DCPVideo;
class Writer;
class Job;
void enqueue (boost::shared_ptr<PlayerVideo> f);
void frame_done ();
- void encoder_thread (boost::optional<ServerDescription>);
+ void encoder_thread (boost::optional<EncodeServerDescription>);
void terminate_threads ();
void servers_list_changed ();
+++ /dev/null
-/*
- Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-/** @file src/server.cc
- * @brief Class to describe a server to which we can send
- * encoding work, and a class to implement such a server.
- */
-
-#include "server.h"
-#include "util.h"
-#include "dcpomatic_socket.h"
-#include "image.h"
-#include "dcp_video.h"
-#include "config.h"
-#include "cross.h"
-#include "player_video.h"
-#include "safe_stringstream.h"
-#include "raw_convert.h"
-#include "compose.hpp"
-#include "log.h"
-#include "encoded_log_entry.h"
-#include <libcxml/cxml.h>
-#include <libxml++/libxml++.h>
-#include <boost/algorithm/string.hpp>
-#include <boost/scoped_array.hpp>
-#include <boost/foreach.hpp>
-#include <string>
-#include <vector>
-#include <iostream>
-
-#include "i18n.h"
-
-#define LOG_GENERAL(...) _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
-#define LOG_GENERAL_NC(...) _log->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
-#define LOG_ERROR(...) _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
-#define LOG_ERROR_NC(...) _log->log (__VA_ARGS__, LogEntry::TYPE_ERROR);
-
-using std::string;
-using std::vector;
-using std::list;
-using std::cout;
-using std::cerr;
-using std::fixed;
-using boost::shared_ptr;
-using boost::thread;
-using boost::bind;
-using boost::scoped_array;
-using boost::optional;
-using dcp::Size;
-using dcp::Data;
-
-Server::Server (shared_ptr<Log> log, bool verbose)
- : _terminate (false)
- , _log (log)
- , _verbose (verbose)
- , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
-{
-
-}
-
-Server::~Server ()
-{
- {
- boost::mutex::scoped_lock lm (_worker_mutex);
- _terminate = true;
- _empty_condition.notify_all ();
- _full_condition.notify_all ();
- }
-
- BOOST_FOREACH (boost::thread* i, _worker_threads) {
- DCPOMATIC_ASSERT (i->joinable ());
- i->join ();
- delete i;
- }
-
- _io_service.stop ();
-
- _broadcast.io_service.stop ();
- if (_broadcast.thread) {
- DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
- _broadcast.thread->join ();
- }
-}
-
-/** @param after_read Filled in with gettimeofday() after reading the input from the network.
- * @param after_encode Filled in with gettimeofday() after encoding the image.
- */
-int
-Server::process (shared_ptr<Socket> socket, struct timeval& after_read, struct timeval& after_encode)
-{
- uint32_t length = socket->read_uint32 ();
- scoped_array<char> buffer (new char[length]);
- socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
-
- string s (buffer.get());
- shared_ptr<cxml::Document> xml (new cxml::Document ("EncodingRequest"));
- xml->read_string (s);
- /* This is a double-check; the server shouldn't even be on the candidate list
- if it is the wrong version, but it doesn't hurt to make sure here.
- */
- if (xml->number_child<int> ("Version") != SERVER_LINK_VERSION) {
- cerr << "Mismatched server/client versions\n";
- LOG_ERROR_NC ("Mismatched server/client versions");
- return -1;
- }
-
- shared_ptr<PlayerVideo> pvf (new PlayerVideo (xml, socket));
-
- DCPVideo dcp_video_frame (pvf, xml, _log);
-
- gettimeofday (&after_read, 0);
-
- Data encoded = dcp_video_frame.encode_locally (boost::bind (&Log::dcp_log, _log.get(), _1, _2));
-
- gettimeofday (&after_encode, 0);
-
- try {
- socket->write (encoded.size());
- socket->write (encoded.data().get(), encoded.size());
- } catch (std::exception& e) {
- cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
- LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
- throw;
- }
-
- return dcp_video_frame.index ();
-}
-
-void
-Server::worker_thread ()
-{
- while (true) {
- boost::mutex::scoped_lock lock (_worker_mutex);
- while (_queue.empty () && !_terminate) {
- _empty_condition.wait (lock);
- }
-
- if (_terminate) {
- return;
- }
-
- shared_ptr<Socket> socket = _queue.front ();
- _queue.pop_front ();
-
- lock.unlock ();
-
- int frame = -1;
- string ip;
-
- struct timeval start;
- struct timeval after_read;
- struct timeval after_encode;
- struct timeval end;
-
- gettimeofday (&start, 0);
-
- try {
- frame = process (socket, after_read, after_encode);
- ip = socket->socket().remote_endpoint().address().to_string();
- } catch (std::exception& e) {
- cerr << "Error: " << e.what() << "\n";
- LOG_ERROR ("Error: %1", e.what());
- }
-
- gettimeofday (&end, 0);
-
- socket.reset ();
-
- lock.lock ();
-
- if (frame >= 0) {
- struct timeval end;
- gettimeofday (&end, 0);
-
- shared_ptr<EncodedLogEntry> e (
- new EncodedLogEntry (
- frame, ip,
- seconds(after_read) - seconds(start),
- seconds(after_encode) - seconds(after_read),
- seconds(end) - seconds(after_encode)
- )
- );
-
- if (_verbose) {
- cout << e->get() << "\n";
- }
-
- _log->log (e);
- }
-
- _full_condition.notify_all ();
- }
-}
-
-void
-Server::run (int num_threads)
-{
- LOG_GENERAL ("Server starting with %1 threads", num_threads);
- if (_verbose) {
- cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
- }
-
- for (int i = 0; i < num_threads; ++i) {
- _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
- }
-
- _broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
-
- start_accept ();
- _io_service.run ();
-}
-
-void
-Server::broadcast_thread ()
-try
-{
- boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
- boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
-
- _broadcast.socket = new boost::asio::ip::udp::socket (_broadcast.io_service);
- _broadcast.socket->open (listen_endpoint.protocol ());
- _broadcast.socket->bind (listen_endpoint);
-
- _broadcast.socket->async_receive_from (
- boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
- _broadcast.send_endpoint,
- boost::bind (&Server::broadcast_received, this)
- );
-
- _broadcast.io_service.run ();
-}
-catch (...)
-{
- store_current ();
-}
-
-void
-Server::broadcast_received ()
-{
- _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
-
- if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
- /* Reply to the client saying what we can do */
- xmlpp::Document doc;
- xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
- root->add_child("Threads")->add_child_text (raw_convert<string> (_worker_threads.size ()));
- root->add_child("Version")->add_child_text (raw_convert<string> (SERVER_LINK_VERSION));
- string xml = doc.write_to_string ("UTF-8");
-
- if (_verbose) {
- cout << "Offering services to master " << _broadcast.send_endpoint.address().to_string () << "\n";
- }
- shared_ptr<Socket> socket (new Socket);
- try {
- socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
- socket->write (xml.length() + 1);
- socket->write ((uint8_t *) xml.c_str(), xml.length() + 1);
- } catch (...) {
-
- }
- }
-
- _broadcast.socket->async_receive_from (
- boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
- _broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
- );
-}
-
-void
-Server::start_accept ()
-{
- if (_terminate) {
- return;
- }
-
- shared_ptr<Socket> socket (new Socket);
- _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
-}
-
-void
-Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
-{
- if (error) {
- return;
- }
-
- boost::mutex::scoped_lock lock (_worker_mutex);
-
- /* Wait until the queue has gone down a bit */
- while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
- _full_condition.wait (lock);
- }
-
- _queue.push_back (socket);
- _empty_condition.notify_all ();
-
- start_accept ();
-}
+++ /dev/null
-/*
- Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-#ifndef DCPOMATIC_SERVER_H
-#define DCPOMATIC_SERVER_H
-
-/** @file src/server.h
- * @brief Server class.
- */
-
-#include "exception_store.h"
-#include <boost/thread.hpp>
-#include <boost/asio.hpp>
-#include <boost/thread/condition.hpp>
-#include <string>
-
-class Socket;
-class Log;
-
-/** @class Server
- * @brief A class to run a server which can accept requests to perform JPEG2000
- * encoding work.
- */
-class Server : public ExceptionStore, public boost::noncopyable
-{
-public:
- Server (boost::shared_ptr<Log> log, bool verbose);
- ~Server ();
-
- void run (int num_threads);
-
-private:
- void worker_thread ();
- int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
- void broadcast_thread ();
- void broadcast_received ();
- void start_accept ();
- void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
-
- bool _terminate;
-
- std::vector<boost::thread *> _worker_threads;
- std::list<boost::shared_ptr<Socket> > _queue;
- boost::mutex _worker_mutex;
- boost::condition _full_condition;
- boost::condition _empty_condition;
- boost::shared_ptr<Log> _log;
- bool _verbose;
-
- boost::asio::io_service _io_service;
- boost::asio::ip::tcp::acceptor _acceptor;
-
- struct Broadcast {
-
- Broadcast ()
- : thread (0)
- , socket (0)
- {}
-
- boost::thread* thread;
- boost::asio::ip::udp::socket* socket;
- char buffer[64];
- boost::asio::ip::udp::endpoint send_endpoint;
- boost::asio::io_service io_service;
-
- } _broadcast;
-};
-
-#endif
+++ /dev/null
-/*
- Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-#ifndef DCPOMATIC_SERVER_DESCRIPTION_H
-#define DCPOMATIC_SERVER_DESCRIPTION_H
-
-/** @class ServerDescription
- * @brief Class to describe a server to which we can send encoding work.
- */
-class ServerDescription
-{
-public:
- ServerDescription ()
- : _host_name ("")
- , _threads (1)
- {}
-
- /** @param h Server host name or IP address in string form.
- * @param t Number of threads to use on the server.
- */
- ServerDescription (std::string h, int t)
- : _host_name (h)
- , _threads (t)
- {}
-
- /* Default copy constructor is fine */
-
- /** @return server's host name or IP address in string form */
- std::string host_name () const {
- return _host_name;
- }
-
- /** @return number of threads to use on the server */
- int threads () const {
- return _threads;
- }
-
- void set_host_name (std::string n) {
- _host_name = n;
- }
-
- void set_threads (int t) {
- _threads = t;
- }
-
-private:
- /** server's host name */
- std::string _host_name;
- /** number of threads to use on the server */
- int _threads;
-};
-
-#endif
+++ /dev/null
-/*
- Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-#include "server_finder.h"
-#include "exceptions.h"
-#include "util.h"
-#include "config.h"
-#include "cross.h"
-#include "server_description.h"
-#include "dcpomatic_socket.h"
-#include "raw_convert.h"
-#include <libcxml/cxml.h>
-#include <boost/lambda/lambda.hpp>
-#include <iostream>
-
-#include "i18n.h"
-
-using std::string;
-using std::list;
-using std::vector;
-using std::cout;
-using boost::shared_ptr;
-using boost::scoped_array;
-using boost::weak_ptr;
-
-ServerFinder* ServerFinder::_instance = 0;
-
-ServerFinder::ServerFinder ()
- : _disabled (false)
- , _search_thread (0)
- , _listen_thread (0)
- , _stop (false)
-{
- Config::instance()->Changed.connect (boost::bind (&ServerFinder::config_changed, this, _1));
-}
-
-void
-ServerFinder::start ()
-{
- _search_thread = new boost::thread (boost::bind (&ServerFinder::search_thread, this));
- _listen_thread = new boost::thread (boost::bind (&ServerFinder::listen_thread, this));
-}
-
-ServerFinder::~ServerFinder ()
-{
- _stop = true;
-
- _search_condition.notify_all ();
- if (_search_thread) {
- DCPOMATIC_ASSERT (_search_thread->joinable ());
- _search_thread->join ();
- }
-
- _listen_io_service.stop ();
- if (_listen_thread) {
- DCPOMATIC_ASSERT (_listen_thread->joinable ());
- _listen_thread->join ();
- }
-}
-
-void
-ServerFinder::search_thread ()
-try
-{
- boost::system::error_code error;
- boost::asio::io_service io_service;
- boost::asio::ip::udp::socket socket (io_service);
- socket.open (boost::asio::ip::udp::v4(), error);
- if (error) {
- throw NetworkError ("failed to set up broadcast socket");
- }
-
- socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
- socket.set_option (boost::asio::socket_base::broadcast (true));
-
- string const data = DCPOMATIC_HELLO;
-
- while (!_stop) {
- if (Config::instance()->use_any_servers ()) {
- /* Broadcast to look for servers */
- try {
- boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);
- socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
- } catch (...) {
-
- }
- }
-
- /* Query our `definite' servers (if there are any) */
- vector<string> servers = Config::instance()->servers ();
- for (vector<string>::const_iterator i = servers.begin(); i != servers.end(); ++i) {
- if (server_found (*i)) {
- /* Don't bother asking a server that we already know about */
- continue;
- }
- try {
- boost::asio::ip::udp::resolver resolver (io_service);
- boost::asio::ip::udp::resolver::query query (*i, raw_convert<string> (Config::instance()->server_port_base() + 1));
- boost::asio::ip::udp::endpoint end_point (*resolver.resolve (query));
- socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
- } catch (...) {
-
- }
- }
-
- boost::mutex::scoped_lock lm (_search_condition_mutex);
- _search_condition.timed_wait (lm, boost::get_system_time() + boost::posix_time::seconds (10));
- }
-}
-catch (...)
-{
- store_current ();
-}
-
-void
-ServerFinder::listen_thread ()
-try {
- using namespace boost::asio::ip;
-
- try {
- _listen_acceptor.reset (new tcp::acceptor (_listen_io_service, tcp::endpoint (tcp::v4(), Config::instance()->server_port_base() + 1)));
- } catch (...) {
- boost::throw_exception (NetworkError (_("Could not listen for remote encode servers. Perhaps another instance of DCP-o-matic is running.")));
- }
-
- start_accept ();
- _listen_io_service.run ();
-}
-catch (...)
-{
- store_current ();
-}
-
-void
-ServerFinder::start_accept ()
-{
- shared_ptr<Socket> socket (new Socket ());
- _listen_acceptor->async_accept (
- socket->socket(),
- boost::bind (&ServerFinder::handle_accept, this, boost::asio::placeholders::error, socket)
- );
-}
-
-void
-ServerFinder::handle_accept (boost::system::error_code ec, shared_ptr<Socket> socket)
-{
- if (ec) {
- start_accept ();
- return;
- }
-
- uint32_t length;
- socket->read (reinterpret_cast<uint8_t*> (&length), sizeof (uint32_t));
- length = ntohl (length);
-
- scoped_array<char> buffer (new char[length]);
- socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
-
- string s (buffer.get());
- shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
- xml->read_string (s);
-
- string const ip = socket->socket().remote_endpoint().address().to_string ();
- if (!server_found (ip) && xml->optional_number_child<int>("Version").get_value_or (0) == SERVER_LINK_VERSION) {
- ServerDescription sd (ip, xml->number_child<int> ("Threads"));
- {
- boost::mutex::scoped_lock lm (_servers_mutex);
- _servers.push_back (sd);
- }
- emit (boost::bind (boost::ref (ServersListChanged)));
- }
-
- start_accept ();
-}
-
-bool
-ServerFinder::server_found (string ip) const
-{
- boost::mutex::scoped_lock lm (_servers_mutex);
- list<ServerDescription>::const_iterator i = _servers.begin();
- while (i != _servers.end() && i->host_name() != ip) {
- ++i;
- }
-
- return i != _servers.end ();
-}
-
-ServerFinder*
-ServerFinder::instance ()
-{
- if (!_instance) {
- _instance = new ServerFinder ();
- _instance->start ();
- }
-
- return _instance;
-}
-
-void
-ServerFinder::drop ()
-{
- delete _instance;
- _instance = 0;
-}
-
-list<ServerDescription>
-ServerFinder::servers () const
-{
- boost::mutex::scoped_lock lm (_servers_mutex);
- return _servers;
-}
-
-void
-ServerFinder::config_changed (Config::Property what)
-{
- if (what == Config::USE_ANY_SERVERS || what == Config::SERVERS) {
- {
- boost::mutex::scoped_lock lm (_servers_mutex);
- _servers.clear ();
- }
- ServersListChanged ();
- _search_condition.notify_all ();
- }
-}
+++ /dev/null
-/*
- Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-/** @file src/lib/server_finder.h
- * @brief ServerFinder class.
- */
-
-#include "signaller.h"
-#include "server_description.h"
-#include "config.h"
-#include "exception_store.h"
-#include <boost/signals2.hpp>
-#include <boost/thread/condition.hpp>
-
-class Socket;
-
-class ServerFinder : public Signaller, public ExceptionStore
-{
-public:
- static ServerFinder* instance ();
- static void drop ();
-
- void disable () {
- _disabled = true;
- }
-
- bool disabled () const {
- return _disabled;
- }
-
- std::list<ServerDescription> servers () const;
-
- /** Emitted whenever the list of servers changes */
- boost::signals2::signal<void ()> ServersListChanged;
-
-private:
- ServerFinder ();
- ~ServerFinder ();
-
- void start ();
-
- void search_thread ();
- void listen_thread ();
-
- bool server_found (std::string) const;
- void start_accept ();
- void handle_accept (boost::system::error_code ec, boost::shared_ptr<Socket> socket);
-
- void config_changed (Config::Property what);
-
- bool _disabled;
-
- /** Thread to periodically issue broadcasts and requests to find encoding servers */
- boost::thread* _search_thread;
- /** Thread to listen to the responses from servers */
- boost::thread* _listen_thread;
-
- std::list<ServerDescription> _servers;
- mutable boost::mutex _servers_mutex;
-
- boost::asio::io_service _listen_io_service;
- boost::shared_ptr<boost::asio::ip::tcp::acceptor> _listen_acceptor;
- bool _stop;
-
- boost::condition _search_condition;
- boost::mutex _search_condition_mutex;
-
- static ServerFinder* _instance;
-};
dolby_cp750.cc
emailer.cc
encoder.cc
+ encode_server.cc
+ encode_server_finder.cc
encoded_log_entry.cc
environment_info.cc
examine_content_job.cc
screen_kdm.cc
send_kdm_email_job.cc
send_problem_report_job.cc
- server.cc
- server_finder.cc
single_stream_audio_content.cc
sndfile_base.cc
sndfile_content.cc
#include "lib/cinema.h"
#include "lib/screen_kdm.h"
#include "lib/send_kdm_email_job.h"
-#include "lib/server_finder.h"
+#include "lib/encode_server_finder.h"
#include "lib/update_checker.h"
#include "lib/cross.h"
#include "lib/content_factory.h"
void check ()
{
try {
- ServerFinder::instance()->rethrow ();
+ EncodeServerFinder::instance()->rethrow ();
} catch (exception& e) {
error_dialog (0, std_to_wx (e.what ()));
}
#include "lib/config.h"
#include "lib/log.h"
#include "lib/signal_manager.h"
-#include "lib/server_finder.h"
+#include "lib/encode_server_finder.h"
#include "lib/json_server.h"
#include "lib/ratio.h"
#include "lib/video_content.h"
{
while (true) {
int N = 0;
- list<ServerDescription> servers = ServerFinder::instance()->servers ();
+ list<EncodeServerDescription> servers = EncodeServerFinder::instance()->servers ();
if (Config::instance()->use_any_servers ()) {
if (servers.empty ()) {
} else {
cout << std::left << setw(24) << "Host" << " Threads\n";
++N;
- BOOST_FOREACH (ServerDescription const & i, servers) {
+ BOOST_FOREACH (EncodeServerDescription const & i, servers) {
cout << std::left << setw(24) << i.host_name() << " " << i.threads() << "\n";
++N;
}
BOOST_FOREACH (string const & i, Config::instance()->servers()) {
cout << std::left << setw(24) << i << " ";
optional<int> threads;
- BOOST_FOREACH (ServerDescription const & j, servers) {
+ BOOST_FOREACH (EncodeServerDescription const & j, servers) {
if (i == j.host_name()) {
threads = j.threads();
}
signal_manager = new SignalManager ();
if (no_remote) {
- ServerFinder::instance()->disable ();
+ EncodeServerFinder::instance()->disable ();
}
if (json_port) {
*/
JobManager::drop ();
- ServerFinder::drop ();
+ EncodeServerFinder::drop ();
return error ? EXIT_FAILURE : EXIT_SUCCESS;
}
#include "wx/wx_signal_manager.h"
#include "lib/util.h"
#include "lib/encoded_log_entry.h"
-#include "lib/server.h"
+#include "lib/encode_server.h"
#include "lib/config.h"
#include "lib/log.h"
#include "lib/raw_convert.h"
void main_thread ()
try {
- Server server (server_log, false);
+ EncodeServer server (server_log, false);
server.run (Config::instance()->num_local_encoding_threads ());
} catch (...) {
store_current ();
#include "lib/file_log.h"
#include "lib/null_log.h"
#include "lib/version.h"
-#include "lib/server.h"
+#include "lib/encode_server.h"
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/algorithm/string.hpp>
log.reset (new NullLog);
}
- Server server (log, verbose);
+ EncodeServer server (log, verbose);
try {
server.run (num_threads);
#include "lib/film.h"
#include "lib/filter.h"
#include "lib/util.h"
-#include "lib/server.h"
+#include "lib/encode_server.h"
#include "lib/dcp_video.h"
#include "lib/decoder.h"
#include "lib/exceptions.h"
#include "lib/video_decoder.h"
#include "lib/player.h"
#include "lib/player_video.h"
-#include "lib/server_description.h"
+#include "lib/encode_server_description.h"
#include <getopt.h>
#include <iostream>
#include <iomanip>
using dcp::Data;
static shared_ptr<Film> film;
-static ServerDescription* server;
+static EncodeServerDescription* server;
static shared_ptr<FileLog> log_ (new FileLog ("servomatictest.log"));
static int frame_count = 0;
dcpomatic_setup ();
try {
- server = new ServerDescription (server_host, 1);
+ server = new EncodeServerDescription (server_host, 1);
film.reset (new Film (film_dir));
film->read_metadata ();
*/
-#include "lib/server.h"
+#include "lib/encode_server.h"
#include "server_dialog.h"
#include "wx_util.h"
#include "servers_list_dialog.h"
#include "wx_util.h"
-#include "lib/server_finder.h"
-#include "lib/server_description.h"
+#include "lib/encode_server_finder.h"
+#include "lib/encode_server_description.h"
#include <boost/lexical_cast.hpp>
#include <boost/foreach.hpp>
s->Layout ();
s->SetSizeHints (this);
- _server_finder_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&ServersListDialog::servers_list_changed, this));
+ _server_finder_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
+ boost::bind (&ServersListDialog::servers_list_changed, this)
+ );
servers_list_changed ();
}
_list->DeleteAllItems ();
int n = 0;
- BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
+ BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
wxListItem list_item;
list_item.SetId (n);
_list->InsertItem (list_item);
*/
-#include "lib/server.h"
+#include "lib/encode_server.h"
#include <wx/wx.h>
#include <wx/listctrl.h>
#include <boost/signals2.hpp>
* @brief Test the server class.
*
* Create a test image and then encode it using the standard mechanism
- * and also using a Server object running on localhost. Compare the resulting
+ * and also using a EncodeServer object running on localhost. Compare the resulting
* encoded data to check that they are the same.
*/
-#include "lib/server.h"
+#include "lib/encode_server.h"
#include "lib/image.h"
#include "lib/cross.h"
#include "lib/dcp_video.h"
#include "lib/player_video.h"
#include "lib/raw_image_proxy.h"
#include "lib/j2k_image_proxy.h"
-#include "lib/server_description.h"
+#include "lib/encode_server_description.h"
#include "lib/file_log.h"
#include <boost/test/unit_test.hpp>
#include <boost/thread.hpp>
using dcp::Data;
void
-do_remote_encode (shared_ptr<DCPVideo> frame, ServerDescription description, Data locally_encoded)
+do_remote_encode (shared_ptr<DCPVideo> frame, EncodeServerDescription description, Data locally_encoded)
{
Data remotely_encoded;
BOOST_CHECK_NO_THROW (remotely_encoded = frame->encode_remotely (description, 60));
Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
- Server* server = new Server (log, true);
+ EncodeServer* server = new EncodeServer (log, true);
- new thread (boost::bind (&Server::run, server, 2));
+ new thread (boost::bind (&EncodeServer::run, server, 2));
/* Let the server get itself ready */
dcpomatic_sleep (1);
- ServerDescription description ("localhost", 2);
+ EncodeServerDescription description ("localhost", 2);
list<thread*> threads;
for (int i = 0; i < 8; ++i) {
Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
- Server* server = new Server (log, true);
+ EncodeServer* server = new EncodeServer (log, true);
- new thread (boost::bind (&Server::run, server, 2));
+ new thread (boost::bind (&EncodeServer::run, server, 2));
/* Let the server get itself ready */
dcpomatic_sleep (1);
- ServerDescription description ("localhost", 2);
+ EncodeServerDescription description ("localhost", 2);
list<thread*> threads;
for (int i = 0; i < 8; ++i) {
Data j2k_locally_encoded = j2k_frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
- Server* server = new Server (log, true);
+ EncodeServer* server = new EncodeServer (log, true);
- new thread (boost::bind (&Server::run, server, 2));
+ new thread (boost::bind (&EncodeServer::run, server, 2));
/* Let the server get itself ready */
dcpomatic_sleep (1);
- ServerDescription description ("localhost", 2);
+ EncodeServerDescription description ("localhost", 2);
list<thread*> threads;
for (int i = 0; i < 8; ++i) {
#include "lib/job_manager.h"
#include "lib/job.h"
#include "lib/cross.h"
-#include "lib/server_finder.h"
+#include "lib/encode_server_finder.h"
#include "lib/image.h"
#include "lib/ratio.h"
#include "lib/log_entry.h"
Config::instance()->set_default_j2k_bandwidth (100000000);
Config::instance()->set_log_types (LogEntry::TYPE_GENERAL | LogEntry::TYPE_WARNING | LogEntry::TYPE_ERROR);
- ServerFinder::instance()->disable ();
+ EncodeServerFinder::instance()->disable ();
signal_manager = new TestSignalManager ();
}