Try to fix up paths for video MXFs, hashes and temporarily-stored frames.
[dcpomatic.git] / src / lib / encoder.cc
index 0d5fc1c6efbca6dd8a843a284b7dfde905d3959e..978d787e88985c97513b2dc425b635bd7bd54c97 100644 (file)
@@ -37,6 +37,7 @@
 #include "server.h"
 #include "format.h"
 #include "cross.h"
+#include "writer.h"
 
 using std::pair;
 using std::string;
@@ -48,51 +49,29 @@ using std::make_pair;
 using namespace boost;
 
 int const Encoder::_history_size = 25;
-unsigned int const Encoder::_maximum_frames_in_memory = 8;
 
 /** @param f Film that we are encoding.
  *  @param o Options.
  */
-Encoder::Encoder (shared_ptr<const Film> f)
+Encoder::Encoder (shared_ptr<Film> f)
        : _film (f)
-       , _just_skipped (false)
        , _video_frames_in (0)
-       , _audio_frames_in (0)
        , _video_frames_out (0)
-       , _audio_frames_out (0)
 #ifdef HAVE_SWRESAMPLE   
        , _swr_context (0)
 #endif
        , _have_a_real_frame (false)
        , _terminate_encoder (false)
-       , _writer_thread (0)
-       , _finish_writer (false)
-       , _last_written_frame (-1)
 {
-       if (_film->audio_stream()) {
-               /* Create sound output files with .tmp suffixes; we will rename
-                  them if and when we complete.
-               */
-               for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
-                       SF_INFO sf_info;
-                       sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
-                       /* We write mono files */
-                       sf_info.channels = 1;
-                       sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
-                       SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
-                       if (f == 0) {
-                               throw CreateFileError (_film->multichannel_audio_out_path (i, true));
-                       }
-                       _sound_files.push_back (f);
-               }
-       }
+       
 }
 
 Encoder::~Encoder ()
 {
-       close_sound_files ();
        terminate_worker_threads ();
-       finish_writer_thread ();
+       if (_writer) {
+               _writer->finish ();
+       }
 }
 
 void
@@ -139,18 +118,7 @@ Encoder::process_begin ()
                }
        }
 
-       /* XXX! */
-       _picture_asset.reset (
-               new libdcp::MonoPictureAsset (
-                       _film->dir (_film->dcp_name()),
-                       String::compose ("video_%1.mxf", 0),
-                       DCPFrameRate (_film->frames_per_second()).frames_per_second,
-                       _film->format()->dcp_size()
-                       )
-               );
-       
-       _picture_asset_writer = _picture_asset->start_write ();
-       _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
+       _writer.reset (new Writer (_film));
 }
 
 
@@ -174,25 +142,13 @@ Encoder::process_end ()
                        }
 
                        out->set_frames (frames);
-                       write_audio (out);
+                       _writer->write (out);
                }
 
                swr_free (&_swr_context);
        }
 #endif
 
-       if (_film->audio_stream()) {
-               close_sound_files ();
-               
-               /* Rename .wav.tmp files to .wav */
-               for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
-                       if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
-                               boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
-                       }
-                       boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
-               }
-       }
-
        boost::mutex::scoped_lock lock (_worker_mutex);
 
        _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
@@ -222,20 +178,15 @@ Encoder::process_end ()
        for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
                _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
                try {
-                       shared_ptr<EncodedData> e = (*i)->encode_locally ();
-                       {
-                               boost::mutex::scoped_lock lock2 (_writer_mutex);
-                               _write_queue.push_back (make_pair (e, (*i)->frame ()));
-                               _writer_condition.notify_all ();
-                       }
+                       _writer->write ((*i)->encode_locally(), (*i)->frame ());
                        frame_done ();
                } catch (std::exception& e) {
                        _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
                }
        }
 
-       finish_writer_thread ();
-       _picture_asset_writer->finalize ();
+       _writer->finish ();
+       _writer.reset ();
 }      
 
 /** @return an estimate of the current number of frames we are encoding per second,
@@ -255,14 +206,6 @@ Encoder::current_frames_per_second () const
        return _history_size / (seconds (now) - seconds (_time_history.back ()));
 }
 
-/** @return true if the last frame to be processed was skipped as it already existed */
-bool
-Encoder::skipping () const
-{
-       boost::mutex::scoped_lock (_history_mutex);
-       return _just_skipped;
-}
-
 /** @return Number of video frames that have been sent out */
 int
 Encoder::video_frames_out () const
@@ -278,7 +221,6 @@ void
 Encoder::frame_done ()
 {
        boost::mutex::scoped_lock lock (_history_mutex);
-       _just_skipped = false;
        
        struct timeval tv;
        gettimeofday (&tv, 0);
@@ -288,16 +230,6 @@ Encoder::frame_done ()
        }
 }
 
-/** Called by a subclass when it has just skipped the processing
-    of a frame because it has already been done.
-*/
-void
-Encoder::frame_skipped ()
-{
-       boost::mutex::scoped_lock lock (_history_mutex);
-       _just_skipped = true;
-}
-
 void
 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
 {
@@ -321,19 +253,10 @@ Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Su
                return;
        }
 
