1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
4 #include <glibmm/threadpool.h>
5 #include <glibmm/timeval.h>
6 #include <sigc++/slot.h>
7 #include <boost/format.hpp>
13 #include "audiographer/visibility.h"
14 #include "audiographer/source.h"
15 #include "audiographer/sink.h"
16 #include "audiographer/exception.h"
18 namespace AudioGrapher
21 /// Class that stores exceptions thrown from different threads
22 class /*LIBAUDIOGRAPHER_API*/ ThreaderException : public Exception
26 ThreaderException (T const & thrower, std::exception const & e)
28 boost::str ( boost::format
29 ("\n\t- Dynamic type: %1%\n\t- what(): %2%")
30 % DebugUtils::demangled_name (e) % e.what() ))
34 /// Class for distributing processing across several threads
35 template <typename T = DefaultSampleType>
36 class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
39 typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
45 * \param thread_pool a thread pool from which all tasks are scheduled
46 * \param wait_timeout_milliseconds maximum time allowed for threads to use in processing
48 Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 500)
49 : thread_pool (thread_pool)
51 , wait_timeout (wait_timeout_milliseconds)
54 virtual ~Threader () {}
56 /// Adds output \n RT safe
57 void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
59 /// Clears outputs \n RT safe
60 void clear_outputs () { outputs.clear (); }
62 /// Removes a specific output \n RT safe
63 void remove_output (typename Source<T>::SinkPtr output) {
64 typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
65 outputs.erase (new_end, outputs.end());
68 /// Processes context concurrently by scheduling each output separately to the given thread pool
69 void process (ProcessContext<T> const & c)
75 unsigned int outs = outputs.size();
76 g_atomic_int_add (&readers, outs);
77 for (unsigned int i = 0; i < outs; ++i) {
78 thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
84 using Sink<T>::process;
90 while (g_atomic_int_get (&readers) != 0) {
91 gint64 end_time = g_get_monotonic_time () + (wait_timeout * G_TIME_SPAN_MILLISECOND);
92 wait_cond.wait_until(wait_mutex, end_time);
102 void process_output(ProcessContext<T> const & c, unsigned int output)
105 outputs[output]->process (c);
106 } catch (std::exception const & e) {
107 // Only first exception will be passed on
108 exception_mutex.lock();
109 if(!exception) { exception.reset (new ThreaderException (*this, e)); }
110 exception_mutex.unlock();
113 if (g_atomic_int_dec_and_test (&readers)) {
120 Glib::ThreadPool & thread_pool;
121 Glib::Threads::Mutex wait_mutex;
122 Glib::Threads::Cond wait_cond;
126 Glib::Threads::Mutex exception_mutex;
127 boost::shared_ptr<ThreaderException> exception;
133 #endif //AUDIOGRAPHER_THREADER_H