#include <iostream>
#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
+#include <libdcp/picture_asset.h>
#include "encoder.h"
#include "util.h"
#include "options.h"
using namespace boost;
int const Encoder::_history_size = 25;
+unsigned int const Encoder::_maximum_frames_in_memory = 8;
/** @param f Film that we are encoding.
* @param o Options.
, _audio_frames_out (0)
#ifdef HAVE_SWRESAMPLE
, _swr_context (0)
-#endif
+#endif
+ , _have_a_real_frame (false)
, _terminate_encoder (false)
, _writer_thread (0)
- , _terminate_writer (false)
+ , _finish_writer (false)
+ , _last_written_frame (-1)
{
if (_film->audio_stream()) {
/* Create sound output files with .tmp suffixes; we will rename
{
close_sound_files ();
terminate_worker_threads ();
- terminate_writer_thread ();
+ finish_writer_thread ();
}
void
}
}
+ /* XXX! */
+ _picture_asset.reset (
+ new libdcp::MonoPictureAsset (
+ _film->dir (_film->dcp_name()),
+ String::compose ("video_%1.mxf", 0),
+ DCPFrameRate (_film->frames_per_second()).frames_per_second,
+ _film->format()->dcp_size()
+ )
+ );
+
+ _picture_asset_writer = _picture_asset->start_write ();
_writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
}
lock.unlock ();
terminate_worker_threads ();
- terminate_writer_thread ();
_film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
_film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
try {
shared_ptr<EncodedData> e = (*i)->encode_locally ();
- e->write (_film, (*i)->frame ());
+ {
+ boost::mutex::scoped_lock lock2 (_writer_mutex);
+ _write_queue.push_back (make_pair (e, (*i)->frame ()));
+ _writer_condition.notify_all ();
+ }
frame_done ();
} catch (std::exception& e) {
_film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
}
}
- /* Mop up any unwritten things in the writer's queue */
- for (list<pair<shared_ptr<EncodedData>, int> >::iterator i = _write_queue.begin(); i != _write_queue.end(); ++i) {
- i->first->write (_opt, i->second);
- }
-
- /* Now do links (or copies on windows) to duplicate frames */
- for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
- link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false));
- link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false));
- }
+ finish_writer_thread ();
+ _picture_asset_writer->finalize ();
}
/** @return an estimate of the current number of frames we are encoding per second,
return;
}
- if (same && _last_real_frame) {
- /* Use the last frame that we encoded. We need to postpone doing the actual link,
- as on windows the link is really a copy and the reference frame might not have
- finished encoding yet.
+ if (same && _have_a_real_frame) {
+ /* Use the last frame that we encoded. We do this by putting a null encoded
+ frame straight onto the writer's queue. It will know to duplicate the previous frame
+ in this case.
*/
- _links_required.push_back (make_pair (_last_real_frame.get(), _video_frames_out));
+ boost::mutex::scoped_lock lock2 (_writer_mutex);
+ _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
} else {
/* Queue this new frame for encoding */
pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
));
_worker_condition.notify_all ();
- _last_real_frame = _video_frames_out;
+ _have_a_real_frame = true;
}
++_video_frames_in;
++_video_frames_out;
if (dfr.repeat) {
- _links_required.push_back (make_pair (_video_frames_out, _video_frames_out - 1));
+ boost::mutex::scoped_lock lock2 (_writer_mutex);
+ _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
++_video_frames_out;
}
}
}
void
-Encoder::terminate_writer_thread ()
+Encoder::finish_writer_thread ()
{
if (!_writer_thread) {
return;
}
boost::mutex::scoped_lock lock (_writer_mutex);
- _terminate_writer = true;
+ _finish_writer = true;
_writer_condition.notify_all ();
lock.unlock ();
#endif
}
+struct WriteQueueSorter
+{
+ bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
+ return a.second < b.second;
+ }
+};
+
void
Encoder::writer_thread ()
{
while (1)
{
boost::mutex::scoped_lock lock (_writer_mutex);
- TIMING ("writer sleeps with a queue of %1", _write_queue.size());
- while (_write_queue.empty() && !_terminate_writer) {
- _writer_condition.wait (lock);
+
+ while (1) {
+ if (_finish_writer ||
+ _write_queue.size() > _maximum_frames_in_memory ||
+ (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) {
+
+ break;
+ }
+
+ TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size());
+ _writer_condition.wait (lock);
+ TIMING ("writer wakes with a queue of %1", _write_queue.size());
+
+ _write_queue.sort (WriteQueueSorter ());
}
- TIMING ("writer wakes with a queue of %1", _write_queue.size());
- if (_terminate_writer) {
+ if (_finish_writer && _write_queue.empty() && _pending.empty()) {
return;
}
- pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
- _write_queue.pop_front ();
+ /* Write any frames that we can write; i.e. those that are in sequence */
+ while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) {
+ pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
+ _write_queue.pop_front ();
- lock.unlock ();
- encoded.first->write (_opt, encoded.second);
- lock.lock ();
+ lock.unlock ();
+ /* XXX: write to mxf */
+ _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
+ if (encoded.first) {
+ _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
+ _last_written = encoded.first;
+ } else {
+ _picture_asset_writer->write (_last_written->data(), _last_written->size());
+ }
+ lock.lock ();
+
+ ++_last_written_frame;
+ }
+
+ while (_write_queue.size() > _maximum_frames_in_memory) {
+ /* Too many frames in memory which can't yet be written to the stream.
+ Put some to disk.
+ */
+
+ pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
+ _write_queue.pop_back ();
+ if (!encoded.first) {
+ /* This is a `repeat-last' frame, so no need to write it to disk */
+ continue;
+ }
+
+ lock.unlock ();
+ _film->log()->log (String::compose ("Writer full; pushes %1 to disk", encoded.second));
+ encoded.first->write (_film, encoded.second);
+ lock.lock ();
+
+ _pending.push_back (encoded.second);
+ }
+
+ while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
+ /* We have some space in memory. Fetch some frames back off disk. */
+
+ _pending.sort ();
+ int const fetch = _pending.front ();
+
+ lock.unlock ();
+ _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
+ shared_ptr<EncodedData> encoded;
+ if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
+ /* It's an actual frame (not a repeat-last); load it in */
+ encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
+ }
+ lock.lock ();
+
+ _write_queue.push_back (make_pair (encoded, fetch));
+ _pending.remove (fetch);
+ }
}
}
class DCPVideoFrame;
class EncodedData;
+namespace libdcp {
+ class MonoPictureAsset;
+ class MonoPictureAssetWriter;
+}
+
/** @class Encoder
* @brief Encoder to J2K and WAV for DCP.
*
int64_t _audio_frames_out;
void writer_thread ();
- void terminate_writer_thread ();
+ void finish_writer_thread ();
#if HAVE_SWRESAMPLE
SwrContext* _swr_context;
#endif
- /** List of links that we need to create when all frames have been processed;
- * such that we need to call link (first, second) for each member of this list.
- * In other words, `first' is a `real' frame and `second' should be a link to `first'.
- * Frames are DCP frames.
- */
- std::list<std::pair<int, int> > _links_required;
-
std::vector<SNDFILE*> _sound_files;
- boost::optional<int> _last_real_frame;
+ bool _have_a_real_frame;
bool _terminate_encoder;
std::list<boost::shared_ptr<DCPVideoFrame> > _encode_queue;
std::list<boost::thread *> _worker_threads;
boost::condition _worker_condition;
boost::thread* _writer_thread;
- bool _terminate_writer;
+ bool _finish_writer;
std::list<std::pair<boost::shared_ptr<EncodedData>, int> > _write_queue;
mutable boost::mutex _writer_mutex;
boost::condition _writer_condition;
+ boost::shared_ptr<EncodedData> _last_written;
+ std::list<int> _pending;
+ int _last_written_frame;
+ static const unsigned int _maximum_frames_in_memory;
+
+ boost::shared_ptr<libdcp::MonoPictureAsset> _picture_asset;
+ boost::shared_ptr<libdcp::MonoPictureAssetWriter> _picture_asset_writer;
};
#endif