diff options
| author | Carl Hetherington <cth@carlh.net> | 2020-04-09 00:58:42 +0200 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2020-04-13 00:23:41 +0200 |
| commit | 350afcbc40fffd8c8780180e153a2ee91088f562 (patch) | |
| tree | ccbabb8b91239555ed01ca762d2f56b02858a8cf /src/lib/nanomsg.cc | |
| parent | a16523af5f70b60f4890f198f6214177077a9c1d (diff) | |
Tidy up nanomsg class API; add unmounting for Linux.
Diffstat (limited to 'src/lib/nanomsg.cc')
| -rw-r--r-- | src/lib/nanomsg.cc | 54 |
1 files changed, 20 insertions, 34 deletions
diff --git a/src/lib/nanomsg.cc b/src/lib/nanomsg.cc index 57220cd54..0fc0dd357 100644 --- a/src/lib/nanomsg.cc +++ b/src/lib/nanomsg.cc @@ -20,6 +20,7 @@ #include "nanomsg.h" #include "dcpomatic_log.h" +#include "exceptions.h" #include <nanomsg/nn.h> #include <nanomsg/pair.h> #include <stdexcept> @@ -48,23 +49,16 @@ Nanomsg::Nanomsg (bool server) } } -void -Nanomsg::blocking_send (string s) +bool +Nanomsg::send (string s, int timeout) { - int const r = nn_send (_socket, s.c_str(), s.length(), 0); - if (r < 0) { - throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); - } else if (r != int(s.length())) { - throw runtime_error("Could not send to nanomsg socket (message too big)"); + if (timeout != 0) { + nn_setsockopt (_socket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(int)); } -} -bool -Nanomsg::nonblocking_send (string s) -{ - int const r = nn_send (_socket, s.c_str(), s.length(), NN_DONTWAIT); + int const r = nn_send (_socket, s.c_str(), s.length(), timeout ? 0 : NN_DONTWAIT); if (r < 0) { - if (errno == EAGAIN) { + if (errno == ETIMEDOUT || errno == EAGAIN) { return false; } throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); @@ -88,17 +82,17 @@ Nanomsg::get_from_pending () } void -Nanomsg::recv_and_parse (bool blocking) +Nanomsg::recv_and_parse (int flags) { char* buf = 0; - int const received = nn_recv (_socket, &buf, NN_MSG, blocking ? 0 : NN_DONTWAIT); + int const received = nn_recv (_socket, &buf, NN_MSG, flags); if (received < 0) { - if (!blocking && errno == EAGAIN) { + if (errno == ETIMEDOUT || errno == EAGAIN) { return; } - throw runtime_error ("Could not communicate with subprocess"); + throw CommunicationFailedError (); } char* p = buf; @@ -114,32 +108,24 @@ Nanomsg::recv_and_parse (bool blocking) nn_freemsg (buf); } -string -Nanomsg::blocking_get () +optional<string> +Nanomsg::receive (int timeout) { + if (timeout != 0) { + nn_setsockopt (_socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(int)); + } + optional<string> l = get_from_pending (); if (l) { return *l; } - recv_and_parse (true); + recv_and_parse (timeout ? 0 : NN_DONTWAIT); - l = get_from_pending (); + return get_from_pending (); if (!l) { - throw runtime_error ("Could not communicate with subprocess"); + throw CommunicationFailedError (); } return *l; } - -optional<string> -Nanomsg::nonblocking_get () -{ - optional<string> l = get_from_pending (); - if (l) { - return *l; - } - - recv_and_parse (false); - return get_from_pending (); -} |
