resolve merge with master (?)
[ardour.git] / libs / audiographer / audiographer / general / threader.h
1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
3
4 #include <glibmm/threadpool.h>
5 #include <glibmm/timeval.h>
6 #include <sigc++/slot.h>
7 #include <boost/format.hpp>
8
9 #include <glib.h>
10 #include <vector>
11 #include <algorithm>
12
13 #include "audiographer/source.h"
14 #include "audiographer/sink.h"
15 #include "audiographer/exception.h"
16
17 namespace AudioGrapher
18 {
19
20 /// Class that stores exceptions thrown from different threads
21 class ThreaderException : public Exception
22 {
23   public:
24         template<typename T>
25         ThreaderException (T const & thrower, std::exception const & e)
26                 : Exception (thrower,
27                         boost::str ( boost::format
28                         ("\n\t- Dynamic type: %1%\n\t- what(): %2%")
29                         % DebugUtils::demangled_name (e) % e.what() ))
30         { }
31 };
32
33 /// Class for distributing processing across several threads
34 template <typename T = DefaultSampleType>
35 class Threader : public Source<T>, public Sink<T>
36 {
37   private:
38         typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
39
40   public:
41         
42         /** Constructor
43           * \n RT safe
44           * \param thread_pool a thread pool from which all tasks are scheduled
45           * \param wait_timeout_milliseconds maximum time allowed for threads to use in processing
46           */
47         Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 500)
48           : thread_pool (thread_pool)
49           , readers (0)
50           , wait_timeout (wait_timeout_milliseconds)
51         { }
52         
53         virtual ~Threader () {}
54         
55         /// Adds output \n RT safe
56         void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
57         
58         /// Clears outputs \n RT safe
59         void clear_outputs () { outputs.clear (); }
60         
61         /// Removes a specific output \n RT safe
62         void remove_output (typename Source<T>::SinkPtr output) {
63                 typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
64                 outputs.erase (new_end, outputs.end());
65         }
66         
67         /// Processes context concurrently by scheduling each output separately to the given thread pool
68         void process (ProcessContext<T> const & c)
69         {
70                 wait_mutex.lock();
71                 
72                 exception.reset();
73                 
74                 unsigned int outs = outputs.size();
75                 g_atomic_int_add (&readers, outs);
76                 for (unsigned int i = 0; i < outs; ++i) {
77                         thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
78                 }
79                 
80                 wait();
81         }
82         
83         using Sink<T>::process;
84         
85   private:
86
87         void wait()
88         {
89                 while (g_atomic_int_get (&readers) != 0) {
90                         gint64 end_time = g_get_monotonic_time () + (wait_timeout * G_TIME_SPAN_MILLISECOND);
91                         wait_cond.wait_until(wait_mutex, end_time);
92                 }
93
94                 wait_mutex.unlock();
95                 
96                 if (exception) {
97                         throw *exception;
98                 }
99         }
100         
101         void process_output(ProcessContext<T> const & c, unsigned int output)
102         {
103                 try {
104                         outputs[output]->process (c);
105                 } catch (std::exception const & e) {
106                         // Only first exception will be passed on
107                         exception_mutex.lock();
108                         if(!exception) { exception.reset (new ThreaderException (*this, e)); }
109                         exception_mutex.unlock();
110                 }
111                 
112                 if (g_atomic_int_dec_and_test (&readers)) {
113                         wait_cond.signal();
114                 }
115         }
116
117         OutputVec outputs;
118
119         Glib::ThreadPool & thread_pool;
120         Glib::Threads::Mutex wait_mutex;
121         Glib::Threads::Cond  wait_cond;
122         gint        readers;
123         long        wait_timeout;
124         
125         Glib::Threads::Mutex exception_mutex;
126         boost::shared_ptr<ThreaderException> exception;
127
128 };
129
130 } // namespace
131
132 #endif //AUDIOGRAPHER_THREADER_H