{
asio::io_service io_service;
asio::ip::tcp::resolver resolver (io_service);
+
asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
- socket->connect (*endpoint_iterator);
+
+ DeadlineWrapper wrapper (io_service);
+ wrapper.set_socket (socket);
+
+ wrapper.connect (*endpoint_iterator, 30);
#ifdef DEBUG_HASH
_input->hash ("Input for remote encoding (before sending)");
s << _input->line_size()[i] << " ";
}
- asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
+ wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 10);
for (int i = 0; i < _input->components(); ++i) {
- asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i)));
+ wrapper.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 10);
}
- SocketReader reader (socket);
-
char buffer[32];
- reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
- reader.consume (strlen (buffer) + 1);
+ wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ wrapper.consume (strlen (buffer) + 1);
shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer)));
/* now read the rest */
- reader.read_definite_and_consume (e->data(), e->size());
+ wrapper.read_definite_and_consume (e->data(), e->size(), 30);
#ifdef DEBUG_HASH
e->hash ("Encoded image (after receiving)");
int
Server::process (shared_ptr<asio::ip::tcp::socket> socket)
{
- SocketReader reader (socket);
+ DeadlineWrapper wrapper (_io_service);
+ wrapper.set_socket (socket);
char buffer[128];
- reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
- reader.consume (strlen (buffer) + 1);
+ wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ wrapper.consume (strlen (buffer) + 1);
stringstream s (buffer);
}
for (int i = 0; i < image->components(); ++i) {
- reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+ wrapper.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
}
#ifdef DEBUG_HASH
_worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
}
- asio::io_service io_service;
- asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+ asio::ip::tcp::acceptor acceptor (_io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
while (1) {
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
+ shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (_io_service));
acceptor.accept (*socket);
mutex::scoped_lock lock (_worker_mutex);
#include <libssh/libssh.h>
#include <signal.h>
#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+#include <boost/lambda/lambda.hpp>
#include <openjpeg.h>
#include <openssl/md5.h>
#include <magick/MagickCore.h>
return t.tv_sec + (double (t.tv_usec) / 1e6);
}
-/** @param socket Socket to read from */
-SocketReader::SocketReader (shared_ptr<asio::ip::tcp::socket> socket)
- : _socket (socket)
- , _buffer_data (0)
-{
-
-}
-
-/** Mark some data as being `consumed', so that it will not be returned
- * as data again.
- * @param size Amount of data to consume, in bytes.
- */
-void
-SocketReader::consume (int size)
-{
- assert (_buffer_data >= size);
-
- _buffer_data -= size;
- if (_buffer_data > 0) {
- /* Shift still-valid data to the start of the buffer */
- memmove (_buffer, _buffer + size, _buffer_data);
- }
-}
-
-/** Read a definite amount of data from our socket, and mark
- * it as consumed.
- * @param data Where to put the data.
- * @param size Number of bytes to read.
- */
-void
-SocketReader::read_definite_and_consume (uint8_t* data, int size)
-{
- int const from_buffer = min (_buffer_data, size);
- if (from_buffer > 0) {
- /* Get data from our buffer */
- memcpy (data, _buffer, from_buffer);
- consume (from_buffer);
- /* Update our output state */
- data += from_buffer;
- size -= from_buffer;
- }
-
- /* read() the rest */
- while (size > 0) {
- int const n = asio::read (*_socket, asio::buffer (data, size));
- if (n <= 0) {
- throw NetworkError ("could not read");
- }
-
- data += n;
- size -= n;
- }
-}
-
-/** Read as much data as is available, up to some limit.
- * @param data Where to put the data.
- * @param size Maximum amount of data to read.
- */
-void
-SocketReader::read_indefinite (uint8_t* data, int size)
-{
- assert (size < int (sizeof (_buffer)));
-
- /* Amount of extra data we need to read () */
- int to_read = size - _buffer_data;
- while (to_read > 0) {
- /* read as much of it as we can (into our buffer) */
- int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read));
- if (n <= 0) {
- throw NetworkError ("could not read");
- }
-
- to_read -= n;
- _buffer_data += n;
- }
-
- assert (_buffer_data >= size);
-
- /* copy data into the output buffer */
- assert (size >= _buffer_data);
- memcpy (data, _buffer, size);
-}
#ifdef DVDOMATIC_POSIX
void
return "";
}
+DeadlineWrapper::DeadlineWrapper (asio::io_service& io_service)
+ : _io_service (io_service)
+ , _deadline (io_service)
+ , _buffer_data (0)
+{
+ _deadline.expires_at (posix_time::pos_infin);
+ check ();
+}
+
+void
+DeadlineWrapper::set_socket (shared_ptr<asio::ip::tcp::socket> socket)
+{
+ _socket = socket;
+}
+
+void
+DeadlineWrapper::check ()
+{
+ if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
+ if (_socket) {
+ _socket->close ();
+ }
+ _deadline.expires_at (posix_time::pos_infin);
+ }
+
+ _deadline.async_wait (boost::bind (&DeadlineWrapper::check, this));
+}
+
+void
+DeadlineWrapper::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
+{
+ assert (_socket);
+
+ system::error_code ec = asio::error::would_block;
+ _socket->async_connect (endpoint, lambda::var(ec) = lambda::_1);
+ do {
+ _io_service.run_one();
+ } while (ec == asio::error::would_block);
+
+ if (ec || !_socket->is_open ()) {
+ throw NetworkError ("connect timed out");
+ }
+}
+
+void
+DeadlineWrapper::write (uint8_t const * data, int size, int timeout)
+{
+ assert (_socket);
+
+ _deadline.expires_from_now (posix_time::seconds (timeout));
+ system::error_code ec = asio::error::would_block;
+
+ asio::async_write (*_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
+ do {
+ _io_service.run_one ();
+ } while (ec == asio::error::would_block);
+
+ if (ec) {
+ throw NetworkError ("write timed out");
+ }
+}
+
int
-read_with_timeout (boost::asio::ip::tcp::socket* socket, uint8_t* data, int size)
+DeadlineWrapper::read (uint8_t* data, int size, int timeout)
{
+ assert (_socket);
+
+ _deadline.expires_from_now (posix_time::seconds (timeout));
+ system::error_code ec = asio::error::would_block;
+
+ int amount_read = 0;
+
+ _socket->async_read_some (
+ asio::buffer (data, size),
+ (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2)
+ );
+
+ _io_service.run ();
+
+ if (ec) {
+ amount_read = 0;
+ }
+
+ return amount_read;
+}
+
+/** Mark some data as being `consumed', so that it will not be returned
+ * as data again.
+ * @param size Amount of data to consume, in bytes.
+ */
+void
+DeadlineWrapper::consume (int size)
+{
+ assert (_buffer_data >= size);
- return asio::read (socket, asio::buffer (data, size));
+ _buffer_data -= size;
+ if (_buffer_data > 0) {
+ /* Shift still-valid data to the start of the buffer */
+ memmove (_buffer, _buffer + size, _buffer_data);
+ }
+}
+
+/** Read a definite amount of data from our socket, and mark
+ * it as consumed.
+ * @param data Where to put the data.
+ * @param size Number of bytes to read.
+ */
+void
+DeadlineWrapper::read_definite_and_consume (uint8_t* data, int size, int timeout)
+{
+ int const from_buffer = min (_buffer_data, size);
+ if (from_buffer > 0) {
+ /* Get data from our buffer */
+ memcpy (data, _buffer, from_buffer);
+ consume (from_buffer);
+ /* Update our output state */
+ data += from_buffer;
+ size -= from_buffer;
+ }
+
+ /* read() the rest */
+ while (size > 0) {
+ int const n = read (data, size, timeout);
+ if (n <= 0) {
+ throw NetworkError ("could not read");
+ }
+
+ data += n;
+ size -= n;
+ }
+}
+
+/** Read as much data as is available, up to some limit.
+ * @param data Where to put the data.
+ * @param size Maximum amount of data to read.
+ */
+void
+DeadlineWrapper::read_indefinite (uint8_t* data, int size, int timeout)
+{
+ assert (size < int (sizeof (_buffer)));
+
+ /* Amount of extra data we need to read () */
+ int to_read = size - _buffer_data;
+ while (to_read > 0) {
+ /* read as much of it as we can (into our buffer) */
+ int const n = read (_buffer + _buffer_data, to_read, timeout);
+ if (n <= 0) {
+ throw NetworkError ("could not read");
+ }
+
+ to_read -= n;
+ _buffer_data += n;
+ }
+
+ assert (_buffer_data >= size);
+
+ /* copy data into the output buffer */
+ assert (size >= _buffer_data);
+ memcpy (data, _buffer, size);
}
extern void md5_data (std::string, void const *, int);
#endif
-/** @class SocketReader
- * @brief A helper class from reading from sockets.
- *
- * You can probably do this stuff directly in boost, but I'm not sure how.
- */
-class SocketReader
-{
-public:
- SocketReader (boost::shared_ptr<boost::asio::ip::tcp::socket>);
-
- void read_definite_and_consume (uint8_t *, int);
- void read_indefinite (uint8_t *, int);
- void consume (int);
-
-private:
- /** socket we are reading from */
- boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
- /** a buffer for small reads */
- uint8_t _buffer[256];
- /** amount of valid data in the buffer */
- int _buffer_data;
-};
-
/** @class Size
* @brief Representation of the size of something */
struct Size
extern int dcp_audio_sample_rate (int);
extern std::string colour_lut_index_to_name (int index);
+class DeadlineWrapper
+{
+public:
+ DeadlineWrapper (boost::asio::io_service& io_service);
+
+ void set_socket (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+
+ void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
+ void write (uint8_t const * data, int size, int timeout);
+ int read (uint8_t* data, int size, int timeout);
+
+ void read_definite_and_consume (uint8_t* data, int size, int timeout);
+ void read_indefinite (uint8_t* data, int size, int timeout);
+ void consume (int amount);
+
+private:
+ void check ();
+
+ boost::asio::io_service& _io_service;
+ boost::asio::deadline_timer _deadline;
+ boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
+ /** a buffer for small reads */
+ uint8_t _buffer[256];
+ /** amount of valid data in the buffer */
+ int _buffer_data;
+};
+
#endif