allow dynamic process thread count resets
[ardour.git] / libs / ardour / graph.cc
index 9549a9dcc98bdb5c8f658028f253c154d2433923..29e95b67fdf7b4864fb29657591fa0c48b8863de 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "pbd/compose.h"
 #include "pbd/cpus.h"
+#include "pbd/debug_rt_alloc.h"
 
 #include "ardour/debug.h"
 #include "ardour/graph.h"
@@ -39,9 +40,21 @@ using namespace ARDOUR;
 using namespace PBD;
 using namespace std;
 
+#ifdef DEBUG_RT_ALLOC
+static Graph* graph = 0;
+extern "C" {
+
+int alloc_allowed ()
+{
+       return !graph->in_process_thread ();
+}
+
+}
+#endif
 
 Graph::Graph (Session & session) 
         : SessionHandleRef (session) 
+        , _quit_threads (false)
        , _execution_sem ("graph_execution", 0)
        , _callback_start_sem ("graph_start", 0)
        , _callback_done_sem ("graph_done", 0)
@@ -49,6 +62,11 @@ Graph::Graph (Session & session)
 {
         pthread_mutex_init( &_trigger_mutex, NULL);
 
+       /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
+          memory in the RT thread.
+       */
+       _trigger_queue.reserve (8192);
+
         _execution_tokens = 0;
 
         _current_chain = 0;
@@ -56,16 +74,44 @@ Graph::Graph (Session & session)
         _setup_chain   = 1;
         _quit_threads = false;
         _graph_empty = true;
+        
+        reset_thread_list ();
+
+        Config->ParameterChanged.connect_same_thread (processor_usage_connection, boost::bind (&Graph::parameter_changed, this, _1));
 
+#ifdef DEBUG_RT_ALLOC  
+       graph = this;
+       pbd_alloc_allowed = &::alloc_allowed;
+#endif 
+}
+
+void
+Graph::parameter_changed (std::string param)
+{
+        if (param == X_("processor-usage")) {
+                reset_thread_list ();
+        }
+}
+
+void
+Graph::reset_thread_list ()
+{
         int num_cpu = hardware_concurrency();
-        int num_threads = num_cpu;
+        uint32_t num_threads = num_cpu;
         int pu = Config->get_processor_usage ();
+       pthread_t a_thread;
+
+        Glib::Mutex::Lock lm (_session.engine().process_lock());
+
+        if (!_thread_list.empty()) {
+                drop_threads ();
+        }
 
         if (pu < 0) {
                 /* pu is negative: use "pu" less cores for DSP than appear to be available
                  */
 
-                if (-pu < num_threads) {
+                if ((uint32_t) -pu < num_threads) {
                         num_threads += pu; 
                 } else {
                         num_threads = 1;
@@ -73,49 +119,58 @@ Graph::Graph (Session & session)
         } else {
                 /* use "pu" cores, if available
                  */
-
-                if (pu <= num_threads) {
+                
+                if ((uint32_t) pu <= num_threads) {
                         num_threads = pu;
                 } 
         }
 
-        info << string_compose (_("Using %2 threads on %1 CPUs"), num_cpu, num_threads) << endmsg;
-
-       pthread_t a_thread;
-
        if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), &a_thread, 100000) == 0) {
                _thread_list.push_back (a_thread);
        }
 
-        for (int i = 1; i < num_threads; ++i) {
+        for (uint32_t i = 1; i < num_threads; ++i) {
                if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), &a_thread, 100000) == 0) {
                        _thread_list.push_back (a_thread);
                }
         }
+
+        info << string_compose (_("Using %2 threads on %1 CPUs"), _thread_list.size(), num_threads) << endmsg;
+        cerr << string_compose (_("Using %2 threads on %1 CPUs"), _thread_list.size(), num_threads) << endl;
 }
 
 void
 Graph::session_going_away()
