Compiles.
[dcpomatic.git] / src / tools / servomatic.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 #include <iostream>
21 #include <stdexcept>
22 #include <sstream>
23 #include <cstring>
24 #include <vector>
25 #include <unistd.h>
26 #include <errno.h>
27 #include <boost/array.hpp>
28 #include <boost/asio.hpp>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/thread.hpp>
31 #include <boost/thread/mutex.hpp>
32 #include <boost/thread/condition.hpp>
33 #include "config.h"
34 #include "dcp_video_frame.h"
35 #include "exceptions.h"
36 #include "util.h"
37 #include "config.h"
38 #include "scaler.h"
39 #include "image.h"
40 #include "log.h"
41
42 #define BACKLOG 8
43
44 using namespace std;
45 using namespace boost;
46
47 static vector<thread *> worker_threads;
48
49 static std::list<shared_ptr<asio::ip::tcp::socket> > queue;
50 static mutex worker_mutex;
51 static condition worker_condition;
52 static Log log_ ("servomatic.log");
53
54 int
55 process (shared_ptr<asio::ip::tcp::socket> socket)
56 {
57         SocketReader reader (socket);
58         
59         char buffer[128];
60         reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
61         reader.consume (strlen (buffer) + 1);
62         
63         stringstream s (buffer);
64         
65         string command;
66         s >> command;
67         if (command != "encode") {
68                 return -1;
69         }
70         
71         Size in_size;
72         int pixel_format_int;
73         Size out_size;
74         int padding;
75         string scaler_id;
76         int frame;
77         float frames_per_second;
78         string post_process;
79         int colour_lut_index;
80         int j2k_bandwidth;
81         
82         s >> in_size.width >> in_size.height
83           >> pixel_format_int
84           >> out_size.width >> out_size.height
85           >> padding
86           >> scaler_id
87           >> frame
88           >> frames_per_second
89           >> post_process
90           >> colour_lut_index
91           >> j2k_bandwidth;
92         
93         PixelFormat pixel_format = (PixelFormat) pixel_format_int;
94         Scaler const * scaler = Scaler::from_id (scaler_id);
95         if (post_process == "none") {
96                 post_process = "";
97         }
98         
99         shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
100         
101         for (int i = 0; i < image->components(); ++i) {
102                 int line_size;
103                 s >> line_size;
104                 image->set_line_size (i, line_size);
105         }
106         
107         for (int i = 0; i < image->components(); ++i) {
108                 reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
109         }
110         
111 #ifdef DEBUG_HASH
112         image->hash ("Image for encoding (as received by server)");
113 #endif          
114         
115         DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &log_);
116         shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
117         encoded->send (fd);
118
119 #ifdef DEBUG_HASH
120         encoded->hash ("Encoded image (as made by server and as sent back)");
121 #endif          
122         
123         return frame;
124 }
125
126 void
127 worker_thread ()
128 {
129         while (1) {
130                 mutex::scoped_lock lock (worker_mutex);
131                 while (queue.empty ()) {
132                         worker_condition.wait (lock);
133                 }
134
135                 shared_ptr<asio::ip::tcp::socket> socket = queue.front ();
136                 queue.pop_front ();
137                 
138                 lock.unlock ();
139
140                 int frame = -1;
141
142                 struct timeval start;
143                 gettimeofday (&start, 0);
144                 
145                 try {
146                         frame = process (socket);
147                 } catch (std::exception& e) {
148                         cerr << "Error: " << e.what() << "\n";
149                 }
150                 
151                 socket.reset ();
152                 
153                 lock.lock ();
154
155                 if (frame >= 0) {
156                         struct timeval end;
157                         gettimeofday (&end, 0);
158                         cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
159                 }
160                 
161                 worker_condition.notify_all ();
162         }
163 }
164
165 int
166 main ()
167 {
168         Scaler::setup_scalers ();
169
170         int const num_threads = Config::instance()->num_local_encoding_threads ();
171         
172         for (int i = 0; i < num_threads; ++i) {
173                 worker_threads.push_back (new thread (worker_thread));
174         }
175         
176         asio::io_service io_service;
177         asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
178         while (1) {
179                 shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
180                 acceptor.accept (*socket);
181
182                 mutex::scoped_lock lock (worker_mutex);
183                 
184                 /* Wait until the queue has gone down a bit */
185                 while (int (queue.size()) >= num_threads * 2) {
186                         worker_condition.wait (lock);
187                 }
188
189                 struct timeval tv;
190                 tv.tv_sec = 20;
191                 tv.tv_usec = 0;
192                 setsockopt (new_fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
193                 setsockopt (new_fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
194                 
195                 queue.push_back (socket);
196                 worker_condition.notify_all ();
197         }
198         
199         close (fd);
200
201         return 0;
202 }