diff options
| author | Carl Hetherington <cth@carlh.net> | 2012-09-23 18:46:09 +0100 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2012-09-23 18:46:09 +0100 |
| commit | 1e53d56c1488cce07bc8aad7bcc76edd36c19961 (patch) | |
| tree | 0d5de18847376090e7a653b5734b9b105bd43786 /src | |
| parent | a6e3a0589cd19ff11e3cdb5543b4a70b20d385ae (diff) | |
| parent | 737c3392039740f7a22a9ff922f8492905173b9c (diff) | |
Fix merge.
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dcp_video_frame.cc | 23 | ||||
| -rw-r--r-- | src/lib/dcp_video_frame.h | 2 | ||||
| -rw-r--r-- | src/lib/server.cc | 18 | ||||
| -rw-r--r-- | src/lib/server.h | 8 | ||||
| -rw-r--r-- | src/lib/util.cc | 233 | ||||
| -rw-r--r-- | src/lib/util.h | 54 |
6 files changed, 207 insertions, 131 deletions
diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index b128f6fa0..d8af3462d 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -296,8 +296,9 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ())); asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query); - shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service)); - socket->connect (*endpoint_iterator); + Socket socket; + + socket.connect (*endpoint_iterator, 30); #ifdef DEBUG_HASH _input->hash ("Input for remote encoding (before sending)"); @@ -320,21 +321,19 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) s << _input->line_size()[i] << " "; } - asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1)); + socket.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); for (int i = 0; i < _input->components(); ++i) { - asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i))); + socket.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30); } - SocketReader reader (socket); - char buffer[32]; - reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer)); - reader.consume (strlen (buffer) + 1); + socket.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + socket.consume (strlen (buffer) + 1); shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer))); /* now read the rest */ - reader.read_definite_and_consume (e->data(), e->size()); + socket.read_definite_and_consume (e->data(), e->size(), 30); #ifdef DEBUG_HASH e->hash ("Encoded image (after receiving)"); @@ -375,12 +374,12 @@ EncodedData::write (shared_ptr<const Options> opt, int frame) * @param socket Socket */ void -EncodedData::send (shared_ptr<asio::ip::tcp::socket> socket) +EncodedData::send (shared_ptr<Socket> socket) { stringstream s; s << _size; - asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1)); - asio::write (*socket, asio::buffer (_data, _size)); + socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); + socket->write (_data, _size, 30); } #ifdef DEBUG_HASH diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index ee54bc0f5..da4e0c301 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -48,7 +48,7 @@ public: virtual ~EncodedData () {} - void send (boost::shared_ptr<boost::asio::ip::tcp::socket>); + void send (boost::shared_ptr<Socket> socket); void write (boost::shared_ptr<const Options>, int); #ifdef DEBUG_HASH diff --git a/src/lib/server.cc b/src/lib/server.cc index f4e894558..8ca426049 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -70,13 +70,11 @@ Server::Server (Log* log) } int -Server::process (shared_ptr<asio::ip::tcp::socket> socket) +Server::process (shared_ptr<Socket> socket) { - SocketReader reader (socket); - char buffer[128]; - reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer)); - reader.consume (strlen (buffer) + 1); + socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + socket->consume (strlen (buffer) + 1); stringstream s (buffer); @@ -123,7 +121,7 @@ Server::process (shared_ptr<asio::ip::tcp::socket> socket) } for (int i = 0; i < image->components(); ++i) { - reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i)); + socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); } #ifdef DEBUG_HASH @@ -150,7 +148,7 @@ Server::worker_thread () _worker_condition.wait (lock); } - shared_ptr<asio::ip::tcp::socket> socket = _queue.front (); + shared_ptr<Socket> socket = _queue.front (); _queue.pop_front (); lock.unlock (); @@ -192,12 +190,12 @@ Server::run (int num_threads) for (int i = 0; i < num_threads; ++i) { _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); } - + asio::io_service io_service; asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); while (1) { - shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service)); - acceptor.accept (*socket); + shared_ptr<Socket> socket (new Socket); + acceptor.accept (socket->socket ()); mutex::scoped_lock lock (_worker_mutex); diff --git a/src/lib/server.h b/src/lib/server.h index fac440a76..32ba8dc4b 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -28,6 +28,8 @@ #include <boost/thread/condition.hpp> #include "log.h" +class Socket; + /** @class ServerDescription * @brief Class to describe a server to which we can send encoding work. */ @@ -80,10 +82,10 @@ public: private: void worker_thread (); - int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket); - + int process (boost::shared_ptr<Socket> socket); + std::vector<boost::thread *> _worker_threads; - std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue; + std::list<boost::shared_ptr<Socket> > _queue; boost::mutex _worker_mutex; boost::condition _worker_condition; Log* _log; diff --git a/src/lib/util.cc b/src/lib/util.cc index cbf8decb9..d12bd3e77 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -33,6 +33,8 @@ #include <libssh/libssh.h> #include <signal.h> #include <boost/algorithm/string.hpp> +#include <boost/bind.hpp> +#include <boost/lambda/lambda.hpp> #include <openjpeg.h> #include <openssl/md5.h> #include <magick/MagickCore.h> @@ -286,88 +288,6 @@ seconds (struct timeval t) return t.tv_sec + (double (t.tv_usec) / 1e6); } -/** @param socket Socket to read from */ -SocketReader::SocketReader (shared_ptr<asio::ip::tcp::socket> socket) - : _socket (socket) - , _buffer_data (0) -{ - -} - -/** Mark some data as being `consumed', so that it will not be returned - * as data again. - * @param size Amount of data to consume, in bytes. - */ -void -SocketReader::consume (int size) -{ - assert (_buffer_data >= size); - - _buffer_data -= size; - if (_buffer_data > 0) { - /* Shift still-valid data to the start of the buffer */ - memmove (_buffer, _buffer + size, _buffer_data); - } -} - -/** Read a definite amount of data from our socket, and mark - * it as consumed. - * @param data Where to put the data. - * @param size Number of bytes to read. - */ -void -SocketReader::read_definite_and_consume (uint8_t* data, int size) -{ - int const from_buffer = min (_buffer_data, size); - if (from_buffer > 0) { - /* Get data from our buffer */ - memcpy (data, _buffer, from_buffer); - consume (from_buffer); - /* Update our output state */ - data += from_buffer; - size -= from_buffer; - } - - /* read() the rest */ - while (size > 0) { - int const n = asio::read (*_socket, asio::buffer (data, size)); - if (n <= 0) { - throw NetworkError ("could not read"); - } - - data += n; - size -= n; - } -} - -/** Read as much data as is available, up to some limit. - * @param data Where to put the data. - * @param size Maximum amount of data to read. - */ -void -SocketReader::read_indefinite (uint8_t* data, int size) -{ - assert (size < int (sizeof (_buffer))); - - /* Amount of extra data we need to read () */ - int to_read = size - _buffer_data; - while (to_read > 0) { - /* read as much of it as we can (into our buffer) */ - int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read)); - if (n <= 0) { - throw NetworkError ("could not read"); - } - - to_read -= n; - _buffer_data += n; - } - - assert (_buffer_data >= size); - - /* copy data into the output buffer */ - assert (size >= _buffer_data); - memcpy (data, _buffer, size); -} #ifdef DVDOMATIC_POSIX void @@ -517,3 +437,152 @@ colour_lut_index_to_name (int index) assert (false); return ""; } + +Socket::Socket () + : _deadline (_io_service) + , _socket (_io_service) + , _buffer_data (0) +{ + _deadline.expires_at (posix_time::pos_infin); + check (); +} + +void +Socket::check () +{ + if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) { + _socket.close (); + _deadline.expires_at (posix_time::pos_infin); + } + + _deadline.async_wait (boost::bind (&Socket::check, this)); +} + +void +Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout) +{ + system::error_code ec = asio::error::would_block; + _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1); + do { + _io_service.run_one(); + } while (ec == asio::error::would_block); + + if (ec || !_socket.is_open ()) { + throw NetworkError ("connect timed out"); + } +} + +void +Socket::write (uint8_t const * data, int size, int timeout) +{ + _deadline.expires_from_now (posix_time::seconds (timeout)); + system::error_code ec = asio::error::would_block; + + asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); + do { + _io_service.run_one (); + } while (ec == asio::error::would_block); + + if (ec) { + throw NetworkError ("write timed out"); + } +} + +int +Socket::read (uint8_t* data, int size, int timeout) +{ + _deadline.expires_from_now (posix_time::seconds (timeout)); + system::error_code ec = asio::error::would_block; + + int amount_read = 0; + + _socket.async_read_some ( + asio::buffer (data, size), + (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2) + ); + + do { + _io_service.run_one (); + } while (ec == asio::error::would_block); + + if (ec) { + amount_read = 0; + } + + return amount_read; +} + +/** Mark some data as being `consumed', so that it will not be returned + * as data again. + * @param size Amount of data to consume, in bytes. + */ +void +Socket::consume (int size) +{ + assert (_buffer_data >= size); + + _buffer_data -= size; + if (_buffer_data > 0) { + /* Shift still-valid data to the start of the buffer */ + memmove (_buffer, _buffer + size, _buffer_data); + } +} + +/** Read a definite amount of data from our socket, and mark + * it as consumed. + * @param data Where to put the data. + * @param size Number of bytes to read. + */ +void +Socket::read_definite_and_consume (uint8_t* data, int size, int timeout) +{ + int const from_buffer = min (_buffer_data, size); + if (from_buffer > 0) { + /* Get data from our buffer */ + memcpy (data, _buffer, from_buffer); + consume (from_buffer); + /* Update our output state */ + data += from_buffer; + size -= from_buffer; + } + + /* read() the rest */ + while (size > 0) { + int const n = read (data, size, timeout); + if (n <= 0) { + throw NetworkError ("could not read"); + } + + data += n; + size -= n; + } +} + +/** Read as much data as is available, up to some limit. + * @param data Where to put the data. + * @param size Maximum amount of data to read. + */ +void +Socket::read_indefinite (uint8_t* data, int size, int timeout) +{ + assert (size < int (sizeof (_buffer))); + + /* Amount of extra data we need to read () */ + int to_read = size - _buffer_data; + while (to_read > 0) { + /* read as much of it as we can (into our buffer) */ + int const n = read (_buffer + _buffer_data, to_read, timeout); + if (n <= 0) { + throw NetworkError ("could not read"); + } + + to_read -= n; + _buffer_data += n; + } + + assert (_buffer_data >= size); + + /* copy data into the output buffer */ + assert (size >= _buffer_data); + memcpy (data, _buffer, size); +} diff --git a/src/lib/util.h b/src/lib/util.h index 568fe05d0..d7f233003 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -56,29 +56,6 @@ enum ContentType { extern void md5_data (std::string, void const *, int); #endif -/** @class SocketReader - * @brief A helper class from reading from sockets. - * - * You can probably do this stuff directly in boost, but I'm not sure how. - */ -class SocketReader -{ -public: - SocketReader (boost::shared_ptr<boost::asio::ip::tcp::socket>); - - void read_definite_and_consume (uint8_t *, int); - void read_indefinite (uint8_t *, int); - void consume (int); - -private: - /** socket we are reading from */ - boost::shared_ptr<boost::asio::ip::tcp::socket> _socket; - /** a buffer for small reads */ - uint8_t _buffer[256]; - /** amount of valid data in the buffer */ - int _buffer_data; -}; - /** @class Size * @brief Representation of the size of something */ struct Size @@ -136,4 +113,35 @@ extern std::string crop_string (Position, Size); extern int dcp_audio_sample_rate (int); extern std::string colour_lut_index_to_name (int index); +class Socket +{ +public: + Socket (); + + boost::asio::ip::tcp::socket& socket () { + return _socket; + } + + void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout); + void write (uint8_t const * data, int size, int timeout); + + void read_definite_and_consume (uint8_t* data, int size, int timeout); + void read_indefinite (uint8_t* data, int size, int timeout); + void consume (int amount); + +private: + void check (); + int read (uint8_t* data, int size, int timeout); + + Socket (Socket const &); + + boost::asio::io_service _io_service; + boost::asio::deadline_timer _deadline; + boost::asio::ip::tcp::socket _socket; + /** a buffer for small reads */ + uint8_t _buffer[256]; + /** amount of valid data in the buffer */ + int _buffer_data; +}; + #endif |
