diff options
| author | Carl Hetherington <cth@carlh.net> | 2020-03-23 01:35:51 +0100 |
|---|---|---|
| committer | Carl Hetherington <cth@carlh.net> | 2020-03-28 19:47:28 +0100 |
| commit | df0417115b86c714600e40493a999675e69c1f47 (patch) | |
| tree | 38814aaeee452cee079c32bcabc8ef1285ad346a /src/tools | |
| parent | c3663fa9de96b99eac21f3aa736bd6f2e9bbc3c0 (diff) | |
Use nanomsg instead of std{in,out} for communication between dist and writer.
Diffstat (limited to 'src/tools')
| -rw-r--r-- | src/tools/dcpomatic_dist.cc | 13 | ||||
| -rw-r--r-- | src/tools/dcpomatic_dist_writer.cc | 101 | ||||
| -rw-r--r-- | src/tools/wscript | 7 |
3 files changed, 51 insertions, 70 deletions
diff --git a/src/tools/dcpomatic_dist.cc b/src/tools/dcpomatic_dist.cc index 433dcd209..a9d8f9bda 100644 --- a/src/tools/dcpomatic_dist.cc +++ b/src/tools/dcpomatic_dist.cc @@ -30,6 +30,8 @@ #include "lib/cross.h" #include "lib/copy_to_drive_job.h" #include "lib/job_manager.h" +#include <nanomsg/nn.h> +#include <nanomsg/pair.h> #include <wx/wx.h> #include <boost/process.hpp> #ifdef __WXOSX__ @@ -40,6 +42,7 @@ using std::string; using std::exception; using std::cout; using std::cerr; +using std::runtime_error; using boost::shared_ptr; class DOMFrame : public wxFrame @@ -47,6 +50,7 @@ class DOMFrame : public wxFrame public: explicit DOMFrame (wxString const & title) : wxFrame (0, -1, title) + , _nanomsg (true) , _sizer (new wxBoxSizer(wxVERTICAL)) { /* Use a panel as the only child of the Frame so that we avoid @@ -96,7 +100,7 @@ public: _sizer->Add (grid, 1, wxALL | wxEXPAND, DCPOMATIC_DIALOG_BORDER); overall_panel->SetSizer (_sizer); Fit (); - SetSize (512, GetSize().GetHeight() + 32); + SetSize (768, GetSize().GetHeight() + 32); /* XXX: this is a hack, but I expect we'll need logs and I'm not sure if there's * a better place to put them. @@ -109,7 +113,7 @@ public: Bind (wxEVT_SIZE, boost::bind (&DOMFrame::sized, this, _1)); - _writer = new boost::process::child ("dcpomatic2_dist_writer", boost::process::std_in < _to_writer, boost::process::std_out > _from_writer); + _writer = new boost::process::child ("dcpomatic2_dist_writer"); } private: @@ -148,7 +152,7 @@ private: return; } - JobManager::instance()->add(shared_ptr<Job>(new CopyToDriveJob(*_dcp_path, _drives[_drive->GetSelection()], _to_writer, _from_writer))); + JobManager::instance()->add(shared_ptr<Job>(new CopyToDriveJob(*_dcp_path, _drives[_drive->GetSelection()], _nanomsg))); } void drive_refresh () @@ -193,8 +197,7 @@ private: boost::optional<boost::filesystem::path> _dcp_path; std::vector<Drive> _drives; boost::process::child* _writer; - boost::process::opstream _to_writer; - boost::process::ipstream _from_writer; + Nanomsg _nanomsg; wxSizer* _sizer; }; diff --git a/src/tools/dcpomatic_dist_writer.cc b/src/tools/dcpomatic_dist_writer.cc index 7f76c9ba4..c965666e6 100644 --- a/src/tools/dcpomatic_dist_writer.cc +++ b/src/tools/dcpomatic_dist_writer.cc @@ -25,6 +25,7 @@ #include "lib/digester.h" #include "lib/file_log.h" #include "lib/dcpomatic_log.h" +#include "lib/nanomsg.h" extern "C" { #include <lwext4/ext4_mbr.h> #include <lwext4/ext4_fs.h> @@ -65,10 +66,11 @@ extern "C" { #include <boost/filesystem.hpp> #include <iostream> -using std::cout; using std::cin; using std::min; using std::string; +using std::runtime_error; +using boost::optional; #ifdef DCPOMATIC_LINUX static PolkitAuthority* polkit_authority = 0; @@ -76,6 +78,7 @@ static PolkitAuthority* polkit_authority = 0; static boost::filesystem::path dcp_path; static std::string device; static uint64_t const block_size = 4096; +static Nanomsg* nanomsg = 0; static void @@ -139,8 +142,7 @@ write (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total } remaining -= this_time; total_remaining -= this_time; - cout << DIST_WRITER_PROGRESS "\n" << (1 - float(total_remaining) / total) << "\n"; - cout.flush (); + nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total))); } fclose (in); @@ -177,8 +179,7 @@ read (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total_ digester.add (buffer, this_time); remaining -= this_time; total_remaining -= this_time; - cout << DIST_WRITER_PROGRESS "\n" << (1 - float(total_remaining) / total) << "\n"; - cout.flush (); + nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total))); } ext4_fclose (&in); @@ -195,13 +196,9 @@ static void copy (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total_remaining, uint64_t total) { - using namespace boost::filesystem; + LOG_DIST ("Copy %1 -> %2", from.string(), to.string()); - /* XXX: this is a hack. We are going to "treat" every byte twice; write it, and then verify it. Double the - * bytes totals so that progress works itself out (assuming write is the same speed as read). - */ - total_remaining *= 2; - total *= 2; + using namespace boost::filesystem; path const cr = to / from.filename(); @@ -216,7 +213,9 @@ copy (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total_ } } else { string const write_digest = write (from, cr, total_remaining, total); + LOG_DIST ("Wrote %1 %2 with %3", from.string(), cr.string(), write_digest); string const read_digest = read (from, cr, total_remaining, total); + LOG_DIST ("Read %1 %2 with %3", from.string(), cr.string(), write_digest); if (write_digest != read_digest) { throw VerifyError ("Hash of written data is incorrect", 0); } @@ -230,7 +229,7 @@ try { // ext4_dmask_set (DEBUG_ALL); - cout << "U\n"; + nanomsg->send("U\n"); /* We rely on static initialization for these */ static struct ext4_fs fs; @@ -250,6 +249,7 @@ try if (!bd) { throw CopyError ("Failed to open drive", 0); } + LOG_DIST_NC ("Opened drive"); struct ext4_mbr_parts parts; parts.division[0] = 100; @@ -267,6 +267,7 @@ try if (r) { throw CopyError ("Failed to write MBR", r); } + LOG_DIST_NC ("Wrote MBR"); #ifdef DCPOMATIC_WINDOWS struct ext4_mbr_bdevs bdevs; @@ -300,25 +301,33 @@ try if (!bd) { throw CopyError ("Failed to open partition", 0); } + LOG_DIST_NC ("Opened partition"); r = ext4_mkfs(&fs, bd, &info, F_SET_EXT4); if (r != EOK) { throw CopyError ("Failed to make filesystem", r); } + LOG_DIST_NC ("Made filesystem"); r = ext4_device_register(bd, "ext4_fs"); if (r != EOK) { throw CopyError ("Failed to register device", r); } + LOG_DIST_NC ("Registered device"); r = ext4_mount("ext4_fs", "/mp/", false); if (r != EOK) { throw CopyError ("Failed to mount device", r); } + LOG_DIST_NC ("Mounted device"); uint64_t total_bytes = 0; count (dcp_path, total_bytes); + /* XXX: this is a hack. We are going to "treat" every byte twice; write it, and then verify it. Double the + * bytes totals so that progress works itself out (assuming write is the same speed as read). + */ + total_bytes *= 2; copy (dcp_path, "/mp", total_bytes, total_bytes); r = ext4_umount("/mp/"); @@ -326,14 +335,11 @@ try throw CopyError ("Failed to unmount device", r); } - cout << DIST_WRITER_OK "\n"; - cout.flush (); + nanomsg->send(DIST_WRITER_OK "\n"); } catch (CopyError& e) { - cout << DIST_WRITER_ERROR "\n" << e.message() << "\n" << e.number() << "\n"; - cout.flush (); + nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number())); } catch (VerifyError& e) { - cout << DIST_WRITER_ERROR "\n" << e.message() << "\n" << e.number() << "\n"; - cout.flush (); + nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number())); } #ifdef DCPOMATIC_LINUX @@ -354,41 +360,25 @@ polkit_callback (GObject *, GAsyncResult* res, gpointer) bool idle () { -#ifdef DCPOMATIC_POSIX - struct pollfd input[1] = { { fd: 0, events: POLLIN, revents: 0 } }; - int const r = poll (input, 1, 0); - if (r > 0 && (input[0].revents & POLLIN)) { -#else - DWORD num; - LOG_DIST ("now handle in is %1", reinterpret_cast<uint64_t>(GetStdHandle(STD_INPUT_HANDLE))); - if (!GetNumberOfConsoleInputEvents(GetStdHandle(STD_INPUT_HANDLE), &num)) { - LOG_DIST ("Could not check console: %1", GetLastError()); + optional<string> s = nanomsg->nonblocking_get (); + if (!s) { + return true; } - LOG_DIST ("%1 console events", num); - if (num) { -#endif - string s; - getline (cin, s); - if (s.empty()) { - return true; - } - dcp_path = s; - getline (cin, s); - device = "/dev/" + s; + dcp_path = *s; + device = "/dev/" + nanomsg->blocking_get (); - LOG_DIST ("Here we go writing %1 to %2", dcp_path, device); + LOG_DIST ("Here we go writing %1 to %2", dcp_path, device); #ifdef DCPOMATIC_LINUX - polkit_authority = polkit_authority_get_sync (0, 0); - PolkitSubject* subject = polkit_unix_process_new (getppid()); - polkit_authority_check_authorization ( + polkit_authority = polkit_authority_get_sync (0, 0); + PolkitSubject* subject = polkit_unix_process_new (getppid()); + polkit_authority_check_authorization ( polkit_authority, subject, "com.dcpomatic.write-drive", 0, POLKIT_CHECK_AUTHORIZATION_FLAGS_ALLOW_USER_INTERACTION, 0, polkit_callback, 0 ); #else - write (); + write (); #endif - } return true; } @@ -402,23 +392,12 @@ main () dcpomatic_log->set_types (dcpomatic_log->types() | LogEntry::TYPE_DIST); LOG_DIST_NC("dcpomatic_dist_writer started"); -#ifdef DCPOMATIC_WINDOWS - FreeConsole (); - AllocConsole (); - - HANDLE handle_out = GetStdHandle(STD_OUTPUT_HANDLE); - int hCrt = _open_osfhandle((intptr_t) handle_out, _O_TEXT); - FILE* hf_out = _fdopen(hCrt, "w"); - setvbuf(hf_out, NULL, _IONBF, 1); - *stdout = *hf_out; - - HANDLE handle_in = GetStdHandle(STD_INPUT_HANDLE); - LOG_DIST ("handle_in is %1", reinterpret_cast<uint64_t>(handle_in)); - hCrt = _open_osfhandle((intptr_t) handle_in, _O_TEXT); - FILE* hf_in = _fdopen(hCrt, "r"); - setvbuf(hf_in, NULL, _IONBF, 128); - *stdin = *hf_in; -#endif + try { + nanomsg = new Nanomsg (false); + } catch (runtime_error& e) { + LOG_DIST_NC("Could not set up nanomsg socket"); + exit (EXIT_FAILURE); + } Glib::RefPtr<Glib::MainLoop> ml = Glib::MainLoop::create (); Glib::signal_timeout().connect(sigc::ptr_fun(&idle), 500); diff --git a/src/tools/wscript b/src/tools/wscript index 8a6bc60c5..8ab583112 100644 --- a/src/tools/wscript +++ b/src/tools/wscript @@ -32,11 +32,10 @@ def build(bld): uselib += 'AVUTIL SWSCALE SWRESAMPLE POSTPROC CURL BOOST_FILESYSTEM SSH ZIP CAIROMM FONTCONFIG PANGOMM SUB ' uselib += 'SNDFILE SAMPLERATE BOOST_REGEX ICU NETTLE RTAUDIO PNG ' - if bld.env.TARGET_LINUX: - uselib += 'POLKIT ' - if bld.env.ENABLE_DIST: - uselib += 'LWEXT4 ' + if bld.env.TARGET_LINUX: + uselib += 'POLKIT ' + uselib += 'LWEXT4 NANOMSG ' if bld.env.TARGET_WINDOWS: uselib += 'WINSOCK2 DBGHELP SHLWAPI MSWSOCK BOOST_LOCALE WINSOCK2 OLE32 DSOUND WINMM KSUSER ' |
