2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #include <boost/array.hpp>
28 #include <boost/asio.hpp>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/thread.hpp>
31 #include <boost/thread/mutex.hpp>
32 #include <boost/thread/condition.hpp>
34 #include "dcp_video_frame.h"
35 #include "exceptions.h"
45 using namespace boost;
47 static vector<thread *> worker_threads;
49 static std::list<shared_ptr<asio::ip::tcp::socket> > queue;
50 static mutex worker_mutex;
51 static condition worker_condition;
52 static Log log_ ("servomatic.log");
55 process (shared_ptr<asio::ip::tcp::socket> socket)
57 SocketReader reader (socket);
60 reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
61 reader.consume (strlen (buffer) + 1);
63 stringstream s (buffer);
67 if (command != "encode") {
77 float frames_per_second;
82 s >> in_size.width >> in_size.height
84 >> out_size.width >> out_size.height
93 PixelFormat pixel_format = (PixelFormat) pixel_format_int;
94 Scaler const * scaler = Scaler::from_id (scaler_id);
95 if (post_process == "none") {
99 shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
101 for (int i = 0; i < image->components(); ++i) {
104 image->set_line_size (i, line_size);
107 for (int i = 0; i < image->components(); ++i) {
108 reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
112 image->hash ("Image for encoding (as received by server)");
115 DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &log_);
116 shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
120 encoded->hash ("Encoded image (as made by server and as sent back)");
130 mutex::scoped_lock lock (worker_mutex);
131 while (queue.empty ()) {
132 worker_condition.wait (lock);
135 shared_ptr<asio::ip::tcp::socket> socket = queue.front ();
142 struct timeval start;
143 gettimeofday (&start, 0);
146 frame = process (socket);
147 } catch (std::exception& e) {
148 cerr << "Error: " << e.what() << "\n";
157 gettimeofday (&end, 0);
158 cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
161 worker_condition.notify_all ();
168 Scaler::setup_scalers ();
170 int const num_threads = Config::instance()->num_local_encoding_threads ();
172 for (int i = 0; i < num_threads; ++i) {
173 worker_threads.push_back (new thread (worker_thread));
176 asio::io_service io_service;
177 asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
179 shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
180 acceptor.accept (*socket);
182 mutex::scoped_lock lock (worker_mutex);
184 /* Wait until the queue has gone down a bit */
185 while (int (queue.size()) >= num_threads * 2) {
186 worker_condition.wait (lock);
192 setsockopt (new_fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
193 setsockopt (new_fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
195 queue.push_back (socket);
196 worker_condition.notify_all ();