summaryrefslogtreecommitdiff
path: root/src/lib/server.cc
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2012-09-28 23:09:15 +0100
committerCarl Hetherington <cth@carlh.net>2012-09-28 23:09:15 +0100
commitc0ed407fb02891f0dd364e78b6192f0e6dbe1d8d (patch)
treecfa1abb09f891f220f15886b4cad2d1562f4a79c /src/lib/server.cc
parentd50fe6707c973d4a1397aa40b67ae753744ce748 (diff)
parentc252cb33a3ca8088fbe091af903a77ad8a098969 (diff)
Merge branch 'master' of /home/carl/git/dvdomatic
Diffstat (limited to 'src/lib/server.cc')
-rw-r--r--src/lib/server.cc158
1 files changed, 151 insertions, 7 deletions
diff --git a/src/lib/server.cc b/src/lib/server.cc
index 8a5b5cfca..f8c4425d9 100644
--- a/src/lib/server.cc
+++ b/src/lib/server.cc
@@ -19,24 +19,30 @@
/** @file src/server.cc
* @brief Class to describe a server to which we can send
- * encoding work.
+ * encoding work, and a class to implement such a server.
*/
#include <string>
#include <vector>
#include <sstream>
+#include <iostream>
#include <boost/algorithm/string.hpp>
#include "server.h"
+#include "util.h"
+#include "scaler.h"
+#include "image.h"
+#include "dcp_video_frame.h"
+#include "config.h"
using namespace std;
using namespace boost;
-/** Create a server from a string of metadata returned from as_metadata().
+/** Create a server description from a string of metadata returned from as_metadata().
* @param v Metadata.
- * @return Server, or 0.
+ * @return ServerDescription, or 0.
*/
-Server *
-Server::create_from_metadata (string v)
+ServerDescription *
+ServerDescription::create_from_metadata (string v)
{
vector<string> b;
split (b, v, is_any_of (" "));
@@ -45,14 +51,152 @@ Server::create_from_metadata (string v)
return 0;
}
- return new Server (b[0], atoi (b[1].c_str ()));
+ return new ServerDescription (b[0], atoi (b[1].c_str ()));
}
/** @return Description of this server as text */
string
-Server::as_metadata () const
+ServerDescription::as_metadata () const
{
stringstream s;
s << _host_name << " " << _threads;
return s.str ();
}
+
+Server::Server (Log* log)
+ : _log (log)
+{
+
+}
+
+int
+Server::process (shared_ptr<Socket> socket)
+{
+ char buffer[128];
+ socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ socket->consume (strlen (buffer) + 1);
+
+ stringstream s (buffer);
+
+ string command;
+ s >> command;
+ if (command != "encode") {
+ return -1;
+ }
+
+ Size in_size;
+ int pixel_format_int;
+ Size out_size;
+ int padding;
+ string scaler_id;
+ int frame;
+ float frames_per_second;
+ string post_process;
+ int colour_lut_index;
+ int j2k_bandwidth;
+
+ s >> in_size.width >> in_size.height
+ >> pixel_format_int
+ >> out_size.width >> out_size.height
+ >> padding
+ >> scaler_id
+ >> frame
+ >> frames_per_second
+ >> post_process
+ >> colour_lut_index
+ >> j2k_bandwidth;
+
+ PixelFormat pixel_format = (PixelFormat) pixel_format_int;
+ Scaler const * scaler = Scaler::from_id (scaler_id);
+ if (post_process == "none") {
+ post_process = "";
+ }
+
+ shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
+
+ for (int i = 0; i < image->components(); ++i) {
+ int line_size;
+ s >> line_size;
+ image->set_line_size (i, line_size);
+ }
+
+ for (int i = 0; i < image->components(); ++i) {
+ socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
+ }
+
+ DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, _log);
+ shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
+ encoded->send (socket);
+
+ return frame;
+}
+
+void
+Server::worker_thread ()
+{
+ while (1) {
+ mutex::scoped_lock lock (_worker_mutex);
+ while (_queue.empty ()) {
+ _worker_condition.wait (lock);
+ }
+
+ shared_ptr<Socket> socket = _queue.front ();
+ _queue.pop_front ();
+
+ lock.unlock ();
+
+ int frame = -1;
+
+ struct timeval start;
+ gettimeofday (&start, 0);
+
+ try {
+ frame = process (socket);
+ } catch (std::exception& e) {
+ cerr << "Error: " << e.what() << "\n";
+ }
+
+ socket.reset ();
+
+ lock.lock ();
+
+ if (frame >= 0) {
+ struct timeval end;
+ gettimeofday (&end, 0);
+ stringstream s;
+ s << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start));
+ _log->log (s.str ());
+ }
+
+ _worker_condition.notify_all ();
+ }
+}
+
+void
+Server::run (int num_threads)
+{
+ stringstream s;
+ s << "Server starting with " << num_threads << " threads.";
+ _log->log (s.str ());
+
+ for (int i = 0; i < num_threads; ++i) {
+ _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
+ }
+
+ asio::io_service io_service;
+ asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+ while (1) {
+ shared_ptr<Socket> socket (new Socket);
+ acceptor.accept (socket->socket ());
+
+ mutex::scoped_lock lock (_worker_mutex);
+
+ /* Wait until the queue has gone down a bit */
+ while (int (_queue.size()) >= num_threads * 2) {
+ _worker_condition.wait (lock);
+ }
+
+ _queue.push_back (socket);
+ _worker_condition.notify_all ();
+ }
+}