/*
- Copyright (C) 2020 Carl Hetherington <cth@carlh.net>
+ Copyright (C) 2020-2021 Carl Hetherington <cth@carlh.net>
This file is part of DCP-o-matic.
*/
-#include "nanomsg.h"
+
#include "dcpomatic_log.h"
+#include "exceptions.h"
+#include "nanomsg.h"
#include <nanomsg/nn.h>
#include <nanomsg/pair.h>
-#include <stdexcept>
#include <cerrno>
+#include <stdexcept>
+
-using std::string;
using std::runtime_error;
+using std::string;
using boost::optional;
+
#define NANOMSG_URL "ipc:///tmp/dcpomatic.ipc"
+
Nanomsg::Nanomsg (bool server)
{
_socket = nn_socket (AF_SP, NN_PAIR);
throw runtime_error("Could not set up nanomsg socket");
}
if (server) {
- if (nn_bind(_socket, NANOMSG_URL) < 0) {
+ if ((_endpoint = nn_bind(_socket, NANOMSG_URL)) < 0) {
throw runtime_error(String::compose("Could not bind nanomsg socket (%1)", errno));
}
} else {
- if (nn_connect(_socket, NANOMSG_URL) < 0) {
+ if ((_endpoint = nn_connect(_socket, NANOMSG_URL)) < 0) {
throw runtime_error(String::compose("Could not connect nanomsg socket (%1)", errno));
}
}
}
-void
-Nanomsg::blocking_send (string s)
+
+Nanomsg::~Nanomsg ()
{
- int const r = nn_send (_socket, s.c_str(), s.length(), 0);
- if (r < 0) {
- throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno));
- } else if (r != int(s.length())) {
- throw runtime_error("Could not send to nanomsg socket (message too big)");
- }
+ nn_shutdown (_socket, _endpoint);
+ nn_close (_socket);
}
+
bool
-Nanomsg::nonblocking_send (string s)
+Nanomsg::send (string s, int timeout)
{
- int const r = nn_send (_socket, s.c_str(), s.length(), NN_DONTWAIT);
+ if (timeout != 0) {
+ nn_setsockopt (_socket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(int));
+ }
+
+ int const r = nn_send (_socket, s.c_str(), s.length(), timeout ? 0 : NN_DONTWAIT);
if (r < 0) {
- if (errno == EAGAIN) {
+ if (errno == ETIMEDOUT || errno == EAGAIN) {
return false;
}
throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno));
return true;
}
+
optional<string>
Nanomsg::get_from_pending ()
{
if (_pending.empty()) {
- return optional<string>();
+ return {};
}
- string const l = _pending.back();
+ auto const l = _pending.back();
_pending.pop_back();
return l;
}
+
void
-Nanomsg::recv_and_parse (bool blocking)
+Nanomsg::recv_and_parse (int flags)
{
char* buf = 0;
- int const received = nn_recv (_socket, &buf, NN_MSG, blocking ? 0 : NN_DONTWAIT);
+ int const received = nn_recv (_socket, &buf, NN_MSG, flags);
if (received < 0)
{
- if (!blocking && errno == EAGAIN) {
+ if (errno == ETIMEDOUT || errno == EAGAIN) {
return;
}
- throw runtime_error ("Could not communicate with subprocess");
+ LOG_DISK_NC("nn_recv failed");
+ throw CommunicationFailedError ();
}
char* p = buf;
nn_freemsg (buf);
}
-string
-Nanomsg::blocking_get ()
-{
- optional<string> l = get_from_pending ();
- if (l) {
- return *l;
- }
-
- recv_and_parse (true);
-
- l = get_from_pending ();
- if (!l) {
- throw runtime_error ("Could not communicate with subprocess");
- }
-
- return *l;
-}
optional<string>
-Nanomsg::nonblocking_get ()
+Nanomsg::receive (int timeout)
{
- optional<string> l = get_from_pending ();
+ if (timeout != 0) {
+ nn_setsockopt (_socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(int));
+ }
+
+ auto l = get_from_pending ();
if (l) {
return *l;
}
- recv_and_parse (false);
+ recv_and_parse (timeout ? 0 : NN_DONTWAIT);
+
return get_from_pending ();
}