summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2014-06-20 00:53:40 +0100
committerCarl Hetherington <cth@carlh.net>2014-06-20 00:53:40 +0100
commit0da7c88a1afb221f97e2e96c159b1a984e4e2f71 (patch)
tree558590d5ad0a2ced262c2142915b738d898ddbf0 /src/lib
parent4a677ee9e750efc330c29df69f19dae27f09aa4d (diff)
parentda38ed791a5a46a97b26947409cf837b7939fd54 (diff)
Merge master; fix destruction of Server; some test cleanups.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/config.h1
-rw-r--r--src/lib/encoder.h1
-rw-r--r--src/lib/image_content.cc2
-rw-r--r--src/lib/server.cc90
-rw-r--r--src/lib/server.h12
5 files changed, 77 insertions, 29 deletions
diff --git a/src/lib/config.h b/src/lib/config.h
index f0d2630d0..d82f52046 100644
--- a/src/lib/config.h
+++ b/src/lib/config.h
@@ -31,7 +31,6 @@
#include <dcp/metadata.h>
#include "isdcf_metadata.h"
#include "colour_conversion.h"
-#include "server.h"
class ServerDescription;
class Scaler;
diff --git a/src/lib/encoder.h b/src/lib/encoder.h
index ac1d74c57..678cdf04e 100644
--- a/src/lib/encoder.h
+++ b/src/lib/encoder.h
@@ -38,6 +38,7 @@ extern "C" {
#include "util.h"
#include "config.h"
#include "cross.h"
+#include "exceptions.h"
class Image;
class AudioBuffers;
diff --git a/src/lib/image_content.cc b/src/lib/image_content.cc
index 8909240dc..acaedf050 100644
--- a/src/lib/image_content.cc
+++ b/src/lib/image_content.cc
@@ -20,11 +20,11 @@
#include <libcxml/cxml.h>
#include "image_content.h"
#include "image_examiner.h"
-#include "config.h"
#include "compose.hpp"
#include "film.h"
#include "job.h"
#include "frame_rate_change.h"
+#include "exceptions.h"
#include "i18n.h"
diff --git a/src/lib/server.cc b/src/lib/server.cc
index 59364fadd..66ee2b0e3 100644
--- a/src/lib/server.cc
+++ b/src/lib/server.cc
@@ -66,12 +66,33 @@ using dcp::Size;
using dcp::raw_convert;
Server::Server (shared_ptr<Log> log, bool verbose)
- : _log (log)
+ : _terminate (false)
+ , _log (log)
, _verbose (verbose)
+ , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
{
}
+Server::~Server ()
+{
+ {
+ boost::mutex::scoped_lock lm (_worker_mutex);
+ _terminate = true;
+ _worker_condition.notify_all ();
+ }
+
+ for (vector<boost::thread*>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
+ (*i)->join ();
+ delete *i;
+ }
+
+ _io_service.stop ();
+
+ _broadcast.io_service.stop ();
+ _broadcast.thread->join ();
+}
+
/** @param after_read Filled in with gettimeofday() after reading the input from the network.
* @param after_encode Filled in with gettimeofday() after encoding the image.
*/
@@ -117,10 +138,14 @@ Server::worker_thread ()
{
while (1) {
boost::mutex::scoped_lock lock (_worker_mutex);
- while (_queue.empty ()) {
+ while (_queue.empty () && !_terminate) {
_worker_condition.wait (lock);
}
+ if (_terminate) {
+ return;
+ }
+
shared_ptr<Socket> socket = _queue.front ();
_queue.pop_front ();
@@ -187,39 +212,18 @@ Server::run (int num_threads)
_broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
- boost::asio::io_service io_service;
-
- boost::asio::ip::tcp::acceptor acceptor (
- io_service,
- boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base ())
- );
-
- while (1) {
- shared_ptr<Socket> socket (new Socket);
- acceptor.accept (socket->socket ());
-
- boost::mutex::scoped_lock lock (_worker_mutex);
-
- /* Wait until the queue has gone down a bit */
- while (int (_queue.size()) >= num_threads * 2) {
- _worker_condition.wait (lock);
- }
-
- _queue.push_back (socket);
- _worker_condition.notify_all ();
- }
+ start_accept ();
+ _io_service.run ();
}
void
Server::broadcast_thread ()
try
{
- boost::asio::io_service io_service;
-
boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
- _broadcast.socket = new boost::asio::ip::udp::socket (io_service);
+ _broadcast.socket = new boost::asio::ip::udp::socket (_broadcast.io_service);
_broadcast.socket->open (listen_endpoint.protocol ());
_broadcast.socket->bind (listen_endpoint);
@@ -229,7 +233,7 @@ try
boost::bind (&Server::broadcast_received, this)
);
- io_service.run ();
+ _broadcast.io_service.run ();
}
catch (...)
{
@@ -264,3 +268,35 @@ Server::broadcast_received ()
_broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
);
}
+
+void
+Server::start_accept ()
+{
+ if (_terminate) {
+ return;
+ }
+
+ shared_ptr<Socket> socket (new Socket);
+ _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
+}
+
+void
+Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+{
+ if (error) {
+ return;
+ }
+
+ boost::mutex::scoped_lock lock (_worker_mutex);
+
+ /* Wait until the queue has gone down a bit */
+ while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
+ _worker_condition.wait (lock);
+ }
+
+ _queue.push_back (socket);
+ _worker_condition.notify_all ();
+
+ start_accept ();
+}
+
diff --git a/src/lib/server.h b/src/lib/server.h
index a9b4b1c1c..9f3e99f9c 100644
--- a/src/lib/server.h
+++ b/src/lib/server.h
@@ -90,6 +90,7 @@ class Server : public ExceptionStore, public boost::noncopyable
{
public:
Server (boost::shared_ptr<Log> log, bool verbose);
+ ~Server ();
void run (int num_threads);
@@ -98,14 +99,24 @@ private:
int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
void broadcast_thread ();
void broadcast_received ();
+ void start_accept ();
+ void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
+
+ bool _terminate;
std::vector<boost::thread *> _worker_threads;
std::list<boost::shared_ptr<Socket> > _queue;
boost::mutex _worker_mutex;
boost::condition _worker_condition;
+
boost::shared_ptr<Log> _log;
bool _verbose;
+ boost::asio::io_service _io_service;
+ boost::asio::ip::tcp::acceptor _acceptor;
+
+ int _num_threads;
+
struct Broadcast {
Broadcast ()
@@ -117,6 +128,7 @@ private:
boost::asio::ip::udp::socket* socket;
char buffer[64];
boost::asio::ip::udp::endpoint send_endpoint;
+ boost::asio::io_service io_service;
} _broadcast;
};