+{
+        drop_threads ();
+
+        // now drop all references on the nodes.
+        _nodes_rt[0].clear();
+        _nodes_rt[1].clear();
+        _init_trigger_list[0].clear();
+        _init_trigger_list[1].clear();
+        _trigger_queue.clear();
+}
+
+void
+Graph::drop_threads ()
 {
         _quit_threads = true;
 
-        for (unsigned int i=0; i<_thread_list.size(); i++) {
+        for (unsigned int i=0; i< _thread_list.size(); i++) {
                _execution_sem.signal ();
         }
 
         _callback_start_sem.signal ();
 
-        for (list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); i++) {
+        for (list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); ++i) {
                 void* status;
                 pthread_join (*i, &status);
         }
 
-        // now drop all references on the nodes.
-        _nodes_rt[0].clear();
-        _nodes_rt[1].clear();
-        _init_trigger_list[0].clear();
-        _init_trigger_list[1].clear();
-        _trigger_queue.clear();
+        _thread_list.clear ();
+
+        _quit_threads = false;
 }
 
 void
@@ -177,7 +232,7 @@ void
 Graph::trigger (GraphNode* n)
 {
         pthread_mutex_lock (&_trigger_mutex);
-        _trigger_queue.push_backn);
+        _trigger_queue.push_back (n);
         pthread_mutex_unlock (&_trigger_mutex);
 }
 
@@ -196,26 +251,26 @@ Graph::dec_ref()
 void
 Graph::restart_cycle()
 {
-        //printf( "cycle_done chain: %d\n", _current_chain);
-
         // we are through. wakeup our caller.
+
   again:
         _callback_done_sem.signal ();
 
         // block until we are triggered.
         _callback_start_sem.wait();
-        if (_quit_threads)
-                return;
 
-        //printf( "cycle_start\n" );
+        if (_quit_threads) {
+                return;
+        }
 
         this->prep();
-        if (_graph_empty)
+
+        if (_graph_empty) {
                 goto again;
-        //printf( "cycle_start chain: %d\n", _current_chain);
+        }
 
         // returning will restart the cycle.
-        //  starting with waking up the others.
+        // starting with waking up the others.
 }
 
 static bool
@@ -328,7 +383,9 @@ Graph::run_one()
         int wakeup = min (et, ts);
         _execution_tokens -= wakeup;
 
-        for (int i=0; i<wakeup; i++ ) {
+        DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 signals %2\n", pthread_self(), wakeup));
+
+        for (int i = 0; i < wakeup; i++) {
                 _execution_sem.signal ();
         }
 
@@ -337,8 +394,9 @@ Graph::run_one()
                 pthread_mutex_unlock (&_trigger_mutex);
                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
                 _execution_sem.wait ();
-                if (_quit_threads)
+                if (_quit_threads) {
                         return true;
+                }
                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
                 pthread_mutex_lock (&_trigger_mutex);
                 if (_trigger_queue.size()) {
@@ -351,6 +409,8 @@ Graph::run_one()
         to_run->process();
         to_run->finish (_current_chain);
 
+        DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_self()));
+
         return false;
 }
 
@@ -375,7 +435,9 @@ static void get_rt()
 void
 Graph::helper_thread()
 {
-        ProcessThread *pt = new ProcessThread;
+       suspend_rt_malloc_checks ();
+       ProcessThread* pt = new ProcessThread ();
+       resume_rt_malloc_checks ();
 
         pt->get_buffers();
         get_rt();
@@ -392,7 +454,9 @@ Graph::helper_thread()
 void
 Graph::main_thread()
 {
-        ProcessThread *pt = new ProcessThread;
+       suspend_rt_malloc_checks ();
+       ProcessThread* pt = new ProcessThread ();
+       resume_rt_malloc_checks ();
 
         pt->get_buffers();
         get_rt();
@@ -400,10 +464,16 @@ Graph::main_thread()
   again:
         _callback_start_sem.wait ();
        DEBUG_TRACE(DEBUG::Graph, "main thread is awake\n");
+
+        if (_quit_threads) {
+                return;
+        }
+
         this->prep();
 
         if (_graph_empty && !_quit_threads) {
                 _callback_done_sem.signal ();
+                DEBUG_TRACE(DEBUG::Graph, "main thread sees graph done, goes back to slee\n");
                 goto again;
         }
 
@@ -552,5 +622,13 @@ Graph::process_one_route (Route* route)
         }
 }
 
+bool
+Graph::in_process_thread () const
+{
+       list<pthread_t>::const_iterator i = _thread_list.begin ();
+       while (i != _thread_list.end() && *i != pthread_self ()) {
+               ++i;
+       }
 
-
+       return i != _thread_list.end ();
+}