add DEBUG::ProcessThreads as a debug tag for parallelization (probably to be renamed)
[ardour.git] / libs / ardour / graph.cc
1 /*
2     Copyright (C) 2010 Paul Davis
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 #include "pbd/compose.h"
21
22 #include "ardour/debug.h"
23 #include "ardour/graph.h"
24 #include "ardour/types.h"
25 #include "ardour/session.h"
26 #include "ardour/route.h"
27 #include "ardour/process_thread.h"
28 #include "ardour/audioengine.h"
29
30 #include <jack/thread.h>
31
32 #ifdef __linux__
33 #include <unistd.h>
34 #elif defined(__APPLE__) || defined(__FreeBSD__)
35 #include <sys/types.h>
36 #include <sys/sysctl.h>
37 #endif
38
39
40
41 #include <stdio.h>
42 #include <cmath>
43
44 using namespace ARDOUR;
45 using namespace PBD;
46
47 static    unsigned int hardware_concurrency()
48     {
49 #if defined(PTW32_VERSION) || defined(__hpux)
50         return pthread_num_processors_np();
51 #elif defined(__APPLE__) || defined(__FreeBSD__)
52         int count;
53         size_t size=sizeof(count);
54         return sysctlbyname("hw.ncpu",&count,&size,NULL,0)?0:count;
55 #elif defined(HAVE_UNISTD) && defined(_SC_NPROCESSORS_ONLN)
56         int const count=sysconf(_SC_NPROCESSORS_ONLN);
57         return (count>0)?count:0;
58 #else
59         return 0;
60 #endif
61     }
62 // ========================================== Graph
63
64 Graph::Graph( Session & session ) 
65     : SessionHandleRef( session ) 
66 {
67     pthread_mutex_init( &_trigger_mutex, NULL );
68     sem_init( &_execution_sem, 0, 0 );
69
70     sem_init( &_callback_start_sem, 0, 0 );
71     sem_init( &_callback_done_sem,  0, 0 );
72
73     _execution_tokens = 0;
74
75     pthread_mutex_init( &_swap_mutex, NULL );
76     _current_chain = 0;
77     _pending_chain = 0;
78     _setup_chain   = 1;
79     _quit_threads = false;
80     _graph_empty = true;
81
82     int num_cpu = hardware_concurrency();
83     DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("Using %1 CPUs via %1 threads\n", num_cpu));
84     _thread_list.push_back( Glib::Thread::create( sigc::mem_fun( *this, &Graph::main_thread ), 100000, true, true, Glib::THREAD_PRIORITY_NORMAL ) );
85     for (int i=1; i<num_cpu; i++)
86         _thread_list.push_back( Glib::Thread::create( sigc::mem_fun( *this, &Graph::helper_thread ), 100000, true, true, Glib::THREAD_PRIORITY_NORMAL ) );
87 }
88
89 void
90 Graph::session_going_away()
91 {
92     _quit_threads = true;
93
94     for (unsigned int i=0; i<_thread_list.size(); i++)
95         sem_post( &_execution_sem );
96
97     sem_post( &_callback_start_sem );
98
99     for (std::list<Glib::Thread *>::iterator i=_thread_list.begin(); i!=_thread_list.end(); i++)
100     {
101         (*i)->join();
102     }
103
104     // now drop all references on the nodes.
105     _nodes.clear();
106     _nodes_rt[0].clear();
107     _nodes_rt[1].clear();
108     _init_trigger_list[0].clear();
109     _init_trigger_list[1].clear();
110     _trigger_queue.clear();
111 }
112
113 void
114 Graph::prep()
115 {
116     node_list_t::iterator i;
117     int chain;
118
119     if (pthread_mutex_trylock (&_swap_mutex) == 0)
120     {
121         // we got the swap mutex.
122         if (_current_chain != _pending_chain)
123         {
124             //printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain );
125             _setup_chain = _current_chain;
126             _current_chain = _pending_chain;
127         }
128         pthread_mutex_unlock (&_swap_mutex);
129     }
130
131     chain = _current_chain;
132
133     _graph_empty = true;
134     for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++)
135     {
136         (*i)->prep( chain );
137         _graph_empty = false;
138     }
139     _finished_refcount = _init_finished_refcount[chain];
140
141     for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++)
142         this->trigger( i->get() );
143 }
144
145 void
146 Graph::trigger( GraphNode * n )
147 {
148     pthread_mutex_lock( &_trigger_mutex );
149     _trigger_queue.push_back( n );
150     pthread_mutex_unlock( &_trigger_mutex );
151 }
152
153 void
154 Graph::dec_ref()
155 {
156     if (g_atomic_int_dec_and_test( &_finished_refcount ))
157     {
158         // ok... this cycle is finished now.
159         // we are the only thread alive.
160         
161         this->restart_cycle();
162
163     }
164 }
165
166 void
167 Graph::restart_cycle()
168 {
169     //printf( "cycle_done chain: %d\n", _current_chain );
170
171     // we are through. wakeup our caller.
172 again:
173     sem_post( &_callback_done_sem );
174
175     // block until we are triggered.
176     sem_wait( &_callback_start_sem );
177     if (_quit_threads)
178         return;
179
180     //printf( "cycle_start\n" );
181
182     this->prep();
183     if (_graph_empty)
184         goto again;
185     //printf( "cycle_start chain: %d\n", _current_chain );
186
187     // returning will restart the cycle.
188     //  starting with waking up the others.
189 }
190
191 static bool
192 is_feedback( boost::shared_ptr<RouteList> routelist, Route * from, boost::shared_ptr<Route> to )
193 {
194     for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++)
195     {
196         if ((*ri).get() == from)
197             return false;
198         if ((*ri) == to)
199             return true;
200     }
201     assert(0);
202     return false;
203 }
204
205 static bool
206 is_feedback( boost::shared_ptr<RouteList> routelist, boost::shared_ptr<Route> from, Route * to )
207 {
208     for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++)
209     {
210         if ((*ri).get() == to)
211             return true;
212         if ((*ri) == from)
213             return false;
214     }
215     assert(0);
216     return false;
217 }
218
219 void
220 Graph::rechain( boost::shared_ptr<RouteList> routelist )
221 {
222     node_list_t::iterator ni;
223
224     pthread_mutex_lock (&_swap_mutex);
225     int chain = _setup_chain;
226     printf( "============== setup %d\n", chain );
227     // set all refcounts to 0;
228
229     _init_finished_refcount[chain] = 0;
230     _init_trigger_list[chain].clear();
231
232     _nodes_rt[chain].clear();
233
234     for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++)
235     {
236         node_ptr_t n = boost::dynamic_pointer_cast<GraphNode> (*ri);
237
238         n->_init_refcount[chain] = 0;
239         n->_activation_set[chain].clear();
240         _nodes_rt[chain].push_back(n);
241     }
242
243     // now add refs for the connections.
244
245     for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++)
246     {
247         bool has_input  = false;
248         bool has_output = false;
249
250         boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni );
251
252         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++)
253         {
254             if (rp->direct_feeds( *ri ))
255             {
256                 if (is_feedback( routelist, rp.get(), *ri ))
257                    continue; 
258                     
259                 has_output = true;
260                 (*ni)->_activation_set[chain].insert( boost::dynamic_pointer_cast<GraphNode> (*ri) );
261             }
262         }
263
264         for (Route::FedBy::iterator fi=rp->fed_by().begin(); fi!=rp->fed_by().end(); fi++)
265         {
266             if (boost::shared_ptr<Route> r = fi->r.lock())
267                 if (!is_feedback( routelist, r, rp.get() ))
268                     has_input = true;
269         }
270
271         for (node_set_t::iterator ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++)
272         {
273             (*ai)->_init_refcount[chain] += 1;
274         }
275
276         if (!has_input)
277             _init_trigger_list[chain].push_back( *ni );
278
279         if (!has_output)
280             _init_finished_refcount[chain] += 1;
281     } 
282
283     _pending_chain = chain;
284     dump(chain);
285     pthread_mutex_unlock (&_swap_mutex);
286 }
287
288
289 bool
290 Graph::run_one()
291 {
292     GraphNode * to_run;
293
294     pthread_mutex_lock( &_trigger_mutex );
295     if (_trigger_queue.size()) {
296         to_run = _trigger_queue.back();
297         _trigger_queue.pop_back();
298     }
299     else
300         to_run = 0;
301
302     int wakeup = std::min( (int) _execution_tokens, (int) _trigger_queue.size() );
303     _execution_tokens -= wakeup;
304
305     for( int i=0; i<wakeup; i++ )
306         sem_post( &_execution_sem );
307
308     while (to_run == 0)
309     {
310         _execution_tokens += 1;
311         pthread_mutex_unlock( &_trigger_mutex );
312         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
313         sem_wait( &_execution_sem );
314         if (_quit_threads)
315             return true;
316         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
317         pthread_mutex_lock( &_trigger_mutex );
318         if (_trigger_queue.size())
319         {
320             to_run = _trigger_queue.back();
321             _trigger_queue.pop_back();
322         }
323     }
324     pthread_mutex_unlock( &_trigger_mutex );
325
326     to_run->process();
327     to_run->finish( _current_chain );
328
329     return false;
330 }
331
332 static void get_rt()
333 {
334     int priority = jack_client_real_time_priority( AudioEngine::instance()->jack() );
335
336     if (priority)
337     {
338         struct sched_param rtparam;
339         
340         memset (&rtparam, 0, sizeof (rtparam));
341         rtparam.sched_priority = priority;
342         
343         pthread_setschedparam (pthread_self(), SCHED_FIFO, &rtparam);
344     }
345 }
346
347 void
348 Graph::helper_thread()
349 {
350     ProcessThread *pt = new ProcessThread;
351
352     pt->get_buffers();
353     get_rt();
354
355
356     while(1)
357     {
358         if (run_one())
359             break;
360     }
361     pt->drop_buffers();
362 }
363
364 void
365 Graph::main_thread()
366 {
367     ProcessThread *pt = new ProcessThread;
368
369     pt->get_buffers();
370     get_rt();
371
372 again:
373     sem_wait( &_callback_start_sem );
374
375     this->prep();
376
377     if (_graph_empty) {
378         sem_post( &_callback_done_sem );
379         goto again;
380     }
381
382     while(1)
383     {
384         if (run_one())
385             break;
386     }
387     pt->drop_buffers();
388 }
389
390 void
391 Graph::dump( int chain )
392 {
393 #ifndef NDEBUG
394     node_list_t::iterator ni;
395     node_set_t::iterator ai;
396
397     chain = _pending_chain;
398
399     printf( "--------------------------------------------Graph dump:\n" );
400     for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++)
401     {
402         boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni );
403         printf( "GraphNode: %s  refcount: %d\n", rp->name().c_str(), (*ni)->_init_refcount[chain] );
404         for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++)
405         {
406             printf( "  triggers: %s\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str() );
407         }
408     }
409
410     printf( "------------- trigger list:\n" );
411     for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++)
412     {
413         printf( "GraphNode: %s  refcount: %d\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain] );
414     }
415
416     printf( "final activation refcount: %d\n", _init_finished_refcount[chain] );
417 #endif
418 }
419
420 int
421 Graph::silent_process_routes (nframes_t nframes, sframes_t start_frame, sframes_t end_frame,
422                     bool can_record, bool rec_monitors_input, bool& need_butler )
423 {
424     _process_nframes = nframes;
425     _process_start_frame = start_frame;
426     _process_end_frame = end_frame;
427     _process_can_record = can_record;
428     _process_rec_monitors_input = rec_monitors_input;
429
430     _process_silent = true;
431     _process_noroll = false;
432     _process_retval = 0;
433     _process_need_butler = false;
434
435     if (!_graph_empty) 
436     {
437         sem_post( &_callback_start_sem );
438         sem_wait( &_callback_done_sem );
439     }
440
441     need_butler = _process_need_butler;
442
443     return _process_retval;
444 }
445
446 int
447 Graph::process_routes (nframes_t nframes, sframes_t start_frame, sframes_t end_frame, int declick,
448                     bool can_record, bool rec_monitors_input, bool& need_butler )
449 {
450     _process_nframes = nframes;
451     _process_start_frame = start_frame;
452     _process_end_frame = end_frame;
453     _process_can_record = can_record;
454     _process_rec_monitors_input = rec_monitors_input;
455     _process_declick = declick;
456
457     _process_silent = false;
458     _process_noroll = false;
459     _process_retval = 0;
460     _process_need_butler = false;
461
462     sem_post( &_callback_start_sem );
463     sem_wait( &_callback_done_sem );
464
465     need_butler = _process_need_butler;
466
467     return _process_retval;
468 }
469
470 int
471 Graph::routes_no_roll (nframes_t nframes, sframes_t start_frame, sframes_t end_frame, 
472                     bool non_rt_pending, bool can_record, int declick)
473 {
474     _process_nframes = nframes;
475     _process_start_frame = start_frame;
476     _process_end_frame = end_frame;
477     _process_can_record = can_record;
478     _process_declick = declick;
479     _process_non_rt_pending = non_rt_pending;
480
481     _process_silent = false;
482     _process_noroll = true;
483     _process_retval = 0;
484     _process_need_butler = false;
485
486     sem_post( &_callback_start_sem );
487     sem_wait( &_callback_done_sem );
488
489     return _process_retval;
490 }
491 void
492 Graph::process_one_route( Route * route )
493 {
494     bool need_butler = false;
495     int retval;
496
497     assert( route );
498
499     DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
500
501     if (_process_silent)
502         retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_can_record, _process_rec_monitors_input, need_butler);
503     else if (_process_noroll)
504     {
505         route->set_pending_declick (_process_declick);
506         retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending, _process_can_record, _process_declick);
507     }
508     else
509     {
510         route->set_pending_declick (_process_declick);
511         retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, _process_can_record, _process_rec_monitors_input, need_butler);
512     }
513
514     if (retval)
515         _process_retval = retval;
516     
517     if (need_butler)
518         _process_need_butler = true;
519 }
520
521
522