diff options
| author | Carl Hetherington <cth@carlh.net> | 2013-02-20 11:51:12 +0000 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2013-02-20 11:51:12 +0000 |
| commit | dc08d2da6bf14fd469005ea3512992c66b041da9 (patch) | |
| tree | 0bd7a60cfa4cfe21732f648ed25299db79e7920a /src/lib | |
| parent | 490af0bac5ec51120f6fed9c5b8b1a0c01427e45 (diff) | |
Fix servomatic build. Hopefully resolve confusion wrt linesize and
stride for FilterBufferImage; the linesize can apparently sometimes
be (slightly) larger than the width for byte-per-pixel images (e.g.
YUV420P). Remove grotty peek-style socket communication and use a
hopefully more robust send of the length of data as a binary word
before the data itself. Should fix #62.
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dcp_video_frame.cc | 22 | ||||
| -rw-r--r-- | src/lib/image.cc | 19 | ||||
| -rw-r--r-- | src/lib/image.h | 1 | ||||
| -rw-r--r-- | src/lib/server.cc | 10 | ||||
| -rw-r--r-- | src/lib/util.cc | 128 | ||||
| -rw-r--r-- | src/lib/util.h | 22 |
6 files changed, 66 insertions, 136 deletions
diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index 4f3fda44a..9b96724b0 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -316,7 +316,7 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) shared_ptr<Socket> socket (new Socket); - socket->connect (*endpoint_iterator, 30); + socket->connect (*endpoint_iterator); stringstream s; s << "encode please\n" @@ -352,21 +352,17 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) _input->lines(0), _input->lines(1), _input->lines(2), _input->line_size()[0], _input->line_size()[1], _input->line_size()[2] )); - - socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); + + socket->write (s.str().length() + 1); + socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1); _input->write_to_socket (socket); if (_subtitle) { _subtitle->image()->write_to_socket (socket); } - char buffer[32]; - socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); - socket->consume (strlen (buffer) + 1); - shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer))); - - /* now read the rest */ - socket->read_definite_and_consume (e->data(), e->size(), 30); + shared_ptr<EncodedData> e (new RemotelyEncodedData (socket->read_uint32 ())); + socket->read (e->data(), e->size()); _log->log (String::compose ("Finished remotely-encoded frame %1", _frame)); @@ -438,10 +434,8 @@ EncodedData::write_info (shared_ptr<const Film> film, int frame, libdcp::FrameIn void EncodedData::send (shared_ptr<Socket> socket) { - stringstream s; - s << _size; - socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); - socket->write (_data, _size, 30); + socket->write (_size); + socket->write (_data, _size); } LocallyEncodedData::LocallyEncodedData (uint8_t* d, int s) diff --git a/src/lib/image.cc b/src/lib/image.cc index 0ec6bd26c..73d499fe8 100644 --- a/src/lib/image.cc +++ b/src/lib/image.cc @@ -323,7 +323,7 @@ Image::read_from_socket (shared_ptr<Socket> socket) for (int i = 0; i < components(); ++i) { uint8_t* p = data()[i]; for (int y = 0; y < lines(i); ++y) { - socket->read_definite_and_consume (p, line_size()[i], 30); + socket->read (p, line_size()[i]); p += stride()[i]; } } @@ -335,7 +335,7 @@ Image::write_to_socket (shared_ptr<Socket> socket) const for (int i = 0; i < components(); ++i) { uint8_t* p = data()[i]; for (int y = 0; y < lines(i); ++y) { - socket->write (p, line_size()[i], 30); + socket->write (p, line_size()[i]); p += stride()[i]; } } @@ -503,12 +503,18 @@ FilterBufferImage::FilterBufferImage (AVPixelFormat p, AVFilterBufferRef* b) : Image (p) , _buffer (b) { - + _line_size = (int *) av_malloc (4 * sizeof (int)); + _line_size[0] = _line_size[1] = _line_size[2] = _line_size[3] = 0; + + for (int i = 0; i < components(); ++i) { + _line_size[i] = size().width * bytes_per_pixel(i); + } } FilterBufferImage::~FilterBufferImage () { avfilter_unref_buffer (_buffer); + av_free (_line_size); } uint8_t ** @@ -520,13 +526,16 @@ FilterBufferImage::data () const int * FilterBufferImage::line_size () const { - return _buffer->linesize; + return _line_size; } int * FilterBufferImage::stride () const { - /* XXX? */ + /* I've seen images where the _buffer->linesize is larger than the width + (by a small amount), suggesting that _buffer->linesize is what we call + stride. But I'm not sure. + */ return _buffer->linesize; } diff --git a/src/lib/image.h b/src/lib/image.h index 23f13a648..f40ea9280 100644 --- a/src/lib/image.h +++ b/src/lib/image.h @@ -117,6 +117,7 @@ private: FilterBufferImage& operator= (FilterBufferImage const &); AVFilterBufferRef* _buffer; + int* _line_size; }; /** @class SimpleImage diff --git a/src/lib/server.cc b/src/lib/server.cc index d75ab0fb6..3614ed9e4 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -28,6 +28,7 @@ #include <iostream> #include <boost/algorithm/string.hpp> #include <boost/lexical_cast.hpp> +#include <boost/scoped_array.hpp> #include "server.h" #include "util.h" #include "scaler.h" @@ -45,6 +46,7 @@ using boost::algorithm::is_any_of; using boost::algorithm::split; using boost::thread; using boost::bind; +using boost::scoped_array; using libdcp::Size; /** Create a server description from a string of metadata returned from as_metadata(). @@ -82,11 +84,11 @@ Server::Server (Log* log) int Server::process (shared_ptr<Socket> socket) { - char buffer[512]; - socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); - socket->consume (strlen (buffer) + 1); + uint32_t length = socket->read_uint32 (); + scoped_array<char> buffer (new char[length]); + socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length); - stringstream s (buffer); + stringstream s (buffer.get()); multimap<string, string> kv = read_key_value (s); if (get_required_string (kv, "encode") != "please") { diff --git a/src/lib/util.cc b/src/lib/util.cc index c0c8be984..4ee304600 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -485,10 +485,10 @@ colour_lut_index_to_name (int index) return ""; } -Socket::Socket () +Socket::Socket (int timeout) : _deadline (_io_service) , _socket (_io_service) - , _buffer_data (0) + , _timeout (timeout) { _deadline.expires_at (posix_time::pos_infin); check (); @@ -505,14 +505,13 @@ Socket::check () _deadline.async_wait (boost::bind (&Socket::check, this)); } -/** Blocking connect with timeout. +/** Blocking connect. * @param endpoint End-point to connect to. - * @param timeout Time-out in seconds. */ void -Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout) +Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint) { - _deadline.expires_from_now (posix_time::seconds (timeout)); + _deadline.expires_from_now (posix_time::seconds (_timeout)); system::error_code ec = asio::error::would_block; _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1); do { @@ -524,132 +523,61 @@ Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, } } -/** Blocking write with timeout. +/** Blocking write. * @param data Buffer to write. * @param size Number of bytes to write. - * @param timeout Time-out, in seconds. */ void -Socket::write (uint8_t const * data, int size, int timeout) +Socket::write (uint8_t const * data, int size) { - _deadline.expires_from_now (posix_time::seconds (timeout)); + _deadline.expires_from_now (posix_time::seconds (_timeout)); system::error_code ec = asio::error::would_block; asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); + do { _io_service.run_one (); } while (ec == asio::error::would_block); if (ec) { - throw NetworkError ("write timed out"); + throw NetworkError (ec.message ()); } } -/** Blocking read with timeout. +void +Socket::write (uint32_t v) +{ + v = htonl (v); + write (reinterpret_cast<uint8_t*> (&v), 4); +} + +/** Blocking read. * @param data Buffer to read to. * @param size Number of bytes to read. - * @param timeout Time-out, in seconds. */ -int -Socket::read (uint8_t* data, int size, int timeout) +void +Socket::read (uint8_t* data, int size) { - _deadline.expires_from_now (posix_time::seconds (timeout)); + _deadline.expires_from_now (posix_time::seconds (_timeout)); system::error_code ec = asio::error::would_block; - int amount_read = 0; - - _socket.async_read_some ( - asio::buffer (data, size), - (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2) - ); + asio::async_read (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); do { _io_service.run_one (); } while (ec == asio::error::would_block); if (ec) { - amount_read = 0; - } - - return amount_read; -} - -/** Mark some data as being `consumed', so that it will not be returned - * as data again. - * @param size Amount of data to consume, in bytes. - */ -void -Socket::consume (int size) -{ - assert (_buffer_data >= size); - - _buffer_data -= size; - if (_buffer_data > 0) { - /* Shift still-valid data to the start of the buffer */ - memmove (_buffer, _buffer + size, _buffer_data); + throw NetworkError (ec.message ()); } } -/** Read a definite amount of data from our socket, and mark - * it as consumed. - * @param data Where to put the data. - * @param size Number of bytes to read. - */ -void -Socket::read_definite_and_consume (uint8_t* data, int size, int timeout) -{ - int const from_buffer = min (_buffer_data, size); - if (from_buffer > 0) { - /* Get data from our buffer */ - memcpy (data, _buffer, from_buffer); - consume (from_buffer); - /* Update our output state */ - data += from_buffer; - size -= from_buffer; - } - - /* read() the rest */ - while (size > 0) { - int const n = read (data, size, timeout); - if (n <= 0) { - throw NetworkError ("could not read"); - } - - data += n; - size -= n; - } -} - -/** Read as much data as is available, up to some limit. - * @param data Where to put the data. - * @param size Maximum amount of data to read. - * - * XXX This method assumes that there is always lots of data to read(); - * if there isn't, it will hang waiting for data that will never arrive. - */ -void -Socket::read_indefinite (uint8_t* data, int size, int timeout) +uint32_t +Socket::read_uint32 () { - assert (size < int (sizeof (_buffer))); - - /* Amount of extra data we need to read () */ - int to_read = size - _buffer_data; - while (to_read > 0) { - /* read as much of it as we can (into our buffer) */ - int const n = read (_buffer + _buffer_data, to_read, timeout); - if (n <= 0) { - throw NetworkError ("could not read"); - } - - to_read -= n; - _buffer_data += n; - } - - assert (_buffer_data >= size); - - /* copy data into the output buffer */ - assert (size >= _buffer_data); - memcpy (data, _buffer, size); + uint32_t v; + read (reinterpret_cast<uint8_t *> (&v), 4); + return ntohl (v); } /** @param other A Rect. diff --git a/src/lib/util.h b/src/lib/util.h index c4940a5d7..87735ea8e 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -194,39 +194,35 @@ extern std::string get_optional_string (std::multimap<std::string, std::string> * that are useful for DVD-o-matic. * * This class wraps some things that I could not work out how to do with boost; - * most notably, sync read/write calls with timeouts, and the ability to peek into - * data being read. + * most notably, sync read/write calls with timeouts. */ class Socket { public: - Socket (); + Socket (int timeout = 30); /** @return Our underlying socket */ boost::asio::ip::tcp::socket& socket () { return _socket; } - void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout); - void write (uint8_t const * data, int size, int timeout); + void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint); + + void write (uint32_t n); + void write (uint8_t const * data, int size); - void read_definite_and_consume (uint8_t* data, int size, int timeout); - void read_indefinite (uint8_t* data, int size, int timeout); - void consume (int amount); + void read (uint8_t* data, int size); + uint32_t read_uint32 (); private: void check (); - int read (uint8_t* data, int size, int timeout); Socket (Socket const &); boost::asio::io_service _io_service; boost::asio::deadline_timer _deadline; boost::asio::ip::tcp::socket _socket; - /** a buffer for small reads */ - uint8_t _buffer[1024]; - /** amount of valid data in the buffer */ - int _buffer_data; + int _timeout; }; /** @class AudioBuffers |
