summaryrefslogtreecommitdiff
path: root/src/lib/server.cc
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2012-09-17 22:50:47 +0100
committerCarl Hetherington <cth@carlh.net>2012-09-17 22:50:47 +0100
commit1e8f1be709e8a3fa58f1147db2e58a39396313d8 (patch)
tree1f86e375afbc1ec2a7e81ad48594bef7542bf71d /src/lib/server.cc
parentd7135bda7b1db2ee2728c90ff4570c350834338f (diff)
Move server code into library; Server -> ServerDescription.
Diffstat (limited to 'src/lib/server.cc')
-rw-r--r--src/lib/server.cc163
1 files changed, 156 insertions, 7 deletions
diff --git a/src/lib/server.cc b/src/lib/server.cc
index 8a5b5cfca..f4aaa25e1 100644
--- a/src/lib/server.cc
+++ b/src/lib/server.cc
@@ -19,7 +19,7 @@
/** @file src/server.cc
* @brief Class to describe a server to which we can send
- * encoding work.
+ * encoding work, and a class to implement such a server.
*/
#include <string>
@@ -27,16 +27,21 @@
#include <sstream>
#include <boost/algorithm/string.hpp>
#include "server.h"
+#include "util.h"
+#include "scaler.h"
+#include "image.h"
+#include "dcp_video_frame.h"
+#include "config.h"
using namespace std;
using namespace boost;
-/** Create a server from a string of metadata returned from as_metadata().
+/** Create a server description from a string of metadata returned from as_metadata().
* @param v Metadata.
- * @return Server, or 0.
+ * @return ServerDescription, or 0.
*/
-Server *
-Server::create_from_metadata (string v)
+ServerDescription *
+ServerDescription::create_from_metadata (string v)
{
vector<string> b;
split (b, v, is_any_of (" "));
@@ -45,14 +50,158 @@ Server::create_from_metadata (string v)
return 0;
}
- return new Server (b[0], atoi (b[1].c_str ()));
+ return new ServerDescription (b[0], atoi (b[1].c_str ()));
}
/** @return Description of this server as text */
string
-Server::as_metadata () const
+ServerDescription::as_metadata () const
{
stringstream s;
s << _host_name << " " << _threads;
return s.str ();
}
+
+Server::Server ()
+ : _log ("servomatic.log")
+{
+
+}
+
+int
+Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+{
+ SocketReader reader (socket);
+
+ char buffer[128];
+ reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
+ reader.consume (strlen (buffer) + 1);
+
+ stringstream s (buffer);
+
+ string command;
+ s >> command;
+ if (command != "encode") {
+ return -1;
+ }
+
+ Size in_size;
+ int pixel_format_int;
+ Size out_size;
+ int padding;
+ string scaler_id;
+ int frame;
+ float frames_per_second;
+ string post_process;
+ int colour_lut_index;
+ int j2k_bandwidth;
+
+ s >> in_size.width >> in_size.height
+ >> pixel_format_int
+ >> out_size.width >> out_size.height
+ >> padding
+ >> scaler_id
+ >> frame
+ >> frames_per_second
+ >> post_process
+ >> colour_lut_index
+ >> j2k_bandwidth;
+
+ PixelFormat pixel_format = (PixelFormat) pixel_format_int;
+ Scaler const * scaler = Scaler::from_id (scaler_id);
+ if (post_process == "none") {
+ post_process = "";
+ }
+
+ shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
+
+ for (int i = 0; i < image->components(); ++i) {
+ int line_size;
+ s >> line_size;
+ image->set_line_size (i, line_size);
+ }
+
+ for (int i = 0; i < image->components(); ++i) {
+ reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+ }
+
+#ifdef DEBUG_HASH
+ image->hash ("Image for encoding (as received by server)");
+#endif
+
+ DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &_log);
+ shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
+ encoded->send (socket);
+
+#ifdef DEBUG_HASH
+ encoded->hash ("Encoded image (as made by server and as sent back)");
+#endif
+
+ return frame;
+}
+
+void
+Server::worker_thread ()
+{
+ while (1) {
+ mutex::scoped_lock lock (_worker_mutex);
+ while (_queue.empty ()) {
+ _worker_condition.wait (lock);
+ }
+
+ shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+ _queue.pop_front ();
+
+ lock.unlock ();
+
+ int frame = -1;
+
+ struct timeval start;
+ gettimeofday (&start, 0);
+
+ try {
+ frame = process (socket);
+ } catch (std::exception& e) {
+ cerr << "Error: " << e.what() << "\n";
+ }
+
+ socket.reset ();
+
+ lock.lock ();
+
+ if (frame >= 0) {
+ struct timeval end;
+ gettimeofday (&end, 0);
+ cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
+ }
+
+ _worker_condition.notify_all ();
+ }
+}
+
+void
+Server::run ()
+{
+ int const num_threads = Config::instance()->num_local_encoding_threads ();
+
+ for (int i = 0; i < num_threads; ++i) {
+ _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
+ }
+
+ asio::io_service io_service;
+ asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+ while (1) {
+ shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
+ acceptor.accept (*socket);
+
+ mutex::scoped_lock lock (_worker_mutex);
+
+ /* Wait until the queue has gone down a bit */
+ while (int (_queue.size()) >= num_threads * 2) {
+ _worker_condition.wait (lock);
+ }
+
+ _queue.push_back (socket);
+ _worker_condition.notify_all ();
+ }
+}