From: Carl Hetherington Date: Thu, 17 Jan 2013 23:42:59 +0000 (+0000) Subject: Sort of works to a first-order approximation. X-Git-Tag: v2.0.48~1337^2~742 X-Git-Url: https://git.carlh.net/gitweb/?a=commitdiff_plain;h=cadd50fe2609a1ad9963389d65d8e91f85226752;p=dcpomatic.git Sort of works to a first-order approximation. --- diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index 1c408270e..da864ad9f 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -380,6 +380,21 @@ EncodedData::EncodedData (int s) } +EncodedData::EncodedData (string file) +{ + _size = boost::filesystem::file_size (file); + _data = new uint8_t[_size]; + + FILE* f = fopen (file.c_str(), "rb"); + if (!f) { + throw FileError ("could not open file for reading", file); + } + + fread (_data, 1, _size, f); + fclose (f); +} + + EncodedData::~EncodedData () { delete[] _data; diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index e311724d8..b446352c7 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -43,6 +43,8 @@ public: */ EncodedData (int s); + EncodedData (std::string f); + virtual ~EncodedData (); void send (boost::shared_ptr socket); diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index 0f48310ef..c6960d0d1 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include "encoder.h" #include "util.h" #include "options.h" @@ -47,6 +48,7 @@ using std::make_pair; using namespace boost; int const Encoder::_history_size = 25; +unsigned int const Encoder::_maximum_frames_in_memory = 8; /** @param f Film that we are encoding. * @param o Options. @@ -60,10 +62,12 @@ Encoder::Encoder (shared_ptr f) , _audio_frames_out (0) #ifdef HAVE_SWRESAMPLE , _swr_context (0) -#endif +#endif + , _have_a_real_frame (false) , _terminate_encoder (false) , _writer_thread (0) - , _terminate_writer (false) + , _finish_writer (false) + , _last_written_frame (-1) { if (_film->audio_stream()) { /* Create sound output files with .tmp suffixes; we will rename @@ -88,7 +92,7 @@ Encoder::~Encoder () { close_sound_files (); terminate_worker_threads (); - terminate_writer_thread (); + finish_writer_thread (); } void @@ -135,6 +139,17 @@ Encoder::process_begin () } } + /* XXX! */ + _picture_asset.reset ( + new libdcp::MonoPictureAsset ( + _film->dir (_film->dcp_name()), + String::compose ("video_%1.mxf", 0), + DCPFrameRate (_film->frames_per_second()).frames_per_second, + _film->format()->dcp_size() + ) + ); + + _picture_asset_writer = _picture_asset->start_write (); _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this)); } @@ -192,7 +207,6 @@ Encoder::process_end () lock.unlock (); terminate_worker_threads (); - terminate_writer_thread (); _film->log()->log ("Mopping up " + lexical_cast (_encode_queue.size())); @@ -209,23 +223,19 @@ Encoder::process_end () _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ())); try { shared_ptr e = (*i)->encode_locally (); - e->write (_film, (*i)->frame ()); + { + boost::mutex::scoped_lock lock2 (_writer_mutex); + _write_queue.push_back (make_pair (e, (*i)->frame ())); + _writer_condition.notify_all (); + } frame_done (); } catch (std::exception& e) { _film->log()->log (String::compose ("Local encode failed (%1)", e.what ())); } } - /* Mop up any unwritten things in the writer's queue */ - for (list, int> >::iterator i = _write_queue.begin(); i != _write_queue.end(); ++i) { - i->first->write (_opt, i->second); - } - - /* Now do links (or copies on windows) to duplicate frames */ - for (list >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) { - link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false)); - link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false)); - } + finish_writer_thread (); + _picture_asset_writer->finalize (); } /** @return an estimate of the current number of frames we are encoding per second, @@ -317,12 +327,13 @@ Encoder::process_video (shared_ptr image, bool same, boost::shared_ptr (), _video_frames_out)); } else { /* Queue this new frame for encoding */ pair const s = Filter::ffmpeg_strings (_film->filters()); @@ -338,14 +349,15 @@ Encoder::process_video (shared_ptr image, bool same, boost::shared_ptr (), _video_frames_out)); ++_video_frames_out; } } @@ -434,14 +446,14 @@ Encoder::terminate_worker_threads () } void -Encoder::terminate_writer_thread () +Encoder::finish_writer_thread () { if (!_writer_thread) { return; } boost::mutex::scoped_lock lock (_writer_mutex); - _terminate_writer = true; + _finish_writer = true; _writer_condition.notify_all (); lock.unlock (); @@ -550,27 +562,95 @@ Encoder::link (string a, string b) const #endif } +struct WriteQueueSorter +{ + bool operator() (pair, int> const & a, pair, int> const & b) { + return a.second < b.second; + } +}; + void Encoder::writer_thread () { while (1) { boost::mutex::scoped_lock lock (_writer_mutex); - TIMING ("writer sleeps with a queue of %1", _write_queue.size()); - while (_write_queue.empty() && !_terminate_writer) { - _writer_condition.wait (lock); + + while (1) { + if (_finish_writer || + _write_queue.size() > _maximum_frames_in_memory || + (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) { + + break; + } + + TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size()); + _writer_condition.wait (lock); + TIMING ("writer wakes with a queue of %1", _write_queue.size()); + + _write_queue.sort (WriteQueueSorter ()); } - TIMING ("writer wakes with a queue of %1", _write_queue.size()); - if (_terminate_writer) { + if (_finish_writer && _write_queue.empty() && _pending.empty()) { return; } - pair, int> encoded = _write_queue.front (); - _write_queue.pop_front (); + /* Write any frames that we can write; i.e. those that are in sequence */ + while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) { + pair, int> encoded = _write_queue.front (); + _write_queue.pop_front (); - lock.unlock (); - encoded.first->write (_opt, encoded.second); - lock.lock (); + lock.unlock (); + /* XXX: write to mxf */ + _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second)); + if (encoded.first) { + _picture_asset_writer->write (encoded.first->data(), encoded.first->size()); + _last_written = encoded.first; + } else { + _picture_asset_writer->write (_last_written->data(), _last_written->size()); + } + lock.lock (); + + ++_last_written_frame; + } + + while (_write_queue.size() > _maximum_frames_in_memory) { + /* Too many frames in memory which can't yet be written to the stream. + Put some to disk. + */ + + pair, int> encoded = _write_queue.front (); + _write_queue.pop_back (); + if (!encoded.first) { + /* This is a `repeat-last' frame, so no need to write it to disk */ + continue; + } + + lock.unlock (); + _film->log()->log (String::compose ("Writer full; pushes %1 to disk", encoded.second)); + encoded.first->write (_film, encoded.second); + lock.lock (); + + _pending.push_back (encoded.second); + } + + while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) { + /* We have some space in memory. Fetch some frames back off disk. */ + + _pending.sort (); + int const fetch = _pending.front (); + + lock.unlock (); + _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch)); + shared_ptr encoded; + if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) { + /* It's an actual frame (not a repeat-last); load it in */ + encoded.reset (new EncodedData (_film->frame_out_path (fetch, false))); + } + lock.lock (); + + _write_queue.push_back (make_pair (encoded, fetch)); + _pending.remove (fetch); + } } } diff --git a/src/lib/encoder.h b/src/lib/encoder.h index 96e7a1d25..3e2b5d957 100644 --- a/src/lib/encoder.h +++ b/src/lib/encoder.h @@ -52,6 +52,11 @@ class ServerDescription; class DCPVideoFrame; class EncodedData; +namespace libdcp { + class MonoPictureAsset; + class MonoPictureAssetWriter; +} + /** @class Encoder * @brief Encoder to J2K and WAV for DCP. * @@ -121,22 +126,15 @@ private: int64_t _audio_frames_out; void writer_thread (); - void terminate_writer_thread (); + void finish_writer_thread (); #if HAVE_SWRESAMPLE SwrContext* _swr_context; #endif - /** List of links that we need to create when all frames have been processed; - * such that we need to call link (first, second) for each member of this list. - * In other words, `first' is a `real' frame and `second' should be a link to `first'. - * Frames are DCP frames. - */ - std::list > _links_required; - std::vector _sound_files; - boost::optional _last_real_frame; + bool _have_a_real_frame; bool _terminate_encoder; std::list > _encode_queue; std::list _worker_threads; @@ -144,10 +142,17 @@ private: boost::condition _worker_condition; boost::thread* _writer_thread; - bool _terminate_writer; + bool _finish_writer; std::list, int> > _write_queue; mutable boost::mutex _writer_mutex; boost::condition _writer_condition; + boost::shared_ptr _last_written; + std::list _pending; + int _last_written_frame; + static const unsigned int _maximum_frames_in_memory; + + boost::shared_ptr _picture_asset; + boost::shared_ptr _picture_asset_writer; }; #endif