From db4fde2e8983eaa0b76c49a189e059d6c9f5720d Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 12 Nov 2023 22:09:48 +0100 Subject: [PATCH] Improve progress reporting of digest calculations (might help with #2643). --- cscript | 2 +- src/lib/job.cc | 11 +++++++- src/lib/job.h | 7 +++++ src/lib/reel_writer.cc | 30 ++++++++++++++++---- src/lib/reel_writer.h | 2 +- src/lib/writer.cc | 62 ++++++++++++++++++++++++++++++++++-------- src/lib/writer.h | 6 ++-- test/writer_test.cc | 52 +++++++++++++++++++++++++++++++++++ 8 files changed, 148 insertions(+), 24 deletions(-) diff --git a/cscript b/cscript index 795507e18..9d5a72eaf 100644 --- a/cscript +++ b/cscript @@ -508,7 +508,7 @@ def dependencies(target, options): # Use distro-provided FFmpeg on Arch deps = [] - deps.append(('libdcp', 'e3fa86ef35f212b14b593dd36dbff66e845d37e4')) + deps.append(('libdcp', 'c46f6125c482f2a3361cd33d1e1163927f038e9d')) deps.append(('libsub', 'v1.6.44')) deps.append(('leqm-nrt', '30dcaea1373ac62fba050e02ce5b0c1085797a23')) deps.append(('rtaudio', 'f619b76')) diff --git a/src/lib/job.cc b/src/lib/job.cc index 28bdde7fc..2a4c891ed 100644 --- a/src/lib/job.cc +++ b/src/lib/job.cc @@ -59,6 +59,7 @@ Job::Job (shared_ptr film) , _state (NEW) , _sub_start_time (0) , _progress (0) + , _rate_limit_progress(true) { } @@ -432,7 +433,7 @@ Job::set_progress (float p, bool force) { check_for_interruption_or_pause (); - if (!force) { + if (!force && _rate_limit_progress) { /* Check for excessively frequent progress reporting */ boost::mutex::scoped_lock lm (_progress_mutex); struct timeval now; @@ -735,3 +736,11 @@ Job::set_message (string m) boost::mutex::scoped_lock lm (_state_mutex); _message = m; } + + +void +Job::set_rate_limit_progress(bool rate_limit) +{ + _rate_limit_progress = rate_limit; +} + diff --git a/src/lib/job.h b/src/lib/job.h index dc5f7bc34..7814e0c2d 100644 --- a/src/lib/job.h +++ b/src/lib/job.h @@ -99,6 +99,8 @@ public: void when_finished(boost::signals2::connection& connection, std::function finished); + void set_rate_limit_progress(bool rate_limit); + boost::signals2::signal Progress; /** Emitted from the UI thread when the job is finished */ boost::signals2::signal Finished; @@ -159,6 +161,11 @@ private: boost::optional _progress; boost::optional _last_progress_update; + /** true to limit emissions of the progress signal so that they don't + * come too often. + */ + boost::atomic _rate_limit_progress; + /** condition to signal changes to pause/resume so that we know when to wake; this could be a general _state_change if it made more sense. */ diff --git a/src/lib/reel_writer.cc b/src/lib/reel_writer.cc index ca4a2dbb1..1b33cae85 100644 --- a/src/lib/reel_writer.cc +++ b/src/lib/reel_writer.cc @@ -773,21 +773,39 @@ ReelWriter::create_reel ( return reel; } + +/** @param set_progress Method to call with progress; first parameter is the number of bytes + * done, second parameter is the number of bytes in total. + */ void -ReelWriter::calculate_digests (std::function set_progress) +ReelWriter::calculate_digests(std::function set_progress) try { + vector> assets; + if (_picture_asset) { - _picture_asset->hash (set_progress); + assets.push_back(_picture_asset); } - if (_sound_asset) { - _sound_asset->hash (set_progress); + assets.push_back(_sound_asset); } - if (_atmos_asset) { - _atmos_asset->hash (set_progress); + assets.push_back(_atmos_asset); } + + int64_t total_size = 0; + for (auto asset: assets) { + total_size += asset->file() ? boost::filesystem::file_size(*asset->file()) : 0; + } + + int64_t total_done = 0; + for (auto asset: assets) { + asset->hash([&total_done, total_size, set_progress](int64_t done, int64_t) { + set_progress(total_done + done, total_size); + }); + total_done += asset->file() ? boost::filesystem::file_size(*asset->file()) : 0; + } + } catch (boost::thread_interrupted) { /* set_progress contains an interruption_point, so any of these methods * may throw thread_interrupted, at which point we just give up. diff --git a/src/lib/reel_writer.h b/src/lib/reel_writer.h index fff298cb7..c9052c832 100644 --- a/src/lib/reel_writer.h +++ b/src/lib/reel_writer.h @@ -82,7 +82,7 @@ public: bool ensure_subtitles, std::set ensure_closed_captions ); - void calculate_digests (std::function set_progress); + void calculate_digests(std::function set_progress); Frame start () const; diff --git a/src/lib/writer.cc b/src/lib/writer.cc index 9ab3d4e1e..8863816e8 100644 --- a/src/lib/writer.cc +++ b/src/lib/writer.cc @@ -531,19 +531,32 @@ Writer::calculate_digests () pool.create_thread (boost::bind (&boost::asio::io_service::run, &service)); } - std::function set_progress; + std::function set_progress; if (job) { - set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1); + set_progress = boost::bind(&Writer::set_digest_progress, this, job.get(), _1, _2, _3); } else { - set_progress = [](float) { + set_progress = [](int, int64_t, int64_t) { boost::this_thread::interruption_point(); }; } + int index = 0; + for (auto& i: _reels) { - service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress)); + service.post( + boost::bind( + &ReelWriter::calculate_digests, + &i, + std::function(boost::bind(set_progress, index, _1, _2)) + )); + ++index; } - service.post (boost::bind (&Writer::calculate_referenced_digests, this, set_progress)); + service.post( + boost::bind( + &Writer::calculate_referenced_digests, + this, + std::function(boost::bind(set_progress, index, _1, _2)) + )); work.reset (); @@ -934,18 +947,29 @@ Writer::video_reel (int frame) const } +/** Update job progress with information about the progress of a single digest calculation + * thread. + * @param id Unique identifier for the thread whose progress has changed. + * @param done Number of bytes that this thread has processed. + * @param size Total number of bytes that this thread must process. + */ void -Writer::set_digest_progress (Job* job, float progress) +Writer::set_digest_progress(Job* job, int id, int64_t done, int64_t size) { boost::mutex::scoped_lock lm (_digest_progresses_mutex); - _digest_progresses[boost::this_thread::get_id()] = progress; - float min_progress = FLT_MAX; + /* Update the progress for this thread */ + _digest_progresses[id] = std::make_pair(done, size); + + /* Get the total progress across all threads and use it to set job progress */ + int64_t total_done = 0; + int64_t total_size = 0; for (auto const& i: _digest_progresses) { - min_progress = min (min_progress, i.second); + total_done += i.second.first; + total_size += i.second.second; } - job->set_progress (min_progress); + job->set_progress(float(total_done) / total_size); Waker waker; waker.nudge (); @@ -956,13 +980,27 @@ Writer::set_digest_progress (Job* job, float progress) /** Calculate hashes for any referenced MXF assets which do not already have one */ void -Writer::calculate_referenced_digests (std::function set_progress) +Writer::calculate_referenced_digests(std::function set_progress) try { + int64_t total_size = 0; + for (auto const& i: _reel_assets) { + auto file = dynamic_pointer_cast(i.asset); + if (file && !file->hash()) { + auto filename = file->asset_ref().asset()->file(); + DCPOMATIC_ASSERT(filename); + total_size += boost::filesystem::file_size(*filename); + } + } + + int64_t total_done = 0; for (auto const& i: _reel_assets) { auto file = dynamic_pointer_cast(i.asset); if (file && !file->hash()) { - file->asset_ref().asset()->hash (set_progress); + file->asset_ref().asset()->hash([&total_done, total_size, set_progress](int64_t done, int64_t) { + set_progress(total_done + done, total_size); + }); + total_done += boost::filesystem::file_size(*file->asset_ref().asset()->file()); file->set_hash (file->asset_ref().asset()->hash()); } } diff --git a/src/lib/writer.h b/src/lib/writer.h index 1fbf7bbd5..efb6a17d8 100644 --- a/src/lib/writer.h +++ b/src/lib/writer.h @@ -134,9 +134,9 @@ private: void terminate_thread (bool); bool have_sequenced_image_at_queue_head (); size_t video_reel (int frame) const; - void set_digest_progress (Job* job, float progress); + void set_digest_progress(Job* job, int id, int64_t done, int64_t size); void write_cover_sheet (boost::filesystem::path output_dcp); - void calculate_referenced_digests (std::function set_progress); + void calculate_referenced_digests(std::function set_progress); void write_hanging_text (ReelWriter& reel); void calculate_digests (); @@ -204,7 +204,7 @@ private: bool _text_only; boost::mutex _digest_progresses_mutex; - std::map _digest_progresses; + std::map> _digest_progresses; std::list _reel_assets; diff --git a/test/writer_test.cc b/test/writer_test.cc index 76e9ddb28..86b60818f 100644 --- a/test/writer_test.cc +++ b/test/writer_test.cc @@ -23,6 +23,7 @@ #include "lib/content.h" #include "lib/content_factory.h" #include "lib/cross.h" +#include "lib/dcp_encoder.h" #include "lib/film.h" #include "lib/job.h" #include "lib/video_content.h" @@ -36,6 +37,7 @@ using std::make_shared; using std::shared_ptr; +using std::string; using std::vector; @@ -101,3 +103,53 @@ BOOST_AUTO_TEST_CASE (interrupt_writer) dcpomatic_sleep_seconds (1); cl.run (); } + + +BOOST_AUTO_TEST_CASE(writer_progress_test) +{ + class TestJob : public Job + { + public: + explicit TestJob(shared_ptr film) + : Job(film) + {} + + ~TestJob() + { + stop_thread(); + } + + std::string name() const override { + return "test"; + } + std::string json_name() const override { + return "test"; + } + void run() override {}; + }; + + auto picture1 = content_factory("test/data/flat_red.png")[0]; + auto picture2 = content_factory("test/data/flat_red.png")[0]; + + auto film = new_test_film2("writer_progress_test", { picture1, picture2 }); + film->set_reel_type(ReelType::BY_VIDEO_CONTENT); + picture1->video->set_length(240); + picture2->video->set_length(240); + picture2->set_position(film, dcpomatic::DCPTime::from_seconds(10)); + + auto job = std::make_shared(film); + job->set_rate_limit_progress(false); + + float last_progress = 0; + string last_sub_name; + boost::signals2::scoped_connection connection = job->Progress.connect([job, &last_progress, &last_sub_name]() { + auto const progress = job->progress().get_value_or(0); + BOOST_REQUIRE(job->sub_name() != last_sub_name || progress >= last_progress); + last_progress = progress; + last_sub_name = job->sub_name(); + }); + + DCPEncoder encoder(film, job); + encoder.go(); +} + -- 2.30.2