X-Git-Url: https://git.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fjob_manager.cc;h=86230db2b77a7653f86e46bf7165a51ccad84c82;hb=HEAD;hp=d8c0b02f2f210a58c863fe81d072dfecdd883468;hpb=8963f0007af1a312017b9627c18b82ec2a577591;p=dcpomatic.git diff --git a/src/lib/job_manager.cc b/src/lib/job_manager.cc index d8c0b02f2..86230db2b 100644 --- a/src/lib/job_manager.cc +++ b/src/lib/job_manager.cc @@ -30,6 +30,7 @@ #include "film.h" #include "job.h" #include "job_manager.h" +#include "util.h" #include @@ -74,7 +75,7 @@ JobManager::~JobManager () { boost::mutex::scoped_lock lm (_mutex); _terminate = true; - _empty_condition.notify_all (); + _schedule_condition.notify_all(); } try { @@ -89,7 +90,7 @@ JobManager::add (shared_ptr j) { boost::mutex::scoped_lock lm (_mutex); _jobs.push_back (j); - _empty_condition.notify_all (); + _schedule_condition.notify_all(); } emit (boost::bind(boost::ref(JobAdded), weak_ptr(j))); @@ -106,7 +107,7 @@ JobManager::add_after (shared_ptr after, shared_ptr j) auto i = find (_jobs.begin(), _jobs.end(), after); DCPOMATIC_ASSERT (i != _jobs.end()); _jobs.insert (i, j); - _empty_condition.notify_all (); + _schedule_condition.notify_all(); } emit (boost::bind(boost::ref(JobAdded), weak_ptr(j))); @@ -159,39 +160,32 @@ JobManager::scheduler () boost::mutex::scoped_lock lm (_mutex); - while (true) { - bool have_new = false; - bool have_running = false; - for (auto i: _jobs) { - if (i->running()) { - have_running = true; - } - if (i->is_new()) { - have_new = true; - } - } - - if ((!have_running && have_new) || _terminate) { - break; - } - - _empty_condition.wait (lm); - } - if (_terminate) { break; } + bool have_running = false; for (auto i: _jobs) { - if (i->is_new()) { - _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this))); - i->start (); + if ((have_running || _paused) && i->running()) { + /* We already have a running job, or are totally paused, so this job should not be running */ + i->pause_by_priority(); + } else if (!have_running && !_paused && (i->is_new() || i->paused_by_priority())) { + /* We don't have a running job, and we should have one, so start/resume this */ + if (i->is_new()) { + _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this))); + i->start (); + } else { + i->resume (); + } emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, i->json_name())); _last_active_job = i->json_name (); - /* Only start one job at once */ - break; + have_running = true; + } else if (!have_running && i->running()) { + have_running = true; } } + + _schedule_condition.wait(lm); } } @@ -205,7 +199,7 @@ JobManager::job_finished () _last_active_job = optional(); } - _empty_condition.notify_all (); + _schedule_condition.notify_all(); } @@ -235,7 +229,7 @@ JobManager::analyse_audio ( shared_ptr playlist, bool from_zero, boost::signals2::connection& connection, - function ready + function ready ) { { @@ -258,7 +252,7 @@ JobManager::analyse_audio ( job = make_shared (film, playlist, from_zero); connection = job->Finished.connect (ready); _jobs.push_back (job); - _empty_condition.notify_all (); + _schedule_condition.notify_all (); } emit (boost::bind (boost::ref (JobAdded), weak_ptr (job))); @@ -270,7 +264,7 @@ JobManager::analyse_subtitles ( shared_ptr film, shared_ptr content, boost::signals2::connection& connection, - function ready + function ready ) { { @@ -278,7 +272,7 @@ JobManager::analyse_subtitles ( for (auto i: _jobs) { auto a = dynamic_pointer_cast (i); - if (a && a->path() == film->subtitle_analysis_path(content)) { + if (a && a->path() == film->subtitle_analysis_path(content) && !i->finished_cancelled()) { i->when_finished (connection, ready); return; } @@ -293,7 +287,7 @@ JobManager::analyse_subtitles ( job = make_shared(film, content); connection = job->Finished.connect (ready); _jobs.push_back (job); - _empty_condition.notify_all (); + _schedule_condition.notify_all (); } emit (boost::bind(boost::ref(JobAdded), weak_ptr(job))); @@ -302,110 +296,53 @@ JobManager::analyse_subtitles ( void JobManager::increase_priority (shared_ptr job) -{ - bool changed = false; - - { - boost::mutex::scoped_lock lm (_mutex); - auto last = _jobs.end (); - for (auto i = _jobs.begin(); i != _jobs.end(); ++i) { - if (*i == job && last != _jobs.end()) { - swap (*i, *last); - changed = true; - break; - } - last = i; - } - } - - if (changed) { - priority_changed (); - } -} - - -void -JobManager::priority_changed () { { boost::mutex::scoped_lock lm (_mutex); - - bool first = true; - for (auto i: _jobs) { - if (first) { - if (i->is_new ()) { - i->start (); - } else if (i->paused_by_priority ()) { - i->resume (); - } - first = false; - } else { - if (i->running ()) { - i->pause_by_priority (); - } - } + auto iter = std::find(_jobs.begin(), _jobs.end(), job); + if (iter == _jobs.begin() || iter == _jobs.end()) { + return; } + swap(*iter, *std::prev(iter)); } - emit (boost::bind(boost::ref(JobsReordered))); + _schedule_condition.notify_all(); + emit(boost::bind(boost::ref(JobsReordered))); } void JobManager::decrease_priority (shared_ptr job) { - bool changed = false; - { boost::mutex::scoped_lock lm (_mutex); - for (auto i = _jobs.begin(); i != _jobs.end(); ++i) { - auto next = i; - ++next; - if (*i == job && next != _jobs.end()) { - swap (*i, *next); - changed = true; - break; - } + auto iter = std::find(_jobs.begin(), _jobs.end(), job); + if (iter == _jobs.end() || std::next(iter) == _jobs.end()) { + return; } + swap(*iter, *std::next(iter)); } - if (changed) { - priority_changed (); - } + _schedule_condition.notify_all(); + emit(boost::bind(boost::ref(JobsReordered))); } +/** Pause all job processing */ void JobManager::pause () { boost::mutex::scoped_lock lm (_mutex); - - if (_paused) { - return; - } - - for (auto i: _jobs) { - if (i->pause_by_user()) { - _paused_job = i; - } - } - _paused = true; + _schedule_condition.notify_all(); } +/** Resume processing jobs after a previous pause() */ void JobManager::resume () { boost::mutex::scoped_lock lm (_mutex); - if (!_paused) { - return; - } - - if (_paused_job) { - _paused_job->resume (); - } - - _paused_job.reset (); _paused = false; + _schedule_condition.notify_all(); }