wip: encoding; crashes on startup.
[dcpomatic.git] / src / lib / j2k_encoder.cc
index e5f1271006b81953979f903b1ca86ae3413c5a47..8c915b567364deb25d4a919a8dcd0565a7128fef 100644 (file)
@@ -1,5 +1,5 @@
 /*
-    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+    Copyright (C) 2012-2020 Carl Hetherington <cth@carlh.net>
 
     This file is part of DCP-o-matic.
 
  */
 
 #include "j2k_encoder.h"
+#include "j2k_encoder_backend.h"
+#include "j2k_encoder_cpu_backend.h"
+#include "j2k_encoder_remote_backend.h"
+#include "j2k_encoder_fastvideo_backend.h"
 #include "util.h"
 #include "film.h"
 #include "log.h"
@@ -44,6 +48,8 @@
 
 using std::list;
 using std::cout;
+using std::exception;
+using std::vector;
 using boost::shared_ptr;
 using boost::weak_ptr;
 using boost::optional;
@@ -56,6 +62,7 @@ using namespace dcpomatic;
 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
        : _film (film)
        , _history (200)
+       , _cpu_backend (new J2KEncoderCPUBackend())
        , _writer (writer)
 {
        servers_list_changed ();
@@ -63,14 +70,7 @@ J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
 
 J2KEncoder::~J2KEncoder ()
 {
-       try {
-               terminate_threads ();
-       } catch (...) {
-               /* Destructors must not throw exceptions; anything bad
-                  happening now is too late to worry about anyway,
-                  I think.
-               */
-       }
+       terminate_threads ();
 }
 
 void
@@ -115,6 +115,9 @@ J2KEncoder::end ()
 
        terminate_threads ();
 
+       /* Something might have been thrown during terminate_threads */
+       rethrow ();
+
        LOG_GENERAL (N_("Mopping up %1"), _queue.size());
 
        /* The following sequence of events can occur in the above code:
@@ -126,15 +129,18 @@ J2KEncoder::end ()
             So just mop up anything left in the queue here.
        */
 
-       for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
-               LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
+       BOOST_FOREACH (shared_ptr<DCPVideo> i, _queue) {
+               LOG_GENERAL (N_("Encode left-over frame %1"), i->index());
                try {
+                       /* XXX: ewww */
+                       vector<shared_ptr<DCPVideo> > frames;
+                       frames.push_back (i);
                        _writer->write (
-                               (*i)->encode_locally(),
-                               (*i)->index(),
-                               (*i)->eyes()
+                               _cpu_backend->encode(frames).front(),
+                               i->index(),
+                               i->frame()->eyes()
                                );
-                       frame_done ();
+                       frames_done (1);
                } catch (std::exception& e) {
                        LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
                }
@@ -163,9 +169,9 @@ J2KEncoder::video_frames_enqueued () const
 
 /** Should be called when a frame has been encoded successfully */
 void
-J2KEncoder::frame_done ()
+J2KEncoder::frames_done (int number)
 {
-       _history.event ();
+       _history.event (number);
 }
 
 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
@@ -181,11 +187,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
 {
        _waker.nudge ();
 
-       size_t threads = 0;
-       {
-               boost::mutex::scoped_lock threads_lock (_threads_mutex);
-               threads = _threads.size ();
-       }
+       size_t threads = _threads->size();
 
        boost::mutex::scoped_lock queue_lock (_queue_mutex);
 
@@ -211,7 +213,7 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
                /* We can fake-write this frame */
                LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
                _writer->fake_write (position, pv->eyes ());
-               frame_done ();
+               frames_done (1);
        } else if (pv->has_j2k() && !_film->reencode_j2k()) {
                LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
                /* This frame already has J2K data, so just write it */
@@ -246,117 +248,76 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
 void
 J2KEncoder::terminate_threads ()
 {
-       boost::mutex::scoped_lock threads_lock (_threads_mutex);
+       boost::this_thread::disable_interruption dis;
 
-       int n = 0;
-       for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
-               LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
-               (*i)->interrupt ();
-               DCPOMATIC_ASSERT ((*i)->joinable ());
-               try {
-                       (*i)->join ();
-               } catch (boost::thread_interrupted& e) {
-                       /* This is to be expected */
-               }
-               delete *i;
-               LOG_GENERAL_NC ("Thread terminated");
-               ++n;
+       if (!_threads) {
+               return;
+       }
+
+       _threads->interrupt_all ();
+       try {
+               _threads->join_all ();
+       } catch (exception& e) {
+               LOG_ERROR ("join() threw an exception: %1", e.what());
+       } catch (...) {
+               LOG_ERROR_NC ("join() threw an exception");
        }
 
-       _threads.clear ();
+       _threads.reset ();
 }
 
 void
-J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
+J2KEncoder::encoder_thread (shared_ptr<J2KEncoderBackend> backend)
 try
 {
-       if (server) {
-               LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
-       } else {
-               LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
-       }
-
-       /* Number of seconds that we currently wait between attempts
-          to connect to the server; not relevant for localhost
-          encodings.
-       */
-       int remote_backoff = 0;
+       LOG_TIMING ("start-encoder-thread thread=%1", thread_id());
 
        while (true) {
 
                LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
                boost::mutex::scoped_lock lock (_queue_mutex);
-               while (_queue.empty ()) {
+               while (static_cast<int>(_queue.size()) < backend->quantity()) {
                        _empty_condition.wait (lock);
                }
 
                LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
-               shared_ptr<DCPVideo> vf = _queue.front ();
+               vector<shared_ptr<DCPVideo> > video;
+               for (int i = 0; i < backend->quantity(); ++i) {
+                       video.push_back (_queue.front());
+               }
 
-               /* We're about to commit to either encoding this frame or putting it back onto the queue,
+               /* We're about to commit to either encoding these frame(s) or putting them back onto the queue,
                   so we must not be interrupted until one or other of these things have happened.  This
                   block has thread interruption disabled.
                */
                {
                        boost::this_thread::disable_interruption dis;
 
-                       LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
-                       _queue.pop_front ();
+                       LOG_TIMING ("encoder-pop thread=%1 frames=%2 eyes=%3", thread_id(), video.size());
+                       for (int i = 0; i < backend->quantity(); ++i) {
+                               _queue.pop_front ();
+                       }
 
                        lock.unlock ();
 
-                       optional<Data> encoded;
-
-                       /* We need to encode this input */
-                       if (server) {
-                               try {
-                                       encoded = vf->encode_remotely (server.get ());
-
-                                       if (remote_backoff > 0) {
-                                               LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
-                                       }
-
-                                       /* This job succeeded, so remove any backoff */
-                                       remote_backoff = 0;
-
-                               } catch (std::exception& e) {
-                                       if (remote_backoff < 60) {
-                                               /* back off more */
-                                               remote_backoff += 10;
-                                       }
-                                       LOG_ERROR (
-                                               N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
-                                               vf->index(), server->host_name(), e.what(), remote_backoff
-                                               );
-                               }
+                       vector<Data> encoded = backend->encode (video);
 
-                       } else {
-                               try {
-                                       LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
-                                       encoded = vf->encode_locally ();
-                                       LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
-                               } catch (std::exception& e) {
-                                       /* This is very bad, so don't cope with it, just pass it on */
-                                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
-                                       throw;
+                       if (!encoded.empty()) {
+                               DCPOMATIC_ASSERT (static_cast<int>(encoded.size()) == backend->quantity());
+                               for (int i = 0; i < backend->quantity(); ++i) {
+                                       _writer->write (encoded[i], video[i]->index(), video[i]->frame()->eyes());
                                }
-                       }
-
-                       if (encoded) {
-                               _writer->write (encoded.get(), vf->index (), vf->eyes ());
-                               frame_done ();
+                               frames_done (backend->quantity());
                        } else {
                                lock.lock ();
-                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
-                               _queue.push_front (vf);
+                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames back onto queue after failure"), thread_id(), video.size());
+                               for (vector<shared_ptr<DCPVideo> >::const_reverse_iterator i = video.rbegin(); i != video.rend(); ++i) {
+                                       _queue.push_front (*i);
+                               }
                                lock.unlock ();
                        }
                }
 
-               if (remote_backoff > 0) {
-                       boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
-               }
-
                /* The queue might not be full any more, so notify anything that is waiting on that */
                lock.lock ();
                _full_condition.notify_all ();
@@ -377,32 +338,17 @@ void
 J2KEncoder::servers_list_changed ()
 {
        terminate_threads ();
+       _threads.reset (new boost::thread_group());
 
        /* XXX: could re-use threads */
 
-       boost::mutex::scoped_lock lm (_threads_mutex);
-
-#ifdef BOOST_THREAD_PLATFORM_WIN32
-       OSVERSIONINFO info;
-       info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
-       GetVersionEx (&info);
-       bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
-       if (windows_xp) {
-               LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
-       }
-#endif
-
        if (!Config::instance()->only_servers_encode ()) {
                for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
-                       boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription> ()));
 #ifdef DCPOMATIC_LINUX
+                       boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, _cpu_backend));
                        pthread_setname_np (t->native_handle(), "encode-worker");
-#endif
-                       _threads.push_back (t);
-#ifdef BOOST_THREAD_PLATFORM_WIN32
-                       if (windows_xp) {
-                               SetThreadAffinityMask (t->native_handle(), 1 << i);
-                       }
+#else
+                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, _cpu_backend));
 #endif
                }
        }
@@ -413,10 +359,14 @@ J2KEncoder::servers_list_changed ()
                }
 
                LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
+               shared_ptr<J2KEncoderRemoteBackend> backend (new J2KEncoderRemoteBackend(i));
                for (int j = 0; j < i.threads(); ++j) {
-                       _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i)));
+                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, backend));
                }
        }
 
-       _writer->set_encoder_threads (_threads.size ());
+       shared_ptr<J2KEncoderFastvideoBackend> fastvideo(new J2KEncoderFastvideoBackend());
+       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, fastvideo));
+
+       _writer->set_encoder_threads (_threads->size());
 }