diff options
| author | Carl Hetherington <cth@carlh.net> | 2012-09-17 22:50:47 +0100 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2012-09-17 22:50:47 +0100 |
| commit | 1e8f1be709e8a3fa58f1147db2e58a39396313d8 (patch) | |
| tree | 1f86e375afbc1ec2a7e81ad48594bef7542bf71d /src/lib/server.cc | |
| parent | d7135bda7b1db2ee2728c90ff4570c350834338f (diff) | |
Move server code into library; Server -> ServerDescription.
Diffstat (limited to 'src/lib/server.cc')
| -rw-r--r-- | src/lib/server.cc | 163 |
1 files changed, 156 insertions, 7 deletions
diff --git a/src/lib/server.cc b/src/lib/server.cc index 8a5b5cfca..f4aaa25e1 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -19,7 +19,7 @@ /** @file src/server.cc * @brief Class to describe a server to which we can send - * encoding work. + * encoding work, and a class to implement such a server. */ #include <string> @@ -27,16 +27,21 @@ #include <sstream> #include <boost/algorithm/string.hpp> #include "server.h" +#include "util.h" +#include "scaler.h" +#include "image.h" +#include "dcp_video_frame.h" +#include "config.h" using namespace std; using namespace boost; -/** Create a server from a string of metadata returned from as_metadata(). +/** Create a server description from a string of metadata returned from as_metadata(). * @param v Metadata. - * @return Server, or 0. + * @return ServerDescription, or 0. */ -Server * -Server::create_from_metadata (string v) +ServerDescription * +ServerDescription::create_from_metadata (string v) { vector<string> b; split (b, v, is_any_of (" ")); @@ -45,14 +50,158 @@ Server::create_from_metadata (string v) return 0; } - return new Server (b[0], atoi (b[1].c_str ())); + return new ServerDescription (b[0], atoi (b[1].c_str ())); } /** @return Description of this server as text */ string -Server::as_metadata () const +ServerDescription::as_metadata () const { stringstream s; s << _host_name << " " << _threads; return s.str (); } + +Server::Server () + : _log ("servomatic.log") +{ + +} + +int +Server::process (shared_ptr<asio::ip::tcp::socket> socket) +{ + SocketReader reader (socket); + + char buffer[128]; + reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer)); + reader.consume (strlen (buffer) + 1); + + stringstream s (buffer); + + string command; + s >> command; + if (command != "encode") { + return -1; + } + + Size in_size; + int pixel_format_int; + Size out_size; + int padding; + string scaler_id; + int frame; + float frames_per_second; + string post_process; + int colour_lut_index; + int j2k_bandwidth; + + s >> in_size.width >> in_size.height + >> pixel_format_int + >> out_size.width >> out_size.height + >> padding + >> scaler_id + >> frame + >> frames_per_second + >> post_process + >> colour_lut_index + >> j2k_bandwidth; + + PixelFormat pixel_format = (PixelFormat) pixel_format_int; + Scaler const * scaler = Scaler::from_id (scaler_id); + if (post_process == "none") { + post_process = ""; + } + + shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size)); + + for (int i = 0; i < image->components(); ++i) { + int line_size; + s >> line_size; + image->set_line_size (i, line_size); + } + + for (int i = 0; i < image->components(); ++i) { + reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i)); + } + +#ifdef DEBUG_HASH + image->hash ("Image for encoding (as received by server)"); +#endif + + DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &_log); + shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally (); + encoded->send (socket); + +#ifdef DEBUG_HASH + encoded->hash ("Encoded image (as made by server and as sent back)"); +#endif + + return frame; +} + +void +Server::worker_thread () +{ + while (1) { + mutex::scoped_lock lock (_worker_mutex); + while (_queue.empty ()) { + _worker_condition.wait (lock); + } + + shared_ptr<asio::ip::tcp::socket> socket = _queue.front (); + _queue.pop_front (); + + lock.unlock (); + + int frame = -1; + + struct timeval start; + gettimeofday (&start, 0); + + try { + frame = process (socket); + } catch (std::exception& e) { + cerr << "Error: " << e.what() << "\n"; + } + + socket.reset (); + + lock.lock (); + + if (frame >= 0) { + struct timeval end; + gettimeofday (&end, 0); + cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n"; + } + + _worker_condition.notify_all (); + } +} + +void +Server::run () +{ + int const num_threads = Config::instance()->num_local_encoding_threads (); + + for (int i = 0; i < num_threads; ++i) { + _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); + } + + asio::io_service io_service; + asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); + while (1) { + shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service)); + acceptor.accept (*socket); + + mutex::scoped_lock lock (_worker_mutex); + + /* Wait until the queue has gone down a bit */ + while (int (_queue.size()) >= num_threads * 2) { + _worker_condition.wait (lock); + } + + _queue.push_back (socket); + _worker_condition.notify_all (); + } +} |
