* @param o Options.
* @Param l A log that we can write to.
*/
-ABTranscodeJob::ABTranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
- : Job (s, o, l)
+ABTranscodeJob::ABTranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+ : Job (s, o, l, req)
{
_fs_b.reset (new FilmState (*_fs));
_fs_b->scaler = Config::instance()->reference_scaler ();
class ABTranscodeJob : public Job
{
public:
- ABTranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+ ABTranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
std::string name () const;
void run ();
using namespace std;
using namespace boost;
-CheckHashesJob::CheckHashesJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
- : Job (s, o, l)
+CheckHashesJob::CheckHashesJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+ : Job (s, o, l, req)
, _bad (0)
{
shared_ptr<Job> tc;
if (_fs->dcp_ab) {
- tc.reset (new ABTranscodeJob (_fs, _opt, _log));
+ tc.reset (new ABTranscodeJob (_fs, _opt, _log, shared_from_this()));
} else {
- tc.reset (new TranscodeJob (_fs, _opt, _log));
+ tc.reset (new TranscodeJob (_fs, _opt, _log, shared_from_this()));
}
JobManager::instance()->add_after (shared_from_this(), tc);
- JobManager::instance()->add_after (tc, shared_ptr<Job> (new CheckHashesJob (_fs, _opt, _log)));
+ JobManager::instance()->add_after (tc, shared_ptr<Job> (new CheckHashesJob (_fs, _opt, _log, tc)));
}
set_progress (1);
class CheckHashesJob : public Job
{
public:
- CheckHashesJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+ CheckHashesJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
std::string name () const;
void run ();
/** @param fs FilmState for the film to write DVD data into.
* @param l Log that we can write to.
*/
-CopyFromDVDJob::CopyFromDVDJob (shared_ptr<const FilmState> fs, Log* l)
- : Job (fs, shared_ptr<Options> (), l)
+CopyFromDVDJob::CopyFromDVDJob (shared_ptr<const FilmState> fs, Log* l, shared_ptr<Job> req)
+ : Job (fs, shared_ptr<Options> (), l, req)
{
}
class CopyFromDVDJob : public Job
{
public:
- CopyFromDVDJob (boost::shared_ptr<const FilmState>, Log *);
+ CopyFromDVDJob (boost::shared_ptr<const FilmState>, Log *, boost::shared_ptr<Job> req);
std::string name () const;
void run ();
using namespace std;
using namespace boost;
-ExamineContentJob::ExamineContentJob (shared_ptr<const FilmState> fs, Log* l)
- : Job (fs, shared_ptr<Options> (), l)
+ExamineContentJob::ExamineContentJob (shared_ptr<const FilmState> fs, Log* l, shared_ptr<Job> req)
+ : Job (fs, shared_ptr<Options> (), l, req)
{
}
class ExamineContentJob : public Job
{
public:
- ExamineContentJob (boost::shared_ptr<const FilmState>, Log *);
+ ExamineContentJob (boost::shared_ptr<const FilmState>, Log *, boost::shared_ptr<Job> req);
~ExamineContentJob ();
std::string name () const;
o->padding = format()->dcp_padding (this);
o->ratio = format()->ratio_as_float (this);
+ shared_ptr<Job> r;
+
if (transcode) {
if (_state.dcp_ab) {
- JobManager::instance()->add (shared_ptr<Job> (new ABTranscodeJob (fs, o, log ())));
+ r = JobManager::instance()->add (shared_ptr<Job> (new ABTranscodeJob (fs, o, log(), shared_ptr<Job> ())));
} else {
- JobManager::instance()->add (shared_ptr<Job> (new TranscodeJob (fs, o, log ())));
+ r = JobManager::instance()->add (shared_ptr<Job> (new TranscodeJob (fs, o, log(), shared_ptr<Job> ())));
}
}
- JobManager::instance()->add (shared_ptr<Job> (new CheckHashesJob (fs, o, log ())));
- JobManager::instance()->add (shared_ptr<Job> (new MakeDCPJob (fs, o, log ())));
+ r = JobManager::instance()->add (shared_ptr<Job> (new CheckHashesJob (fs, o, log(), r)));
+ JobManager::instance()->add (shared_ptr<Job> (new MakeDCPJob (fs, o, log(), r)));
}
shared_ptr<FilmState>
return;
}
- _examine_content_job.reset (new ExamineContentJob (state_copy (), log ()));
+ _examine_content_job.reset (new ExamineContentJob (state_copy (), log(), shared_ptr<Job> ()));
_examine_content_job->Finished.connect (sigc::mem_fun (*this, &Film::examine_content_post_gui));
JobManager::instance()->add (_examine_content_job);
}
void
Film::send_dcp_to_tms ()
{
- shared_ptr<Job> j (new SCPDCPJob (state_copy (), log ()));
+ shared_ptr<Job> j (new SCPDCPJob (state_copy (), log(), shared_ptr<Job> ()));
JobManager::instance()->add (j);
}
void
Film::copy_from_dvd ()
{
- shared_ptr<Job> j (new CopyFromDVDJob (state_copy (), log ()));
+ shared_ptr<Job> j (new CopyFromDVDJob (state_copy (), log(), shared_ptr<Job> ()));
j->Finished.connect (sigc::mem_fun (*this, &Film::copy_from_dvd_post_gui));
JobManager::instance()->add (j);
}
* @param o Options.
* @param l A log that we can write to.
*/
-Job::Job (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
+Job::Job (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
: _fs (s)
, _opt (o)
, _log (l)
+ , _required (req)
, _state (NEW)
, _start_time (0)
, _progress_unknown (false)
}
}
+bool
+Job::is_new () const
+{
+ boost::mutex::scoped_lock lm (_state_mutex);
+ return _state == NEW;
+}
+
/** @return true if the job is running */
bool
Job::running () const
class Job : public boost::enable_shared_from_this<Job>
{
public:
- Job (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+ Job (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
/** @return user-readable name of this job */
virtual std::string name () const = 0;
void start ();
+ bool is_new () const;
bool running () const;
bool finished () const;
bool finished_ok () const;
void emit_finished ();
+ boost::shared_ptr<Job> required () const {
+ return _required;
+ }
+
/** Emitted from the GUI thread */
sigc::signal0<void> Finished;
void run_wrapper ();
+ boost::shared_ptr<Job> _required;
+
/** mutex for _state and _error */
mutable boost::mutex _state_mutex;
/** current state of the job */
boost::thread (boost::bind (&JobManager::scheduler, this));
}
-void
+shared_ptr<Job>
JobManager::add (shared_ptr<Job> j)
{
boost::mutex::scoped_lock lm (_mutex);
_jobs.push_back (j);
+ return j;
}
void
while (1) {
{
boost::mutex::scoped_lock lm (_mutex);
- int running = 0;
- shared_ptr<Job> first_new;
for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
- if ((*i)->running ()) {
- ++running;
- } else if (!(*i)->finished () && first_new == 0) {
- first_new = *i;
- }
-
- if (running == 0 && first_new) {
- first_new->start ();
- break;
+ if ((*i)->is_new()) {
+ shared_ptr<Job> r = (*i)->required ();
+ if (!r || r->finished_ok ()) {
+ (*i)->start ();
+
+ /* Only start one job at once */
+ break;
+ }
}
}
}
/** @class JobManager
* @brief A simple scheduler for jobs.
- *
- * JobManager simply keeps a list of pending jobs, and assumes that all the jobs
- * are sufficiently CPU intensive that there is no point running them in parallel;
- * so jobs are just run one after the other.
*/
class JobManager
{
public:
- void add (boost::shared_ptr<Job>);
+ boost::shared_ptr<Job> add (boost::shared_ptr<Job>);
void add_after (boost::shared_ptr<Job> after, boost::shared_ptr<Job> j);
std::list<boost::shared_ptr<Job> > get () const;
bool work_to_do () const;
* @param o Options.
* @param l Log.
*/
-MakeDCPJob::MakeDCPJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
- : Job (s, o, l)
+MakeDCPJob::MakeDCPJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+ : Job (s, o, l, req)
{
}
class MakeDCPJob : public Job
{
public:
- MakeDCPJob (boost::shared_ptr<const FilmState>, boost::shared_ptr<const Options>, Log *);
+ MakeDCPJob (boost::shared_ptr<const FilmState>, boost::shared_ptr<const Options>, Log *, boost::shared_ptr<Job> req);
std::string name () const;
void run ();
};
-SCPDCPJob::SCPDCPJob (shared_ptr<const FilmState> s, Log* l)
- : Job (s, shared_ptr<const Options> (), l)
+SCPDCPJob::SCPDCPJob (shared_ptr<const FilmState> s, Log* l, shared_ptr<Job> req)
+ : Job (s, shared_ptr<const Options> (), l, req)
, _status ("Waiting")
{
class SCPDCPJob : public Job
{
public:
- SCPDCPJob (boost::shared_ptr<const FilmState>, Log *);
+ SCPDCPJob (boost::shared_ptr<const FilmState>, Log *, boost::shared_ptr<Job> req);
std::string name () const;
void run ();
* @param o Options.
* @param l A log that we can write to.
*/
-ThumbsJob::ThumbsJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
- : Job (s, o, l)
+ThumbsJob::ThumbsJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+ : Job (s, o, l, req)
{
}
class ThumbsJob : public Job
{
public:
- ThumbsJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+ ThumbsJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
std::string name () const;
void run ();
};
* @param o Options.
* @param l A log that we can write to.
*/
-TranscodeJob::TranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
- : Job (s, o, l)
+TranscodeJob::TranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+ : Job (s, o, l, req)
{
}
class TranscodeJob : public Job
{
public:
- TranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+ TranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
std::string name () const;
void run ();
o->decode_audio = false;
o->decode_video_frequency = 128;
- shared_ptr<Job> j (new ThumbsJob (s, o, _film->log ()));
+ shared_ptr<Job> j (new ThumbsJob (s, o, _film->log(), shared_ptr<Job> ()));
j->Finished.connect (sigc::mem_fun (_film, &Film::update_thumbs_post_gui));
JobManager::instance()->add (j);
}
#include "config.h"
#include "server.h"
#include "cross.h"
+#include "job.h"
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE dvdomatic_test
#include <boost/test/unit_test.hpp>
new thread (boost::bind (&Server::run, server, 2));
+ /* Let the server get itself ready */
+ dvdomatic_sleep (1);
+
ServerDescription description ("localhost", 2);
list<thread*> threads;
film.set_dcp_frames (42);
film.make_dcp (true);
- while (JobManager::instance()->work_to_do ()) {
+ while (JobManager::instance()->work_to_do() && !JobManager::instance()->errors()) {
dvdomatic_sleep (1);
}
fs.audio_sample_rate = 48000;
BOOST_CHECK_EQUAL (fs.target_sample_rate(), 47952);
}
+
+class TestJob : public Job
+{
+public:
+ TestJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+ : Job (s, o, l, req)
+ {
+
+ }
+
+ void set_finished_ok () {
+ set_state (FINISHED_OK);
+ }
+
+ void set_finished_error () {
+ set_state (FINISHED_ERROR);
+ }
+
+ void run ()
+ {
+ while (1) {
+ if (finished ()) {
+ return;
+ }
+ }
+ }
+
+ string name () const {
+ return "";
+ }
+};
+
+BOOST_AUTO_TEST_CASE (job_manager_test)
+{
+ shared_ptr<const FilmState> s;
+ shared_ptr<const Options> o;
+ FileLog log ("build/test/job_manager_test.log");
+
+ /* Single job, no dependency */
+ shared_ptr<TestJob> a (new TestJob (s, o, &log, shared_ptr<Job> ()));
+
+ JobManager::instance()->add (a);
+ dvdomatic_sleep (1);
+ BOOST_CHECK_EQUAL (a->running (), true);
+ a->set_finished_ok ();
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (a->finished_ok(), true);
+
+ /* Two jobs, no dependency */
+ a.reset (new TestJob (s, o, &log, shared_ptr<Job> ()));
+ shared_ptr<TestJob> b (new TestJob (s, o, &log, shared_ptr<Job> ()));
+
+ JobManager::instance()->add (a);
+ JobManager::instance()->add (b);
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (a->running (), true);
+ BOOST_CHECK_EQUAL (b->running (), true);
+ a->set_finished_ok ();
+ b->set_finished_ok ();
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (a->finished_ok (), true);
+ BOOST_CHECK_EQUAL (b->finished_ok (), true);
+
+ /* Two jobs, dependency */
+ a.reset (new TestJob (s, o, &log, shared_ptr<Job> ()));
+ b.reset (new TestJob (s, o, &log, a));
+
+ JobManager::instance()->add (a);
+ JobManager::instance()->add (b);
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (a->running(), true);
+ BOOST_CHECK_EQUAL (b->running(), false);
+ a->set_finished_ok ();
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (a->finished_ok(), true);
+ BOOST_CHECK_EQUAL (b->running(), true);
+ b->set_finished_ok ();
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (b->finished_ok(), true);
+
+ /* Two jobs, dependency, first fails */
+ a.reset (new TestJob (s, o, &log, shared_ptr<Job> ()));
+ b.reset (new TestJob (s, o, &log, a));
+
+ JobManager::instance()->add (a);
+ JobManager::instance()->add (b);
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (a->running(), true);
+ BOOST_CHECK_EQUAL (b->running(), false);
+ a->set_finished_error ();
+ dvdomatic_sleep (2);
+ BOOST_CHECK_EQUAL (a->finished_in_error(), true);
+ BOOST_CHECK_EQUAL (b->running(), false);
+}