* encoding work, and a class to implement such a server.
*/
-#include <string>
-#include <vector>
-#include <iostream>
-#include <boost/algorithm/string.hpp>
-#include <boost/scoped_array.hpp>
-#include <libcxml/cxml.h>
-#include <libdcp/raw_convert.h>
#include "server.h"
-#include "util.h"
+#include "dcpomatic_socket.h"
#include "scaler.h"
#include "image.h"
-#include "dcp_video_frame.h"
+#include "dcp_video.h"
#include "config.h"
#include "cross.h"
-#include "player_video_frame.h"
+#include "player_video.h"
+#include "encoded_data.h"
#include "safe_stringstream.h"
+#include <dcp/raw_convert.h>
+#include <libcxml/cxml.h>
+#include <boost/algorithm/string.hpp>
+#include <boost/scoped_array.hpp>
+#include <string>
+#include <vector>
+#include <iostream>
#include "i18n.h"
using boost::bind;
using boost::scoped_array;
using boost::optional;
-using libdcp::Size;
-using libdcp::raw_convert;
+using dcp::Size;
+using dcp::raw_convert;
Server::Server (shared_ptr<Log> log, bool verbose)
- : _log (log)
+ : _terminate (false)
+ , _log (log)
, _verbose (verbose)
+ , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
{
}
+Server::~Server ()
+{
+ {
+ boost::mutex::scoped_lock lm (_worker_mutex);
+ _terminate = true;
+ _empty_condition.notify_all ();
+ }
+
+ for (vector<boost::thread*>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
+ (*i)->join ();
+ delete *i;
+ }
+
+ _io_service.stop ();
+
+ _broadcast.io_service.stop ();
+ _broadcast.thread->join ();
+}
+
/** @param after_read Filled in with gettimeofday() after reading the input from the network.
* @param after_encode Filled in with gettimeofday() after encoding the image.
*/
return -1;
}
- shared_ptr<PlayerVideoFrame> pvf (new PlayerVideoFrame (xml, socket, _log));
+ shared_ptr<PlayerVideo> pvf (new PlayerVideo (xml, socket));
- DCPVideoFrame dcp_video_frame (pvf, xml, _log);
+ DCPVideo dcp_video_frame (pvf, xml, _log);
gettimeofday (&after_read, 0);
- shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
+ shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally (boost::bind (&Log::dcp_log, _log.get(), _1, _2));
gettimeofday (&after_encode, 0);
{
while (true) {
boost::mutex::scoped_lock lock (_worker_mutex);
- while (_queue.empty ()) {
+ while (_queue.empty () && !_terminate) {
_empty_condition.wait (lock);
}
+ if (_terminate) {
+ return;
+ }
+
shared_ptr<Socket> socket = _queue.front ();
_queue.pop_front ();
_broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
- boost::asio::io_service io_service;
-
- boost::asio::ip::tcp::acceptor acceptor (
- io_service,
- boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base ())
- );
-
- while (true) {
- shared_ptr<Socket> socket (new Socket);
- acceptor.accept (socket->socket ());
-
- boost::mutex::scoped_lock lock (_worker_mutex);
-
- /* Wait until the queue has gone down a bit */
- while (int (_queue.size()) >= num_threads * 2) {
- _full_condition.wait (lock);
- }
-
- _queue.push_back (socket);
- _empty_condition.notify_all ();
- }
+ start_accept ();
+ _io_service.run ();
}
void
Server::broadcast_thread ()
try
{
- boost::asio::io_service io_service;
-
boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
- _broadcast.socket = new boost::asio::ip::udp::socket (io_service);
+ _broadcast.socket = new boost::asio::ip::udp::socket (_broadcast.io_service);
_broadcast.socket->open (listen_endpoint.protocol ());
_broadcast.socket->bind (listen_endpoint);
boost::bind (&Server::broadcast_received, this)
);
- io_service.run ();
+ _broadcast.io_service.run ();
}
catch (...)
{
_broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
);
}
+
+void
+Server::start_accept ()
+{
+ if (_terminate) {
+ return;
+ }
+
+ shared_ptr<Socket> socket (new Socket);
+ _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
+}
+
+void
+Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+{
+ if (error) {
+ return;
+ }
+
+ boost::mutex::scoped_lock lock (_worker_mutex);
+
+ /* Wait until the queue has gone down a bit */
+ while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
+ _full_condition.wait (lock);
+ }
+
+ _queue.push_back (socket);
+ _empty_condition.notify_all ();
+
+ start_accept ();
+}
+