summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2012-09-23 18:46:09 +0100
committerCarl Hetherington <cth@carlh.net>2012-09-23 18:46:09 +0100
commit1e53d56c1488cce07bc8aad7bcc76edd36c19961 (patch)
tree0d5de18847376090e7a653b5734b9b105bd43786 /src
parenta6e3a0589cd19ff11e3cdb5543b4a70b20d385ae (diff)
parent737c3392039740f7a22a9ff922f8492905173b9c (diff)
Fix merge.
Diffstat (limited to 'src')
-rw-r--r--src/lib/dcp_video_frame.cc23
-rw-r--r--src/lib/dcp_video_frame.h2
-rw-r--r--src/lib/server.cc18
-rw-r--r--src/lib/server.h8
-rw-r--r--src/lib/util.cc233
-rw-r--r--src/lib/util.h54
6 files changed, 207 insertions, 131 deletions
diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc
index b128f6fa0..d8af3462d 100644
--- a/src/lib/dcp_video_frame.cc
+++ b/src/lib/dcp_video_frame.cc
@@ -296,8 +296,9 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv)
asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
- socket->connect (*endpoint_iterator);
+ Socket socket;
+
+ socket.connect (*endpoint_iterator, 30);
#ifdef DEBUG_HASH
_input->hash ("Input for remote encoding (before sending)");
@@ -320,21 +321,19 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv)
s << _input->line_size()[i] << " ";
}
- asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
+ socket.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
for (int i = 0; i < _input->components(); ++i) {
- asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i)));
+ socket.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30);
}
- SocketReader reader (socket);
-
char buffer[32];
- reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
- reader.consume (strlen (buffer) + 1);
+ socket.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ socket.consume (strlen (buffer) + 1);
shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer)));
/* now read the rest */
- reader.read_definite_and_consume (e->data(), e->size());
+ socket.read_definite_and_consume (e->data(), e->size(), 30);
#ifdef DEBUG_HASH
e->hash ("Encoded image (after receiving)");
@@ -375,12 +374,12 @@ EncodedData::write (shared_ptr<const Options> opt, int frame)
* @param socket Socket
*/
void
-EncodedData::send (shared_ptr<asio::ip::tcp::socket> socket)
+EncodedData::send (shared_ptr<Socket> socket)
{
stringstream s;
s << _size;
- asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
- asio::write (*socket, asio::buffer (_data, _size));
+ socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
+ socket->write (_data, _size, 30);
}
#ifdef DEBUG_HASH
diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h
index ee54bc0f5..da4e0c301 100644
--- a/src/lib/dcp_video_frame.h
+++ b/src/lib/dcp_video_frame.h
@@ -48,7 +48,7 @@ public:
virtual ~EncodedData () {}
- void send (boost::shared_ptr<boost::asio::ip::tcp::socket>);
+ void send (boost::shared_ptr<Socket> socket);
void write (boost::shared_ptr<const Options>, int);
#ifdef DEBUG_HASH
diff --git a/src/lib/server.cc b/src/lib/server.cc
index f4e894558..8ca426049 100644
--- a/src/lib/server.cc
+++ b/src/lib/server.cc
@@ -70,13 +70,11 @@ Server::Server (Log* log)
}
int
-Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+Server::process (shared_ptr<Socket> socket)
{
- SocketReader reader (socket);
-
char buffer[128];
- reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
- reader.consume (strlen (buffer) + 1);
+ socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ socket->consume (strlen (buffer) + 1);
stringstream s (buffer);
@@ -123,7 +121,7 @@ Server::process (shared_ptr<asio::ip::tcp::socket> socket)
}
for (int i = 0; i < image->components(); ++i) {
- reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+ socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
}
#ifdef DEBUG_HASH
@@ -150,7 +148,7 @@ Server::worker_thread ()
_worker_condition.wait (lock);
}
- shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+ shared_ptr<Socket> socket = _queue.front ();
_queue.pop_front ();
lock.unlock ();
@@ -192,12 +190,12 @@ Server::run (int num_threads)
for (int i = 0; i < num_threads; ++i) {
_worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
}
-
+
asio::io_service io_service;
asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
while (1) {
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
- acceptor.accept (*socket);
+ shared_ptr<Socket> socket (new Socket);
+ acceptor.accept (socket->socket ());
mutex::scoped_lock lock (_worker_mutex);
diff --git a/src/lib/server.h b/src/lib/server.h
index fac440a76..32ba8dc4b 100644
--- a/src/lib/server.h
+++ b/src/lib/server.h
@@ -28,6 +28,8 @@
#include <boost/thread/condition.hpp>
#include "log.h"
+class Socket;
+
/** @class ServerDescription
* @brief Class to describe a server to which we can send encoding work.
*/
@@ -80,10 +82,10 @@ public:
private:
void worker_thread ();
- int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
-
+ int process (boost::shared_ptr<Socket> socket);
+
std::vector<boost::thread *> _worker_threads;
- std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
+ std::list<boost::shared_ptr<Socket> > _queue;
boost::mutex _worker_mutex;
boost::condition _worker_condition;
Log* _log;
diff --git a/src/lib/util.cc b/src/lib/util.cc
index cbf8decb9..d12bd3e77 100644
--- a/src/lib/util.cc
+++ b/src/lib/util.cc
@@ -33,6 +33,8 @@
#include <libssh/libssh.h>
#include <signal.h>
#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+#include <boost/lambda/lambda.hpp>
#include <openjpeg.h>
#include <openssl/md5.h>
#include <magick/MagickCore.h>
@@ -286,88 +288,6 @@ seconds (struct timeval t)
return t.tv_sec + (double (t.tv_usec) / 1e6);
}
-/** @param socket Socket to read from */
-SocketReader::SocketReader (shared_ptr<asio::ip::tcp::socket> socket)
- : _socket (socket)
- , _buffer_data (0)
-{
-
-}
-
-/** Mark some data as being `consumed', so that it will not be returned
- * as data again.
- * @param size Amount of data to consume, in bytes.
- */
-void
-SocketReader::consume (int size)
-{
- assert (_buffer_data >= size);
-
- _buffer_data -= size;
- if (_buffer_data > 0) {
- /* Shift still-valid data to the start of the buffer */
- memmove (_buffer, _buffer + size, _buffer_data);
- }
-}
-
-/** Read a definite amount of data from our socket, and mark
- * it as consumed.
- * @param data Where to put the data.
- * @param size Number of bytes to read.
- */
-void
-SocketReader::read_definite_and_consume (uint8_t* data, int size)
-{
- int const from_buffer = min (_buffer_data, size);
- if (from_buffer > 0) {
- /* Get data from our buffer */
- memcpy (data, _buffer, from_buffer);
- consume (from_buffer);
- /* Update our output state */
- data += from_buffer;
- size -= from_buffer;
- }
-
- /* read() the rest */
- while (size > 0) {
- int const n = asio::read (*_socket, asio::buffer (data, size));
- if (n <= 0) {
- throw NetworkError ("could not read");
- }
-
- data += n;
- size -= n;
- }
-}
-
-/** Read as much data as is available, up to some limit.
- * @param data Where to put the data.
- * @param size Maximum amount of data to read.
- */
-void
-SocketReader::read_indefinite (uint8_t* data, int size)
-{
- assert (size < int (sizeof (_buffer)));
-
- /* Amount of extra data we need to read () */
- int to_read = size - _buffer_data;
- while (to_read > 0) {
- /* read as much of it as we can (into our buffer) */
- int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read));
- if (n <= 0) {
- throw NetworkError ("could not read");
- }
-
- to_read -= n;
- _buffer_data += n;
- }
-
- assert (_buffer_data >= size);
-
- /* copy data into the output buffer */
- assert (size >= _buffer_data);
- memcpy (data, _buffer, size);
-}
#ifdef DVDOMATIC_POSIX
void
@@ -517,3 +437,152 @@ colour_lut_index_to_name (int index)
assert (false);
return "";
}
+
+Socket::Socket ()
+ : _deadline (_io_service)
+ , _socket (_io_service)
+ , _buffer_data (0)
+{
+ _deadline.expires_at (posix_time::pos_infin);
+ check ();
+}
+
+void
+Socket::check ()
+{
+ if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
+ _socket.close ();
+ _deadline.expires_at (posix_time::pos_infin);
+ }
+
+ _deadline.async_wait (boost::bind (&Socket::check, this));
+}
+
+void
+Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
+{
+ system::error_code ec = asio::error::would_block;
+ _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1);
+ do {
+ _io_service.run_one();
+ } while (ec == asio::error::would_block);
+
+ if (ec || !_socket.is_open ()) {
+ throw NetworkError ("connect timed out");
+ }
+}
+
+void
+Socket::write (uint8_t const * data, int size, int timeout)
+{
+ _deadline.expires_from_now (posix_time::seconds (timeout));
+ system::error_code ec = asio::error::would_block;
+
+ asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
+ do {
+ _io_service.run_one ();
+ } while (ec == asio::error::would_block);
+
+ if (ec) {
+ throw NetworkError ("write timed out");
+ }
+}
+
+int
+Socket::read (uint8_t* data, int size, int timeout)
+{
+ _deadline.expires_from_now (posix_time::seconds (timeout));
+ system::error_code ec = asio::error::would_block;
+
+ int amount_read = 0;
+
+ _socket.async_read_some (
+ asio::buffer (data, size),
+ (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2)
+ );
+
+ do {
+ _io_service.run_one ();
+ } while (ec == asio::error::would_block);
+
+ if (ec) {
+ amount_read = 0;
+ }
+
+ return amount_read;
+}
+
+/** Mark some data as being `consumed', so that it will not be returned
+ * as data again.
+ * @param size Amount of data to consume, in bytes.
+ */
+void
+Socket::consume (int size)
+{
+ assert (_buffer_data >= size);
+
+ _buffer_data -= size;
+ if (_buffer_data > 0) {
+ /* Shift still-valid data to the start of the buffer */
+ memmove (_buffer, _buffer + size, _buffer_data);
+ }
+}
+
+/** Read a definite amount of data from our socket, and mark
+ * it as consumed.
+ * @param data Where to put the data.
+ * @param size Number of bytes to read.
+ */
+void
+Socket::read_definite_and_consume (uint8_t* data, int size, int timeout)
+{
+ int const from_buffer = min (_buffer_data, size);
+ if (from_buffer > 0) {
+ /* Get data from our buffer */
+ memcpy (data, _buffer, from_buffer);
+ consume (from_buffer);
+ /* Update our output state */
+ data += from_buffer;
+ size -= from_buffer;
+ }
+
+ /* read() the rest */
+ while (size > 0) {
+ int const n = read (data, size, timeout);
+ if (n <= 0) {
+ throw NetworkError ("could not read");
+ }
+
+ data += n;
+ size -= n;
+ }
+}
+
+/** Read as much data as is available, up to some limit.
+ * @param data Where to put the data.
+ * @param size Maximum amount of data to read.
+ */
+void
+Socket::read_indefinite (uint8_t* data, int size, int timeout)
+{
+ assert (size < int (sizeof (_buffer)));
+
+ /* Amount of extra data we need to read () */
+ int to_read = size - _buffer_data;
+ while (to_read > 0) {
+ /* read as much of it as we can (into our buffer) */
+ int const n = read (_buffer + _buffer_data, to_read, timeout);
+ if (n <= 0) {
+ throw NetworkError ("could not read");
+ }
+
+ to_read -= n;
+ _buffer_data += n;
+ }
+
+ assert (_buffer_data >= size);
+
+ /* copy data into the output buffer */
+ assert (size >= _buffer_data);
+ memcpy (data, _buffer, size);
+}
diff --git a/src/lib/util.h b/src/lib/util.h
index 568fe05d0..d7f233003 100644
--- a/src/lib/util.h
+++ b/src/lib/util.h
@@ -56,29 +56,6 @@ enum ContentType {
extern void md5_data (std::string, void const *, int);
#endif
-/** @class SocketReader
- * @brief A helper class from reading from sockets.
- *
- * You can probably do this stuff directly in boost, but I'm not sure how.
- */
-class SocketReader
-{
-public:
- SocketReader (boost::shared_ptr<boost::asio::ip::tcp::socket>);
-
- void read_definite_and_consume (uint8_t *, int);
- void read_indefinite (uint8_t *, int);
- void consume (int);
-
-private:
- /** socket we are reading from */
- boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
- /** a buffer for small reads */
- uint8_t _buffer[256];
- /** amount of valid data in the buffer */
- int _buffer_data;
-};
-
/** @class Size
* @brief Representation of the size of something */
struct Size
@@ -136,4 +113,35 @@ extern std::string crop_string (Position, Size);
extern int dcp_audio_sample_rate (int);
extern std::string colour_lut_index_to_name (int index);
+class Socket
+{
+public:
+ Socket ();
+
+ boost::asio::ip::tcp::socket& socket () {
+ return _socket;
+ }
+
+ void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
+ void write (uint8_t const * data, int size, int timeout);
+
+ void read_definite_and_consume (uint8_t* data, int size, int timeout);
+ void read_indefinite (uint8_t* data, int size, int timeout);
+ void consume (int amount);
+
+private:
+ void check ();
+ int read (uint8_t* data, int size, int timeout);
+
+ Socket (Socket const &);
+
+ boost::asio::io_service _io_service;
+ boost::asio::deadline_timer _deadline;
+ boost::asio::ip::tcp::socket _socket;
+ /** a buffer for small reads */
+ uint8_t _buffer[256];
+ /** amount of valid data in the buffer */
+ int _buffer_data;
+};
+
#endif