Merge branch 'cairocanvas'
[ardour.git] / libs / audiographer / audiographer / general / threader.h
1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
3
4 #include <glibmm/threadpool.h>
5 #include <glibmm/timeval.h>
6 #include <sigc++/slot.h>
7 #include <boost/format.hpp>
8
9 #include <glib.h>
10 #include <vector>
11 #include <algorithm>
12
13 #include "audiographer/visibility.h"
14 #include "audiographer/source.h"
15 #include "audiographer/sink.h"
16 #include "audiographer/exception.h"
17
18 namespace AudioGrapher
19 {
20
21 /// Class that stores exceptions thrown from different threads
22 class /*LIBAUDIOGRAPHER_API*/ ThreaderException : public Exception
23 {
24   public:
25         template<typename T>
26         ThreaderException (T const & thrower, std::exception const & e)
27                 : Exception (thrower,
28                         boost::str ( boost::format
29                         ("\n\t- Dynamic type: %1%\n\t- what(): %2%")
30                         % DebugUtils::demangled_name (e) % e.what() ))
31         { }
32 };
33
34 /// Class for distributing processing across several threads
35 template <typename T = DefaultSampleType>
36 class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
37 {
38   private:
39         typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
40
41   public:
42         
43         /** Constructor
44           * \n RT safe
45           * \param thread_pool a thread pool from which all tasks are scheduled
46           * \param wait_timeout_milliseconds maximum time allowed for threads to use in processing
47           */
48         Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 500)
49           : thread_pool (thread_pool)
50           , readers (0)
51           , wait_timeout (wait_timeout_milliseconds)
52         { }
53         
54         virtual ~Threader () {}
55         
56         /// Adds output \n RT safe
57         void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
58         
59         /// Clears outputs \n RT safe
60         void clear_outputs () { outputs.clear (); }
61         
62         /// Removes a specific output \n RT safe
63         void remove_output (typename Source<T>::SinkPtr output) {
64                 typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
65                 outputs.erase (new_end, outputs.end());
66         }
67         
68         /// Processes context concurrently by scheduling each output separately to the given thread pool
69         void process (ProcessContext<T> const & c)
70         {
71                 wait_mutex.lock();
72                 
73                 exception.reset();
74                 
75                 unsigned int outs = outputs.size();
76                 g_atomic_int_add (&readers, outs);
77                 for (unsigned int i = 0; i < outs; ++i) {
78                         thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
79                 }
80                 
81                 wait();
82         }
83         
84         using Sink<T>::process;
85         
86   private:
87
88         void wait()
89         {
90                 while (g_atomic_int_get (&readers) != 0) {
91                         gint64 end_time = g_get_monotonic_time () + (wait_timeout * G_TIME_SPAN_MILLISECOND);
92                         wait_cond.wait_until(wait_mutex, end_time);
93                 }
94
95                 wait_mutex.unlock();
96                 
97                 if (exception) {
98                         throw *exception;
99                 }
100         }
101         
102         void process_output(ProcessContext<T> const & c, unsigned int output)
103         {
104                 try {
105                         outputs[output]->process (c);
106                 } catch (std::exception const & e) {
107                         // Only first exception will be passed on
108                         exception_mutex.lock();
109                         if(!exception) { exception.reset (new ThreaderException (*this, e)); }
110                         exception_mutex.unlock();
111                 }
112                 
113                 if (g_atomic_int_dec_and_test (&readers)) {
114                         wait_cond.signal();
115                 }
116         }
117
118         OutputVec outputs;
119
120         Glib::ThreadPool & thread_pool;
121         Glib::Threads::Mutex wait_mutex;
122         Glib::Threads::Cond  wait_cond;
123         gint        readers;
124         long        wait_timeout;
125         
126         Glib::Threads::Mutex exception_mutex;
127         boost::shared_ptr<ThreaderException> exception;
128
129 };
130
131 } // namespace
132
133 #endif //AUDIOGRAPHER_THREADER_H