#include <iostream>
#include <unistd.h>
#include <errno.h>
-#ifdef DVDOMATIC_POSIX
-#include <netinet/in.h>
-#include <netdb.h>
-#endif
+#include <boost/array.hpp>
+#include <boost/asio.hpp>
#include <boost/filesystem.hpp>
+#include <boost/lexical_cast.hpp>
#include "film.h"
#include "dcp_video_frame.h"
#include "lut.h"
shared_ptr<EncodedData>
DCPVideoFrame::encode_remotely (Server const * serv)
{
-#ifdef DVDOMATIC_POSIX
- int const fd = socket (AF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- throw NetworkError ("could not create socket");
- }
-
- struct timeval tv;
- tv.tv_sec = 20;
- tv.tv_usec = 0;
-
- if (setsockopt (fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv)) < 0) {
- close (fd);
- throw NetworkError ("setsockopt failed");
- }
+ asio::io_service io_service;
+ asio::ip::tcp::resolver resolver (io_service);
+ asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
+ asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
- if (setsockopt (fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv)) < 0) {
- close (fd);
- throw NetworkError ("setsockopt failed");
- }
-
- struct hostent* server = gethostbyname (serv->host_name().c_str ());
- if (server == 0) {
- close (fd);
- throw NetworkError ("gethostbyname failed");
- }
-
- struct sockaddr_in server_address;
- memset (&server_address, 0, sizeof (server_address));
- server_address.sin_family = AF_INET;
- memcpy (&server_address.sin_addr.s_addr, server->h_addr, server->h_length);
- server_address.sin_port = htons (Config::instance()->server_port ());
- if (connect (fd, (struct sockaddr *) &server_address, sizeof (server_address)) < 0) {
- close (fd);
- stringstream s;
- s << "could not connect (" << strerror (errno) << ")";
- throw NetworkError (s.str());
- }
+ shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
+ socket->connect (*endpoint_iterator);
#ifdef DEBUG_HASH
_input->hash ("Input for remote encoding (before sending)");
s << _input->line_size()[i] << " ";
}
- socket_write (fd, (uint8_t *) s.str().c_str(), s.str().length() + 1);
+ asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
for (int i = 0; i < _input->components(); ++i) {
- socket_write (fd, _input->data()[i], _input->line_size()[i] * _input->lines(i));
+ asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i)));
}
- SocketReader reader (fd);
+ SocketReader reader (socket);
char buffer[32];
reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
_log->log (s.str ());
}
- close (fd);
return e;
-#endif
-
-#ifdef DVDOMATIC_WINDOWS
- return shared_ptr<EncodedData> ();
-#endif
}
/** Write this data to a J2K file.
filesystem::rename (tmp_j2k, opt->frame_out_path (frame, false));
}
-#ifdef DVDOMATIC_POSIX
-/** Send this data to a file descriptor.
- * @param fd File descriptor.
+/** Send this data to a socket.
+ * @param socket Socket
*/
void
-EncodedData::send (int fd)
+EncodedData::send (shared_ptr<asio::ip::tcp::socket> socket)
{
stringstream s;
s << _size;
- socket_write (fd, (uint8_t *) s.str().c_str(), s.str().length() + 1);
- socket_write (fd, _data, _size);
+ asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
+ asio::write (*socket, asio::buffer (_data, _size));
}
-#endif
#ifdef DEBUG_HASH
void
virtual ~EncodedData () {}
-#ifdef DVDOMATIC_POSIX
- void send (int);
-#endif
+ void send (boost::shared_ptr<boost::asio::ip::tcp::socket>);
void write (boost::shared_ptr<const Options>, int);
#ifdef DEBUG_HASH
return s.str ();
}
-#ifdef DVDOMATIC_POSIX
-/** Write some data to a socket.
- * @param fd Socket file descriptor.
- * @param data Data.
- * @param size Amount to write, in bytes.
- */
-void
-socket_write (int fd, uint8_t const * data, int size)
-{
- uint8_t const * p = data;
- while (size) {
- int const n = send (fd, p, size, MSG_NOSIGNAL);
- if (n < 0) {
- stringstream s;
- s << "could not write (" << strerror (errno) << ")";
- throw NetworkError (s.str ());
- }
-
- size -= n;
- p += n;
- }
-}
-#endif
-
double
seconds (struct timeval t)
{
return t.tv_sec + (double (t.tv_usec) / 1e6);
}
-#ifdef DVDOMATIC_POSIX
-/** @param fd File descriptor to read from */
-SocketReader::SocketReader (int fd)
- : _fd (fd)
+/** @param socket Socket to read from */
+SocketReader::SocketReader (shared_ptr<asio::ip::tcp::socket> socket)
+ : _socket (socket)
, _buffer_data (0)
{
/* read() the rest */
while (size > 0) {
- int const n = ::read (_fd, data, size);
+ int const n = asio::read (*_socket, asio::buffer (data, size));
if (n <= 0) {
throw NetworkError ("could not read");
}
int to_read = size - _buffer_data;
while (to_read > 0) {
/* read as much of it as we can (into our buffer) */
- int const n = ::read (_fd, _buffer + _buffer_data, to_read);
+ int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read));
if (n <= 0) {
throw NetworkError ("could not read");
}
assert (size >= _buffer_data);
memcpy (data, _buffer, size);
}
-#endif
#ifdef DVDOMATIC_POSIX
void
#include <string>
#include <vector>
#include <boost/shared_ptr.hpp>
+#include <boost/asio.hpp>
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavfilter/avfilter.h>
extern std::string audio_sample_format_to_string (AVSampleFormat);
extern AVSampleFormat audio_sample_format_from_string (std::string);
extern std::string dependency_version_summary ();
-extern void socket_write (int, uint8_t const *, int);
extern double seconds (struct timeval);
extern void dvdomatic_setup ();
extern std::vector<std::string> split_at_spaces_considering_quotes (std::string);
/** @class SocketReader
* @brief A helper class from reading from sockets.
+ *
+ * You can probably do this stuff directly in boost, but I'm not sure how.
*/
class SocketReader
{
public:
- SocketReader (int);
+ SocketReader (boost::shared_ptr<boost::asio::ip::tcp::socket>);
void read_definite_and_consume (uint8_t *, int);
void read_indefinite (uint8_t *, int);
void consume (int);
private:
- /** file descriptor we are reading from */
- int _fd;
+ /** socket we are reading from */
+ boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
/** a buffer for small reads */
uint8_t _buffer[256];
/** amount of valid data in the buffer */
#include <vector>
#include <unistd.h>
#include <errno.h>
-#ifdef DVDOMATIC_POSIX
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#endif
+#include <boost/array.hpp>
+#include <boost/asio.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
static vector<thread *> worker_threads;
-static std::list<int> queue;
+static std::list<shared_ptr<asio::ip::tcp::socket> > queue;
static mutex worker_mutex;
static condition worker_condition;
static Log log_ ("servomatic.log");
int
-process (int fd)
+process (shared_ptr<asio::ip::tcp::socket> socket)
{
- SocketReader reader (fd);
+ SocketReader reader (socket);
char buffer[128];
reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
string command;
s >> command;
if (command != "encode") {
- close (fd);
return -1;
}
#ifdef DEBUG_HASH
encoded->hash ("Encoded image (as made by server and as sent back)");
#endif
-
return frame;
}
worker_condition.wait (lock);
}
- int fd = queue.front ();
+ shared_ptr<asio::ip::tcp::socket> socket = queue.front ();
queue.pop_front ();
lock.unlock ();
gettimeofday (&start, 0);
try {
- frame = process (fd);
+ frame = process (socket);
} catch (std::exception& e) {
cerr << "Error: " << e.what() << "\n";
}
- close (fd);
+ socket.reset ();
lock.lock ();
worker_threads.push_back (new thread (worker_thread));
}
- int fd = socket (AF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- throw NetworkError ("could not open socket");
- }
-
- int const o = 1;
- setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &o, sizeof (o));
-
- struct timeval tv;
- tv.tv_sec = 20;
- tv.tv_usec = 0;
- setsockopt (fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
- setsockopt (fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
-
- struct sockaddr_in server_address;
- memset (&server_address, 0, sizeof (server_address));
- server_address.sin_family = AF_INET;
- server_address.sin_addr.s_addr = INADDR_ANY;
- server_address.sin_port = htons (Config::instance()->server_port ());
- if (::bind (fd, (struct sockaddr *) &server_address, sizeof (server_address)) < 0) {
- stringstream s;
- s << "could not bind to port " << Config::instance()->server_port() << " (" << strerror (errno) << ")";
- throw NetworkError (s.str());
- }
-
- listen (fd, BACKLOG);
-
+ asio::io_service io_service;
+ asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
while (1) {
- struct sockaddr_in client_address;
- socklen_t client_length = sizeof (client_address);
- int new_fd = accept (fd, (struct sockaddr *) &client_address, &client_length);
- if (new_fd < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- throw NetworkError ("accept failed");
- }
-
- continue;
- }
+ shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
+ acceptor.accept (*socket);
mutex::scoped_lock lock (worker_mutex);
setsockopt (new_fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
setsockopt (new_fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
- queue.push_back (new_fd);
+ queue.push_back (socket);
worker_condition.notify_all ();
}