#include "data.h"
#include <libcxml/cxml.h>
#include <boost/lambda/lambda.hpp>
+#include <boost/foreach.hpp>
#include <iostream>
#include "i18n.h"
int const Encoder::_history_size = 25;
/** @param f Film that we are encoding */
-Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j, shared_ptr<Writer> writer)
- : _film (f)
+Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
+ : _film (film)
, _job (j)
, _video_frames_enqueued (0)
+ , _left_done (false)
+ , _right_done (false)
, _terminate (false)
, _writer (writer)
{
-
+ servers_list_changed ();
}
Encoder::~Encoder ()
_writer->set_encoder_threads (_threads.size ());
if (!ServerFinder::instance()->disabled ()) {
- _server_found_connection = ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
+ _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
}
}
}
lock.unlock ();
-
+
terminate_threads ();
LOG_GENERAL (N_("Mopping up %1"), _queue.size());
LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
}
}
-}
+}
/** @return an estimate of the current number of frames we are encoding per second,
* or 0 if not known.
Encoder::frame_done ()
{
boost::mutex::scoped_lock lock (_state_mutex);
-
+
struct timeval tv;
gettimeofday (&tv, 0);
_time_history.push_front (tv);
Encoder::enqueue (shared_ptr<PlayerVideo> pv)
{
_waker.nudge ();
-
+
boost::mutex::scoped_lock lock (_mutex);
/* XXX: discard 3D here if required */
/* We can fake-write this frame */
_writer->fake_write (_video_frames_enqueued, pv->eyes ());
frame_done ();
- } else if (_last_player_video && pv->same (_last_player_video)) {
- _writer->repeat (_video_frames_enqueued, pv->eyes ());
} else if (pv->has_j2k ()) {
/* This frame already has JPEG2000 data, so just write it */
_writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
+ } else if (_last_player_video && pv->same (_last_player_video)) {
+ _writer->repeat (_video_frames_enqueued, pv->eyes ());
} else {
/* Queue this new frame for encoding */
LOG_TIMING ("adding to queue of %1", _queue.size ());
_film->video_frame_rate(),
_film->j2k_bandwidth(),
_film->resolution(),
- _film->burn_subtitles(),
_film->log()
)
));
_empty_condition.notify_all ();
}
- if (pv->eyes() != EYES_LEFT) {
+ switch (pv->eyes ()) {
+ case EYES_BOTH:
+ ++_video_frames_enqueued;
+ break;
+ case EYES_LEFT:
+ _left_done = true;
+ break;
+ case EYES_RIGHT:
+ _right_done = true;
+ break;
+ default:
+ break;
+ }
+
+ if (_left_done && _right_done) {
++_video_frames_enqueued;
+ _left_done = _right_done = false;
}
_last_player_video = pv;
}
_threads.clear ();
+ _terminate = false;
}
void
encodings.
*/
int remote_backoff = 0;
-
+
while (true) {
LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
shared_ptr<DCPVideo> vf = _queue.front ();
LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
_queue.pop_front ();
-
+
lock.unlock ();
optional<Data> encoded;
if (server) {
try {
encoded = vf->encode_remotely (server.get ());
-
+
if (remote_backoff > 0) {
LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
}
-
+
/* This job succeeded, so remove any backoff */
remote_backoff = 0;
-
+
} catch (std::exception& e) {
if (remote_backoff < 60) {
/* back off more */
vf->index(), server->host_name(), e.what(), remote_backoff
);
}
-
+
} else {
try {
LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
}
void
-Encoder::server_found (ServerDescription s)
+Encoder::servers_list_changed ()
{
- add_worker_threads (s);
+ terminate_threads ();
+ BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
+ add_worker_threads (i);
+ }
}