1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
4 #include <glibmm/threadpool.h>
5 #include <glibmm/timeval.h>
6 #include <sigc++/slot.h>
7 #include <boost/format.hpp>
13 #include "audiographer/source.h"
14 #include "audiographer/sink.h"
15 #include "audiographer/exception.h"
17 namespace AudioGrapher
20 /// Class that stores exceptions thrown from different threads
21 class ThreaderException : public Exception
25 ThreaderException (T const & thrower, std::exception const & e)
27 boost::str ( boost::format
28 ("\n\t- Dynamic type: %1%\n\t- what(): %2%")
29 % DebugUtils::demangled_name (e) % e.what() ))
33 /// Class for distributing processing across several threads
34 template <typename T = DefaultSampleType>
35 class Threader : public Source<T>, public Sink<T>
38 typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
44 * \param thread_pool a thread pool from which all tasks are scheduled
45 * \param wait_timeout_milliseconds maximum time allowed for threads to use in processing
47 Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 500)
48 : thread_pool (thread_pool)
50 , wait_timeout (wait_timeout_milliseconds)
53 virtual ~Threader () {}
55 /// Adds output \n RT safe
56 void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
58 /// Clears outputs \n RT safe
59 void clear_outputs () { outputs.clear (); }
61 /// Removes a specific output \n RT safe
62 void remove_output (typename Source<T>::SinkPtr output) {
63 typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
64 outputs.erase (new_end, outputs.end());
67 /// Processes context concurrently by scheduling each output separately to the given thread pool
68 void process (ProcessContext<T> const & c)
74 unsigned int outs = outputs.size();
75 g_atomic_int_add (&readers, outs);
76 for (unsigned int i = 0; i < outs; ++i) {
77 thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
83 using Sink<T>::process;
89 while (g_atomic_int_get (&readers) != 0) {
90 gint64 end_time = g_get_monotonic_time () + (wait_timeout * G_TIME_SPAN_MILLISECOND);
91 wait_cond.wait_until(wait_mutex, end_time);
101 void process_output(ProcessContext<T> const & c, unsigned int output)
104 outputs[output]->process (c);
105 } catch (std::exception const & e) {
106 // Only first exception will be passed on
107 exception_mutex.lock();
108 if(!exception) { exception.reset (new ThreaderException (*this, e)); }
109 exception_mutex.unlock();
112 if (g_atomic_int_dec_and_test (&readers)) {
119 Glib::ThreadPool & thread_pool;
120 Glib::Threads::Mutex wait_mutex;
121 Glib::Threads::Cond wait_cond;
125 Glib::Threads::Mutex exception_mutex;
126 boost::shared_ptr<ThreaderException> exception;
132 #endif //AUDIOGRAPHER_THREADER_H