Untested first cut.
authorCarl Hetherington <cth@carlh.net>
Sun, 23 Sep 2012 13:02:01 +0000 (14:02 +0100)
committerCarl Hetherington <cth@carlh.net>
Sun, 23 Sep 2012 13:02:01 +0000 (14:02 +0100)
src/lib/dcp_video_frame.cc
src/lib/server.cc
src/lib/server.h
src/lib/util.cc
src/lib/util.h

index b128f6fa068d629f603fd92f0c60f3cc7f4384e3..ade615bfb113056220e7fed5681e19def30f5008 100644 (file)
@@ -293,11 +293,16 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv)
 {
        asio::io_service io_service;
        asio::ip::tcp::resolver resolver (io_service);
+
        asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
        asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
 
        shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
-       socket->connect (*endpoint_iterator);
+
+       DeadlineWrapper wrapper (io_service);
+       wrapper.set_socket (socket);
+
+       wrapper.connect (*endpoint_iterator, 30);
 
 #ifdef DEBUG_HASH
        _input->hash ("Input for remote encoding (before sending)");
@@ -320,21 +325,19 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv)
                s << _input->line_size()[i] << " ";
        }
 
-       asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
+       wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 10);
 
        for (int i = 0; i < _input->components(); ++i) {
-               asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i)));
+               wrapper.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 10);
        }
 
-       SocketReader reader (socket);
-
        char buffer[32];
-       reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
-       reader.consume (strlen (buffer) + 1);
+       wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+       wrapper.consume (strlen (buffer) + 1);
        shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer)));
 
        /* now read the rest */
-       reader.read_definite_and_consume (e->data(), e->size());
+       wrapper.read_definite_and_consume (e->data(), e->size(), 30);
 
 #ifdef DEBUG_HASH
        e->hash ("Encoded image (after receiving)");
index a627634471e2af032a9c668c86aa88bdff550fbb..395786b67ff0cc18c05e69f3130265f602da309b 100644 (file)
@@ -72,11 +72,12 @@ Server::Server (Log* log)
 int
 Server::process (shared_ptr<asio::ip::tcp::socket> socket)
 {
-       SocketReader reader (socket);
+       DeadlineWrapper wrapper (_io_service);
+       wrapper.set_socket (socket);
        
        char buffer[128];
-       reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
-       reader.consume (strlen (buffer) + 1);
+       wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+       wrapper.consume (strlen (buffer) + 1);
        
        stringstream s (buffer);
        
@@ -123,7 +124,7 @@ Server::process (shared_ptr<asio::ip::tcp::socket> socket)
        }
        
        for (int i = 0; i < image->components(); ++i) {
-               reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+               wrapper.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
        }
        
 #ifdef DEBUG_HASH
@@ -189,10 +190,9 @@ Server::run (int num_threads)
                _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
        }
        
-       asio::io_service io_service;
-       asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+       asio::ip::tcp::acceptor acceptor (_io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
        while (1) {
-               shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
+               shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (_io_service));
                acceptor.accept (*socket);
 
                mutex::scoped_lock lock (_worker_mutex);
index fac440a76134e3f90a308a2bea7fc8c7c458e7a3..7470814431e1c53822d8fd18e443006885506b4e 100644 (file)
@@ -81,7 +81,8 @@ public:
 private:
        void worker_thread ();
        int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
-       
+
+       boost::asio::io_service _io_service;
        std::vector<boost::thread *> _worker_threads;
        std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
        boost::mutex _worker_mutex;
index e79c7cd1cb6d1c1cb2ae412f3f1c3a4823296469..deab5d6396af1e26e0341a0ab03abe2b6c984088 100644 (file)
@@ -33,6 +33,8 @@
 #include <libssh/libssh.h>
 #include <signal.h>
 #include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+#include <boost/lambda/lambda.hpp>
 #include <openjpeg.h>
 #include <openssl/md5.h>
 #include <magick/MagickCore.h>
@@ -286,88 +288,6 @@ seconds (struct timeval t)
        return t.tv_sec + (double (t.tv_usec) / 1e6);
 }
 
