From 9dd38ef2f05b24ba669acb9805e0914ac227fff2 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Wed, 10 Oct 2012 12:53:06 +0100 Subject: [PATCH] Try to not start jobs if a dependant fails. --- src/lib/ab_transcode_job.cc | 4 +- src/lib/ab_transcode_job.h | 2 +- src/lib/check_hashes_job.cc | 10 ++-- src/lib/check_hashes_job.h | 2 +- src/lib/copy_from_dvd_job.cc | 4 +- src/lib/copy_from_dvd_job.h | 2 +- src/lib/examine_content_job.cc | 4 +- src/lib/examine_content_job.h | 2 +- src/lib/film.cc | 16 +++--- src/lib/job.cc | 10 +++- src/lib/job.h | 9 ++- src/lib/job_manager.cc | 22 ++++---- src/lib/job_manager.h | 6 +- src/lib/make_dcp_job.cc | 4 +- src/lib/make_dcp_job.h | 2 +- src/lib/scp_dcp_job.cc | 4 +- src/lib/scp_dcp_job.h | 2 +- src/lib/thumbs_job.cc | 4 +- src/lib/thumbs_job.h | 2 +- src/lib/transcode_job.cc | 4 +- src/lib/transcode_job.h | 2 +- src/wx/film_viewer.cc | 2 +- test/test.cc | 100 ++++++++++++++++++++++++++++++++- 23 files changed, 164 insertions(+), 55 deletions(-) diff --git a/src/lib/ab_transcode_job.cc b/src/lib/ab_transcode_job.cc index d94f56d0a..fd8236bf0 100644 --- a/src/lib/ab_transcode_job.cc +++ b/src/lib/ab_transcode_job.cc @@ -35,8 +35,8 @@ using namespace boost; * @param o Options. * @Param l A log that we can write to. */ -ABTranscodeJob::ABTranscodeJob (shared_ptr s, shared_ptr o, Log* l) - : Job (s, o, l) +ABTranscodeJob::ABTranscodeJob (shared_ptr s, shared_ptr o, Log* l, shared_ptr req) + : Job (s, o, l, req) { _fs_b.reset (new FilmState (*_fs)); _fs_b->scaler = Config::instance()->reference_scaler (); diff --git a/src/lib/ab_transcode_job.h b/src/lib/ab_transcode_job.h index 478049068..4b80593f4 100644 --- a/src/lib/ab_transcode_job.h +++ b/src/lib/ab_transcode_job.h @@ -34,7 +34,7 @@ class ABTranscodeJob : public Job { public: - ABTranscodeJob (boost::shared_ptr s, boost::shared_ptr o, Log* l); + ABTranscodeJob (boost::shared_ptr s, boost::shared_ptr o, Log* l, boost::shared_ptr req); std::string name () const; void run (); diff --git a/src/lib/check_hashes_job.cc b/src/lib/check_hashes_job.cc index f60a2d40d..f07a5ab2a 100644 --- a/src/lib/check_hashes_job.cc +++ b/src/lib/check_hashes_job.cc @@ -31,8 +31,8 @@ using namespace std; using namespace boost; -CheckHashesJob::CheckHashesJob (shared_ptr s, shared_ptr o, Log* l) - : Job (s, o, l) +CheckHashesJob::CheckHashesJob (shared_ptr s, shared_ptr o, Log* l, shared_ptr req) + : Job (s, o, l, req) , _bad (0) { @@ -73,13 +73,13 @@ CheckHashesJob::run () shared_ptr tc; if (_fs->dcp_ab) { - tc.reset (new ABTranscodeJob (_fs, _opt, _log)); + tc.reset (new ABTranscodeJob (_fs, _opt, _log, shared_from_this())); } else { - tc.reset (new TranscodeJob (_fs, _opt, _log)); + tc.reset (new TranscodeJob (_fs, _opt, _log, shared_from_this())); } JobManager::instance()->add_after (shared_from_this(), tc); - JobManager::instance()->add_after (tc, shared_ptr (new CheckHashesJob (_fs, _opt, _log))); + JobManager::instance()->add_after (tc, shared_ptr (new CheckHashesJob (_fs, _opt, _log, tc))); } set_progress (1); diff --git a/src/lib/check_hashes_job.h b/src/lib/check_hashes_job.h index b59cf031b..6a68e936c 100644 --- a/src/lib/check_hashes_job.h +++ b/src/lib/check_hashes_job.h @@ -22,7 +22,7 @@ class CheckHashesJob : public Job { public: - CheckHashesJob (boost::shared_ptr s, boost::shared_ptr o, Log* l); + CheckHashesJob (boost::shared_ptr s, boost::shared_ptr o, Log* l, boost::shared_ptr req); std::string name () const; void run (); diff --git a/src/lib/copy_from_dvd_job.cc b/src/lib/copy_from_dvd_job.cc index d1000f54c..f7281fc10 100644 --- a/src/lib/copy_from_dvd_job.cc +++ b/src/lib/copy_from_dvd_job.cc @@ -35,8 +35,8 @@ using namespace boost; /** @param fs FilmState for the film to write DVD data into. * @param l Log that we can write to. */ -CopyFromDVDJob::CopyFromDVDJob (shared_ptr fs, Log* l) - : Job (fs, shared_ptr (), l) +CopyFromDVDJob::CopyFromDVDJob (shared_ptr fs, Log* l, shared_ptr req) + : Job (fs, shared_ptr (), l, req) { } diff --git a/src/lib/copy_from_dvd_job.h b/src/lib/copy_from_dvd_job.h index 6b56f6f0a..ce3837100 100644 --- a/src/lib/copy_from_dvd_job.h +++ b/src/lib/copy_from_dvd_job.h @@ -29,7 +29,7 @@ class CopyFromDVDJob : public Job { public: - CopyFromDVDJob (boost::shared_ptr, Log *); + CopyFromDVDJob (boost::shared_ptr, Log *, boost::shared_ptr req); std::string name () const; void run (); diff --git a/src/lib/examine_content_job.cc b/src/lib/examine_content_job.cc index d77ede2f9..36b4cbabc 100644 --- a/src/lib/examine_content_job.cc +++ b/src/lib/examine_content_job.cc @@ -30,8 +30,8 @@ using namespace std; using namespace boost; -ExamineContentJob::ExamineContentJob (shared_ptr fs, Log* l) - : Job (fs, shared_ptr (), l) +ExamineContentJob::ExamineContentJob (shared_ptr fs, Log* l, shared_ptr req) + : Job (fs, shared_ptr (), l, req) { } diff --git a/src/lib/examine_content_job.h b/src/lib/examine_content_job.h index d149341b4..3bbd673a8 100644 --- a/src/lib/examine_content_job.h +++ b/src/lib/examine_content_job.h @@ -31,7 +31,7 @@ class Decoder; class ExamineContentJob : public Job { public: - ExamineContentJob (boost::shared_ptr, Log *); + ExamineContentJob (boost::shared_ptr, Log *, boost::shared_ptr req); ~ExamineContentJob (); std::string name () const; diff --git a/src/lib/film.cc b/src/lib/film.cc index e2b3d4bc3..00d37c097 100644 --- a/src/lib/film.cc +++ b/src/lib/film.cc @@ -534,16 +534,18 @@ Film::make_dcp (bool transcode, int freq) o->padding = format()->dcp_padding (this); o->ratio = format()->ratio_as_float (this); + shared_ptr r; + if (transcode) { if (_state.dcp_ab) { - JobManager::instance()->add (shared_ptr (new ABTranscodeJob (fs, o, log ()))); + r = JobManager::instance()->add (shared_ptr (new ABTranscodeJob (fs, o, log(), shared_ptr ()))); } else { - JobManager::instance()->add (shared_ptr (new TranscodeJob (fs, o, log ()))); + r = JobManager::instance()->add (shared_ptr (new TranscodeJob (fs, o, log(), shared_ptr ()))); } } - JobManager::instance()->add (shared_ptr (new CheckHashesJob (fs, o, log ()))); - JobManager::instance()->add (shared_ptr (new MakeDCPJob (fs, o, log ()))); + r = JobManager::instance()->add (shared_ptr (new CheckHashesJob (fs, o, log(), r))); + JobManager::instance()->add (shared_ptr (new MakeDCPJob (fs, o, log(), r))); } shared_ptr @@ -582,7 +584,7 @@ Film::examine_content () return; } - _examine_content_job.reset (new ExamineContentJob (state_copy (), log ())); + _examine_content_job.reset (new ExamineContentJob (state_copy (), log(), shared_ptr ())); _examine_content_job->Finished.connect (sigc::mem_fun (*this, &Film::examine_content_post_gui)); JobManager::instance()->add (_examine_content_job); } @@ -631,14 +633,14 @@ Film::set_still_duration (int d) void Film::send_dcp_to_tms () { - shared_ptr j (new SCPDCPJob (state_copy (), log ())); + shared_ptr j (new SCPDCPJob (state_copy (), log(), shared_ptr ())); JobManager::instance()->add (j); } void Film::copy_from_dvd () { - shared_ptr j (new CopyFromDVDJob (state_copy (), log ())); + shared_ptr j (new CopyFromDVDJob (state_copy (), log(), shared_ptr ())); j->Finished.connect (sigc::mem_fun (*this, &Film::copy_from_dvd_post_gui)); JobManager::instance()->add (j); } diff --git a/src/lib/job.cc b/src/lib/job.cc index 39ce4173a..d3871bf72 100644 --- a/src/lib/job.cc +++ b/src/lib/job.cc @@ -34,10 +34,11 @@ using namespace boost; * @param o Options. * @param l A log that we can write to. */ -Job::Job (shared_ptr s, shared_ptr o, Log* l) +Job::Job (shared_ptr s, shared_ptr o, Log* l, shared_ptr req) : _fs (s) , _opt (o) , _log (l) + , _required (req) , _state (NEW) , _start_time (0) , _progress_unknown (false) @@ -80,6 +81,13 @@ Job::run_wrapper () } } +bool +Job::is_new () const +{ + boost::mutex::scoped_lock lm (_state_mutex); + return _state == NEW; +} + /** @return true if the job is running */ bool Job::running () const diff --git a/src/lib/job.h b/src/lib/job.h index 802bf468d..f50ed0784 100644 --- a/src/lib/job.h +++ b/src/lib/job.h @@ -39,7 +39,7 @@ class Options; class Job : public boost::enable_shared_from_this { public: - Job (boost::shared_ptr s, boost::shared_ptr o, Log* l); + Job (boost::shared_ptr s, boost::shared_ptr o, Log* l, boost::shared_ptr req); /** @return user-readable name of this job */ virtual std::string name () const = 0; @@ -48,6 +48,7 @@ public: void start (); + bool is_new () const; bool running () const; bool finished () const; bool finished_ok () const; @@ -66,6 +67,10 @@ public: void emit_finished (); + boost::shared_ptr required () const { + return _required; + } + /** Emitted from the GUI thread */ sigc::signal0 Finished; @@ -95,6 +100,8 @@ private: void run_wrapper (); + boost::shared_ptr _required; + /** mutex for _state and _error */ mutable boost::mutex _state_mutex; /** current state of the job */ diff --git a/src/lib/job_manager.cc b/src/lib/job_manager.cc index 76fcc6c5d..562c887de 100644 --- a/src/lib/job_manager.cc +++ b/src/lib/job_manager.cc @@ -37,11 +37,12 @@ JobManager::JobManager () boost::thread (boost::bind (&JobManager::scheduler, this)); } -void +shared_ptr JobManager::add (shared_ptr j) { boost::mutex::scoped_lock lm (_mutex); _jobs.push_back (j); + return j; } void @@ -93,18 +94,15 @@ JobManager::scheduler () while (1) { { boost::mutex::scoped_lock lm (_mutex); - int running = 0; - shared_ptr first_new; for (list >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) { - if ((*i)->running ()) { - ++running; - } else if (!(*i)->finished () && first_new == 0) { - first_new = *i; - } - - if (running == 0 && first_new) { - first_new->start (); - break; + if ((*i)->is_new()) { + shared_ptr r = (*i)->required (); + if (!r || r->finished_ok ()) { + (*i)->start (); + + /* Only start one job at once */ + break; + } } } } diff --git a/src/lib/job_manager.h b/src/lib/job_manager.h index 8b79fd67d..4b70738f0 100644 --- a/src/lib/job_manager.h +++ b/src/lib/job_manager.h @@ -28,16 +28,12 @@ class Job; /** @class JobManager * @brief A simple scheduler for jobs. - * - * JobManager simply keeps a list of pending jobs, and assumes that all the jobs - * are sufficiently CPU intensive that there is no point running them in parallel; - * so jobs are just run one after the other. */ class JobManager { public: - void add (boost::shared_ptr); + boost::shared_ptr add (boost::shared_ptr); void add_after (boost::shared_ptr after, boost::shared_ptr j); std::list > get () const; bool work_to_do () const; diff --git a/src/lib/make_dcp_job.cc b/src/lib/make_dcp_job.cc index ae4bb4fbe..b42a38429 100644 --- a/src/lib/make_dcp_job.cc +++ b/src/lib/make_dcp_job.cc @@ -43,8 +43,8 @@ using namespace boost; * @param o Options. * @param l Log. */ -MakeDCPJob::MakeDCPJob (shared_ptr s, shared_ptr o, Log* l) - : Job (s, o, l) +MakeDCPJob::MakeDCPJob (shared_ptr s, shared_ptr o, Log* l, shared_ptr req) + : Job (s, o, l, req) { } diff --git a/src/lib/make_dcp_job.h b/src/lib/make_dcp_job.h index 677bed424..c350a819c 100644 --- a/src/lib/make_dcp_job.h +++ b/src/lib/make_dcp_job.h @@ -29,7 +29,7 @@ class MakeDCPJob : public Job { public: - MakeDCPJob (boost::shared_ptr, boost::shared_ptr, Log *); + MakeDCPJob (boost::shared_ptr, boost::shared_ptr, Log *, boost::shared_ptr req); std::string name () const; void run (); diff --git a/src/lib/scp_dcp_job.cc b/src/lib/scp_dcp_job.cc index dac4a602c..90122cea7 100644 --- a/src/lib/scp_dcp_job.cc +++ b/src/lib/scp_dcp_job.cc @@ -91,8 +91,8 @@ public: }; -SCPDCPJob::SCPDCPJob (shared_ptr s, Log* l) - : Job (s, shared_ptr (), l) +SCPDCPJob::SCPDCPJob (shared_ptr s, Log* l, shared_ptr req) + : Job (s, shared_ptr (), l, req) , _status ("Waiting") { diff --git a/src/lib/scp_dcp_job.h b/src/lib/scp_dcp_job.h index 1c795be47..b457fdf5b 100644 --- a/src/lib/scp_dcp_job.h +++ b/src/lib/scp_dcp_job.h @@ -26,7 +26,7 @@ class SCPDCPJob : public Job { public: - SCPDCPJob (boost::shared_ptr, Log *); + SCPDCPJob (boost::shared_ptr, Log *, boost::shared_ptr req); std::string name () const; void run (); diff --git a/src/lib/thumbs_job.cc b/src/lib/thumbs_job.cc index f6ed75ff7..779a1d5d1 100644 --- a/src/lib/thumbs_job.cc +++ b/src/lib/thumbs_job.cc @@ -35,8 +35,8 @@ using namespace boost; * @param o Options. * @param l A log that we can write to. */ -ThumbsJob::ThumbsJob (shared_ptr s, shared_ptr o, Log* l) - : Job (s, o, l) +ThumbsJob::ThumbsJob (shared_ptr s, shared_ptr o, Log* l, shared_ptr req) + : Job (s, o, l, req) { } diff --git a/src/lib/thumbs_job.h b/src/lib/thumbs_job.h index 1dd69a0f9..f7e30d576 100644 --- a/src/lib/thumbs_job.h +++ b/src/lib/thumbs_job.h @@ -31,7 +31,7 @@ class FilmState; class ThumbsJob : public Job { public: - ThumbsJob (boost::shared_ptr s, boost::shared_ptr o, Log* l); + ThumbsJob (boost::shared_ptr s, boost::shared_ptr o, Log* l, boost::shared_ptr req); std::string name () const; void run (); }; diff --git a/src/lib/transcode_job.cc b/src/lib/transcode_job.cc index e1ba82359..a53a4b6ad 100644 --- a/src/lib/transcode_job.cc +++ b/src/lib/transcode_job.cc @@ -39,8 +39,8 @@ using namespace boost; * @param o Options. * @param l A log that we can write to. */ -TranscodeJob::TranscodeJob (shared_ptr s, shared_ptr o, Log* l) - : Job (s, o, l) +TranscodeJob::TranscodeJob (shared_ptr s, shared_ptr o, Log* l, shared_ptr req) + : Job (s, o, l, req) { } diff --git a/src/lib/transcode_job.h b/src/lib/transcode_job.h index 737f10de9..fe68a4910 100644 --- a/src/lib/transcode_job.h +++ b/src/lib/transcode_job.h @@ -32,7 +32,7 @@ class Encoder; class TranscodeJob : public Job { public: - TranscodeJob (boost::shared_ptr s, boost::shared_ptr o, Log* l); + TranscodeJob (boost::shared_ptr s, boost::shared_ptr o, Log* l, boost::shared_ptr req); std::string name () const; void run (); diff --git a/src/wx/film_viewer.cc b/src/wx/film_viewer.cc index 3c7d76bce..6ffe4e66a 100644 --- a/src/wx/film_viewer.cc +++ b/src/wx/film_viewer.cc @@ -266,7 +266,7 @@ FilmViewer::update_thumbs () o->decode_audio = false; o->decode_video_frequency = 128; - shared_ptr j (new ThumbsJob (s, o, _film->log ())); + shared_ptr j (new ThumbsJob (s, o, _film->log(), shared_ptr ())); j->Finished.connect (sigc::mem_fun (_film, &Film::update_thumbs_post_gui)); JobManager::instance()->add (j); } diff --git a/test/test.cc b/test/test.cc index 26bca33e5..c801d538e 100644 --- a/test/test.cc +++ b/test/test.cc @@ -35,6 +35,7 @@ #include "config.h" #include "server.h" #include "cross.h" +#include "job.h" #define BOOST_TEST_DYN_LINK #define BOOST_TEST_MODULE dvdomatic_test #include @@ -305,6 +306,9 @@ BOOST_AUTO_TEST_CASE (client_server_test) new thread (boost::bind (&Server::run, server, 2)); + /* Let the server get itself ready */ + dvdomatic_sleep (1); + ServerDescription description ("localhost", 2); list threads; @@ -357,7 +361,7 @@ BOOST_AUTO_TEST_CASE (make_dcp_with_range_test) film.set_dcp_frames (42); film.make_dcp (true); - while (JobManager::instance()->work_to_do ()) { + while (JobManager::instance()->work_to_do() && !JobManager::instance()->errors()) { dvdomatic_sleep (1); } @@ -386,3 +390,97 @@ BOOST_AUTO_TEST_CASE (audio_sampling_rate_test) fs.audio_sample_rate = 48000; BOOST_CHECK_EQUAL (fs.target_sample_rate(), 47952); } + +class TestJob : public Job +{ +public: + TestJob (shared_ptr s, shared_ptr o, Log* l, shared_ptr req) + : Job (s, o, l, req) + { + + } + + void set_finished_ok () { + set_state (FINISHED_OK); + } + + void set_finished_error () { + set_state (FINISHED_ERROR); + } + + void run () + { + while (1) { + if (finished ()) { + return; + } + } + } + + string name () const { + return ""; + } +}; + +BOOST_AUTO_TEST_CASE (job_manager_test) +{ + shared_ptr s; + shared_ptr o; + FileLog log ("build/test/job_manager_test.log"); + + /* Single job, no dependency */ + shared_ptr a (new TestJob (s, o, &log, shared_ptr ())); + + JobManager::instance()->add (a); + dvdomatic_sleep (1); + BOOST_CHECK_EQUAL (a->running (), true); + a->set_finished_ok (); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (a->finished_ok(), true); + + /* Two jobs, no dependency */ + a.reset (new TestJob (s, o, &log, shared_ptr ())); + shared_ptr b (new TestJob (s, o, &log, shared_ptr ())); + + JobManager::instance()->add (a); + JobManager::instance()->add (b); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (a->running (), true); + BOOST_CHECK_EQUAL (b->running (), true); + a->set_finished_ok (); + b->set_finished_ok (); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (a->finished_ok (), true); + BOOST_CHECK_EQUAL (b->finished_ok (), true); + + /* Two jobs, dependency */ + a.reset (new TestJob (s, o, &log, shared_ptr ())); + b.reset (new TestJob (s, o, &log, a)); + + JobManager::instance()->add (a); + JobManager::instance()->add (b); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (a->running(), true); + BOOST_CHECK_EQUAL (b->running(), false); + a->set_finished_ok (); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (a->finished_ok(), true); + BOOST_CHECK_EQUAL (b->running(), true); + b->set_finished_ok (); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (b->finished_ok(), true); + + /* Two jobs, dependency, first fails */ + a.reset (new TestJob (s, o, &log, shared_ptr ())); + b.reset (new TestJob (s, o, &log, a)); + + JobManager::instance()->add (a); + JobManager::instance()->add (b); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (a->running(), true); + BOOST_CHECK_EQUAL (b->running(), false); + a->set_finished_error (); + dvdomatic_sleep (2); + BOOST_CHECK_EQUAL (a->finished_in_error(), true); + BOOST_CHECK_EQUAL (b->running(), false); +} -- 2.30.2