summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2015-12-06 20:17:47 +0000
committerCarl Hetherington <cth@carlh.net>2015-12-11 11:56:23 +0000
commit6fa353595ce8f784b7d5004a6c38c78bddae94c7 (patch)
treed50c1273d519d97f3c4ad5e6ca4532f3ecbaa6d9 /src/lib
parent17df947ac256397311a11894062070f8069c7e75 (diff)
Split EncodeServer into that and Server.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/encode_server.cc45
-rw-r--r--src/lib/encode_server.h19
-rw-r--r--src/lib/exception_store.h6
-rw-r--r--src/lib/server.cc71
-rw-r--r--src/lib/server.h52
-rw-r--r--src/lib/wscript1
6 files changed, 146 insertions, 48 deletions
diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc
index 3f30a361a..6560bcfec 100644
--- a/src/lib/encode_server.cc
+++ b/src/lib/encode_server.cc
@@ -65,11 +65,11 @@ using boost::optional;
using dcp::Size;
using dcp::Data;
-EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
- : _terminate (false)
+EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose, int num_threads)
+ : Server (Config::instance()->server_port_base())
, _log (log)
, _verbose (verbose)
- , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
+ , _num_threads (num_threads)
{
}
@@ -77,8 +77,7 @@ EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
EncodeServer::~EncodeServer ()
{
{
- boost::mutex::scoped_lock lm (_worker_mutex);
- _terminate = true;
+ boost::mutex::scoped_lock lm (_mutex);
_empty_condition.notify_all ();
_full_condition.notify_all ();
}
@@ -89,8 +88,6 @@ EncodeServer::~EncodeServer ()
delete i;
}
- _io_service.stop ();
-
_broadcast.io_service.stop ();
if (_broadcast.thread) {
DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
@@ -146,7 +143,7 @@ void
EncodeServer::worker_thread ()
{
while (true) {
- boost::mutex::scoped_lock lock (_worker_mutex);
+ boost::mutex::scoped_lock lock (_mutex);
while (_queue.empty () && !_terminate) {
_empty_condition.wait (lock);
}
@@ -209,21 +206,20 @@ EncodeServer::worker_thread ()
}
void
-EncodeServer::run (int num_threads)
+EncodeServer::run ()
{
- LOG_GENERAL ("Server starting with %1 threads", num_threads);
+ LOG_GENERAL ("Server starting with %1 threads", _num_threads);
if (_verbose) {
- cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
+ cout << "DCP-o-matic server starting with " << _num_threads << " threads.\n";
}
- for (int i = 0; i < num_threads; ++i) {
+ for (int i = 0; i < _num_threads; ++i) {
_worker_threads.push_back (new thread (bind (&EncodeServer::worker_thread, this)));
}
_broadcast.thread = new thread (bind (&EncodeServer::broadcast_thread, this));
- start_accept ();
- _io_service.run ();
+ Server::run ();
}
void
@@ -283,24 +279,9 @@ EncodeServer::broadcast_received ()
}
void
-EncodeServer::start_accept ()
-{
- if (_terminate) {
- return;
- }
-
- shared_ptr<Socket> socket (new Socket);
- _acceptor.async_accept (socket->socket (), boost::bind (&EncodeServer::handle_accept, this, socket, boost::asio::placeholders::error));
-}
-
-void
-EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+EncodeServer::handle (shared_ptr<Socket> socket)
{
- if (error) {
- return;
- }
-
- boost::mutex::scoped_lock lock (_worker_mutex);
+ boost::mutex::scoped_lock lock (_mutex);
/* Wait until the queue has gone down a bit */
while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
@@ -309,6 +290,4 @@ EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_cod
_queue.push_back (socket);
_empty_condition.notify_all ();
-
- start_accept ();
}
diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h
index f6f1fc9b9..d0c061eb3 100644
--- a/src/lib/encode_server.h
+++ b/src/lib/encode_server.h
@@ -21,9 +21,10 @@
#define DCPOMATIC_ENCODE_SERVER_H
/** @file src/encode_server.h
- * @brief Server class.
+ * @brief EncodeServer class.
*/
+#include "server.h"
#include "exception_store.h"
#include <boost/thread.hpp>
#include <boost/asio.hpp>
@@ -37,34 +38,28 @@ class Log;
* @brief A class to run a server which can accept requests to perform JPEG2000
* encoding work.
*/
-class EncodeServer : public ExceptionStore, public boost::noncopyable
+class EncodeServer : public Server, public ExceptionStore
{
public:
- EncodeServer (boost::shared_ptr<Log> log, bool verbose);
+ EncodeServer (boost::shared_ptr<Log> log, bool verbose, int num_threads);
~EncodeServer ();
- void run (int num_threads);
+ void run ();
private:
+ void handle (boost::shared_ptr<Socket>);
void worker_thread ();
int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
void broadcast_thread ();
void broadcast_received ();
- void start_accept ();
- void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
-
- bool _terminate;
std::vector<boost::thread *> _worker_threads;
std::list<boost::shared_ptr<Socket> > _queue;
- boost::mutex _worker_mutex;
boost::condition _full_condition;
boost::condition _empty_condition;
boost::shared_ptr<Log> _log;
bool _verbose;
-
- boost::asio::io_service _io_service;
- boost::asio::ip::tcp::acceptor _acceptor;
+ int _num_threads;
struct Broadcast {
diff --git a/src/lib/exception_store.h b/src/lib/exception_store.h
index d7c34c0f4..de0bed029 100644
--- a/src/lib/exception_store.h
+++ b/src/lib/exception_store.h
@@ -46,7 +46,7 @@ class ExceptionStore
{
public:
void rethrow () {
- boost::mutex::scoped_lock lm (_mutex);
+ boost::mutex::scoped_lock lm (_exception_mutex);
if (_exception) {
boost::exception_ptr tmp = _exception;
_exception = boost::exception_ptr ();
@@ -57,13 +57,13 @@ public:
protected:
void store_current () {
- boost::mutex::scoped_lock lm (_mutex);
+ boost::mutex::scoped_lock lm (_exception_mutex);
_exception = boost::current_exception ();
}
private:
boost::exception_ptr _exception;
- mutable boost::mutex _mutex;
+ mutable boost::mutex _exception_mutex;
};
#endif
diff --git a/src/lib/server.cc b/src/lib/server.cc
new file mode 100644
index 000000000..f137d07f8
--- /dev/null
+++ b/src/lib/server.cc
@@ -0,0 +1,71 @@
+/*
+ Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include "server.h"
+#include "dcpomatic_socket.h"
+
+#include "i18n.h"
+
+using boost::shared_ptr;
+
+Server::Server (int port)
+ : _terminate (false)
+ , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port))
+{
+
+}
+
+Server::~Server ()
+{
+ boost::mutex::scoped_lock lm (_mutex);
+ _terminate = true;
+ _io_service.stop ();
+}
+
+void
+Server::run ()
+{
+ start_accept ();
+ _io_service.run ();
+}
+
+void
+Server::start_accept ()
+{
+ {
+ boost::mutex::scoped_lock lm (_mutex);
+ if (_terminate) {
+ return;
+ }
+ }
+
+ shared_ptr<Socket> socket (new Socket);
+ _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
+}
+
+void
+Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+{
+ if (error) {
+ return;
+ }
+
+ handle (socket);
+ start_accept ();
+}
diff --git a/src/lib/server.h b/src/lib/server.h
new file mode 100644
index 000000000..3e7c7945a
--- /dev/null
+++ b/src/lib/server.h
@@ -0,0 +1,52 @@
+/*
+ Copyright (C) 2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#ifndef DCPOMATIC_SERVER_H
+#define DCPOMATIC_SERVER_H
+
+#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread/condition.hpp>
+#include <string>
+
+class Socket;
+
+class Server : public boost::noncopyable
+{
+public:
+ Server (int port);
+ virtual ~Server ();
+
+ virtual void run ();
+
+protected:
+ boost::mutex _mutex;
+ bool _terminate;
+
+private:
+ virtual void handle (boost::shared_ptr<Socket> socket) = 0;
+
+ void start_accept ();
+ void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
+
+ boost::asio::io_service _io_service;
+ boost::asio::ip::tcp::acceptor _acceptor;
+};
+
+#endif
diff --git a/src/lib/wscript b/src/lib/wscript
index 0f0b0c14f..f61445685 100644
--- a/src/lib/wscript
+++ b/src/lib/wscript
@@ -109,6 +109,7 @@ sources = """
screen_kdm.cc
send_kdm_email_job.cc
send_problem_report_job.cc
+ server.cc
single_stream_audio_content.cc
sndfile_base.cc
sndfile_content.cc