Add cross header.
[dcpomatic.git] / src / tools / servomatic.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 #include <iostream>
21 #include <stdexcept>
22 #include <sstream>
23 #include <cstring>
24 #include <vector>
25 #include <unistd.h>
26 #include <errno.h>
27 #ifdef DVDOMATIC_POSIX
28 #include <sys/types.h> 
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #endif
32 #include <boost/algorithm/string.hpp>
33 #include <boost/thread.hpp>
34 #include <boost/thread/mutex.hpp>
35 #include <boost/thread/condition.hpp>
36 #include "config.h"
37 #include "dcp_video_frame.h"
38 #include "exceptions.h"
39 #include "util.h"
40 #include "config.h"
41 #include "scaler.h"
42 #include "image.h"
43 #include "log.h"
44
45 #define BACKLOG 8
46
47 using namespace std;
48 using namespace boost;
49
50 static vector<thread *> worker_threads;
51
52 static std::list<int> queue;
53 static mutex worker_mutex;
54 static condition worker_condition;
55 static Log log_ ("servomatic.log");
56
57 int
58 process (int fd)
59 {
60         SocketReader reader (fd);
61         
62         char buffer[128];
63         reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
64         reader.consume (strlen (buffer) + 1);
65         
66         stringstream s (buffer);
67         
68         string command;
69         s >> command;
70         if (command != "encode") {
71                 close (fd);
72                 return -1;
73         }
74         
75         Size in_size;
76         int pixel_format_int;
77         Size out_size;
78         int padding;
79         string scaler_id;
80         int frame;
81         float frames_per_second;
82         string post_process;
83         int colour_lut_index;
84         int j2k_bandwidth;
85         
86         s >> in_size.width >> in_size.height
87           >> pixel_format_int
88           >> out_size.width >> out_size.height
89           >> padding
90           >> scaler_id
91           >> frame
92           >> frames_per_second
93           >> post_process
94           >> colour_lut_index
95           >> j2k_bandwidth;
96         
97         PixelFormat pixel_format = (PixelFormat) pixel_format_int;
98         Scaler const * scaler = Scaler::from_id (scaler_id);
99         if (post_process == "none") {
100                 post_process = "";
101         }
102         
103         shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
104         
105         for (int i = 0; i < image->components(); ++i) {
106                 int line_size;
107                 s >> line_size;
108                 image->set_line_size (i, line_size);
109         }
110         
111         for (int i = 0; i < image->components(); ++i) {
112                 reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
113         }
114         
115 #ifdef DEBUG_HASH
116         image->hash ("Image for encoding (as received by server)");
117 #endif          
118         
119         DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &log_);
120         shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
121         encoded->send (fd);
122
123 #ifdef DEBUG_HASH
124         encoded->hash ("Encoded image (as made by server and as sent back)");
125 #endif          
126
127         
128         return frame;
129 }
130
131 void
132 worker_thread ()
133 {
134         while (1) {
135                 mutex::scoped_lock lock (worker_mutex);
136                 while (queue.empty ()) {
137                         worker_condition.wait (lock);
138                 }
139
140                 int fd = queue.front ();
141                 queue.pop_front ();
142                 
143                 lock.unlock ();
144
145                 int frame = -1;
146
147                 struct timeval start;
148                 gettimeofday (&start, 0);
149                 
150                 try {
151                         frame = process (fd);
152                 } catch (std::exception& e) {
153                         cerr << "Error: " << e.what() << "\n";
154                 }
155                 
156                 close (fd);
157                 
158                 lock.lock ();
159
160                 if (frame >= 0) {
161                         struct timeval end;
162                         gettimeofday (&end, 0);
163                         cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
164                 }
165                 
166                 worker_condition.notify_all ();
167         }
168 }
169
170 int
171 main ()
172 {
173         Scaler::setup_scalers ();
174
175         int const num_threads = Config::instance()->num_local_encoding_threads ();
176         
177         for (int i = 0; i < num_threads; ++i) {
178                 worker_threads.push_back (new thread (worker_thread));
179         }
180         
181         int fd = socket (AF_INET, SOCK_STREAM, 0);
182         if (fd < 0) {
183                 throw NetworkError ("could not open socket");
184         }
185
186         int const o = 1;
187         setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &o, sizeof (o));
188
189         struct timeval tv;
190         tv.tv_sec = 20;
191         tv.tv_usec = 0;
192         setsockopt (fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
193         setsockopt (fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
194
195         struct sockaddr_in server_address;
196         memset (&server_address, 0, sizeof (server_address));
197         server_address.sin_family = AF_INET;
198         server_address.sin_addr.s_addr = INADDR_ANY;
199         server_address.sin_port = htons (Config::instance()->server_port ());
200         if (::bind (fd, (struct sockaddr *) &server_address, sizeof (server_address)) < 0) {
201                 stringstream s;
202                 s << "could not bind to port " << Config::instance()->server_port() << " (" << strerror (errno) << ")";
203                 throw NetworkError (s.str());
204         }
205
206         listen (fd, BACKLOG);
207
208         while (1) {
209                 struct sockaddr_in client_address;
210                 socklen_t client_length = sizeof (client_address);
211                 int new_fd = accept (fd, (struct sockaddr *) &client_address, &client_length);
212                 if (new_fd < 0) {
213                         if (errno != EAGAIN && errno != EWOULDBLOCK) {
214                                 throw NetworkError ("accept failed");
215                         }
216
217                         continue;
218                 }
219
220                 mutex::scoped_lock lock (worker_mutex);
221                 
222                 /* Wait until the queue has gone down a bit */
223                 while (int (queue.size()) >= num_threads * 2) {
224                         worker_condition.wait (lock);
225                 }
226
227                 struct timeval tv;
228                 tv.tv_sec = 20;
229                 tv.tv_usec = 0;
230                 setsockopt (new_fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
231                 setsockopt (new_fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
232                 
233                 queue.push_back (new_fd);
234                 worker_condition.notify_all ();
235         }
236         
237         close (fd);
238
239         return 0;
240 }