From 1e8f1be709e8a3fa58f1147db2e58a39396313d8 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Mon, 17 Sep 2012 22:50:47 +0100 Subject: Move server code into library; Server -> ServerDescription. --- src/lib/server.cc | 163 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 156 insertions(+), 7 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/server.cc b/src/lib/server.cc index 8a5b5cfca..f4aaa25e1 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -19,7 +19,7 @@ /** @file src/server.cc * @brief Class to describe a server to which we can send - * encoding work. + * encoding work, and a class to implement such a server. */ #include @@ -27,16 +27,21 @@ #include #include #include "server.h" +#include "util.h" +#include "scaler.h" +#include "image.h" +#include "dcp_video_frame.h" +#include "config.h" using namespace std; using namespace boost; -/** Create a server from a string of metadata returned from as_metadata(). +/** Create a server description from a string of metadata returned from as_metadata(). * @param v Metadata. - * @return Server, or 0. + * @return ServerDescription, or 0. */ -Server * -Server::create_from_metadata (string v) +ServerDescription * +ServerDescription::create_from_metadata (string v) { vector b; split (b, v, is_any_of (" ")); @@ -45,14 +50,158 @@ Server::create_from_metadata (string v) return 0; } - return new Server (b[0], atoi (b[1].c_str ())); + return new ServerDescription (b[0], atoi (b[1].c_str ())); } /** @return Description of this server as text */ string -Server::as_metadata () const +ServerDescription::as_metadata () const { stringstream s; s << _host_name << " " << _threads; return s.str (); } + +Server::Server () + : _log ("servomatic.log") +{ + +} + +int +Server::process (shared_ptr socket) +{ + SocketReader reader (socket); + + char buffer[128]; + reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer)); + reader.consume (strlen (buffer) + 1); + + stringstream s (buffer); + + string command; + s >> command; + if (command != "encode") { + return -1; + } + + Size in_size; + int pixel_format_int; + Size out_size; + int padding; + string scaler_id; + int frame; + float frames_per_second; + string post_process; + int colour_lut_index; + int j2k_bandwidth; + + s >> in_size.width >> in_size.height + >> pixel_format_int + >> out_size.width >> out_size.height + >> padding + >> scaler_id + >> frame + >> frames_per_second + >> post_process + >> colour_lut_index + >> j2k_bandwidth; + + PixelFormat pixel_format = (PixelFormat) pixel_format_int; + Scaler const * scaler = Scaler::from_id (scaler_id); + if (post_process == "none") { + post_process = ""; + } + + shared_ptr image (new SimpleImage (pixel_format, in_size)); + + for (int i = 0; i < image->components(); ++i) { + int line_size; + s >> line_size; + image->set_line_size (i, line_size); + } + + for (int i = 0; i < image->components(); ++i) { + reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i)); + } + +#ifdef DEBUG_HASH + image->hash ("Image for encoding (as received by server)"); +#endif + + DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &_log); + shared_ptr encoded = dcp_video_frame.encode_locally (); + encoded->send (socket); + +#ifdef DEBUG_HASH + encoded->hash ("Encoded image (as made by server and as sent back)"); +#endif + + return frame; +} + +void +Server::worker_thread () +{ + while (1) { + mutex::scoped_lock lock (_worker_mutex); + while (_queue.empty ()) { + _worker_condition.wait (lock); + } + + shared_ptr socket = _queue.front (); + _queue.pop_front (); + + lock.unlock (); + + int frame = -1; + + struct timeval start; + gettimeofday (&start, 0); + + try { + frame = process (socket); + } catch (std::exception& e) { + cerr << "Error: " << e.what() << "\n"; + } + + socket.reset (); + + lock.lock (); + + if (frame >= 0) { + struct timeval end; + gettimeofday (&end, 0); + cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n"; + } + + _worker_condition.notify_all (); + } +} + +void +Server::run () +{ + int const num_threads = Config::instance()->num_local_encoding_threads (); + + for (int i = 0; i < num_threads; ++i) { + _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); + } + + asio::io_service io_service; + asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); + while (1) { + shared_ptr socket (new asio::ip::tcp::socket (io_service)); + acceptor.accept (*socket); + + mutex::scoped_lock lock (_worker_mutex); + + /* Wait until the queue has gone down a bit */ + while (int (_queue.size()) >= num_threads * 2) { + _worker_condition.wait (lock); + } + + _queue.push_back (socket); + _worker_condition.notify_all (); + } +} -- cgit v1.2.3 From 3c1b239453936128d1711ffa063ad4e1617b3e40 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Tue, 18 Sep 2012 02:07:59 +0100 Subject: Sort of working log window. --- src/lib/dcp_video_frame.cc | 4 ++-- src/lib/film.cc | 2 +- src/lib/log.cc | 29 +++++++++++++++++------- src/lib/log.h | 27 ++++++++++++++-------- src/lib/server.cc | 6 ++--- src/lib/server.h | 4 ++-- src/tools/servomatic_cli.cc | 3 ++- src/tools/servomatic_gui.cc | 55 +++++++++++++++++++++++++++++++++++++-------- src/wx/config_dialog.cc | 4 ++++ 9 files changed, 99 insertions(+), 35 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index 91c441543..b128f6fa0 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -277,7 +277,7 @@ DCPVideoFrame::encode_locally () { stringstream s; - s << "Finished locally-encoded frame " << _frame << " length " << cio_tell (_cio); + s << "Finished locally-encoded frame " << _frame; _log->log (s.str ()); } @@ -342,7 +342,7 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) { stringstream s; - s << "Finished remotely-encoded frame " << _frame << " length " << e->size(); + s << "Finished remotely-encoded frame " << _frame; _log->log (s.str ()); } diff --git a/src/lib/film.cc b/src/lib/film.cc index f8a3b192d..3b74f1888 100644 --- a/src/lib/film.cc +++ b/src/lib/film.cc @@ -88,7 +88,7 @@ Film::Film (string d, bool must_exist) read_metadata (); - _log = new Log (_state.file ("log")); + _log = new FileLog (_state.file ("log")); } /** Copy constructor */ diff --git a/src/lib/log.cc b/src/lib/log.cc index accf3694d..7f1eea206 100644 --- a/src/lib/log.cc +++ b/src/lib/log.cc @@ -27,10 +27,8 @@ using namespace std; -/** @param f Filename to write log to */ -Log::Log (string f) - : _file (f) - , _level (VERBOSE) +Log::Log () + : _level (VERBOSE) { } @@ -45,13 +43,13 @@ Log::log (string m, Level l) return; } - ofstream f (_file.c_str(), fstream::app); - time_t t; time (&t); string a = ctime (&t); - - f << a.substr (0, a.length() - 1) << ": " << m << "\n"; + + stringstream s; + s << a.substr (0, a.length() - 1) << ": " << m; + do_log (s.str ()); } void @@ -61,3 +59,18 @@ Log::set_level (Level l) _level = l; } + +/** @param file Filename to write log to */ +FileLog::FileLog (string file) + : _file (file) +{ + +} + +void +FileLog::do_log (string m) +{ + ofstream f (_file.c_str(), fstream::app); + f << m << "\n"; +} + diff --git a/src/lib/log.h b/src/lib/log.h index d32b368f5..2a242e24c 100644 --- a/src/lib/log.h +++ b/src/lib/log.h @@ -29,15 +29,11 @@ /** @class Log * @brief A very simple logging class. - * - * This class simply accepts log messages and writes them to a file. - * Its single nod to complexity is that it has a mutex to prevent - * multi-thread logging from clashing. */ class Log { public: - Log (std::string f); + Log (); enum Level { STANDARD = 0, @@ -48,13 +44,26 @@ public: void set_level (Level l); -private: - /** mutex to prevent simultaneous writes to the file */ +protected: + /** mutex to protect the log */ boost::mutex _mutex; - /** filename to write to */ - std::string _file; + +private: + virtual void do_log (std::string m) = 0; + /** level above which to ignore log messages */ Level _level; }; +class FileLog : public Log +{ +public: + FileLog (std::string file); + +private: + void do_log (std::string m); + /** filename to write to */ + std::string _file; +}; + #endif diff --git a/src/lib/server.cc b/src/lib/server.cc index f4aaa25e1..9e43601c4 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -62,8 +62,8 @@ ServerDescription::as_metadata () const return s.str (); } -Server::Server () - : _log ("servomatic.log") +Server::Server (Log* log) + : _log (log) { } @@ -129,7 +129,7 @@ Server::process (shared_ptr socket) image->hash ("Image for encoding (as received by server)"); #endif - DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &_log); + DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, _log); shared_ptr encoded = dcp_video_frame.encode_locally (); encoded->send (socket); diff --git a/src/lib/server.h b/src/lib/server.h index 8c0f86ebb..58cfe0b3f 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -74,7 +74,7 @@ private: class Server { public: - Server (); + Server (Log* log); void run (); @@ -86,5 +86,5 @@ private: std::list > _queue; boost::mutex _worker_mutex; boost::condition _worker_condition; - Log _log; + Log* _log; }; diff --git a/src/tools/servomatic_cli.cc b/src/tools/servomatic_cli.cc index 1fcd02117..3ad73faf9 100644 --- a/src/tools/servomatic_cli.cc +++ b/src/tools/servomatic_cli.cc @@ -44,7 +44,8 @@ int main () { Scaler::setup_scalers (); - Server server; + FileLog log ("servomatic.log"); + Server server (&log); server.run (); return 0; } diff --git a/src/tools/servomatic_gui.cc b/src/tools/servomatic_gui.cc index d89bd91ad..1d95f498a 100644 --- a/src/tools/servomatic_gui.cc +++ b/src/tools/servomatic_gui.cc @@ -24,28 +24,65 @@ #include "lib/util.h" #include "lib/server.h" +using namespace std; using namespace boost; enum { ID_status = 1, - ID_quit + ID_quit, + ID_timer }; +class MemoryLog : public Log +{ +public: + + string get () const { + boost::mutex::scoped_lock (_mutex); + return _log; + } + +private: + void do_log (string m) + { + _log = m; + } + + string _log; +}; + +static MemoryLog memory_log; + class StatusDialog : public wxDialog { public: StatusDialog () - : wxDialog (0, wxID_ANY, _("DVD-o-matic encode server"), wxDefaultPosition, wxDefaultSize, wxDEFAULT_DIALOG_STYLE) + : wxDialog (0, wxID_ANY, _("DVD-o-matic encode server"), wxDefaultPosition, wxSize (600, 40), wxDEFAULT_DIALOG_STYLE | wxRESIZE_BORDER) + , _timer (this, ID_timer) { - wxFlexGridSizer* table = new wxFlexGridSizer (2, 6, 6); - table->AddGrowableCol (1, 1); + _sizer = new wxFlexGridSizer (1, 6, 6); + _sizer->AddGrowableCol (0, 1); - add_label_to_sizer (table, this, "Hello"); + _text = new wxTextCtrl (this, wxID_ANY); + _sizer->Add (_text, 1, wxEXPAND); - SetSizer (table); - table->Layout (); - table->SetSizeHints (this); + SetSizer (_sizer); + _sizer->Layout (); + + Connect (ID_timer, wxEVT_TIMER, wxTimerEventHandler (StatusDialog::update)); + _timer.Start (1000); } + +private: + void update (wxTimerEvent &) + { + _text->ChangeValue (std_to_wx (memory_log.get ())); + _sizer->Layout (); + } + + wxFlexGridSizer* _sizer; + wxTextCtrl* _text; + wxTimer _timer; }; class TaskBarIcon : public wxTaskBarIcon @@ -103,7 +140,7 @@ private: void main_thread () { - Server server; + Server server (&memory_log); server.run (); } diff --git a/src/wx/config_dialog.cc b/src/wx/config_dialog.cc index c53eeddf0..c5d9be41f 100644 --- a/src/wx/config_dialog.cc +++ b/src/wx/config_dialog.cc @@ -287,6 +287,10 @@ ConfigDialog::remove_server_clicked (wxCommandEvent &) if (i >= 0) { _servers->DeleteItem (i); } + + vector o = Config::instance()->servers (); + o.erase (o.begin() + i); + Config::instance()->set_servers (o); } void -- cgit v1.2.3 From 9e92b5193719e72072d903d10c1e71ee6447561d Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sat, 22 Sep 2012 13:38:38 +0100 Subject: Add missing include. --- src/lib/server.cc | 1 + 1 file changed, 1 insertion(+) (limited to 'src/lib/server.cc') diff --git a/src/lib/server.cc b/src/lib/server.cc index 9e43601c4..ff784db5a 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "server.h" #include "util.h" -- cgit v1.2.3 From c2709fbe5438da124b2d493cb714a6c58720cf5b Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 23 Sep 2012 12:39:27 +0100 Subject: Command-line option to specify servomatic_cli threads. --- src/lib/server.cc | 4 ++-- src/lib/server.h | 2 +- src/lib/util.cc | 8 ++++++-- src/tools/servomatic_cli.cc | 47 +++++++++++++++++++++++++++++++++++++++++++-- src/tools/servomatic_gui.cc | 3 ++- 5 files changed, 56 insertions(+), 8 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/server.cc b/src/lib/server.cc index ff784db5a..a62763447 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -181,9 +181,9 @@ Server::worker_thread () } void -Server::run () +Server::run (int num_threads) { - int const num_threads = Config::instance()->num_local_encoding_threads (); + cout << "Server starting with " << num_threads << " threads.\n"; for (int i = 0; i < num_threads; ++i) { _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); diff --git a/src/lib/server.h b/src/lib/server.h index 58cfe0b3f..fac440a76 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -76,7 +76,7 @@ class Server public: Server (Log* log); - void run (); + void run (int num_threads); private: void worker_thread (); diff --git a/src/lib/util.cc b/src/lib/util.cc index 1478bab2e..e79c7cd1c 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -518,5 +518,9 @@ colour_lut_index_to_name (int index) return ""; } - - +int +read_with_timeout (boost::asio::ip::tcp::socket* socket, uint8_t* data, int size) +{ + + return asio::read (socket, asio::buffer (data, size)); +} diff --git a/src/tools/servomatic_cli.cc b/src/tools/servomatic_cli.cc index 3ad73faf9..f8e713193 100644 --- a/src/tools/servomatic_cli.cc +++ b/src/tools/servomatic_cli.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -39,13 +40,55 @@ #include "scaler.h" #include "image.h" #include "log.h" +#include "version.h" + +using namespace std; + +static void +help (string n) +{ + cerr << "Syntax: " << n << " [OPTION]\n" + << " -v, --version show DVD-o-matic version\n" + << " -h, --help show this help\n" + << " -t, --threads number of parallel encoding threads to use\n"; +} int -main () +main (int argc, char* argv[]) { + int num_threads = Config::instance()->num_local_encoding_threads (); + + int option_index = 0; + while (1) { + static struct option long_options[] = { + { "version", no_argument, 0, 'v'}, + { "help", no_argument, 0, 'h'}, + { "threads", required_argument, 0, 't'}, + { 0, 0, 0, 0 } + }; + + int c = getopt_long (argc, argv, "vht:", long_options, &option_index); + + if (c == -1) { + break; + } + + switch (c) { + case 'v': + cout << "dvdomatic version " << dvdomatic_version << " " << dvdomatic_git_commit << "\n"; + exit (EXIT_SUCCESS); + case 'h': + help (argv[0]); + exit (EXIT_SUCCESS); + case 't': + num_threads = atoi (optarg); + break; + } + } + Scaler::setup_scalers (); FileLog log ("servomatic.log"); Server server (&log); - server.run (); + server.run (num_threads); return 0; } diff --git a/src/tools/servomatic_gui.cc b/src/tools/servomatic_gui.cc index a151658f5..610ba8005 100644 --- a/src/tools/servomatic_gui.cc +++ b/src/tools/servomatic_gui.cc @@ -23,6 +23,7 @@ #include "wx_util.h" #include "lib/util.h" #include "lib/server.h" +#include "lib/config.h" using namespace std; using namespace boost; @@ -141,7 +142,7 @@ private: void main_thread () { Server server (&memory_log); - server.run (); + server.run (Config::instance()->num_local_encoding_threads ()); } boost::thread* _thread; -- cgit v1.2.3 From 93c3365a547fbb7467b6c47571c5a68e17b31e0c Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 23 Sep 2012 14:02:01 +0100 Subject: Untested first cut. --- src/lib/dcp_video_frame.cc | 19 ++-- src/lib/server.cc | 14 +-- src/lib/server.h | 3 +- src/lib/util.cc | 242 +++++++++++++++++++++++++++++---------------- src/lib/util.h | 50 +++++----- 5 files changed, 205 insertions(+), 123 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index b128f6fa0..ade615bfb 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -293,11 +293,16 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) { asio::io_service io_service; asio::ip::tcp::resolver resolver (io_service); + asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast (Config::instance()->server_port ())); asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query); shared_ptr socket (new asio::ip::tcp::socket (io_service)); - socket->connect (*endpoint_iterator); + + DeadlineWrapper wrapper (io_service); + wrapper.set_socket (socket); + + wrapper.connect (*endpoint_iterator, 30); #ifdef DEBUG_HASH _input->hash ("Input for remote encoding (before sending)"); @@ -320,21 +325,19 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) s << _input->line_size()[i] << " "; } - asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1)); + wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 10); for (int i = 0; i < _input->components(); ++i) { - asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i))); + wrapper.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 10); } - SocketReader reader (socket); - char buffer[32]; - reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer)); - reader.consume (strlen (buffer) + 1); + wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + wrapper.consume (strlen (buffer) + 1); shared_ptr e (new RemotelyEncodedData (atoi (buffer))); /* now read the rest */ - reader.read_definite_and_consume (e->data(), e->size()); + wrapper.read_definite_and_consume (e->data(), e->size(), 30); #ifdef DEBUG_HASH e->hash ("Encoded image (after receiving)"); diff --git a/src/lib/server.cc b/src/lib/server.cc index a62763447..395786b67 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -72,11 +72,12 @@ Server::Server (Log* log) int Server::process (shared_ptr socket) { - SocketReader reader (socket); + DeadlineWrapper wrapper (_io_service); + wrapper.set_socket (socket); char buffer[128]; - reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer)); - reader.consume (strlen (buffer) + 1); + wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + wrapper.consume (strlen (buffer) + 1); stringstream s (buffer); @@ -123,7 +124,7 @@ Server::process (shared_ptr socket) } for (int i = 0; i < image->components(); ++i) { - reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i)); + wrapper.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); } #ifdef DEBUG_HASH @@ -189,10 +190,9 @@ Server::run (int num_threads) _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); } - asio::io_service io_service; - asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); + asio::ip::tcp::acceptor acceptor (_io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); while (1) { - shared_ptr socket (new asio::ip::tcp::socket (io_service)); + shared_ptr socket (new asio::ip::tcp::socket (_io_service)); acceptor.accept (*socket); mutex::scoped_lock lock (_worker_mutex); diff --git a/src/lib/server.h b/src/lib/server.h index fac440a76..747081443 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -81,7 +81,8 @@ public: private: void worker_thread (); int process (boost::shared_ptr socket); - + + boost::asio::io_service _io_service; std::vector _worker_threads; std::list > _queue; boost::mutex _worker_mutex; diff --git a/src/lib/util.cc b/src/lib/util.cc index e79c7cd1c..deab5d639 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -33,6 +33,8 @@ #include #include #include +#include +#include #include #include #include @@ -286,88 +288,6 @@ seconds (struct timeval t) return t.tv_sec + (double (t.tv_usec) / 1e6); } -/** @param socket Socket to read from */ -SocketReader::SocketReader (shared_ptr socket) - : _socket (socket) - , _buffer_data (0) -{ - -} - -/** Mark some data as being `consumed', so that it will not be returned - * as data again. - * @param size Amount of data to consume, in bytes. - */ -void -SocketReader::consume (int size) -{ - assert (_buffer_data >= size); - - _buffer_data -= size; - if (_buffer_data > 0) { - /* Shift still-valid data to the start of the buffer */ - memmove (_buffer, _buffer + size, _buffer_data); - } -} - -/** Read a definite amount of data from our socket, and mark - * it as consumed. - * @param data Where to put the data. - * @param size Number of bytes to read. - */ -void -SocketReader::read_definite_and_consume (uint8_t* data, int size) -{ - int const from_buffer = min (_buffer_data, size); - if (from_buffer > 0) { - /* Get data from our buffer */ - memcpy (data, _buffer, from_buffer); - consume (from_buffer); - /* Update our output state */ - data += from_buffer; - size -= from_buffer; - } - - /* read() the rest */ - while (size > 0) { - int const n = asio::read (*_socket, asio::buffer (data, size)); - if (n <= 0) { - throw NetworkError ("could not read"); - } - - data += n; - size -= n; - } -} - -/** Read as much data as is available, up to some limit. - * @param data Where to put the data. - * @param size Maximum amount of data to read. - */ -void -SocketReader::read_indefinite (uint8_t* data, int size) -{ - assert (size < int (sizeof (_buffer))); - - /* Amount of extra data we need to read () */ - int to_read = size - _buffer_data; - while (to_read > 0) { - /* read as much of it as we can (into our buffer) */ - int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read)); - if (n <= 0) { - throw NetworkError ("could not read"); - } - - to_read -= n; - _buffer_data += n; - } - - assert (_buffer_data >= size); - - /* copy data into the output buffer */ - assert (size >= _buffer_data); - memcpy (data, _buffer, size); -} #ifdef DVDOMATIC_POSIX void @@ -518,9 +438,163 @@ colour_lut_index_to_name (int index) return ""; } +DeadlineWrapper::DeadlineWrapper (asio::io_service& io_service) + : _io_service (io_service) + , _deadline (io_service) + , _buffer_data (0) +{ + _deadline.expires_at (posix_time::pos_infin); + check (); +} + +void +DeadlineWrapper::set_socket (shared_ptr socket) +{ + _socket = socket; +} + +void +DeadlineWrapper::check () +{ + if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) { + if (_socket) { + _socket->close (); + } + _deadline.expires_at (posix_time::pos_infin); + } + + _deadline.async_wait (boost::bind (&DeadlineWrapper::check, this)); +} + +void +DeadlineWrapper::connect (asio::ip::basic_resolver_entry const & endpoint, int timeout) +{ + assert (_socket); + + system::error_code ec = asio::error::would_block; + _socket->async_connect (endpoint, lambda::var(ec) = lambda::_1); + do { + _io_service.run_one(); + } while (ec == asio::error::would_block); + + if (ec || !_socket->is_open ()) { + throw NetworkError ("connect timed out"); + } +} + +void +DeadlineWrapper::write (uint8_t const * data, int size, int timeout) +{ + assert (_socket); + + _deadline.expires_from_now (posix_time::seconds (timeout)); + system::error_code ec = asio::error::would_block; + + asio::async_write (*_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); + do { + _io_service.run_one (); + } while (ec == asio::error::would_block); + + if (ec) { + throw NetworkError ("write timed out"); + } +} + int -read_with_timeout (boost::asio::ip::tcp::socket* socket, uint8_t* data, int size) +DeadlineWrapper::read (uint8_t* data, int size, int timeout) { + assert (_socket); + + _deadline.expires_from_now (posix_time::seconds (timeout)); + system::error_code ec = asio::error::would_block; + + int amount_read = 0; + + _socket->async_read_some ( + asio::buffer (data, size), + (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2) + ); + + _io_service.run (); + + if (ec) { + amount_read = 0; + } + + return amount_read; +} + +/** Mark some data as being `consumed', so that it will not be returned + * as data again. + * @param size Amount of data to consume, in bytes. + */ +void +DeadlineWrapper::consume (int size) +{ + assert (_buffer_data >= size); - return asio::read (socket, asio::buffer (data, size)); + _buffer_data -= size; + if (_buffer_data > 0) { + /* Shift still-valid data to the start of the buffer */ + memmove (_buffer, _buffer + size, _buffer_data); + } +} + +/** Read a definite amount of data from our socket, and mark + * it as consumed. + * @param data Where to put the data. + * @param size Number of bytes to read. + */ +void +DeadlineWrapper::read_definite_and_consume (uint8_t* data, int size, int timeout) +{ + int const from_buffer = min (_buffer_data, size); + if (from_buffer > 0) { + /* Get data from our buffer */ + memcpy (data, _buffer, from_buffer); + consume (from_buffer); + /* Update our output state */ + data += from_buffer; + size -= from_buffer; + } + + /* read() the rest */ + while (size > 0) { + int const n = read (data, size, timeout); + if (n <= 0) { + throw NetworkError ("could not read"); + } + + data += n; + size -= n; + } +} + +/** Read as much data as is available, up to some limit. + * @param data Where to put the data. + * @param size Maximum amount of data to read. + */ +void +DeadlineWrapper::read_indefinite (uint8_t* data, int size, int timeout) +{ + assert (size < int (sizeof (_buffer))); + + /* Amount of extra data we need to read () */ + int to_read = size - _buffer_data; + while (to_read > 0) { + /* read as much of it as we can (into our buffer) */ + int const n = read (_buffer + _buffer_data, to_read, timeout); + if (n <= 0) { + throw NetworkError ("could not read"); + } + + to_read -= n; + _buffer_data += n; + } + + assert (_buffer_data >= size); + + /* copy data into the output buffer */ + assert (size >= _buffer_data); + memcpy (data, _buffer, size); } diff --git a/src/lib/util.h b/src/lib/util.h index 568fe05d0..c3a42e448 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -56,29 +56,6 @@ enum ContentType { extern void md5_data (std::string, void const *, int); #endif -/** @class SocketReader - * @brief A helper class from reading from sockets. - * - * You can probably do this stuff directly in boost, but I'm not sure how. - */ -class SocketReader -{ -public: - SocketReader (boost::shared_ptr); - - void read_definite_and_consume (uint8_t *, int); - void read_indefinite (uint8_t *, int); - void consume (int); - -private: - /** socket we are reading from */ - boost::shared_ptr _socket; - /** a buffer for small reads */ - uint8_t _buffer[256]; - /** amount of valid data in the buffer */ - int _buffer_data; -}; - /** @class Size * @brief Representation of the size of something */ struct Size @@ -136,4 +113,31 @@ extern std::string crop_string (Position, Size); extern int dcp_audio_sample_rate (int); extern std::string colour_lut_index_to_name (int index); +class DeadlineWrapper +{ +public: + DeadlineWrapper (boost::asio::io_service& io_service); + + void set_socket (boost::shared_ptr socket); + + void connect (boost::asio::ip::basic_resolver_entry const & endpoint, int timeout); + void write (uint8_t const * data, int size, int timeout); + int read (uint8_t* data, int size, int timeout); + + void read_definite_and_consume (uint8_t* data, int size, int timeout); + void read_indefinite (uint8_t* data, int size, int timeout); + void consume (int amount); + +private: + void check (); + + boost::asio::io_service& _io_service; + boost::asio::deadline_timer _deadline; + boost::shared_ptr _socket; + /** a buffer for small reads */ + uint8_t _buffer[256]; + /** amount of valid data in the buffer */ + int _buffer_data; +}; + #endif -- cgit v1.2.3 From f62909cfe63e6a2e238e655ea0536a5f627e0ddf Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 23 Sep 2012 14:48:58 +0100 Subject: Use log rather the cout in server. --- src/lib/server.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/server.cc b/src/lib/server.cc index 395786b67..d92fdf2b6 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -174,7 +174,9 @@ Server::worker_thread () if (frame >= 0) { struct timeval end; gettimeofday (&end, 0); - cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n"; + stringstream s; + s << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)); + _log->log (s.str ()); } _worker_condition.notify_all (); @@ -184,7 +186,9 @@ Server::worker_thread () void Server::run (int num_threads) { - cout << "Server starting with " << num_threads << " threads.\n"; + stringstream s; + s << "Server starting with " << num_threads << " threads."; + _log->log (s.str ()); for (int i = 0; i < num_threads; ++i) { _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); -- cgit v1.2.3 From 46f1b3106ab959e21946fe1f035efc6dc1743b49 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 23 Sep 2012 15:07:30 +0100 Subject: Missing use of wrapper. --- src/lib/dcp_video_frame.cc | 6 +++--- src/lib/dcp_video_frame.h | 2 +- src/lib/server.cc | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index d202109d0..ee29d8601 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -378,12 +378,12 @@ EncodedData::write (shared_ptr opt, int frame) * @param socket Socket */ void -EncodedData::send (shared_ptr socket) +EncodedData::send (DeadlineWrapper& wrapper) { stringstream s; s << _size; - asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1)); - asio::write (*socket, asio::buffer (_data, _size)); + wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); + wrapper.write (_data, _size, 30); } #ifdef DEBUG_HASH diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index ee54bc0f5..d82aee367 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -48,7 +48,7 @@ public: virtual ~EncodedData () {} - void send (boost::shared_ptr); + void send (DeadlineWrapper& wrapper); void write (boost::shared_ptr, int); #ifdef DEBUG_HASH diff --git a/src/lib/server.cc b/src/lib/server.cc index d92fdf2b6..9e61c2282 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -133,7 +133,7 @@ Server::process (shared_ptr socket) DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, _log); shared_ptr encoded = dcp_video_frame.encode_locally (); - encoded->send (socket); + encoded->send (wrapper); #ifdef DEBUG_HASH encoded->hash ("Encoded image (as made by server and as sent back)"); -- cgit v1.2.3 From e89fb9d81358b51ed0e231725f7fb6eb63f96c5b Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 23 Sep 2012 17:50:31 +0100 Subject: Use io_service per thread. --- src/lib/dcp_video_frame.cc | 11 ++++------- src/lib/dcp_video_frame.h | 2 +- src/lib/server.cc | 28 +++++++++++++--------------- src/lib/server.h | 7 ++++--- src/lib/util.cc | 30 ++++++++---------------------- src/lib/util.h | 10 ++++++---- test/test.cc | 17 ++++++++--------- 7 files changed, 44 insertions(+), 61 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index ee29d8601..5c0ec6a6a 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -297,10 +297,7 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast (Config::instance()->server_port ())); asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query); - shared_ptr socket (new asio::ip::tcp::socket (io_service)); - - DeadlineWrapper wrapper (io_service); - wrapper.set_socket (socket); + DeadlineWrapper wrapper; wrapper.connect (*endpoint_iterator, 30); @@ -378,12 +375,12 @@ EncodedData::write (shared_ptr opt, int frame) * @param socket Socket */ void -EncodedData::send (DeadlineWrapper& wrapper) +EncodedData::send (shared_ptr wrapper) { stringstream s; s << _size; - wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); - wrapper.write (_data, _size, 30); + wrapper->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); + wrapper->write (_data, _size, 30); } #ifdef DEBUG_HASH diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index d82aee367..752f0dda7 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -48,7 +48,7 @@ public: virtual ~EncodedData () {} - void send (DeadlineWrapper& wrapper); + void send (boost::shared_ptr wrapper); void write (boost::shared_ptr, int); #ifdef DEBUG_HASH diff --git a/src/lib/server.cc b/src/lib/server.cc index 9e61c2282..1f860d254 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -70,14 +70,11 @@ Server::Server (Log* log) } int -Server::process (shared_ptr socket) +Server::process (shared_ptr wrapper) { - DeadlineWrapper wrapper (_io_service); - wrapper.set_socket (socket); - char buffer[128]; - wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); - wrapper.consume (strlen (buffer) + 1); + wrapper->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + wrapper->consume (strlen (buffer) + 1); stringstream s (buffer); @@ -124,7 +121,7 @@ Server::process (shared_ptr socket) } for (int i = 0; i < image->components(); ++i) { - wrapper.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); + wrapper->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); } #ifdef DEBUG_HASH @@ -151,7 +148,7 @@ Server::worker_thread () _worker_condition.wait (lock); } - shared_ptr socket = _queue.front (); + shared_ptr wrapper = _queue.front (); _queue.pop_front (); lock.unlock (); @@ -162,12 +159,12 @@ Server::worker_thread () gettimeofday (&start, 0); try { - frame = process (socket); + frame = process (wrapper); } catch (std::exception& e) { cerr << "Error: " << e.what() << "\n"; } - socket.reset (); + wrapper.reset (); lock.lock (); @@ -193,11 +190,12 @@ Server::run (int num_threads) for (int i = 0; i < num_threads; ++i) { _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); } - - asio::ip::tcp::acceptor acceptor (_io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); + + asio::io_service io_service; + asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); while (1) { - shared_ptr socket (new asio::ip::tcp::socket (_io_service)); - acceptor.accept (*socket); + shared_ptr wrapper (new DeadlineWrapper); + acceptor.accept (wrapper->socket ()); mutex::scoped_lock lock (_worker_mutex); @@ -206,7 +204,7 @@ Server::run (int num_threads) _worker_condition.wait (lock); } - _queue.push_back (socket); + _queue.push_back (wrapper); _worker_condition.notify_all (); } } diff --git a/src/lib/server.h b/src/lib/server.h index 747081443..4cb6f2563 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -28,6 +28,8 @@ #include #include "log.h" +class DeadlineWrapper; + /** @class ServerDescription * @brief Class to describe a server to which we can send encoding work. */ @@ -80,11 +82,10 @@ public: private: void worker_thread (); - int process (boost::shared_ptr socket); + int process (boost::shared_ptr wrapper); - boost::asio::io_service _io_service; std::vector _worker_threads; - std::list > _queue; + std::list > _queue; boost::mutex _worker_mutex; boost::condition _worker_condition; Log* _log; diff --git a/src/lib/util.cc b/src/lib/util.cc index 3f48d696b..8713c5922 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -438,28 +438,20 @@ colour_lut_index_to_name (int index) return ""; } -DeadlineWrapper::DeadlineWrapper (asio::io_service& io_service) - : _io_service (io_service) - , _deadline (io_service) +DeadlineWrapper::DeadlineWrapper () + : _deadline (_io_service) + , _socket (_io_service) , _buffer_data (0) { _deadline.expires_at (posix_time::pos_infin); check (); } -void -DeadlineWrapper::set_socket (shared_ptr socket) -{ - _socket = socket; -} - void DeadlineWrapper::check () { if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) { - if (_socket) { - _socket->close (); - } + _socket.close (); _deadline.expires_at (posix_time::pos_infin); } @@ -469,15 +461,13 @@ DeadlineWrapper::check () void DeadlineWrapper::connect (asio::ip::basic_resolver_entry const & endpoint, int timeout) { - assert (_socket); - system::error_code ec = asio::error::would_block; - _socket->async_connect (endpoint, lambda::var(ec) = lambda::_1); + _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1); do { _io_service.run_one(); } while (ec == asio::error::would_block); - if (ec || !_socket->is_open ()) { + if (ec || !_socket.is_open ()) { throw NetworkError ("connect timed out"); } } @@ -485,12 +475,10 @@ DeadlineWrapper::connect (asio::ip::basic_resolver_entry const & void DeadlineWrapper::write (uint8_t const * data, int size, int timeout) { - assert (_socket); - _deadline.expires_from_now (posix_time::seconds (timeout)); system::error_code ec = asio::error::would_block; - asio::async_write (*_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); + asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); do { _io_service.run_one (); } while (ec == asio::error::would_block); @@ -503,14 +491,12 @@ DeadlineWrapper::write (uint8_t const * data, int size, int timeout) int DeadlineWrapper::read (uint8_t* data, int size, int timeout) { - assert (_socket); - _deadline.expires_from_now (posix_time::seconds (timeout)); system::error_code ec = asio::error::would_block; int amount_read = 0; - _socket->async_read_some ( + _socket.async_read_some ( asio::buffer (data, size), (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2) ); diff --git a/src/lib/util.h b/src/lib/util.h index 2785a5dc1..8d6e2f541 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -116,9 +116,11 @@ extern std::string colour_lut_index_to_name (int index); class DeadlineWrapper { public: - DeadlineWrapper (boost::asio::io_service& io_service); + DeadlineWrapper (); - void set_socket (boost::shared_ptr socket); + boost::asio::ip::tcp::socket& socket () { + return _socket; + } void connect (boost::asio::ip::basic_resolver_entry const & endpoint, int timeout); void write (uint8_t const * data, int size, int timeout); @@ -133,9 +135,9 @@ private: DeadlineWrapper (DeadlineWrapper const &); - boost::asio::io_service& _io_service; + boost::asio::io_service _io_service; boost::asio::deadline_timer _deadline; - boost::shared_ptr _socket; + boost::asio::ip::tcp::socket _socket; /** a buffer for small reads */ uint8_t _buffer[256]; /** amount of valid data in the buffer */ diff --git a/test/test.cc b/test/test.cc index b77eb2b51..638d526e0 100644 --- a/test/test.cc +++ b/test/test.cc @@ -306,13 +306,12 @@ BOOST_AUTO_TEST_CASE (client_server_test) ServerDescription description ("localhost", 2); - thread* a = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - thread* b = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - thread* c = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - thread* d = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - - a->join (); - b->join (); - c->join (); - d->join (); + list threads; + for (int i = 0; i < 8; ++i) { + threads.push_back (new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded))); + } + + for (list::iterator i = threads.begin(); i != threads.end(); ++i) { + (*i)->join (); + } } -- cgit v1.2.3 From 737c3392039740f7a22a9ff922f8492905173b9c Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 23 Sep 2012 17:56:08 +0100 Subject: Tidy up naming. --- src/lib/dcp_video_frame.cc | 21 ++++++++++----------- src/lib/dcp_video_frame.h | 2 +- src/lib/server.cc | 22 +++++++++++----------- src/lib/server.h | 6 +++--- src/lib/util.cc | 18 +++++++++--------- src/lib/util.h | 8 ++++---- 6 files changed, 38 insertions(+), 39 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index 5c0ec6a6a..d8af3462d 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -293,13 +293,12 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) { asio::io_service io_service; asio::ip::tcp::resolver resolver (io_service); - asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast (Config::instance()->server_port ())); asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query); - DeadlineWrapper wrapper; + Socket socket; - wrapper.connect (*endpoint_iterator, 30); + socket.connect (*endpoint_iterator, 30); #ifdef DEBUG_HASH _input->hash ("Input for remote encoding (before sending)"); @@ -322,19 +321,19 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) s << _input->line_size()[i] << " "; } - wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); + socket.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); for (int i = 0; i < _input->components(); ++i) { - wrapper.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30); + socket.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30); } char buffer[32]; - wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); - wrapper.consume (strlen (buffer) + 1); + socket.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + socket.consume (strlen (buffer) + 1); shared_ptr e (new RemotelyEncodedData (atoi (buffer))); /* now read the rest */ - wrapper.read_definite_and_consume (e->data(), e->size(), 30); + socket.read_definite_and_consume (e->data(), e->size(), 30); #ifdef DEBUG_HASH e->hash ("Encoded image (after receiving)"); @@ -375,12 +374,12 @@ EncodedData::write (shared_ptr opt, int frame) * @param socket Socket */ void -EncodedData::send (shared_ptr wrapper) +EncodedData::send (shared_ptr socket) { stringstream s; s << _size; - wrapper->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); - wrapper->write (_data, _size, 30); + socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); + socket->write (_data, _size, 30); } #ifdef DEBUG_HASH diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index 752f0dda7..da4e0c301 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -48,7 +48,7 @@ public: virtual ~EncodedData () {} - void send (boost::shared_ptr wrapper); + void send (boost::shared_ptr socket); void write (boost::shared_ptr, int); #ifdef DEBUG_HASH diff --git a/src/lib/server.cc b/src/lib/server.cc index 1f860d254..8ca426049 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -70,11 +70,11 @@ Server::Server (Log* log) } int -Server::process (shared_ptr wrapper) +Server::process (shared_ptr socket) { char buffer[128]; - wrapper->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); - wrapper->consume (strlen (buffer) + 1); + socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + socket->consume (strlen (buffer) + 1); stringstream s (buffer); @@ -121,7 +121,7 @@ Server::process (shared_ptr wrapper) } for (int i = 0; i < image->components(); ++i) { - wrapper->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); + socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); } #ifdef DEBUG_HASH @@ -130,7 +130,7 @@ Server::process (shared_ptr wrapper) DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, _log); shared_ptr encoded = dcp_video_frame.encode_locally (); - encoded->send (wrapper); + encoded->send (socket); #ifdef DEBUG_HASH encoded->hash ("Encoded image (as made by server and as sent back)"); @@ -148,7 +148,7 @@ Server::worker_thread () _worker_condition.wait (lock); } - shared_ptr wrapper = _queue.front (); + shared_ptr socket = _queue.front (); _queue.pop_front (); lock.unlock (); @@ -159,12 +159,12 @@ Server::worker_thread () gettimeofday (&start, 0); try { - frame = process (wrapper); + frame = process (socket); } catch (std::exception& e) { cerr << "Error: " << e.what() << "\n"; } - wrapper.reset (); + socket.reset (); lock.lock (); @@ -194,8 +194,8 @@ Server::run (int num_threads) asio::io_service io_service; asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); while (1) { - shared_ptr wrapper (new DeadlineWrapper); - acceptor.accept (wrapper->socket ()); + shared_ptr socket (new Socket); + acceptor.accept (socket->socket ()); mutex::scoped_lock lock (_worker_mutex); @@ -204,7 +204,7 @@ Server::run (int num_threads) _worker_condition.wait (lock); } - _queue.push_back (wrapper); + _queue.push_back (socket); _worker_condition.notify_all (); } } diff --git a/src/lib/server.h b/src/lib/server.h index 4cb6f2563..32ba8dc4b 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -28,7 +28,7 @@ #include #include "log.h" -class DeadlineWrapper; +class Socket; /** @class ServerDescription * @brief Class to describe a server to which we can send encoding work. @@ -82,10 +82,10 @@ public: private: void worker_thread (); - int process (boost::shared_ptr wrapper); + int process (boost::shared_ptr socket); std::vector _worker_threads; - std::list > _queue; + std::list > _queue; boost::mutex _worker_mutex; boost::condition _worker_condition; Log* _log; diff --git a/src/lib/util.cc b/src/lib/util.cc index 8713c5922..d12bd3e77 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -438,7 +438,7 @@ colour_lut_index_to_name (int index) return ""; } -DeadlineWrapper::DeadlineWrapper () +Socket::Socket () : _deadline (_io_service) , _socket (_io_service) , _buffer_data (0) @@ -448,18 +448,18 @@ DeadlineWrapper::DeadlineWrapper () } void -DeadlineWrapper::check () +Socket::check () { if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) { _socket.close (); _deadline.expires_at (posix_time::pos_infin); } - _deadline.async_wait (boost::bind (&DeadlineWrapper::check, this)); + _deadline.async_wait (boost::bind (&Socket::check, this)); } void -DeadlineWrapper::connect (asio::ip::basic_resolver_entry const & endpoint, int timeout) +Socket::connect (asio::ip::basic_resolver_entry const & endpoint, int timeout) { system::error_code ec = asio::error::would_block; _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1); @@ -473,7 +473,7 @@ DeadlineWrapper::connect (asio::ip::basic_resolver_entry const & } void -DeadlineWrapper::write (uint8_t const * data, int size, int timeout) +Socket::write (uint8_t const * data, int size, int timeout) { _deadline.expires_from_now (posix_time::seconds (timeout)); system::error_code ec = asio::error::would_block; @@ -489,7 +489,7 @@ DeadlineWrapper::write (uint8_t const * data, int size, int timeout) } int -DeadlineWrapper::read (uint8_t* data, int size, int timeout) +Socket::read (uint8_t* data, int size, int timeout) { _deadline.expires_from_now (posix_time::seconds (timeout)); system::error_code ec = asio::error::would_block; @@ -517,7 +517,7 @@ DeadlineWrapper::read (uint8_t* data, int size, int timeout) * @param size Amount of data to consume, in bytes. */ void -DeadlineWrapper::consume (int size) +Socket::consume (int size) { assert (_buffer_data >= size); @@ -534,7 +534,7 @@ DeadlineWrapper::consume (int size) * @param size Number of bytes to read. */ void -DeadlineWrapper::read_definite_and_consume (uint8_t* data, int size, int timeout) +Socket::read_definite_and_consume (uint8_t* data, int size, int timeout) { int const from_buffer = min (_buffer_data, size); if (from_buffer > 0) { @@ -563,7 +563,7 @@ DeadlineWrapper::read_definite_and_consume (uint8_t* data, int size, int timeout * @param size Maximum amount of data to read. */ void -DeadlineWrapper::read_indefinite (uint8_t* data, int size, int timeout) +Socket::read_indefinite (uint8_t* data, int size, int timeout) { assert (size < int (sizeof (_buffer))); diff --git a/src/lib/util.h b/src/lib/util.h index 8d6e2f541..d7f233003 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -113,10 +113,10 @@ extern std::string crop_string (Position, Size); extern int dcp_audio_sample_rate (int); extern std::string colour_lut_index_to_name (int index); -class DeadlineWrapper +class Socket { public: - DeadlineWrapper (); + Socket (); boost::asio::ip::tcp::socket& socket () { return _socket; @@ -124,7 +124,6 @@ public: void connect (boost::asio::ip::basic_resolver_entry const & endpoint, int timeout); void write (uint8_t const * data, int size, int timeout); - int read (uint8_t* data, int size, int timeout); void read_definite_and_consume (uint8_t* data, int size, int timeout); void read_indefinite (uint8_t* data, int size, int timeout); @@ -132,8 +131,9 @@ public: private: void check (); + int read (uint8_t* data, int size, int timeout); - DeadlineWrapper (DeadlineWrapper const &); + Socket (Socket const &); boost::asio::io_service _io_service; boost::asio::deadline_timer _deadline; -- cgit v1.2.3 From 4fd257106009b2db170dafddece06ee3c190fceb Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Mon, 24 Sep 2012 23:49:53 +0100 Subject: Remove long-since disused hash debugging. --- src/lib/dcp_video_frame.cc | 30 ------------------------------ src/lib/dcp_video_frame.h | 4 ---- src/lib/image.cc | 32 +------------------------------- src/lib/image.h | 4 ---- src/lib/server.cc | 8 -------- src/lib/util.cc | 24 +++++++++++------------- src/lib/util.h | 4 +--- src/lib/wscript | 8 ++------ src/tools/servomatictest.cc | 9 --------- wscript | 2 -- 10 files changed, 15 insertions(+), 110 deletions(-) (limited to 'src/lib/server.cc') diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index d8af3462d..96c40358a 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -55,10 +55,6 @@ #include "image.h" #include "log.h" -#ifdef DEBUG_HASH -#include -#endif - using namespace std; using namespace boost; @@ -255,12 +251,6 @@ DCPVideoFrame::encode_locally () /* Set event manager to null (openjpeg 1.3 bug) */ _cinfo->event_mgr = 0; -#ifdef DEBUG_HASH - md5_data ("J2K in X frame " + lexical_cast (_frame), _image->comps[0].data, size * sizeof (int)); - md5_data ("J2K in Y frame " + lexical_cast (_frame), _image->comps[1].data, size * sizeof (int)); - md5_data ("J2K in Z frame " + lexical_cast (_frame), _image->comps[2].data, size * sizeof (int)); -#endif - /* Setup the encoder parameters using the current image and user parameters */ opj_setup_encoder (_cinfo, _parameters, _image); @@ -271,10 +261,6 @@ DCPVideoFrame::encode_locally () throw EncodeError ("jpeg2000 encoding failed"); } -#ifdef DEBUG_HASH - md5_data ("J2K out frame " + lexical_cast (_frame), _cio->buffer, cio_tell (_cio)); -#endif - { stringstream s; s << "Finished locally-encoded frame " << _frame; @@ -300,10 +286,6 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) socket.connect (*endpoint_iterator, 30); -#ifdef DEBUG_HASH - _input->hash ("Input for remote encoding (before sending)"); -#endif - stringstream s; s << "encode " << _input->size().width << " " << _input->size().height << " " @@ -335,10 +317,6 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) /* now read the rest */ socket.read_definite_and_consume (e->data(), e->size(), 30); -#ifdef DEBUG_HASH - e->hash ("Encoded image (after receiving)"); -#endif - { stringstream s; s << "Finished remotely-encoded frame " << _frame; @@ -382,14 +360,6 @@ EncodedData::send (shared_ptr socket) socket->write (_data, _size, 30); } -#ifdef DEBUG_HASH -void -EncodedData::hash (string n) const -{ - md5_data (n, _data, _size); -} -#endif - /** @param s Size of data in bytes */ RemotelyEncodedData::RemotelyEncodedData (int s) : EncodedData (new uint8_t[s], s) diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index da4e0c301..72f885e45 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -51,10 +51,6 @@ public: void send (boost::shared_ptr socket); void write (boost::shared_ptr, int); -#ifdef DEBUG_HASH - void hash (std::string) const; -#endif - /** @return data */ uint8_t* data () const { return _data; diff --git a/src/lib/image.cc b/src/lib/image.cc index f16bb9f77..89536da33 100644 --- a/src/lib/image.cc +++ b/src/lib/image.cc @@ -27,6 +27,7 @@ #include #include #include +#include extern "C" { #include #include @@ -39,10 +40,6 @@ extern "C" { #include "exceptions.h" #include "scaler.h" -#ifdef DEBUG_HASH -#include -#endif - using namespace std; using namespace boost; @@ -85,33 +82,6 @@ Image::components () const return 0; } -#ifdef DEBUG_HASH -/** Write a MD5 hash of the image's data to stdout. - * @param n Title to give the output. - */ -void -Image::hash (string n) const -{ - MHASH ht = mhash_init (MHASH_MD5); - if (ht == MHASH_FAILED) { - throw EncodeError ("could not create hash thread"); - } - - for (int i = 0; i < components(); ++i) { - mhash (ht, data()[i], line_size()[i] * lines(i)); - } - - uint8_t hash[16]; - mhash_deinit (ht, hash); - - printf ("%s: ", n.c_str ()); - for (int i = 0; i < int (mhash_get_block_size (MHASH_MD5)); ++i) { - printf ("%.2x", hash[i]); - } - printf ("\n"); -} -#endif - /** Scale this image to a given size and convert it to RGB. * @param out_size Output image size in pixels. * @param scaler Scaler to use. diff --git a/src/lib/image.h b/src/lib/image.h index 97ab1d5ff..0161d2b01 100644 --- a/src/lib/image.h +++ b/src/lib/image.h @@ -68,10 +68,6 @@ public: boost::shared_ptr scale_and_convert_to_rgb (Size, int, Scaler const *) const; boost::shared_ptr post_process (std::string) const; -#ifdef DEBUG_HASH - void hash (std::string) const; -#endif - void make_black (); PixelFormat pixel_format () const { diff --git a/src/lib/server.cc b/src/lib/server.cc index 8ca426049..f8c4425d9 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -124,17 +124,9 @@ Server::process (shared_ptr socket) socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); } -#ifdef DEBUG_HASH - image->hash ("Image for encoding (as received by server)"); -#endif - DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, _log); shared_ptr encoded = dcp_video_frame.encode_locally (); encoded->send (socket); - -#ifdef DEBUG_HASH - encoded->hash ("Encoded image (as made by server and as sent back)"); -#endif return frame; } diff --git a/src/lib/util.cc b/src/lib/util.cc index 73222083a..c779268e2 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -40,6 +40,7 @@ #include #include #include +#include extern "C" { #include #include @@ -61,10 +62,6 @@ extern "C" { #include "player_manager.h" #endif -#ifdef DEBUG_HASH -#include -#endif - using namespace std; using namespace boost; @@ -347,9 +344,8 @@ split_at_spaces_considering_quotes (string s) return out; } -#ifdef DEBUG_HASH -void -md5_data (string title, void const * data, int size) +string +md5_hash (void const * data, int size) { MHASH ht = mhash_init (MHASH_MD5); if (ht == MHASH_FAILED) { @@ -360,14 +356,16 @@ md5_data (string title, void const * data, int size) uint8_t hash[16]; mhash_deinit (ht, hash); - - printf ("%s [%d]: ", title.c_str (), size); - for (int i = 0; i < int (mhash_get_block_size (MHASH_MD5)); ++i) { - printf ("%.2x", hash[i]); + + int const N = mhash_get_block_size (MHASH_MD5); + stringstream s; + s << hex << setfill('0') << setw(2); + for (int i = 0; i < N; ++i) { + s << ((int) hash[i]); } - printf ("\n"); + + return s.str (); } -#endif /** @param file File name. * @return MD5 digest of file's contents. diff --git a/src/lib/util.h b/src/lib/util.h index 63d492e60..03d04b852 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -52,9 +52,7 @@ enum ContentType { VIDEO }; -#ifdef DEBUG_HASH -extern void md5_data (std::string, void const *, int); -#endif +extern std::string md5_hash (void const *, int); /** @class Size * @brief Representation of the size of something */ diff --git a/src/lib/wscript b/src/lib/wscript index 71a2b23f4..26740a7e9 100644 --- a/src/lib/wscript +++ b/src/lib/wscript @@ -1,17 +1,13 @@ def configure(conf): - if conf.options.debug_hash: - conf.env.append_value('CXXFLAGS', '-DDEBUG_HASH') - conf.check_cc(msg = 'Checking for library libmhash', function_name = 'mhash_init', header_name = 'mhash.h', lib = 'mhash', uselib_store = 'MHASH') + conf.check_cc(msg = 'Checking for library libmhash', function_name = 'mhash_init', header_name = 'mhash.h', lib = 'mhash', uselib_store = 'MHASH') def build(bld): obj = bld(features = 'cxx cxxshlib') obj.name = 'libdvdomatic' obj.export_includes = ['.'] - obj.uselib = 'AVCODEC AVUTIL AVFORMAT AVFILTER SWSCALE SWRESAMPLE SNDFILE BOOST_FILESYSTEM BOOST_THREAD OPENJPEG POSTPROC TIFF SIGC++ MAGICK SSH DCP GLIB' + obj.uselib = 'AVCODEC AVUTIL AVFORMAT AVFILTER SWSCALE SWRESAMPLE SNDFILE BOOST_FILESYSTEM BOOST_THREAD OPENJPEG POSTPROC TIFF SIGC++ MAGICK SSH DCP GLIB MHASH' if bld.env.TARGET_WINDOWS: obj.uselib += ' WINSOCK2' - if bld.env.DEBUG_HASH: - obj.uselib += ' MHASH' obj.source = """ ab_transcode_job.cc ab_transcoder.cc diff --git a/src/tools/servomatictest.cc b/src/tools/servomatictest.cc index 0f37e73a5..d6804c981 100644 --- a/src/tools/servomatictest.cc +++ b/src/tools/servomatictest.cc @@ -47,12 +47,8 @@ process_video (shared_ptr image, int frame) shared_ptr local (new DCPVideoFrame (image, Size (1024, 1024), 0, Scaler::from_id ("bicubic"), frame, 24, "", 0, 250000000, &log_)); shared_ptr remote (new DCPVideoFrame (image, Size (1024, 1024), 0, Scaler::from_id ("bicubic"), frame, 24, "", 0, 250000000, &log_)); -#if defined(DEBUG_HASH) - cout << "Frame " << frame << ":\n"; -#else cout << "Frame " << frame << ": "; cout.flush (); -#endif shared_ptr local_encoded = local->encode_locally (); shared_ptr remote_encoded; @@ -64,11 +60,6 @@ process_video (shared_ptr image, int frame) remote_error = e.what (); } -#if defined(DEBUG_HASH) - cout << "Frame " << frame << ": "; - cout.flush (); -#endif - if (!remote_error.empty ()) { cout << "\033[0;31mnetwork problem: " << remote_error << "\033[0m\n"; return; diff --git a/wscript b/wscript index 71a89dfd3..69d68a35c 100644 --- a/wscript +++ b/wscript @@ -9,7 +9,6 @@ def options(opt): opt.load('compiler_cxx') opt.load('winres') - opt.add_option('--debug-hash', action='store_true', default = False, help = 'print hashes of data at various points') opt.add_option('--enable-debug', action='store_true', default = False, help = 'build with debugging information and without optimisation') opt.add_option('--disable-gui', action='store_true', default = False, help = 'disable building of GUI tools') opt.add_option('--disable-player', action='store_true', default = False, help = 'disable building of the player components') @@ -37,7 +36,6 @@ def configure(conf): boost_lib_suffix = '' boost_thread = 'boost_thread' - conf.env.DEBUG_HASH = conf.options.debug_hash conf.env.TARGET_WINDOWS = conf.options.target_windows conf.env.DISABLE_GUI = conf.options.disable_gui conf.env.DISABLE_PLAYER = conf.options.disable_player -- cgit v1.2.3