diff options
| author | Carl Hetherington <cth@carlh.net> | 2020-03-28 22:39:28 +0100 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2020-03-28 22:39:28 +0100 |
| commit | cdfe1a402500dfd48d4d6b7e63196b5e13f871f4 (patch) | |
| tree | b80b0f76b84e94c4b90e01291e0ebdb9eeb70880 /src | |
| parent | ad7158be9a8caec264f873ce5e68265dce1489e1 (diff) | |
Try to improve nanomsg EAGAIN handling.
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/copy_to_drive_job.cc | 4 | ||||
| -rw-r--r-- | src/lib/dist_writer_messages.h | 9 | ||||
| -rw-r--r-- | src/lib/nanomsg.cc | 18 | ||||
| -rw-r--r-- | src/lib/nanomsg.h | 7 | ||||
| -rw-r--r-- | src/tools/dcpomatic_dist.cc | 3 | ||||
| -rw-r--r-- | src/tools/dcpomatic_dist_writer.cc | 14 |
6 files changed, 43 insertions, 12 deletions
diff --git a/src/lib/copy_to_drive_job.cc b/src/lib/copy_to_drive_job.cc index 34bffe0e8..1aeb4bfc2 100644 --- a/src/lib/copy_to_drive_job.cc +++ b/src/lib/copy_to_drive_job.cc @@ -63,7 +63,9 @@ CopyToDriveJob::json_name () const void CopyToDriveJob::run () { - _nanomsg.send(String::compose("W\n%1\n%2\n", _dcp.string(), _drive.internal_name())); + if (!_nanomsg.nonblocking_send(String::compose("W\n%1\n%2\n", _dcp.string(), _drive.internal_name()))) { + throw CopyError ("Could not communicate with writer process", 0); + } while (true) { string s = _nanomsg.blocking_get (); diff --git a/src/lib/dist_writer_messages.h b/src/lib/dist_writer_messages.h index 53ddcd1d9..fedbc8b35 100644 --- a/src/lib/dist_writer_messages.h +++ b/src/lib/dist_writer_messages.h @@ -58,3 +58,12 @@ P\n */ #define DIST_WRITER_PROGRESS "P" + +/** dcpomatic_dist_writer may also receive + +Q\n + +as a request to quit. +*/ +#define DIST_WRITER_QUIT "Q" + diff --git a/src/lib/nanomsg.cc b/src/lib/nanomsg.cc index 591ff4bf8..57220cd54 100644 --- a/src/lib/nanomsg.cc +++ b/src/lib/nanomsg.cc @@ -49,14 +49,30 @@ Nanomsg::Nanomsg (bool server) } void -Nanomsg::send (string s) +Nanomsg::blocking_send (string s) +{ + int const r = nn_send (_socket, s.c_str(), s.length(), 0); + if (r < 0) { + throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); + } else if (r != int(s.length())) { + throw runtime_error("Could not send to nanomsg socket (message too big)"); + } +} + +bool +Nanomsg::nonblocking_send (string s) { int const r = nn_send (_socket, s.c_str(), s.length(), NN_DONTWAIT); if (r < 0) { + if (errno == EAGAIN) { + return false; + } throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); } else if (r != int(s.length())) { throw runtime_error("Could not send to nanomsg socket (message too big)"); } + + return true; } optional<string> diff --git a/src/lib/nanomsg.h b/src/lib/nanomsg.h index 26db2d2e3..dc84a6ce7 100644 --- a/src/lib/nanomsg.h +++ b/src/lib/nanomsg.h @@ -28,7 +28,12 @@ class Nanomsg : public boost::noncopyable public: explicit Nanomsg (bool server); - void send (std::string s); + void blocking_send (std::string s); + /** Try to send a message, returning true if successful, false + * if we should try again (EAGAIN) or throwing an exception on any other + * error. + */ + bool nonblocking_send (std::string s); std::string blocking_get (); boost::optional<std::string> nonblocking_get (); diff --git a/src/tools/dcpomatic_dist.cc b/src/tools/dcpomatic_dist.cc index 2098eb1a5..6284487ef 100644 --- a/src/tools/dcpomatic_dist.cc +++ b/src/tools/dcpomatic_dist.cc @@ -30,6 +30,7 @@ #include "lib/cross.h" #include "lib/copy_to_drive_job.h" #include "lib/job_manager.h" +#include "lib/dist_writer_messages.h" #include <wx/wx.h> #include <boost/process.hpp> #ifdef DCPOMATIC_WINDOWS @@ -126,7 +127,7 @@ public: ~DOMFrame () { - _nanomsg.send("Q\n"); + _nanomsg.blocking_send(DIST_WRITER_QUIT "\n"); } private: diff --git a/src/tools/dcpomatic_dist_writer.cc b/src/tools/dcpomatic_dist_writer.cc index 49e7bfd36..8da3a14a7 100644 --- a/src/tools/dcpomatic_dist_writer.cc +++ b/src/tools/dcpomatic_dist_writer.cc @@ -143,7 +143,7 @@ write (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total } remaining -= this_time; total_remaining -= this_time; - nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total))); + nanomsg->blocking_send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total))); } fclose (in); @@ -182,7 +182,7 @@ read (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total_ digester.add (buffer, this_time); remaining -= this_time; total_remaining -= this_time; - nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total))); + nanomsg->blocking_send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total))); } ext4_fclose (&in); @@ -232,8 +232,6 @@ try { // ext4_dmask_set (DEBUG_ALL); - nanomsg->send("U\n"); - /* We rely on static initialization for these */ static struct ext4_fs fs; static struct ext4_mkfs_info info; @@ -338,16 +336,16 @@ try throw CopyError ("Failed to unmount device", r); } - nanomsg->send(DIST_WRITER_OK "\n"); + nanomsg->blocking_send(DIST_WRITER_OK "\n"); } catch (CopyError& e) { LOG_DIST("CopyError: %1 %2", e.message(), e.number()); - nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number())); + nanomsg->blocking_send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number())); } catch (VerifyError& e) { LOG_DIST("VerifyError: %1 %2", e.message(), e.number()); - nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number())); + nanomsg->blocking_send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number())); } catch (exception& e) { LOG_DIST("Exception: %1", e.what()); - nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n0\n", e.what())); + nanomsg->blocking_send(String::compose(DIST_WRITER_ERROR "\n%1\n0\n", e.what())); } #ifdef DCPOMATIC_LINUX |
