summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCarl Hetherington <cth@carlh.net>2020-03-28 22:39:28 +0100
committerCarl Hetherington <cth@carlh.net>2020-03-28 22:39:28 +0100
commitcdfe1a402500dfd48d4d6b7e63196b5e13f871f4 (patch)
treeb80b0f76b84e94c4b90e01291e0ebdb9eeb70880 /src
parentad7158be9a8caec264f873ce5e68265dce1489e1 (diff)
Try to improve nanomsg EAGAIN handling.
Diffstat (limited to 'src')
-rw-r--r--src/lib/copy_to_drive_job.cc4
-rw-r--r--src/lib/dist_writer_messages.h9
-rw-r--r--src/lib/nanomsg.cc18
-rw-r--r--src/lib/nanomsg.h7
-rw-r--r--src/tools/dcpomatic_dist.cc3
-rw-r--r--src/tools/dcpomatic_dist_writer.cc14
6 files changed, 43 insertions, 12 deletions
diff --git a/src/lib/copy_to_drive_job.cc b/src/lib/copy_to_drive_job.cc
index 34bffe0e8..1aeb4bfc2 100644
--- a/src/lib/copy_to_drive_job.cc
+++ b/src/lib/copy_to_drive_job.cc
@@ -63,7 +63,9 @@ CopyToDriveJob::json_name () const
void
CopyToDriveJob::run ()
{
- _nanomsg.send(String::compose("W\n%1\n%2\n", _dcp.string(), _drive.internal_name()));
+ if (!_nanomsg.nonblocking_send(String::compose("W\n%1\n%2\n", _dcp.string(), _drive.internal_name()))) {
+ throw CopyError ("Could not communicate with writer process", 0);
+ }
while (true) {
string s = _nanomsg.blocking_get ();
diff --git a/src/lib/dist_writer_messages.h b/src/lib/dist_writer_messages.h
index 53ddcd1d9..fedbc8b35 100644
--- a/src/lib/dist_writer_messages.h
+++ b/src/lib/dist_writer_messages.h
@@ -58,3 +58,12 @@ P\n
*/
#define DIST_WRITER_PROGRESS "P"
+
+/** dcpomatic_dist_writer may also receive
+
+Q\n
+
+as a request to quit.
+*/
+#define DIST_WRITER_QUIT "Q"
+
diff --git a/src/lib/nanomsg.cc b/src/lib/nanomsg.cc
index 591ff4bf8..57220cd54 100644
--- a/src/lib/nanomsg.cc
+++ b/src/lib/nanomsg.cc
@@ -49,14 +49,30 @@ Nanomsg::Nanomsg (bool server)
}
void
-Nanomsg::send (string s)
+Nanomsg::blocking_send (string s)
+{
+ int const r = nn_send (_socket, s.c_str(), s.length(), 0);
+ if (r < 0) {
+ throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno));
+ } else if (r != int(s.length())) {
+ throw runtime_error("Could not send to nanomsg socket (message too big)");
+ }
+}
+
+bool
+Nanomsg::nonblocking_send (string s)
{
int const r = nn_send (_socket, s.c_str(), s.length(), NN_DONTWAIT);
if (r < 0) {
+ if (errno == EAGAIN) {
+ return false;
+ }
throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno));
} else if (r != int(s.length())) {
throw runtime_error("Could not send to nanomsg socket (message too big)");
}
+
+ return true;
}
optional<string>
diff --git a/src/lib/nanomsg.h b/src/lib/nanomsg.h
index 26db2d2e3..dc84a6ce7 100644
--- a/src/lib/nanomsg.h
+++ b/src/lib/nanomsg.h
@@ -28,7 +28,12 @@ class Nanomsg : public boost::noncopyable
public:
explicit Nanomsg (bool server);
- void send (std::string s);
+ void blocking_send (std::string s);
+ /** Try to send a message, returning true if successful, false
+ * if we should try again (EAGAIN) or throwing an exception on any other
+ * error.
+ */
+ bool nonblocking_send (std::string s);
std::string blocking_get ();
boost::optional<std::string> nonblocking_get ();
diff --git a/src/tools/dcpomatic_dist.cc b/src/tools/dcpomatic_dist.cc
index 2098eb1a5..6284487ef 100644
--- a/src/tools/dcpomatic_dist.cc
+++ b/src/tools/dcpomatic_dist.cc
@@ -30,6 +30,7 @@
#include "lib/cross.h"
#include "lib/copy_to_drive_job.h"
#include "lib/job_manager.h"
+#include "lib/dist_writer_messages.h"
#include <wx/wx.h>
#include <boost/process.hpp>
#ifdef DCPOMATIC_WINDOWS
@@ -126,7 +127,7 @@ public:
~DOMFrame ()
{
- _nanomsg.send("Q\n");
+ _nanomsg.blocking_send(DIST_WRITER_QUIT "\n");
}
private:
diff --git a/src/tools/dcpomatic_dist_writer.cc b/src/tools/dcpomatic_dist_writer.cc
index 49e7bfd36..8da3a14a7 100644
--- a/src/tools/dcpomatic_dist_writer.cc
+++ b/src/tools/dcpomatic_dist_writer.cc
@@ -143,7 +143,7 @@ write (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total
}
remaining -= this_time;
total_remaining -= this_time;
- nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total)));
+ nanomsg->blocking_send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total)));
}
fclose (in);
@@ -182,7 +182,7 @@ read (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total_
digester.add (buffer, this_time);
remaining -= this_time;
total_remaining -= this_time;
- nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total)));
+ nanomsg->blocking_send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total)));
}
ext4_fclose (&in);
@@ -232,8 +232,6 @@ try
{
// ext4_dmask_set (DEBUG_ALL);
- nanomsg->send("U\n");
-
/* We rely on static initialization for these */
static struct ext4_fs fs;
static struct ext4_mkfs_info info;
@@ -338,16 +336,16 @@ try
throw CopyError ("Failed to unmount device", r);
}
- nanomsg->send(DIST_WRITER_OK "\n");
+ nanomsg->blocking_send(DIST_WRITER_OK "\n");
} catch (CopyError& e) {
LOG_DIST("CopyError: %1 %2", e.message(), e.number());
- nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number()));
+ nanomsg->blocking_send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number()));
} catch (VerifyError& e) {
LOG_DIST("VerifyError: %1 %2", e.message(), e.number());
- nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number()));
+ nanomsg->blocking_send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number()));
} catch (exception& e) {
LOG_DIST("Exception: %1", e.what());
- nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n0\n", e.what()));
+ nanomsg->blocking_send(String::compose(DIST_WRITER_ERROR "\n%1\n0\n", e.what()));
}
#ifdef DCPOMATIC_LINUX