-/** @param socket Socket to read from */
-SocketReader::SocketReader (shared_ptr<asio::ip::tcp::socket> socket)
-       : _socket (socket)
-       , _buffer_data (0)
-{
-
-}
-
-/** Mark some data as being `consumed', so that it will not be returned
- *  as data again.
- *  @param size Amount of data to consume, in bytes.
- */
-void
-SocketReader::consume (int size)
-{
-       assert (_buffer_data >= size);
-       
-       _buffer_data -= size;
-       if (_buffer_data > 0) {
-               /* Shift still-valid data to the start of the buffer */
-               memmove (_buffer, _buffer + size, _buffer_data);
-       }
-}
-
-/** Read a definite amount of data from our socket, and mark
- *  it as consumed.
- *  @param data Where to put the data.
- *  @param size Number of bytes to read.
- */
-void
-SocketReader::read_definite_and_consume (uint8_t* data, int size)
-{
-       int const from_buffer = min (_buffer_data, size);
-       if (from_buffer > 0) {
-               /* Get data from our buffer */
-               memcpy (data, _buffer, from_buffer);
-               consume (from_buffer);
-               /* Update our output state */
-               data += from_buffer;
-               size -= from_buffer;
-       }
-
-       /* read() the rest */
-       while (size > 0) {
-               int const n = asio::read (*_socket, asio::buffer (data, size));
-               if (n <= 0) {
-                       throw NetworkError ("could not read");
-               }
-
-               data += n;
-               size -= n;
-       }
-}
-
-/** Read as much data as is available, up to some limit.
- *  @param data Where to put the data.
- *  @param size Maximum amount of data to read.
- */
-void
-SocketReader::read_indefinite (uint8_t* data, int size)
-{
-       assert (size < int (sizeof (_buffer)));
-
-       /* Amount of extra data we need to read () */
-       int to_read = size - _buffer_data;
-       while (to_read > 0) {
-               /* read as much of it as we can (into our buffer) */
-               int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read));
-               if (n <= 0) {
-                       throw NetworkError ("could not read");
-               }
-
-               to_read -= n;
-               _buffer_data += n;
-       }
-
-       assert (_buffer_data >= size);
-
-       /* copy data into the output buffer */
-       assert (size >= _buffer_data);
-       memcpy (data, _buffer, size);
-}
 
 #ifdef DVDOMATIC_POSIX
 void
@@ -518,9 +438,163 @@ colour_lut_index_to_name (int index)
        return "";
 }
 
