diff options
| author | Carl Hetherington <cth@carlh.net> | 2012-09-17 22:50:47 +0100 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2012-09-17 22:50:47 +0100 |
| commit | 1e8f1be709e8a3fa58f1147db2e58a39396313d8 (patch) | |
| tree | 1f86e375afbc1ec2a7e81ad48594bef7542bf71d /src/lib | |
| parent | d7135bda7b1db2ee2728c90ff4570c350834338f (diff) | |
Move server code into library; Server -> ServerDescription.
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/config.cc | 4 | ||||
| -rw-r--r-- | src/lib/config.h | 8 | ||||
| -rw-r--r-- | src/lib/dcp_video_frame.cc | 2 | ||||
| -rw-r--r-- | src/lib/dcp_video_frame.h | 4 | ||||
| -rw-r--r-- | src/lib/j2k_wav_encoder.cc | 8 | ||||
| -rw-r--r-- | src/lib/j2k_wav_encoder.h | 4 | ||||
| -rw-r--r-- | src/lib/log.h | 5 | ||||
| -rw-r--r-- | src/lib/server.cc | 163 | ||||
| -rw-r--r-- | src/lib/server.h | 32 |
9 files changed, 203 insertions, 27 deletions
diff --git a/src/lib/config.cc b/src/lib/config.cc index 53674645d..44d110689 100644 --- a/src/lib/config.cc +++ b/src/lib/config.cc @@ -76,7 +76,7 @@ Config::Config () } else if (k == "reference_filter") { _reference_filters.push_back (Filter::from_id (v)); } else if (k == "server") { - _servers.push_back (Server::create_from_metadata (v)); + _servers.push_back (ServerDescription::create_from_metadata (v)); } else if (k == "screen") { _screens.push_back (Screen::create_from_metadata (v)); } else if (k == "tms_ip") { @@ -131,7 +131,7 @@ Config::write () const f << "reference_filter " << (*i)->id () << "\n"; } - for (vector<Server*>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) { + for (vector<ServerDescription*>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) { f << "server " << (*i)->as_metadata () << "\n"; } diff --git a/src/lib/config.h b/src/lib/config.h index 14b541ee6..b002da7df 100644 --- a/src/lib/config.h +++ b/src/lib/config.h @@ -28,7 +28,7 @@ #include <boost/shared_ptr.hpp> #include <sigc++/signal.h> -class Server; +class ServerDescription; class Screen; class Scaler; class Filter; @@ -65,7 +65,7 @@ public: } /** @return J2K encoding servers to use */ - std::vector<Server*> servers () const { + std::vector<ServerDescription*> servers () const { return _servers; } @@ -126,7 +126,7 @@ public: } /** @param s New list of servers */ - void set_servers (std::vector<Server*> s) { + void set_servers (std::vector<ServerDescription*> s) { _servers = s; Changed (); } @@ -188,7 +188,7 @@ private: int _j2k_bandwidth; /** J2K encoding servers to use */ - std::vector<Server *> _servers; + std::vector<ServerDescription *> _servers; /** Screen definitions */ std::vector<boost::shared_ptr<Screen> > _screens; diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index 24cdda2e6..91c441543 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -289,7 +289,7 @@ DCPVideoFrame::encode_locally () * @return Encoded data. */ shared_ptr<EncodedData> -DCPVideoFrame::encode_remotely (Server const * serv) +DCPVideoFrame::encode_remotely (ServerDescription const * serv) { asio::io_service io_service; asio::ip::tcp::resolver resolver (io_service); diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index 464d48515..ee54bc0f5 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -27,7 +27,7 @@ class FilmState; class Options; -class Server; +class ServerDescription; class Scaler; class Image; class Log; @@ -113,7 +113,7 @@ public: virtual ~DCPVideoFrame (); boost::shared_ptr<EncodedData> encode_locally (); - boost::shared_ptr<EncodedData> encode_remotely (Server const *); + boost::shared_ptr<EncodedData> encode_remotely (ServerDescription const *); int frame () const { return _frame; diff --git a/src/lib/j2k_wav_encoder.cc b/src/lib/j2k_wav_encoder.cc index 2f29f9021..ff450d1ad 100644 --- a/src/lib/j2k_wav_encoder.cc +++ b/src/lib/j2k_wav_encoder.cc @@ -130,7 +130,7 @@ J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame) } void -J2KWAVEncoder::encoder_thread (Server* server) +J2KWAVEncoder::encoder_thread (ServerDescription* server) { /* Number of seconds that we currently wait between attempts to connect to the server; not relevant for localhost @@ -210,12 +210,12 @@ void J2KWAVEncoder::process_begin () { for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) { - _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (Server *) 0))); + _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0))); } - vector<Server*> servers = Config::instance()->servers (); + vector<ServerDescription*> servers = Config::instance()->servers (); - for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) { + for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) { for (int j = 0; j < (*i)->threads (); ++j) { _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i))); } diff --git a/src/lib/j2k_wav_encoder.h b/src/lib/j2k_wav_encoder.h index 656af8321..1c2f50065 100644 --- a/src/lib/j2k_wav_encoder.h +++ b/src/lib/j2k_wav_encoder.h @@ -29,7 +29,7 @@ #include <sndfile.h> #include "encoder.h" -class Server; +class ServerDescription; class DCPVideoFrame; class Image; class Log; @@ -50,7 +50,7 @@ public: private: - void encoder_thread (Server *); + void encoder_thread (ServerDescription *); void close_sound_files (); void terminate_worker_threads (); diff --git a/src/lib/log.h b/src/lib/log.h index d4de8ebde..d32b368f5 100644 --- a/src/lib/log.h +++ b/src/lib/log.h @@ -17,6 +17,9 @@ */ +#ifndef DVDOMATIC_LOG_H +#define DVDOMATIC_LOG_H + /** @file src/log.h * @brief A very simple logging class. */ @@ -53,3 +56,5 @@ private: /** level above which to ignore log messages */ Level _level; }; + +#endif diff --git a/src/lib/server.cc b/src/lib/server.cc index 8a5b5cfca..f4aaa25e1 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -19,7 +19,7 @@ /** @file src/server.cc * @brief Class to describe a server to which we can send - * encoding work. + * encoding work, and a class to implement such a server. */ #include <string> @@ -27,16 +27,21 @@ #include <sstream> #include <boost/algorithm/string.hpp> #include "server.h" +#include "util.h" +#include "scaler.h" +#include "image.h" +#include "dcp_video_frame.h" +#include "config.h" using namespace std; using namespace boost; -/** Create a server from a string of metadata returned from as_metadata(). +/** Create a server description from a string of metadata returned from as_metadata(). * @param v Metadata. - * @return Server, or 0. + * @return ServerDescription, or 0. */ -Server * -Server::create_from_metadata (string v) +ServerDescription * +ServerDescription::create_from_metadata (string v) { vector<string> b; split (b, v, is_any_of (" ")); @@ -45,14 +50,158 @@ Server::create_from_metadata (string v) return 0; } - return new Server (b[0], atoi (b[1].c_str ())); + return new ServerDescription (b[0], atoi (b[1].c_str ())); } /** @return Description of this server as text */ string -Server::as_metadata () const +ServerDescription::as_metadata () const { stringstream s; s << _host_name << " " << _threads; return s.str (); } + +Server::Server () + : _log ("servomatic.log") +{ + +} + +int +Server::process (shared_ptr<asio::ip::tcp::socket> socket) +{ + SocketReader reader (socket); + + char buffer[128]; + reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer)); + reader.consume (strlen (buffer) + 1); + + stringstream s (buffer); + + string command; + s >> command; + if (command != "encode") { + return -1; + } + + Size in_size; + int pixel_format_int; + Size out_size; + int padding; + string scaler_id; + int frame; + float frames_per_second; + string post_process; + int colour_lut_index; + int j2k_bandwidth; + + s >> in_size.width >> in_size.height + >> pixel_format_int + >> out_size.width >> out_size.height + >> padding + >> scaler_id + >> frame + >> frames_per_second + >> post_process + >> colour_lut_index + >> j2k_bandwidth; + + PixelFormat pixel_format = (PixelFormat) pixel_format_int; + Scaler const * scaler = Scaler::from_id (scaler_id); + if (post_process == "none") { + post_process = ""; + } + + shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size)); + + for (int i = 0; i < image->components(); ++i) { + int line_size; + s >> line_size; + image->set_line_size (i, line_size); + } + + for (int i = 0; i < image->components(); ++i) { + reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i)); + } + +#ifdef DEBUG_HASH + image->hash ("Image for encoding (as received by server)"); +#endif + + DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &_log); + shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally (); + encoded->send (socket); + +#ifdef DEBUG_HASH + encoded->hash ("Encoded image (as made by server and as sent back)"); +#endif + + return frame; +} + +void +Server::worker_thread () +{ + while (1) { + mutex::scoped_lock lock (_worker_mutex); + while (_queue.empty ()) { + _worker_condition.wait (lock); + } + + shared_ptr<asio::ip::tcp::socket> socket = _queue.front (); + _queue.pop_front (); + + lock.unlock (); + + int frame = -1; + + struct timeval start; + gettimeofday (&start, 0); + + try { + frame = process (socket); + } catch (std::exception& e) { + cerr << "Error: " << e.what() << "\n"; + } + + socket.reset (); + + lock.lock (); + + if (frame >= 0) { + struct timeval end; + gettimeofday (&end, 0); + cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n"; + } + + _worker_condition.notify_all (); + } +} + +void +Server::run () +{ + int const num_threads = Config::instance()->num_local_encoding_threads (); + + for (int i = 0; i < num_threads; ++i) { + _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); + } + + asio::io_service io_service; + asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); + while (1) { + shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service)); + acceptor.accept (*socket); + + mutex::scoped_lock lock (_worker_mutex); + + /* Wait until the queue has gone down a bit */ + while (int (_queue.size()) >= num_threads * 2) { + _worker_condition.wait (lock); + } + + _queue.push_back (socket); + _worker_condition.notify_all (); + } +} diff --git a/src/lib/server.h b/src/lib/server.h index d06df34e9..8c0f86ebb 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -19,21 +19,25 @@ /** @file src/server.h * @brief Class to describe a server to which we can send - * encoding work. + * encoding work, and a class to implement such a server. */ #include <string> +#include <boost/thread.hpp> +#include <boost/asio.hpp> +#include <boost/thread/condition.hpp> +#include "log.h" -/** @class Server +/** @class ServerDescription * @brief Class to describe a server to which we can send encoding work. */ -class Server +class ServerDescription { public: /** @param h Server host name or IP address in string form. * @param t Number of threads to use on the server. */ - Server (std::string h, int t) + ServerDescription (std::string h, int t) : _host_name (h) , _threads (t) {} @@ -58,7 +62,7 @@ public: std::string as_metadata () const; - static Server * create_from_metadata (std::string v); + static ServerDescription * create_from_metadata (std::string v); private: /** server's host name */ @@ -66,3 +70,21 @@ private: /** number of threads to use on the server */ int _threads; }; + +class Server +{ +public: + Server (); + + void run (); + +private: + void worker_thread (); + int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket); + + std::vector<boost::thread *> _worker_threads; + std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue; + boost::mutex _worker_mutex; + boost::condition _worker_condition; + Log _log; +}; |
