summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2012-09-17 22:50:47 +0100
committerCarl Hetherington <cth@carlh.net>2012-09-17 22:50:47 +0100
commit1e8f1be709e8a3fa58f1147db2e58a39396313d8 (patch)
tree1f86e375afbc1ec2a7e81ad48594bef7542bf71d /src/lib
parentd7135bda7b1db2ee2728c90ff4570c350834338f (diff)
Move server code into library; Server -> ServerDescription.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/config.cc4
-rw-r--r--src/lib/config.h8
-rw-r--r--src/lib/dcp_video_frame.cc2
-rw-r--r--src/lib/dcp_video_frame.h4
-rw-r--r--src/lib/j2k_wav_encoder.cc8
-rw-r--r--src/lib/j2k_wav_encoder.h4
-rw-r--r--src/lib/log.h5
-rw-r--r--src/lib/server.cc163
-rw-r--r--src/lib/server.h32
9 files changed, 203 insertions, 27 deletions
diff --git a/src/lib/config.cc b/src/lib/config.cc
index 53674645d..44d110689 100644
--- a/src/lib/config.cc
+++ b/src/lib/config.cc
@@ -76,7 +76,7 @@ Config::Config ()
} else if (k == "reference_filter") {
_reference_filters.push_back (Filter::from_id (v));
} else if (k == "server") {
- _servers.push_back (Server::create_from_metadata (v));
+ _servers.push_back (ServerDescription::create_from_metadata (v));
} else if (k == "screen") {
_screens.push_back (Screen::create_from_metadata (v));
} else if (k == "tms_ip") {
@@ -131,7 +131,7 @@ Config::write () const
f << "reference_filter " << (*i)->id () << "\n";
}
- for (vector<Server*>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) {
+ for (vector<ServerDescription*>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) {
f << "server " << (*i)->as_metadata () << "\n";
}
diff --git a/src/lib/config.h b/src/lib/config.h
index 14b541ee6..b002da7df 100644
--- a/src/lib/config.h
+++ b/src/lib/config.h
@@ -28,7 +28,7 @@
#include <boost/shared_ptr.hpp>
#include <sigc++/signal.h>
-class Server;
+class ServerDescription;
class Screen;
class Scaler;
class Filter;
@@ -65,7 +65,7 @@ public:
}
/** @return J2K encoding servers to use */
- std::vector<Server*> servers () const {
+ std::vector<ServerDescription*> servers () const {
return _servers;
}
@@ -126,7 +126,7 @@ public:
}
/** @param s New list of servers */
- void set_servers (std::vector<Server*> s) {
+ void set_servers (std::vector<ServerDescription*> s) {
_servers = s;
Changed ();
}
@@ -188,7 +188,7 @@ private:
int _j2k_bandwidth;
/** J2K encoding servers to use */
- std::vector<Server *> _servers;
+ std::vector<ServerDescription *> _servers;
/** Screen definitions */
std::vector<boost::shared_ptr<Screen> > _screens;
diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc
index 24cdda2e6..91c441543 100644
--- a/src/lib/dcp_video_frame.cc
+++ b/src/lib/dcp_video_frame.cc
@@ -289,7 +289,7 @@ DCPVideoFrame::encode_locally ()
* @return Encoded data.
*/
shared_ptr<EncodedData>
-DCPVideoFrame::encode_remotely (Server const * serv)
+DCPVideoFrame::encode_remotely (ServerDescription const * serv)
{
asio::io_service io_service;
asio::ip::tcp::resolver resolver (io_service);
diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h
index 464d48515..ee54bc0f5 100644
--- a/src/lib/dcp_video_frame.h
+++ b/src/lib/dcp_video_frame.h
@@ -27,7 +27,7 @@
class FilmState;
class Options;
-class Server;
+class ServerDescription;
class Scaler;
class Image;
class Log;
@@ -113,7 +113,7 @@ public:
virtual ~DCPVideoFrame ();
boost::shared_ptr<EncodedData> encode_locally ();
- boost::shared_ptr<EncodedData> encode_remotely (Server const *);
+ boost::shared_ptr<EncodedData> encode_remotely (ServerDescription const *);
int frame () const {
return _frame;
diff --git a/src/lib/j2k_wav_encoder.cc b/src/lib/j2k_wav_encoder.cc
index 2f29f9021..ff450d1ad 100644
--- a/src/lib/j2k_wav_encoder.cc
+++ b/src/lib/j2k_wav_encoder.cc
@@ -130,7 +130,7 @@ J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
}
void
-J2KWAVEncoder::encoder_thread (Server* server)
+J2KWAVEncoder::encoder_thread (ServerDescription* server)
{
/* Number of seconds that we currently wait between attempts
to connect to the server; not relevant for localhost
@@ -210,12 +210,12 @@ void
J2KWAVEncoder::process_begin ()
{
for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
- _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (Server *) 0)));
+ _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
}
- vector<Server*> servers = Config::instance()->servers ();
+ vector<ServerDescription*> servers = Config::instance()->servers ();
- for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) {
+ for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
for (int j = 0; j < (*i)->threads (); ++j) {
_worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
}
diff --git a/src/lib/j2k_wav_encoder.h b/src/lib/j2k_wav_encoder.h
index 656af8321..1c2f50065 100644
--- a/src/lib/j2k_wav_encoder.h
+++ b/src/lib/j2k_wav_encoder.h
@@ -29,7 +29,7 @@
#include <sndfile.h>
#include "encoder.h"
-class Server;
+class ServerDescription;
class DCPVideoFrame;
class Image;
class Log;
@@ -50,7 +50,7 @@ public:
private:
- void encoder_thread (Server *);
+ void encoder_thread (ServerDescription *);
void close_sound_files ();
void terminate_worker_threads ();
diff --git a/src/lib/log.h b/src/lib/log.h
index d4de8ebde..d32b368f5 100644
--- a/src/lib/log.h
+++ b/src/lib/log.h
@@ -17,6 +17,9 @@
*/
+#ifndef DVDOMATIC_LOG_H
+#define DVDOMATIC_LOG_H
+
/** @file src/log.h
* @brief A very simple logging class.
*/
@@ -53,3 +56,5 @@ private:
/** level above which to ignore log messages */
Level _level;
};
+
+#endif
diff --git a/src/lib/server.cc b/src/lib/server.cc
index 8a5b5cfca..f4aaa25e1 100644
--- a/src/lib/server.cc
+++ b/src/lib/server.cc
@@ -19,7 +19,7 @@
/** @file src/server.cc
* @brief Class to describe a server to which we can send
- * encoding work.
+ * encoding work, and a class to implement such a server.
*/
#include <string>
@@ -27,16 +27,21 @@
#include <sstream>
#include <boost/algorithm/string.hpp>
#include "server.h"
+#include "util.h"
+#include "scaler.h"
+#include "image.h"
+#include "dcp_video_frame.h"
+#include "config.h"
using namespace std;
using namespace boost;
-/** Create a server from a string of metadata returned from as_metadata().
+/** Create a server description from a string of metadata returned from as_metadata().
* @param v Metadata.
- * @return Server, or 0.
+ * @return ServerDescription, or 0.
*/
-Server *
-Server::create_from_metadata (string v)
+ServerDescription *
+ServerDescription::create_from_metadata (string v)
{
vector<string> b;
split (b, v, is_any_of (" "));
@@ -45,14 +50,158 @@ Server::create_from_metadata (string v)
return 0;
}
- return new Server (b[0], atoi (b[1].c_str ()));
+ return new ServerDescription (b[0], atoi (b[1].c_str ()));
}
/** @return Description of this server as text */
string
-Server::as_metadata () const
+ServerDescription::as_metadata () const
{
stringstream s;
s << _host_name << " " << _threads;
return s.str ();
}
+
+Server::Server ()
+ : _log ("servomatic.log")
+{
+
+}
+
+int
+Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+{
+ SocketReader reader (socket);
+
+ char buffer[128];
+ reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
+ reader.consume (strlen (buffer) + 1);
+
+ stringstream s (buffer);
+
+ string command;
+ s >> command;
+ if (command != "encode") {
+ return -1;
+ }
+
+ Size in_size;
+ int pixel_format_int;
+ Size out_size;
+ int padding;
+ string scaler_id;
+ int frame;
+ float frames_per_second;
+ string post_process;
+ int colour_lut_index;
+ int j2k_bandwidth;
+
+ s >> in_size.width >> in_size.height
+ >> pixel_format_int
+ >> out_size.width >> out_size.height
+ >> padding
+ >> scaler_id
+ >> frame
+ >> frames_per_second
+ >> post_process
+ >> colour_lut_index
+ >> j2k_bandwidth;
+
+ PixelFormat pixel_format = (PixelFormat) pixel_format_int;
+ Scaler const * scaler = Scaler::from_id (scaler_id);
+ if (post_process == "none") {
+ post_process = "";
+ }
+
+ shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
+
+ for (int i = 0; i < image->components(); ++i) {
+ int line_size;
+ s >> line_size;
+ image->set_line_size (i, line_size);
+ }
+
+ for (int i = 0; i < image->components(); ++i) {
+ reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+ }
+
+#ifdef DEBUG_HASH
+ image->hash ("Image for encoding (as received by server)");
+#endif
+
+ DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &_log);
+ shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
+ encoded->send (socket);
+
+#ifdef DEBUG_HASH
+ encoded->hash ("Encoded image (as made by server and as sent back)");
+#endif
+
+ return frame;
+}
+
+void
+Server::worker_thread ()
+{
+ while (1) {
+ mutex::scoped_lock lock (_worker_mutex);
+ while (_queue.empty ()) {
+ _worker_condition.wait (lock);
+ }
+
+ shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+ _queue.pop_front ();
+
+ lock.unlock ();
+
+ int frame = -1;
+
+ struct timeval start;
+ gettimeofday (&start, 0);
+
+ try {
+ frame = process (socket);
+ } catch (std::exception& e) {
+ cerr << "Error: " << e.what() << "\n";
+ }
+
+ socket.reset ();
+
+ lock.lock ();
+
+ if (frame >= 0) {
+ struct timeval end;
+ gettimeofday (&end, 0);
+ cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
+ }
+
+ _worker_condition.notify_all ();
+ }
+}
+
+void
+Server::run ()
+{
+ int const num_threads = Config::instance()->num_local_encoding_threads ();
+
+ for (int i = 0; i < num_threads; ++i) {
+ _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
+ }
+
+ asio::io_service io_service;
+ asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+ while (1) {
+ shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
+ acceptor.accept (*socket);
+
+ mutex::scoped_lock lock (_worker_mutex);
+
+ /* Wait until the queue has gone down a bit */
+ while (int (_queue.size()) >= num_threads * 2) {
+ _worker_condition.wait (lock);
+ }
+
+ _queue.push_back (socket);
+ _worker_condition.notify_all ();
+ }
+}
diff --git a/src/lib/server.h b/src/lib/server.h
index d06df34e9..8c0f86ebb 100644
--- a/src/lib/server.h
+++ b/src/lib/server.h
@@ -19,21 +19,25 @@
/** @file src/server.h
* @brief Class to describe a server to which we can send
- * encoding work.
+ * encoding work, and a class to implement such a server.
*/
#include <string>
+#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread/condition.hpp>
+#include "log.h"
-/** @class Server
+/** @class ServerDescription
* @brief Class to describe a server to which we can send encoding work.
*/
-class Server
+class ServerDescription
{
public:
/** @param h Server host name or IP address in string form.
* @param t Number of threads to use on the server.
*/
- Server (std::string h, int t)
+ ServerDescription (std::string h, int t)
: _host_name (h)
, _threads (t)
{}
@@ -58,7 +62,7 @@ public:
std::string as_metadata () const;
- static Server * create_from_metadata (std::string v);
+ static ServerDescription * create_from_metadata (std::string v);
private:
/** server's host name */
@@ -66,3 +70,21 @@ private:
/** number of threads to use on the server */
int _threads;
};
+
+class Server
+{
+public:
+ Server ();
+
+ void run ();
+
+private:
+ void worker_thread ();
+ int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+
+ std::vector<boost::thread *> _worker_threads;
+ std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
+ boost::mutex _worker_mutex;
+ boost::condition _worker_condition;
+ Log _log;
+};