# Use distro-provided FFmpeg on Arch
deps = []
- deps.append(('libdcp', 'e3fa86ef35f212b14b593dd36dbff66e845d37e4'))
+ deps.append(('libdcp', 'c46f6125c482f2a3361cd33d1e1163927f038e9d'))
deps.append(('libsub', 'v1.6.44'))
deps.append(('leqm-nrt', '30dcaea1373ac62fba050e02ce5b0c1085797a23'))
deps.append(('rtaudio', 'f619b76'))
, _state (NEW)
, _sub_start_time (0)
, _progress (0)
+ , _rate_limit_progress(true)
{
}
{
check_for_interruption_or_pause ();
- if (!force) {
+ if (!force && _rate_limit_progress) {
/* Check for excessively frequent progress reporting */
boost::mutex::scoped_lock lm (_progress_mutex);
struct timeval now;
boost::mutex::scoped_lock lm (_state_mutex);
_message = m;
}
+
+
+void
+Job::set_rate_limit_progress(bool rate_limit)
+{
+ _rate_limit_progress = rate_limit;
+}
+
void when_finished(boost::signals2::connection& connection, std::function<void(Result)> finished);
+ void set_rate_limit_progress(bool rate_limit);
+
boost::signals2::signal<void()> Progress;
/** Emitted from the UI thread when the job is finished */
boost::signals2::signal<void (Result)> Finished;
boost::optional<float> _progress;
boost::optional<struct timeval> _last_progress_update;
+ /** true to limit emissions of the progress signal so that they don't
+ * come too often.
+ */
+ boost::atomic<bool> _rate_limit_progress;
+
/** condition to signal changes to pause/resume so that we know when to wake;
this could be a general _state_change if it made more sense.
*/
return reel;
}
+
+/** @param set_progress Method to call with progress; first parameter is the number of bytes
+ * done, second parameter is the number of bytes in total.
+ */
void
-ReelWriter::calculate_digests (std::function<void (float)> set_progress)
+ReelWriter::calculate_digests(std::function<void (int64_t, int64_t)> set_progress)
try
{
+ vector<shared_ptr<const dcp::Asset>> assets;
+
if (_picture_asset) {
- _picture_asset->hash (set_progress);
+ assets.push_back(_picture_asset);
}
-
if (_sound_asset) {
- _sound_asset->hash (set_progress);
+ assets.push_back(_sound_asset);
}
-
if (_atmos_asset) {
- _atmos_asset->hash (set_progress);
+ assets.push_back(_atmos_asset);
}
+
+ int64_t total_size = 0;
+ for (auto asset: assets) {
+ total_size += asset->file() ? boost::filesystem::file_size(*asset->file()) : 0;
+ }
+
+ int64_t total_done = 0;
+ for (auto asset: assets) {
+ asset->hash([&total_done, total_size, set_progress](int64_t done, int64_t) {
+ set_progress(total_done + done, total_size);
+ });
+ total_done += asset->file() ? boost::filesystem::file_size(*asset->file()) : 0;
+ }
+
} catch (boost::thread_interrupted) {
/* set_progress contains an interruption_point, so any of these methods
* may throw thread_interrupted, at which point we just give up.
bool ensure_subtitles,
std::set<DCPTextTrack> ensure_closed_captions
);
- void calculate_digests (std::function<void (float)> set_progress);
+ void calculate_digests(std::function<void (int64_t, int64_t)> set_progress);
Frame start () const;
pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
}
- std::function<void (float)> set_progress;
+ std::function<void (int, int64_t, int64_t)> set_progress;
if (job) {
- set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
+ set_progress = boost::bind(&Writer::set_digest_progress, this, job.get(), _1, _2, _3);
} else {
- set_progress = [](float) {
+ set_progress = [](int, int64_t, int64_t) {
boost::this_thread::interruption_point();
};
}
+ int index = 0;
+
for (auto& i: _reels) {
- service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
+ service.post(
+ boost::bind(
+ &ReelWriter::calculate_digests,
+ &i,
+ std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
+ ));
+ ++index;
}
- service.post (boost::bind (&Writer::calculate_referenced_digests, this, set_progress));
+ service.post(
+ boost::bind(
+ &Writer::calculate_referenced_digests,
+ this,
+ std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
+ ));
work.reset ();
}
+/** Update job progress with information about the progress of a single digest calculation
+ * thread.
+ * @param id Unique identifier for the thread whose progress has changed.
+ * @param done Number of bytes that this thread has processed.
+ * @param size Total number of bytes that this thread must process.
+ */
void
-Writer::set_digest_progress (Job* job, float progress)
+Writer::set_digest_progress(Job* job, int id, int64_t done, int64_t size)
{
boost::mutex::scoped_lock lm (_digest_progresses_mutex);
- _digest_progresses[boost::this_thread::get_id()] = progress;
- float min_progress = FLT_MAX;
+ /* Update the progress for this thread */
+ _digest_progresses[id] = std::make_pair(done, size);
+
+ /* Get the total progress across all threads and use it to set job progress */
+ int64_t total_done = 0;
+ int64_t total_size = 0;
for (auto const& i: _digest_progresses) {
- min_progress = min (min_progress, i.second);
+ total_done += i.second.first;
+ total_size += i.second.second;
}
- job->set_progress (min_progress);
+ job->set_progress(float(total_done) / total_size);
Waker waker;
waker.nudge ();
/** Calculate hashes for any referenced MXF assets which do not already have one */
void
-Writer::calculate_referenced_digests (std::function<void (float)> set_progress)
+Writer::calculate_referenced_digests(std::function<void (int64_t, int64_t)> set_progress)
try
{
+ int64_t total_size = 0;
+ for (auto const& i: _reel_assets) {
+ auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
+ if (file && !file->hash()) {
+ auto filename = file->asset_ref().asset()->file();
+ DCPOMATIC_ASSERT(filename);
+ total_size += boost::filesystem::file_size(*filename);
+ }
+ }
+
+ int64_t total_done = 0;
for (auto const& i: _reel_assets) {
auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
if (file && !file->hash()) {
- file->asset_ref().asset()->hash (set_progress);
+ file->asset_ref().asset()->hash([&total_done, total_size, set_progress](int64_t done, int64_t) {
+ set_progress(total_done + done, total_size);
+ });
+ total_done += boost::filesystem::file_size(*file->asset_ref().asset()->file());
file->set_hash (file->asset_ref().asset()->hash());
}
}
void terminate_thread (bool);
bool have_sequenced_image_at_queue_head ();
size_t video_reel (int frame) const;
- void set_digest_progress (Job* job, float progress);
+ void set_digest_progress(Job* job, int id, int64_t done, int64_t size);
void write_cover_sheet (boost::filesystem::path output_dcp);
- void calculate_referenced_digests (std::function<void (float)> set_progress);
+ void calculate_referenced_digests(std::function<void (int64_t, int64_t)> set_progress);
void write_hanging_text (ReelWriter& reel);
void calculate_digests ();
bool _text_only;
boost::mutex _digest_progresses_mutex;
- std::map<boost::thread::id, float> _digest_progresses;
+ std::map<int, std::pair<int64_t, int64_t>> _digest_progresses;
std::list<ReferencedReelAsset> _reel_assets;
#include "lib/content.h"
#include "lib/content_factory.h"
#include "lib/cross.h"
+#include "lib/dcp_encoder.h"
#include "lib/film.h"
#include "lib/job.h"
#include "lib/video_content.h"
using std::make_shared;
using std::shared_ptr;
+using std::string;
using std::vector;
dcpomatic_sleep_seconds (1);
cl.run ();
}
+
+
+BOOST_AUTO_TEST_CASE(writer_progress_test)
+{
+ class TestJob : public Job
+ {
+ public:
+ explicit TestJob(shared_ptr<const Film> film)
+ : Job(film)
+ {}
+
+ ~TestJob()
+ {
+ stop_thread();
+ }
+
+ std::string name() const override {
+ return "test";
+ }
+ std::string json_name() const override {
+ return "test";
+ }
+ void run() override {};
+ };
+
+ auto picture1 = content_factory("test/data/flat_red.png")[0];
+ auto picture2 = content_factory("test/data/flat_red.png")[0];
+
+ auto film = new_test_film2("writer_progress_test", { picture1, picture2 });
+ film->set_reel_type(ReelType::BY_VIDEO_CONTENT);
+ picture1->video->set_length(240);
+ picture2->video->set_length(240);
+ picture2->set_position(film, dcpomatic::DCPTime::from_seconds(10));
+
+ auto job = std::make_shared<TestJob>(film);
+ job->set_rate_limit_progress(false);
+
+ float last_progress = 0;
+ string last_sub_name;
+ boost::signals2::scoped_connection connection = job->Progress.connect([job, &last_progress, &last_sub_name]() {
+ auto const progress = job->progress().get_value_or(0);
+ BOOST_REQUIRE(job->sub_name() != last_sub_name || progress >= last_progress);
+ last_progress = progress;
+ last_sub_name = job->sub_name();
+ });
+
+ DCPEncoder encoder(film, job);
+ encoder.go();
+}
+