Hacks to multi-thread GPU encoding.
[dcpomatic.git] / src / lib / j2k_encoder.cc
index 82e9ba76c7b28a12868f98f4cbe6faab09dd34a5..b40f5c95010255d3b1f6daf6052a460429c96443 100644 (file)
 #include "encode_server_finder.h"
 #include "player.h"
 #include "player_video.h"
+#include "timer.h"
 #include "encode_server_description.h"
 #include "compose.hpp"
 #include <libcxml/cxml.h>
 #include <boost/foreach.hpp>
+#include <boost/thread/barrier.hpp>
 #include <iostream>
 
 #include "i18n.h"
 
+using std::min;
 using std::list;
 using std::cout;
 using std::exception;
@@ -56,6 +59,9 @@ using boost::optional;
 using dcp::Data;
 using namespace dcpomatic;
 
+/* XXX */
+#define GPU_THREADS 4
+
 /** @param film Film that we are encoding.
  *  @param writer Writer that we are using.
  */
@@ -64,6 +70,7 @@ J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
        , _history (200)
        , _cpu_backend (new J2KEncoderCPUBackend())
        , _writer (writer)
+       , _gpu_barrier (GPU_THREADS)
 {
        servers_list_changed ();
 }
@@ -194,7 +201,8 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
        /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
           when there are no threads.
        */
-       while (_queue.size() >= (threads * 2) + 1) {
+       /* XXX: needs correct handling for GPU */
+       while (_queue.size() >= (threads * 2) + 1 + 48) {
                LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
                _full_condition.wait (queue_lock);
                LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
@@ -218,11 +226,14 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
                LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
                /* This frame already has J2K data, so just write it */
                _writer->write (pv->j2k(), position, pv->eyes ());
+               /*
        } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
                LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
                _writer->repeat (position, pv->eyes ());
+               */
        } else {
                LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
+               std::cout << "queue " << to_string(time) << "\n";
                /* Queue this new frame for encoding */
                LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
                _queue.push_back (shared_ptr<DCPVideo> (
@@ -276,13 +287,14 @@ try
 
                LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
                boost::mutex::scoped_lock lock (_queue_mutex);
-               while (static_cast<int>(_queue.size()) < backend->quantity()) {
+               while (_queue.empty()) {
                        _empty_condition.wait (lock);
                }
 
                LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
                vector<shared_ptr<DCPVideo> > video;
-               for (int i = 0; i < backend->quantity(); ++i) {
+               int to_do = min(static_cast<int>(_queue.size()), backend->quantity());
+               for (int i = 0; i < to_do; ++i) {
                        video.push_back (_queue.front());
                        _queue.pop_front();
                }
@@ -292,7 +304,8 @@ try
                   block has thread interruption disabled.
                */
                {
-                       boost::this_thread::disable_interruption dis;
+                       // XXX HACK!!
+                       //boost::this_thread::disable_interruption dis;
 
                        LOG_TIMING ("encoder-pop thread=%1 frames=%2 eyes=%3", thread_id(), video.size());
 
@@ -301,11 +314,11 @@ try
                        vector<Data> encoded = backend->encode (video);
 
                        if (!encoded.empty()) {
-                               DCPOMATIC_ASSERT (static_cast<int>(encoded.size()) == backend->quantity());
-                               for (int i = 0; i < backend->quantity(); ++i) {
+                               DCPOMATIC_ASSERT (encoded.size() == video.size());
+                               for (size_t i = 0; i < video.size(); ++i) {
                                        _writer->write (encoded[i], video[i]->index(), video[i]->frame()->eyes());
                                }
-                               frames_done (backend->quantity());
+                               frames_done (video.size());
                        } else {
                                lock.lock ();
                                LOG_GENERAL (N_("[%1] J2KEncoder thread pushes %2 frames back onto queue after failure"), thread_id(), video.size());
@@ -325,8 +338,16 @@ catch (boost::thread_interrupted& e) {
        /* Ignore these and just stop the thread */
        _full_condition.notify_all ();
 }
+catch (std::exception& e)
+{
+       std::cout << "encoder thread crashed " << e.what() << "\n";
+       store_current ();
+       /* Wake anything waiting on _full_condition so it can see the exception */
+       _full_condition.notify_all ();
+}
 catch (...)
 {
+       std::cout << "encoder thread crashed.\n";
        store_current ();
        /* Wake anything waiting on _full_condition so it can see the exception */
        _full_condition.notify_all ();
@@ -363,8 +384,10 @@ J2KEncoder::servers_list_changed ()
                }
        }
 
-       shared_ptr<J2KEncoderFastvideoBackend> fastvideo(new J2KEncoderFastvideoBackend());
-       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, fastvideo));
+       for (int i = 0; i < GPU_THREADS; ++i) {
+               shared_ptr<J2KEncoderFastvideoBackend> fastvideo(new J2KEncoderFastvideoBackend(_gpu_barrier));
+               _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, fastvideo));
+       }
 
        _writer->set_encoder_threads (_threads->size());
 }