-       /* Only do the processing if we don't already have a file for this frame */
-       if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
-               frame_skipped ();
-               return;
-       }
-
        if (same && _have_a_real_frame) {
-               /* Use the last frame that we encoded.  We do this by putting a null encoded
-                  frame straight onto the writer's queue.  It will know to duplicate the previous frame
-                  in this case.
-               */
-               boost::mutex::scoped_lock lock2 (_writer_mutex);
-               _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
+               /* Use the last frame that we encoded. */
+               _writer->repeat (_video_frames_out);
+               frame_done ();
        } else {
                /* Queue this new frame for encoding */
                pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
@@ -356,9 +279,9 @@ Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Su
        ++_video_frames_out;
 
        if (dfr.repeat) {
-               boost::mutex::scoped_lock lock2 (_writer_mutex);
-               _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
+               _writer->repeat (_video_frames_out);
                ++_video_frames_out;
+               frame_done ();
        }
 }
 
@@ -406,31 +329,9 @@ Encoder::process_audio (shared_ptr<AudioBuffers> data)
                data = b;
        }
 
-       write_audio (data);
-       
-       _audio_frames_in += data->frames ();
-}
-
-void
-Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
-{
-       for (int i = 0; i < audio->channels(); ++i) {
-               sf_write_float (_sound_files[i], audio->data(i), audio->frames());
-       }
-
-       _audio_frames_out += audio->frames ();
+       _writer->write (data);
 }
 
-void
-Encoder::close_sound_files ()
-{
-       for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
-               sf_close (*i);
-       }
-
-       _sound_files.clear ();
-}      
-
 void
 Encoder::terminate_worker_threads ()
 {
@@ -445,23 +346,6 @@ Encoder::terminate_worker_threads ()
        }
 }
 
-void
-Encoder::finish_writer_thread ()
-{
-       if (!_writer_thread) {
-               return;
-       }
-       
-       boost::mutex::scoped_lock lock (_writer_mutex);
-       _finish_writer = true;
-       _writer_condition.notify_all ();
-       lock.unlock ();
-
-       _writer_thread->join ();
-       delete _writer_thread;
-       _writer_thread = 0;
-}
-
 void
 Encoder::encoder_thread (ServerDescription* server)
 {
@@ -526,9 +410,8 @@ Encoder::encoder_thread (ServerDescription* server)
                }
 
                if (encoded) {
-                       boost::mutex::scoped_lock lock2 (_writer_mutex);
-                       _write_queue.push_back (make_pair (encoded, vf->frame ()));
-                       _writer_condition.notify_all ();
+                       _writer->write (encoded, vf->frame ());
+                       frame_done ();
                } else {
                        lock.lock ();
                        _film->log()->log (
@@ -546,110 +429,3 @@ Encoder::encoder_thread (ServerDescription* server)
                _worker_condition.notify_all ();
        }
 }
-
-void
-Encoder::link (string a, string b) const
-{
-#ifdef DVDOMATIC_POSIX                 
-       int const r = symlink (a.c_str(), b.c_str());
-       if (r) {
-               throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
-       }
-#endif
-       
-#ifdef DVDOMATIC_WINDOWS
-       boost::filesystem::copy_file (a, b);
-#endif                 
-}
-
-struct WriteQueueSorter
-{
-       bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
-               return a.second < b.second;
-       }
-};
-
-void
-Encoder::writer_thread ()
-{
-       while (1)
-       {
-               boost::mutex::scoped_lock lock (_writer_mutex);
-
-               while (1) {
-                       if (_finish_writer ||
-                           _write_queue.size() > _maximum_frames_in_memory ||
-                           (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) {
-                                   
-                                   break;
-                           }
-
-                           TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size());
-                           _writer_condition.wait (lock);
-                           TIMING ("writer wakes with a queue of %1", _write_queue.size());
-
-                           _write_queue.sort (WriteQueueSorter ());
-               }
-
-               if (_finish_writer && _write_queue.empty() && _pending.empty()) {
-                       return;
-               }
-
-               /* Write any frames that we can write; i.e. those that are in sequence */
-               while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) {
-                       pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
-                       _write_queue.pop_front ();
-
-                       lock.unlock ();
-                       _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
-                       if (encoded.first) {
-                               _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
-                               _last_written = encoded.first;
-                       } else {
-                               _picture_asset_writer->write (_last_written->data(), _last_written->size());
-                       }
-                       lock.lock ();
-
-                       ++_last_written_frame;
-               }
-
-               while (_write_queue.size() > _maximum_frames_in_memory) {
-                       /* Too many frames in memory which can't yet be written to the stream.
-                          Put some to disk.
-                       */
-
-                       pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.back ();
-                       _write_queue.pop_back ();
-                       if (!encoded.first) {
-                               /* This is a `repeat-last' frame, so no need to write it to disk */
-                               continue;
-                       }
-
-                       lock.unlock ();
-                       _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, encoded.second));
-                       encoded.first->write (_film, encoded.second);
-                       lock.lock ();
-
-                       _pending.push_back (encoded.second);
-               }
-
-               while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
-                       /* We have some space in memory.  Fetch some frames back off disk. */
-
-                       _pending.sort ();
-                       int const fetch = _pending.front ();
-
-                       lock.unlock ();
-                       _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
-                       shared_ptr<EncodedData> encoded;
-                       if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
-                               /* It's an actual frame (not a repeat-last); load it in */
-                               encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
-                       }
-                       lock.lock ();
-
-                       _write_queue.push_back (make_pair (encoded, fetch));
-                       _pending.remove (fetch);
-               }
-       }
-}