X-Git-Url: https://git.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fnanomsg.cc;h=61e6c08ce9e66fa738300b9a417267356f71a8b6;hb=7bc591abc86ed4742f21f45ca1d6151cb14bc100;hp=57220cd546347d2fa05e718efd54c32a23771641;hpb=a1f7bf2d9e5610075fbd898cdf52f4f8373741f2;p=dcpomatic.git diff --git a/src/lib/nanomsg.cc b/src/lib/nanomsg.cc index 57220cd54..61e6c08ce 100644 --- a/src/lib/nanomsg.cc +++ b/src/lib/nanomsg.cc @@ -1,5 +1,5 @@ /* - Copyright (C) 2020 Carl Hetherington + Copyright (C) 2020-2021 Carl Hetherington This file is part of DCP-o-matic. @@ -18,19 +18,24 @@ */ + #include "nanomsg.h" #include "dcpomatic_log.h" +#include "exceptions.h" #include #include #include #include + using std::string; using std::runtime_error; using boost::optional; + #define NANOMSG_URL "ipc:///tmp/dcpomatic.ipc" + Nanomsg::Nanomsg (bool server) { _socket = nn_socket (AF_SP, NN_PAIR); @@ -38,33 +43,34 @@ Nanomsg::Nanomsg (bool server) throw runtime_error("Could not set up nanomsg socket"); } if (server) { - if (nn_bind(_socket, NANOMSG_URL) < 0) { + if ((_endpoint = nn_bind(_socket, NANOMSG_URL)) < 0) { throw runtime_error(String::compose("Could not bind nanomsg socket (%1)", errno)); } } else { - if (nn_connect(_socket, NANOMSG_URL) < 0) { + if ((_endpoint = nn_connect(_socket, NANOMSG_URL)) < 0) { throw runtime_error(String::compose("Could not connect nanomsg socket (%1)", errno)); } } } -void -Nanomsg::blocking_send (string s) + +Nanomsg::~Nanomsg () { - int const r = nn_send (_socket, s.c_str(), s.length(), 0); - if (r < 0) { - throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); - } else if (r != int(s.length())) { - throw runtime_error("Could not send to nanomsg socket (message too big)"); - } + nn_shutdown (_socket, _endpoint); + nn_close (_socket); } + bool -Nanomsg::nonblocking_send (string s) +Nanomsg::send (string s, int timeout) { - int const r = nn_send (_socket, s.c_str(), s.length(), NN_DONTWAIT); + if (timeout != 0) { + nn_setsockopt (_socket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(int)); + } + + int const r = nn_send (_socket, s.c_str(), s.length(), timeout ? 0 : NN_DONTWAIT); if (r < 0) { - if (errno == EAGAIN) { + if (errno == ETIMEDOUT || errno == EAGAIN) { return false; } throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); @@ -75,30 +81,33 @@ Nanomsg::nonblocking_send (string s) return true; } + optional Nanomsg::get_from_pending () { if (_pending.empty()) { - return optional(); + return {}; } - string const l = _pending.back(); + auto const l = _pending.back(); _pending.pop_back(); return l; } + void -Nanomsg::recv_and_parse (bool blocking) +Nanomsg::recv_and_parse (int flags) { char* buf = 0; - int const received = nn_recv (_socket, &buf, NN_MSG, blocking ? 0 : NN_DONTWAIT); + int const received = nn_recv (_socket, &buf, NN_MSG, flags); if (received < 0) { - if (!blocking && errno == EAGAIN) { + if (errno == ETIMEDOUT || errno == EAGAIN) { return; } - throw runtime_error ("Could not communicate with subprocess"); + LOG_DISK_NC("nn_recv failed"); + throw CommunicationFailedError (); } char* p = buf; @@ -114,32 +123,20 @@ Nanomsg::recv_and_parse (bool blocking) nn_freemsg (buf); } -string -Nanomsg::blocking_get () -{ - optional l = get_from_pending (); - if (l) { - return *l; - } - - recv_and_parse (true); - - l = get_from_pending (); - if (!l) { - throw runtime_error ("Could not communicate with subprocess"); - } - - return *l; -} optional -Nanomsg::nonblocking_get () +Nanomsg::receive (int timeout) { - optional l = get_from_pending (); + if (timeout != 0) { + nn_setsockopt (_socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(int)); + } + + auto l = get_from_pending (); if (l) { return *l; } - recv_and_parse (false); + recv_and_parse (timeout ? 0 : NN_DONTWAIT); + return get_from_pending (); }