+DeadlineWrapper::DeadlineWrapper (asio::io_service& io_service)
+       : _io_service (io_service)
+       , _deadline (io_service)
+       , _buffer_data (0)
+{
+       _deadline.expires_at (posix_time::pos_infin);
+       check ();
+}
+
+void
+DeadlineWrapper::set_socket (shared_ptr<asio::ip::tcp::socket> socket)
+{
+       _socket = socket;
+}
+
+void
+DeadlineWrapper::check ()
+{
+       if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
+               if (_socket) {
+                       _socket->close ();
+               }
+               _deadline.expires_at (posix_time::pos_infin);
+       }
+
+       _deadline.async_wait (boost::bind (&DeadlineWrapper::check, this));
+}
+
+void
+DeadlineWrapper::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
+{
+       assert (_socket);
+       
+       system::error_code ec = asio::error::would_block;
+       _socket->async_connect (endpoint, lambda::var(ec) = lambda::_1);
+       do {
+               _io_service.run_one();
+       } while (ec == asio::error::would_block);
+
+       if (ec || !_socket->is_open ()) {
+               throw NetworkError ("connect timed out");
+       }
+}
+
+void
+DeadlineWrapper::write (uint8_t const * data, int size, int timeout)
+{
+       assert (_socket);
+       
+       _deadline.expires_from_now (posix_time::seconds (timeout));
+       system::error_code ec = asio::error::would_block;
+
+       asio::async_write (*_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
+       do {
+               _io_service.run_one ();
+       } while (ec == asio::error::would_block);
+
+       if (ec) {
+               throw NetworkError ("write timed out");
+       }
+}
+
 int
-read_with_timeout (boost::asio::ip::tcp::socket* socket, uint8_t* data, int size)
+DeadlineWrapper::read (uint8_t* data, int size, int timeout)
 {
+       assert (_socket);
+
+       _deadline.expires_from_now (posix_time::seconds (timeout));
+       system::error_code ec = asio::error::would_block;
+
+       int amount_read = 0;
+
+       _socket->async_read_some (
+               asio::buffer (data, size),
+               (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2)
+               );
+
+       _io_service.run ();
+
+       if (ec) {
+               amount_read = 0;
+       }
+
+       return amount_read;
+}
+
+/** Mark some data as being `consumed', so that it will not be returned
+ *  as data again.
+ *  @param size Amount of data to consume, in bytes.
+ */
+void
+DeadlineWrapper::consume (int size)
+{
+       assert (_buffer_data >= size);
        
-       return asio::read (socket, asio::buffer (data, size));
+       _buffer_data -= size;
+       if (_buffer_data > 0) {
+               /* Shift still-valid data to the start of the buffer */
+               memmove (_buffer, _buffer + size, _buffer_data);
+       }
+}
+
+/** Read a definite amount of data from our socket, and mark
+ *  it as consumed.
+ *  @param data Where to put the data.
+ *  @param size Number of bytes to read.
+ */
+void
+DeadlineWrapper::read_definite_and_consume (uint8_t* data, int size, int timeout)
+{
+       int const from_buffer = min (_buffer_data, size);
+       if (from_buffer > 0) {
+               /* Get data from our buffer */
+               memcpy (data, _buffer, from_buffer);
+               consume (from_buffer);
+               /* Update our output state */
+               data += from_buffer;
+               size -= from_buffer;
+       }
+
+       /* read() the rest */
+       while (size > 0) {
+               int const n = read (data, size, timeout);
+               if (n <= 0) {
+                       throw NetworkError ("could not read");
+               }
+
+               data += n;
+               size -= n;
+       }
+}
+
+/** Read as much data as is available, up to some limit.
+ *  @param data Where to put the data.
+ *  @param size Maximum amount of data to read.
+ */
+void
+DeadlineWrapper::read_indefinite (uint8_t* data, int size, int timeout)
+{
+       assert (size < int (sizeof (_buffer)));
+
+       /* Amount of extra data we need to read () */
+       int to_read = size - _buffer_data;
+       while (to_read > 0) {
+               /* read as much of it as we can (into our buffer) */
+               int const n = read (_buffer + _buffer_data, to_read, timeout);
+               if (n <= 0) {
+                       throw NetworkError ("could not read");
+               }
+
+               to_read -= n;
+               _buffer_data += n;
+       }
+
+       assert (_buffer_data >= size);
+
+       /* copy data into the output buffer */
+       assert (size >= _buffer_data);
+       memcpy (data, _buffer, size);
 }
index 568fe05d035ea56589be3e98639323da74df903b..c3a42e448e55dbc75f425f8b10676fc69f494611 100644 (file)
@@ -56,29 +56,6 @@ enum ContentType {
 extern void md5_data (std::string, void const *, int);
 #endif
 
-/** @class SocketReader
- *  @brief A helper class from reading from sockets.
- *
- *  You can probably do this stuff directly in boost, but I'm not sure how.
- */
-class SocketReader
-{
-public:
-       SocketReader (boost::shared_ptr<boost::asio::ip::tcp::socket>);
-
-       void read_definite_and_consume (uint8_t *, int);
-       void read_indefinite (uint8_t *, int);
-       void consume (int);
-
-private:
-       /** socket we are reading from */
-       boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
-       /** a buffer for small reads */
-       uint8_t _buffer[256];
-       /** amount of valid data in the buffer */
-       int _buffer_data;
-};
-
 /** @class Size
  *  @brief Representation of the size of something */
 struct Size
@@ -136,4 +113,31 @@ extern std::string crop_string (Position, Size);
 extern int dcp_audio_sample_rate (int);
 extern std::string colour_lut_index_to_name (int index);
 
+class DeadlineWrapper
+{
+public:
+       DeadlineWrapper (boost::asio::io_service& io_service);
+
+       void set_socket (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+
+       void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
+       void write (uint8_t const * data, int size, int timeout);
+       int read (uint8_t* data, int size, int timeout);
+       
+       void read_definite_and_consume (uint8_t* data, int size, int timeout);
+       void read_indefinite (uint8_t* data, int size, int timeout);
+       void consume (int amount);
+       
+private:
+       void check ();
+
+       boost::asio::io_service& _io_service;
+       boost::asio::deadline_timer _deadline;
+       boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
+       /** a buffer for small reads */
+       uint8_t _buffer[256];
+       /** amount of valid data in the buffer */
+       int _buffer_data;
+};
+
 #endif