Stop thought-to-be-safe alloc in RT thread from triggering the checker.
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/cpus.h"
25 #include "pbd/debug_rt_alloc.h"
26
27 #include "ardour/debug.h"
28 #include "ardour/graph.h"
29 #include "ardour/types.h"
30 #include "ardour/session.h"
31 #include "ardour/route.h"
32 #include "ardour/process_thread.h"
33 #include "ardour/audioengine.h"
34
35 #include <jack/thread.h>
36
37 #include "i18n.h"
38
39 using namespace ARDOUR;
40 using namespace PBD;
41 using namespace std;
42
43 #ifdef DEBUG_RT_ALLOC
44 static Graph* graph = 0;
45 extern "C" {
46
47 int alloc_allowed ()
48 {
49         return !graph->in_process_thread ();
50 }
51
52 }
53 #endif
54
55 Graph::Graph (Session & session) 
56         : SessionHandleRef (session) 
57         , _execution_sem ("graph_execution", 0)
58         , _callback_start_sem ("graph_start", 0)
59         , _callback_done_sem ("graph_done", 0)
60         , _cleanup_sem ("graph_cleanup", 0)
61 {
62         pthread_mutex_init( &_trigger_mutex, NULL);
63
64         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
65            memory in the RT thread.
66         */
67         _trigger_queue.reserve (8192);
68
69         _execution_tokens = 0;
70
71         _current_chain = 0;
72         _pending_chain = 0;
73         _setup_chain   = 1;
74         _quit_threads = false;
75         _graph_empty = true;
76
77         int num_cpu = hardware_concurrency();
78         int num_threads = num_cpu;
79         int pu = Config->get_processor_usage ();
80
81         if (pu < 0) {
82                 /* pu is negative: use "pu" less cores for DSP than appear to be available
83                  */
84
85                 if (-pu < num_threads) {
86                         num_threads += pu; 
87                 } else {
88                         num_threads = 1;
89                 }
90         } else {
91                 /* use "pu" cores, if available
92                  */
93
94                 if (pu <= num_threads) {
95                         num_threads = pu;
96                 } 
97         }
98
99         info << string_compose (_("Using %2 threads on %1 CPUs"), num_cpu, num_threads) << endmsg;
100
101         pthread_t a_thread;
102
103         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), &a_thread, 100000) == 0) {
104                 _thread_list.push_back (a_thread);
105         }
106
107         for (int i = 1; i < num_threads; ++i) {
108                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), &a_thread, 100000) == 0) {
109                         _thread_list.push_back (a_thread);
110                 }
111         }
112
113 #ifdef DEBUG_RT_ALLOC   
114         graph = this;
115         pbd_alloc_allowed = &::alloc_allowed;
116 #endif  
117 }
118
119 void
120 Graph::session_going_away()
121 {
122         _quit_threads = true;
123
124         for (unsigned int i=0; i<_thread_list.size(); i++) {
125                 _execution_sem.signal ();
126         }
127
128         _callback_start_sem.signal ();
129
130         for (list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); i++) {
131                 void* status;
132                 pthread_join (*i, &status);
133         }
134
135         // now drop all references on the nodes.
136         _nodes_rt[0].clear();
137         _nodes_rt[1].clear();
138         _init_trigger_list[0].clear();
139         _init_trigger_list[1].clear();
140         _trigger_queue.clear();
141 }
142
143 void
144 Graph::clear_other_chain ()
145 {
146         Glib::Mutex::Lock ls (_swap_mutex);
147
148         while (1) {
149                 if (_setup_chain != _pending_chain) {
150
151                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
152                                 (*ni)->_activation_set[_setup_chain].clear();
153                         }
154
155                         _nodes_rt[_setup_chain].clear ();
156                         _init_trigger_list[_setup_chain].clear ();
157                         break;
158                 }
159                 /* setup chain == pending chain - we have
160                    to wait till this is no longer true.
161                 */
162                 _cleanup_cond.wait (_swap_mutex);                
163         }
164 }
165
166 void
167 Graph::prep()
168 {
169         node_list_t::iterator i;
170         int chain;
171
172         if (_swap_mutex.trylock()) {
173                 // we got the swap mutex.
174                 if (_current_chain != _pending_chain)
175                 {
176                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
177                         _setup_chain = _current_chain;
178                         _current_chain = _pending_chain;
179                         _cleanup_cond.signal ();
180                 }
181                 _swap_mutex.unlock ();
182         }
183
184         chain = _current_chain;
185
186         _graph_empty = true;
187         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
188                 (*i)->prep( chain);
189                 _graph_empty = false;
190         }
191         _finished_refcount = _init_finished_refcount[chain];
192
193         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
194                 this->trigger( i->get() );
195         }
196 }
197
198 void
199 Graph::trigger (GraphNode* n)
200 {
201         pthread_mutex_lock (&_trigger_mutex);
202         _trigger_queue.push_back( n);
203         pthread_mutex_unlock (&_trigger_mutex);
204 }
205
206 void
207 Graph::dec_ref()
208 {
209         if (g_atomic_int_dec_and_test (&_finished_refcount)) {
210
211                 // ok... this cycle is finished now.
212                 // we are the only thread alive.
213         
214                 this->restart_cycle();
215         }
216 }
217
218 void
219 Graph::restart_cycle()
220 {
221         //printf( "cycle_done chain: %d\n", _current_chain);
222
223         // we are through. wakeup our caller.
224   again:
225         _callback_done_sem.signal ();
226
227         // block until we are triggered.
228         _callback_start_sem.wait();
229         if (_quit_threads)
230                 return;
231
232         //printf( "cycle_start\n" );
233
234         this->prep();
235         if (_graph_empty)
236                 goto again;
237         //printf( "cycle_start chain: %d\n", _current_chain);
238
239         // returning will restart the cycle.
240         //  starting with waking up the others.
241 }
242
243 static bool
244 is_feedback (boost::shared_ptr<RouteList> routelist, Route* from, boost::shared_ptr<Route> to)
245 {
246         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
247                 if ((*ri).get() == from)
248                         return false;
249                 if ((*ri) == to)
250                         return true;
251         }
252         assert(0);
253         return false;
254 }
255
256 static bool
257 is_feedback (boost::shared_ptr<RouteList> routelist, boost::shared_ptr<Route> from, Route* to)
258 {
259         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
260                 if ((*ri).get() == to)
261                         return true;
262                 if ((*ri) == from)
263                         return false;
264         }
265         assert(0);
266         return false;
267 }
268
269 void
270 Graph::rechain (boost::shared_ptr<RouteList> routelist)
271 {
272         node_list_t::iterator ni;
273         Glib::Mutex::Lock ls (_swap_mutex);
274
275         int chain = _setup_chain;
276         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
277         // set all refcounts to 0;
278
279         _init_finished_refcount[chain] = 0;
280         _init_trigger_list[chain].clear();
281
282         _nodes_rt[chain].clear();
283
284         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
285                 node_ptr_t n = boost::dynamic_pointer_cast<GraphNode> (*ri);
286
287                 n->_init_refcount[chain] = 0;
288                 n->_activation_set[chain].clear();
289                 _nodes_rt[chain].push_back(n);
290         }
291
292         // now add refs for the connections.
293
294         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
295                 bool has_input  = false;
296                 bool has_output = false;
297
298                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
299
300                 for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
301                         if (rp->direct_feeds (*ri)) {
302                                 if (is_feedback (routelist, rp.get(), *ri)) {
303                                         continue; 
304                                 }
305
306                                 has_output = true;
307                                 (*ni)->_activation_set[chain].insert (boost::dynamic_pointer_cast<GraphNode> (*ri) );
308                         }
309                 }
310
311                 for (Route::FedBy::iterator fi=rp->fed_by().begin(); fi!=rp->fed_by().end(); fi++) {
312                         if (boost::shared_ptr<Route> r = fi->r.lock()) {
313                                 if (!is_feedback (routelist, r, rp.get())) {
314                                         has_input = true;
315                                 }
316                         }
317                 }
318
319                 for (node_set_t::iterator ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
320                         (*ai)->_init_refcount[chain] += 1;
321                 }
322
323                 if (!has_input)
324                         _init_trigger_list[chain].push_back (*ni);
325
326                 if (!has_output)
327                         _init_finished_refcount[chain] += 1;
328         } 
329
330         _pending_chain = chain;
331         dump(chain);
332 }
333
334 bool
335 Graph::run_one()
336 {
337         GraphNode* to_run;
338
339         pthread_mutex_lock (&_trigger_mutex);
340         if (_trigger_queue.size()) {
341                 to_run = _trigger_queue.back();
342                 _trigger_queue.pop_back();
343         } else {
344                 to_run = 0;
345         }
346
347         int et = _execution_tokens;
348         int ts = _trigger_queue.size();
349
350         int wakeup = min (et, ts);
351         _execution_tokens -= wakeup;
352
353         for (int i=0; i<wakeup; i++ ) {
354                 _execution_sem.signal ();
355         }
356
357         while (to_run == 0) {
358                 _execution_tokens += 1;
359                 pthread_mutex_unlock (&_trigger_mutex);
360                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
361                 _execution_sem.wait ();
362                 if (_quit_threads)
363                         return true;
364                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
365                 pthread_mutex_lock (&_trigger_mutex);
366                 if (_trigger_queue.size()) {
367                         to_run = _trigger_queue.back();
368                         _trigger_queue.pop_back();
369                 }
370         }
371         pthread_mutex_unlock (&_trigger_mutex);
372
373         to_run->process();
374         to_run->finish (_current_chain);
375
376         return false;
377 }
378
379 static void get_rt()
380 {
381         if (!jack_is_realtime (AudioEngine::instance()->jack())) {
382                 return;
383         }
384
385         int priority = jack_client_real_time_priority (AudioEngine::instance()->jack());
386
387         if (priority) {
388                 struct sched_param rtparam;
389         
390                 memset (&rtparam, 0, sizeof (rtparam));
391                 rtparam.sched_priority = priority;
392         
393                 pthread_setschedparam (pthread_self(), SCHED_FIFO, &rtparam);
394         }
395 }
396
397 void
398 Graph::helper_thread()
399 {
400         suspend_rt_malloc_checks ();
401         ProcessThread* pt = new ProcessThread ();
402         resume_rt_malloc_checks ();
403
404         pt->get_buffers();
405         get_rt();
406
407         while(1) {
408                 if (run_one()) {
409                         break;
410                 }
411         }
412
413         pt->drop_buffers();
414 }
415
416 void
417 Graph::main_thread()
418 {
419         suspend_rt_malloc_checks ();
420         ProcessThread* pt = new ProcessThread ();
421         resume_rt_malloc_checks ();
422
423         pt->get_buffers();
424         get_rt();
425
426   again:
427         _callback_start_sem.wait ();
428         DEBUG_TRACE(DEBUG::Graph, "main thread is awake\n");
429         this->prep();
430
431         if (_graph_empty && !_quit_threads) {
432                 _callback_done_sem.signal ();
433                 goto again;
434         }
435
436         while (1) {
437                 DEBUG_TRACE(DEBUG::Graph, "main thread runs one graph node\n");
438                 if (run_one()) {
439                         break;
440                 }
441         }
442
443         pt->drop_buffers();
444 }
445
446 void
447 Graph::dump (int chain)
448 {
449 #ifndef NDEBUG
450         node_list_t::iterator ni;
451         node_set_t::iterator ai;
452
453         chain = _pending_chain;
454
455         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
456         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
457                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
458                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
459                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
460                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
461                 }
462         }
463
464         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
465         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
466                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
467         }
468
469         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
470 #endif
471 }
472
473 int
474 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
475                               bool can_record, bool rec_monitors_input, bool& need_butler)
476 {
477         _process_nframes = nframes;
478         _process_start_frame = start_frame;
479         _process_end_frame = end_frame;
480         _process_can_record = can_record;
481         _process_rec_monitors_input = rec_monitors_input;
482
483         _process_silent = true;
484         _process_noroll = false;
485         _process_retval = 0;
486         _process_need_butler = false;
487
488         if (!_graph_empty) {
489                 DEBUG_TRACE(DEBUG::Graph, "wake graph for silent process\n");
490                 _callback_start_sem.signal ();
491                 _callback_done_sem.wait ();
492         }
493
494         need_butler = _process_need_butler;
495
496         return _process_retval;
497 }
498
499 int
500 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick,
501                        bool can_record, bool rec_monitors_input, bool& need_butler)
502 {
503         DEBUG_TRACE (DEBUG::Graph, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
504
505         _process_nframes = nframes;
506         _process_start_frame = start_frame;
507         _process_end_frame = end_frame;
508         _process_can_record = can_record;
509         _process_rec_monitors_input = rec_monitors_input;
510         _process_declick = declick;
511
512         _process_silent = false;
513         _process_noroll = false;
514         _process_retval = 0;
515         _process_need_butler = false;
516
517         DEBUG_TRACE(DEBUG::Graph, "wake graph for non-silent process\n");
518         _callback_start_sem.signal ();
519         _callback_done_sem.wait ();
520
521         DEBUG_TRACE (DEBUG::Graph, "graph execution complete\n");
522
523         need_butler = _process_need_butler;
524
525         return _process_retval;
526 }
527
528 int
529 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, 
530                        bool non_rt_pending, bool can_record, int declick)
531 {
532         DEBUG_TRACE (DEBUG::Graph, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
533
534         _process_nframes = nframes;
535         _process_start_frame = start_frame;
536         _process_end_frame = end_frame;
537         _process_can_record = can_record;
538         _process_declick = declick;
539         _process_non_rt_pending = non_rt_pending;
540
541         _process_silent = false;
542         _process_noroll = true;
543         _process_retval = 0;
544         _process_need_butler = false;
545
546         DEBUG_TRACE(DEBUG::Graph, "wake graph for no-roll process\n");
547         _callback_start_sem.signal ();
548         _callback_done_sem.wait ();
549
550         return _process_retval;
551 }
552 void
553 Graph::process_one_route (Route* route)
554 {
555         bool need_butler = false;
556         int retval;
557
558         assert (route);
559
560         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
561
562         if (_process_silent) {
563                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_can_record, _process_rec_monitors_input, need_butler);
564         } else if (_process_noroll) {
565                 route->set_pending_declick (_process_declick);
566                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending, _process_can_record, _process_declick);
567         } else {
568                 route->set_pending_declick (_process_declick);
569                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, _process_can_record, _process_rec_monitors_input, need_butler);
570         }
571
572         if (retval) {
573                 _process_retval = retval;
574         }
575     
576         if (need_butler) {
577                 _process_need_butler = true;
578         }
579 }
580
581 bool
582 Graph::in_process_thread () const
583 {
584         list<pthread_t>::const_iterator i = _thread_list.begin ();
585         while (i != _thread_list.end() && *i != pthread_self ()) {
586                 ++i;
587         }
588
589         return i != _thread_list.end ();
590 }