| flow_graph.h | | flow_graph.h | |
| | | | |
| skipping to change at line 41 | | skipping to change at line 41 | |
| | | | |
| #include "tbb_stddef.h" | | #include "tbb_stddef.h" | |
| #include "atomic.h" | | #include "atomic.h" | |
| #include "spin_mutex.h" | | #include "spin_mutex.h" | |
| #include "null_mutex.h" | | #include "null_mutex.h" | |
| #include "spin_rw_mutex.h" | | #include "spin_rw_mutex.h" | |
| #include "null_rw_mutex.h" | | #include "null_rw_mutex.h" | |
| #include "task.h" | | #include "task.h" | |
| #include "concurrent_vector.h" | | #include "concurrent_vector.h" | |
| #include "internal/_aggregator_impl.h" | | #include "internal/_aggregator_impl.h" | |
|
| | | #include "tbb_profiling.h" | |
| | | | |
| #if TBB_DEPRECATED_FLOW_ENQUEUE | | #if TBB_DEPRECATED_FLOW_ENQUEUE | |
| #define FLOW_SPAWN(a) tbb::task::enqueue((a)) | | #define FLOW_SPAWN(a) tbb::task::enqueue((a)) | |
| #else | | #else | |
| #define FLOW_SPAWN(a) tbb::task::spawn((a)) | | #define FLOW_SPAWN(a) tbb::task::spawn((a)) | |
| #endif | | #endif | |
| | | | |
| // use the VC10 or gcc version of tuple if it is available. | | // use the VC10 or gcc version of tuple if it is available. | |
| #if __TBB_CPP11_TUPLE_PRESENT | | #if __TBB_CPP11_TUPLE_PRESENT | |
| #include <tuple> | | #include <tuple> | |
| | | | |
| skipping to change at line 274 | | skipping to change at line 275 | |
| } | | } | |
| | | | |
| //! Does whatever should happen when the threshold is reached | | //! Does whatever should happen when the threshold is reached | |
| /** This should be very fast or else spawn a task. This is | | /** This should be very fast or else spawn a task. This is | |
| called while the sender is blocked in the try_put(). */ | | called while the sender is blocked in the try_put(). */ | |
| virtual task * execute() = 0; | | virtual task * execute() = 0; | |
| template<typename TT, typename M> | | template<typename TT, typename M> | |
| friend class internal::successor_cache; | | friend class internal::successor_cache; | |
| /*override*/ bool is_continue_receiver() { return true; } | | /*override*/ bool is_continue_receiver() { return true; } | |
| }; | | }; | |
|
| | | } // interface7 | |
| | | } // flow | |
| | | } // tbb | |
| | | | |
| | | #include "internal/_flow_graph_trace_impl.h" | |
| | | | |
| | | namespace tbb { | |
| | | namespace flow { | |
| | | namespace interface7 { | |
| | | | |
| #include "internal/_flow_graph_impl.h" | | #include "internal/_flow_graph_impl.h" | |
| using namespace internal::graph_policy_namespace; | | using namespace internal::graph_policy_namespace; | |
| | | | |
| class graph; | | class graph; | |
| class graph_node; | | class graph_node; | |
| | | | |
| template <typename GraphContainerType, typename GraphNodeType> | | template <typename GraphContainerType, typename GraphNodeType> | |
| class graph_iterator { | | class graph_iterator { | |
| friend class graph; | | friend class graph; | |
| | | | |
| skipping to change at line 389 | | skipping to change at line 399 | |
| public: | | public: | |
| //! Constructs a graph with isolated task_group_context | | //! Constructs a graph with isolated task_group_context | |
| explicit graph() : my_nodes(NULL), my_nodes_last(NULL) | | explicit graph() : my_nodes(NULL), my_nodes_last(NULL) | |
| { | | { | |
| own_context = true; | | own_context = true; | |
| cancelled = false; | | cancelled = false; | |
| caught_exception = false; | | caught_exception = false; | |
| my_context = new task_group_context(); | | my_context = new task_group_context(); | |
| my_root_task = ( new ( task::allocate_root(*my_context) ) empty_tas
k ); | | my_root_task = ( new ( task::allocate_root(*my_context) ) empty_tas
k ); | |
| my_root_task->set_ref_count(1); | | my_root_task->set_ref_count(1); | |
|
| | | tbb::internal::fgt_graph( this ); | |
| } | | } | |
| | | | |
| //! Constructs a graph with use_this_context as context | | //! Constructs a graph with use_this_context as context | |
| explicit graph(task_group_context& use_this_context) : | | explicit graph(task_group_context& use_this_context) : | |
| my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL) | | my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL) | |
| { | | { | |
| own_context = false; | | own_context = false; | |
| my_root_task = ( new ( task::allocate_root(*my_context) ) empty_tas
k ); | | my_root_task = ( new ( task::allocate_root(*my_context) ) empty_tas
k ); | |
| my_root_task->set_ref_count(1); | | my_root_task->set_ref_count(1); | |
|
| | | tbb::internal::fgt_graph( this ); | |
| } | | } | |
| | | | |
| //! Destroys the graph. | | //! Destroys the graph. | |
| /** Calls wait_for_all, then destroys the root task and context. */ | | /** Calls wait_for_all, then destroys the root task and context. */ | |
| ~graph() { | | ~graph() { | |
| wait_for_all(); | | wait_for_all(); | |
| my_root_task->set_ref_count(0); | | my_root_task->set_ref_count(0); | |
| task::destroy( *my_root_task ); | | task::destroy( *my_root_task ); | |
| if (own_context) delete my_context; | | if (own_context) delete my_context; | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | void set_name( const char *name ) { | |
| | | tbb::internal::fgt_graph_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| //! Used to register that an external entity may still interact with th
e graph. | | //! Used to register that an external entity may still interact with th
e graph. | |
| /** The graph will not return from wait_for_all until a matching number
of decrement_wait_count calls | | /** The graph will not return from wait_for_all until a matching number
of decrement_wait_count calls | |
| is made. */ | | is made. */ | |
| void increment_wait_count() { | | void increment_wait_count() { | |
| if (my_root_task) | | if (my_root_task) | |
| my_root_task->increment_ref_count(); | | my_root_task->increment_ref_count(); | |
| } | | } | |
| | | | |
| //! Deregisters an external entity that may have interacted with the gr
aph. | | //! Deregisters an external entity that may have interacted with the gr
aph. | |
| /** The graph will not return from wait_for_all until all the number of
decrement_wait_count calls | | /** The graph will not return from wait_for_all until all the number of
decrement_wait_count calls | |
| | | | |
| skipping to change at line 556 | | skipping to change at line 574 | |
| graph& my_graph; | | graph& my_graph; | |
| graph_node *next, *prev; | | graph_node *next, *prev; | |
| public: | | public: | |
| graph_node(graph& g) : my_graph(g) { | | graph_node(graph& g) : my_graph(g) { | |
| my_graph.register_node(this); | | my_graph.register_node(this); | |
| } | | } | |
| virtual ~graph_node() { | | virtual ~graph_node() { | |
| my_graph.remove_node(this); | | my_graph.remove_node(this); | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | virtual void set_name( const char *name ) = 0; | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| virtual void reset() = 0; | | virtual void reset() = 0; | |
| }; | | }; | |
| | | | |
| inline void graph::register_node(graph_node *n) { | | inline void graph::register_node(graph_node *n) { | |
| n->next = NULL; | | n->next = NULL; | |
| { | | { | |
| spin_mutex::scoped_lock lock(nodelist_mutex); | | spin_mutex::scoped_lock lock(nodelist_mutex); | |
| n->prev = my_nodes_last; | | n->prev = my_nodes_last; | |
| if (my_nodes_last) my_nodes_last->next = n; | | if (my_nodes_last) my_nodes_last->next = n; | |
| | | | |
| skipping to change at line 620 | | skipping to change at line 642 | |
| typedef receiver< Output > successor_type; | | typedef receiver< Output > successor_type; | |
| | | | |
| //! Constructor for a node with a successor | | //! Constructor for a node with a successor | |
| template< typename Body > | | template< typename Body > | |
| source_node( graph &g, Body body, bool is_active = true ) | | source_node( graph &g, Body body, bool is_active = true ) | |
| : graph_node(g), my_active(is_active), init_my_active(is_active), | | : graph_node(g), my_active(is_active), init_my_active(is_active), | |
| my_body( new internal::source_body_leaf< output_type, Body>(body) )
, | | my_body( new internal::source_body_leaf< output_type, Body>(body) )
, | |
| my_reserved(false), my_has_cached_item(false) | | my_reserved(false), my_has_cached_item(false) | |
| { | | { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
|
| | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, | |
| | | &this->my_graph, | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| source_node( const source_node& src ) : | | source_node( const source_node& src ) : | |
| graph_node(src.my_graph), sender<Output>(), | | graph_node(src.my_graph), sender<Output>(), | |
| my_active(src.init_my_active), | | my_active(src.init_my_active), | |
| init_my_active(src.init_my_active), my_body( src.my_body->clone() )
, | | init_my_active(src.init_my_active), my_body( src.my_body->clone() )
, | |
| my_reserved(false), my_has_cached_item(false) | | my_reserved(false), my_has_cached_item(false) | |
| { | | { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
|
| | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, | |
| | | &this->my_graph, | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| } | | } | |
| | | | |
| //! The destructor | | //! The destructor | |
| ~source_node() { delete my_body; } | | ~source_node() { delete my_body; } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| //! Add a new successor to this node | | //! Add a new successor to this node | |
| /* override */ bool register_successor( receiver<output_type> &r ) { | | /* override */ bool register_successor( receiver<output_type> &r ) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| my_successors.register_successor(r); | | my_successors.register_successor(r); | |
| if ( my_active ) | | if ( my_active ) | |
| spawn_put(); | | spawn_put(); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes a successor from this node | | //! Removes a successor from this node | |
| | | | |
| skipping to change at line 800 | | skipping to change at line 832 | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| typedef internal::function_input<input_type,output_type,Allocator> fInp
ut_type; | | typedef internal::function_input<input_type,output_type,Allocator> fInp
ut_type; | |
| typedef internal::function_output<output_type> fOutput_type; | | typedef internal::function_output<output_type> fOutput_type; | |
| | | | |
| //! Constructor | | //! Constructor | |
| template< typename Body > | | template< typename Body > | |
| function_node( graph &g, size_t concurrency, Body body ) : | | function_node( graph &g, size_t concurrency, Body body ) : | |
|
| graph_node(g), internal::function_input<input_type,output_type,Allo | | graph_node(g), internal::function_input<input_type,output_type,Allo | |
| cator>(g, concurrency, body) | | cator>(g, concurrency, body) { | |
| {} | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | |
| | | E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| function_node( const function_node& src ) : | | function_node( const function_node& src ) : | |
| graph_node(src.my_graph), internal::function_input<input_type,outpu
t_type,Allocator>( src ), | | graph_node(src.my_graph), internal::function_input<input_type,outpu
t_type,Allocator>( src ), | |
|
| fOutput_type() | | fOutput_type() { | |
| {} | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | |
| | | E, &this->my_graph, static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| using fInput_type::try_put_task; | | using fInput_type::try_put_task; | |
| | | | |
| // override of graph_node's reset. | | // override of graph_node's reset. | |
| /*override*/void reset() {fInput_type::reset_function_input(); } | | /*override*/void reset() {fInput_type::reset_function_input(); } | |
| | | | |
| | | | |
| skipping to change at line 838 | | skipping to change at line 880 | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| typedef internal::function_input<input_type,output_type,Allocator> fInp
ut_type; | | typedef internal::function_input<input_type,output_type,Allocator> fInp
ut_type; | |
| typedef internal::function_input_queue<input_type, Allocator> queue_typ
e; | | typedef internal::function_input_queue<input_type, Allocator> queue_typ
e; | |
| typedef internal::function_output<output_type> fOutput_type; | | typedef internal::function_output<output_type> fOutput_type; | |
| | | | |
| //! Constructor | | //! Constructor | |
| template< typename Body > | | template< typename Body > | |
| function_node( graph &g, size_t concurrency, Body body ) : | | function_node( graph &g, size_t concurrency, Body body ) : | |
|
| graph_node(g), fInput_type( g, concurrency, body, new queue_type() | | graph_node(g), fInput_type( g, concurrency, body, new queue_type() | |
| ) | | ) { | |
| {} | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | |
| | | E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| function_node( const function_node& src ) : | | function_node( const function_node& src ) : | |
|
| graph_node(src.graph_node::my_graph), fInput_type( src, new queue_t | | graph_node(src.graph_node::my_graph), fInput_type( src, new queue_t | |
| ype() ), fOutput_type() | | ype() ), fOutput_type() { | |
| {} | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | |
| | | E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| using fInput_type::try_put_task; | | using fInput_type::try_put_task; | |
| | | | |
| /*override*/void reset() { fInput_type::reset_function_input(); } | | /*override*/void reset() { fInput_type::reset_function_input(); } | |
| | | | |
| /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | | /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | |
| | | | |
| skipping to change at line 887 | | skipping to change at line 939 | |
| static const int N = tbb::flow::tuple_size<Output>::value; | | static const int N = tbb::flow::tuple_size<Output>::value; | |
| public: | | public: | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef typename internal::wrap_tuple_elements<N,internal::multifunctio
n_output, Output>::type output_ports_type; | | typedef typename internal::wrap_tuple_elements<N,internal::multifunctio
n_output, Output>::type output_ports_type; | |
| private: | | private: | |
| typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | | typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | |
| typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | | typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | |
| public: | | public: | |
| template<typename Body> | | template<typename Body> | |
| multifunction_node( graph &g, size_t concurrency, Body body ) : | | multifunction_node( graph &g, size_t concurrency, Body body ) : | |
|
| graph_node(g), base_type(g,concurrency, body) | | graph_node(g), base_type(g,concurrency, body) { | |
| {} | | tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter | |
| | | nal::FLOW_MULTIFUNCTION_NODE, | |
| | | &this->gra | |
| | | ph_node::my_graph, static_cast<receiver<input_type> *>(this), | |
| | | this->outp | |
| | | ut_ports(), this->my_body ); | |
| | | } | |
| | | | |
| multifunction_node( const multifunction_node &other) : | | multifunction_node( const multifunction_node &other) : | |
|
| graph_node(other.graph_node::my_graph), base_type(other) | | graph_node(other.graph_node::my_graph), base_type(other) { | |
| {} | | tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter | |
| | | nal::FLOW_MULTIFUNCTION_NODE, | |
| | | &this->gra | |
| | | ph_node::my_graph, static_cast<receiver<input_type> *>(this), | |
| | | this->outp | |
| | | ut_ports(), this->my_body ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_multioutput_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| // all the guts are in multifunction_input... | | // all the guts are in multifunction_input... | |
| protected: | | protected: | |
| /*override*/void reset() { base_type::reset(); } | | /*override*/void reset() { base_type::reset(); } | |
| }; // multifunction_node | | }; // multifunction_node | |
| | | | |
| template < typename Input, typename Output, typename Allocator > | | template < typename Input, typename Output, typename Allocator > | |
| class multifunction_node<Input,Output,queueing,Allocator> : public graph_no
de, public internal::multifunction_input<Input, | | class multifunction_node<Input,Output,queueing,Allocator> : public graph_no
de, public internal::multifunction_input<Input, | |
| typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v
alue, internal::multifunction_output, Output>::type, Allocator> { | | typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v
alue, internal::multifunction_output, Output>::type, Allocator> { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| static const int N = tbb::flow::tuple_size<Output>::value; | | static const int N = tbb::flow::tuple_size<Output>::value; | |
| public: | | public: | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef typename internal::wrap_tuple_elements<N, internal::multifuncti
on_output, Output>::type output_ports_type; | | typedef typename internal::wrap_tuple_elements<N, internal::multifuncti
on_output, Output>::type output_ports_type; | |
| private: | | private: | |
| typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | | typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | |
| typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | | typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | |
| public: | | public: | |
| template<typename Body> | | template<typename Body> | |
| multifunction_node( graph &g, size_t concurrency, Body body) : | | multifunction_node( graph &g, size_t concurrency, Body body) : | |
|
| graph_node(g), base_type(g,concurrency, body, new queue_type()) | | graph_node(g), base_type(g,concurrency, body, new queue_type()) { | |
| {} | | tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter | |
| | | nal::FLOW_MULTIFUNCTION_NODE, | |
| | | &this->gra | |
| | | ph_node::my_graph, static_cast<receiver<input_type> *>(this), | |
| | | this->outp | |
| | | ut_ports(), this->my_body ); | |
| | | } | |
| | | | |
| multifunction_node( const multifunction_node &other) : | | multifunction_node( const multifunction_node &other) : | |
|
| graph_node(other.graph_node::my_graph), base_type(other, new queue_ | | graph_node(other.graph_node::my_graph), base_type(other, new queue_ | |
| type()) | | type()) { | |
| {} | | tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter | |
| | | nal::FLOW_MULTIFUNCTION_NODE, | |
| | | &this->gra | |
| | | ph_node::my_graph, static_cast<receiver<input_type> *>(this), | |
| | | this->outp | |
| | | ut_ports(), this->my_body ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_multioutput_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| // all the guts are in multifunction_input... | | // all the guts are in multifunction_input... | |
| protected: | | protected: | |
| /*override*/void reset() { base_type::reset(); } | | /*override*/void reset() { base_type::reset(); } | |
| }; // multifunction_node | | }; // multifunction_node | |
| | | | |
| //! split_node: accepts a tuple as input, forwards each element of the tupl
e to its | | //! split_node: accepts a tuple as input, forwards each element of the tupl
e to its | |
| // successors. The node has unlimited concurrency, so though it is marked
as | | // successors. The node has unlimited concurrency, so though it is marked
as | |
| // "rejecting" it does not reject inputs. | | // "rejecting" it does not reject inputs. | |
| template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup
leType> > | | template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup
leType> > | |
| class split_node : public multifunction_node<TupleType, TupleType, rejectin
g, Allocator> { | | class split_node : public multifunction_node<TupleType, TupleType, rejectin
g, Allocator> { | |
| | | | |
| skipping to change at line 940 | | skipping to change at line 1020 | |
| typedef typename base_type::output_ports_type output_ports_type; | | typedef typename base_type::output_ports_type output_ports_type; | |
| private: | | private: | |
| struct splitting_body { | | struct splitting_body { | |
| void operator()(const TupleType& t, output_ports_type &p) { | | void operator()(const TupleType& t, output_ports_type &p) { | |
| internal::emit_element<N>::emit_this(t, p); | | internal::emit_element<N>::emit_this(t, p); | |
| } | | } | |
| }; | | }; | |
| public: | | public: | |
| typedef TupleType input_type; | | typedef TupleType input_type; | |
| typedef Allocator allocator_type; | | typedef Allocator allocator_type; | |
|
| split_node(graph &g) : base_type(g, unlimited, splitting_body()) {} | | split_node(graph &g) : base_type(g, unlimited, splitting_body()) { | |
| split_node( const split_node & other) : base_type(other) {} | | tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FL | |
| | | OW_SPLIT_NODE, &this->graph_node::my_graph, | |
| | | static_cast<recei | |
| | | ver<input_type> *>(this), this->output_ports() ); | |
| | | } | |
| | | | |
| | | split_node( const split_node & other) : base_type(other) { | |
| | | tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FL | |
| | | OW_SPLIT_NODE, &this->graph_node::my_graph, | |
| | | static_cast<recei | |
| | | ver<input_type> *>(this), this->output_ports() ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_multioutput_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| }; | | }; | |
| | | | |
| //! Implements an executable node that supports continue_msg -> Output | | //! Implements an executable node that supports continue_msg -> Output | |
| template <typename Output> | | template <typename Output> | |
| class continue_node : public graph_node, public internal::continue_input<Ou
tput>, public internal::function_output<Output> { | | class continue_node : public graph_node, public internal::continue_input<Ou
tput>, public internal::function_output<Output> { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef continue_msg input_type; | | typedef continue_msg input_type; | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| typedef internal::continue_input<Output> fInput_type; | | typedef internal::continue_input<Output> fInput_type; | |
| typedef internal::function_output<output_type> fOutput_type; | | typedef internal::function_output<output_type> fOutput_type; | |
| | | | |
| //! Constructor for executable node with continue_msg -> Output | | //! Constructor for executable node with continue_msg -> Output | |
| template <typename Body > | | template <typename Body > | |
| continue_node( graph &g, Body body ) : | | continue_node( graph &g, Body body ) : | |
|
| graph_node(g), internal::continue_input<output_type>( g, body ) | | graph_node(g), internal::continue_input<output_type>( g, body ) { | |
| {} | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NOD | |
| | | E, &this->my_graph, | |
| | | static_cast<receiver<input_type> | |
| | | *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| //! Constructor for executable node with continue_msg -> Output | | //! Constructor for executable node with continue_msg -> Output | |
| template <typename Body > | | template <typename Body > | |
| continue_node( graph &g, int number_of_predecessors, Body body ) : | | continue_node( graph &g, int number_of_predecessors, Body body ) : | |
|
| graph_node(g), internal::continue_input<output_type>( g, number_of_ | | graph_node(g), internal::continue_input<output_type>( g, number_of_ | |
| predecessors, body ) | | predecessors, body ) { | |
| {} | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NOD | |
| | | E, &this->my_graph, | |
| | | static_cast<receiver<input_type> | |
| | | *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| continue_node( const continue_node& src ) : | | continue_node( const continue_node& src ) : | |
| graph_node(src.graph_node::my_graph), internal::continue_input<outp
ut_type>(src), | | graph_node(src.graph_node::my_graph), internal::continue_input<outp
ut_type>(src), | |
|
| internal::function_output<Output>() | | internal::function_output<Output>() { | |
| {} | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NOD | |
| | | E, &this->my_graph, | |
| | | static_cast<receiver<input_type> | |
| | | *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| using fInput_type::try_put_task; | | using fInput_type::try_put_task; | |
| | | | |
| /*override*/void reset() { internal::continue_input<Output>::reset_rece
iver(); } | | /*override*/void reset() { internal::continue_input<Output>::reset_rece
iver(); } | |
| | | | |
| /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | | /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | |
| | | | |
| skipping to change at line 998 | | skipping to change at line 1107 | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) { | | overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) { | |
| my_successors.set_owner( this ); | | my_successors.set_owner( this ); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this- | |
| | | >my_graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
| // Copy constructor; doesn't take anything from src; default won't work | | // Copy constructor; doesn't take anything from src; default won't work | |
| overwrite_node( const overwrite_node& src ) : | | overwrite_node( const overwrite_node& src ) : | |
| graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_
valid(false) | | graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_
valid(false) | |
| { | | { | |
| my_successors.set_owner( this ); | | my_successors.set_owner( this ); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this- | |
| | | >my_graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
| ~overwrite_node() {} | | ~overwrite_node() {} | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| /* override */ bool register_successor( successor_type &s ) { | | /* override */ bool register_successor( successor_type &s ) { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| if ( my_buffer_is_valid ) { | | if ( my_buffer_is_valid ) { | |
| // We have a valid value that must be forwarded immediately. | | // We have a valid value that must be forwarded immediately. | |
| if ( s.try_put( my_buffer ) || !s.register_predecessor( *this
) ) { | | if ( s.try_put( my_buffer ) || !s.register_predecessor( *this
) ) { | |
| // We add the successor: it accepted our put or it rejected
it but won't let us become a predecessor | | // We add the successor: it accepted our put or it rejected
it but won't let us become a predecessor | |
| my_successors.register_successor( s ); | | my_successors.register_successor( s ); | |
| return true; | | return true; | |
| } else { | | } else { | |
| // We don't add the successor: it rejected our put and we b
ecame its predecessor instead | | // We don't add the successor: it rejected our put and we b
ecame its predecessor instead | |
| | | | |
| skipping to change at line 1085 | | skipping to change at line 1204 | |
| | | | |
| template< typename T > | | template< typename T > | |
| class write_once_node : public overwrite_node<T> { | | class write_once_node : public overwrite_node<T> { | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| //! Constructor | | //! Constructor | |
|
| write_once_node(graph& g) : overwrite_node<T>(g) {} | | write_once_node(graph& g) : overwrite_node<T>(g) { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(thi | |
| | | s->my_graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| //! Copy constructor: call base class copy constructor | | //! Copy constructor: call base class copy constructor | |
|
| write_once_node( const write_once_node& src ) : overwrite_node<T>(src) | | write_once_node( const write_once_node& src ) : overwrite_node<T>(src) | |
| {} | | { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(thi | |
| | | s->my_graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| /* override */ task *try_put_task( const T &v ) { | | /* override */ task *try_put_task( const T &v ) { | |
| spin_mutex::scoped_lock l( this->my_mutex ); | | spin_mutex::scoped_lock l( this->my_mutex ); | |
| if ( this->my_buffer_is_valid ) { | | if ( this->my_buffer_is_valid ) { | |
| return NULL; | | return NULL; | |
| } else { | | } else { | |
| | | | |
| skipping to change at line 1123 | | skipping to change at line 1256 | |
| private: | | private: | |
| internal::broadcast_cache<T> my_successors; | | internal::broadcast_cache<T> my_successors; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| broadcast_node(graph& g) : graph_node(g) { | | broadcast_node(graph& g) : graph_node(g) { | |
| my_successors.set_owner( this ); | | my_successors.set_owner( this ); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this- | |
| | | >my_graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
| // Copy constructor | | // Copy constructor | |
| broadcast_node( const broadcast_node& src ) : | | broadcast_node( const broadcast_node& src ) : | |
| graph_node(src.my_graph), receiver<T>(), sender<T>() | | graph_node(src.my_graph), receiver<T>(), sender<T>() | |
| { | | { | |
| my_successors.set_owner( this ); | | my_successors.set_owner( this ); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this- | |
| | | >my_graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| //! Adds a successor | | //! Adds a successor | |
| virtual bool register_successor( receiver<T> &r ) { | | virtual bool register_successor( receiver<T> &r ) { | |
| my_successors.register_successor( r ); | | my_successors.register_successor( r ); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes s as a successor | | //! Removes s as a successor | |
| virtual bool remove_successor( receiver<T> &r ) { | | virtual bool remove_successor( receiver<T> &r ) { | |
| my_successors.remove_successor( r ); | | my_successors.remove_successor( r ); | |
| return true; | | return true; | |
| | | | |
| skipping to change at line 1338 | | skipping to change at line 1481 | |
| this->release_front(); | | this->release_front(); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| public: | | public: | |
| //! Constructor | | //! Constructor | |
| buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(), | | buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(), | |
| forwarder_busy(false) { | | forwarder_busy(false) { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my | |
| | | _graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| buffer_node( const buffer_node& src ) : graph_node(src.my_graph), | | buffer_node( const buffer_node& src ) : graph_node(src.my_graph), | |
| reservable_item_buffer<T>(), receiver<T>(), sender<T>() { | | reservable_item_buffer<T>(), receiver<T>(), sender<T>() { | |
| forwarder_busy = false; | | forwarder_busy = false; | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my | |
| | | _graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
| virtual ~buffer_node() {} | | virtual ~buffer_node() {} | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| // | | // | |
| // message sender implementation | | // message sender implementation | |
| // | | // | |
| | | | |
| //! Adds a new successor. | | //! Adds a new successor. | |
| /** Adds successor r to the list of successors; may forward tasks. */ | | /** Adds successor r to the list of successors; may forward tasks. */ | |
| /* override */ bool register_successor( receiver<output_type> &r ) { | | /* override */ bool register_successor( receiver<output_type> &r ) { | |
| buffer_operation op_data(reg_succ); | | buffer_operation op_data(reg_succ); | |
| op_data.r = &r; | | op_data.r = &r; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| | | | |
| skipping to change at line 1516 | | skipping to change at line 1669 | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| //! Constructor | | //! Constructor | |
|
| queue_node( graph &g ) : buffer_node<T, A>(g) {} | | queue_node( graph &g ) : buffer_node<T, A>(g) { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my | |
| | | _graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
|
| queue_node( const queue_node& src) : buffer_node<T, A>(src) {} | | queue_node( const queue_node& src) : buffer_node<T, A>(src) { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my | |
| | | _graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| }; | | }; | |
| | | | |
| //! Forwards messages in sequence order | | //! Forwards messages in sequence order | |
| template< typename T, typename A=cache_aligned_allocator<T> > | | template< typename T, typename A=cache_aligned_allocator<T> > | |
| class sequencer_node : public queue_node<T, A> { | | class sequencer_node : public queue_node<T, A> { | |
| internal::function_body< T, size_t > *my_sequencer; | | internal::function_body< T, size_t > *my_sequencer; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| //! Constructor | | //! Constructor | |
| template< typename Sequencer > | | template< typename Sequencer > | |
| sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g), | | sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g), | |
|
| my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer | | my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer | |
| >(s) ) {} | | >(s) ) { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this | |
| | | ->my_graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src), | | sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src), | |
|
| my_sequencer( src.my_sequencer->clone() ) {} | | my_sequencer( src.my_sequencer->clone() ) { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this | |
| | | ->my_graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| //! Destructor | | //! Destructor | |
| ~sequencer_node() { delete my_sequencer; } | | ~sequencer_node() { delete my_sequencer; } | |
|
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| typedef typename buffer_node<T, A>::size_type size_type; | | typedef typename buffer_node<T, A>::size_type size_type; | |
| typedef typename buffer_node<T, A>::buffer_operation sequencer_operatio
n; | | typedef typename buffer_node<T, A>::buffer_operation sequencer_operatio
n; | |
| | | | |
| enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | | enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | |
| | | | |
| private: | | private: | |
| /* override */ void internal_push(sequencer_operation *op) { | | /* override */ void internal_push(sequencer_operation *op) { | |
| size_type tag = (*my_sequencer)(*(op->elem)); | | size_type tag = (*my_sequencer)(*(op->elem)); | |
| | | | |
| | | | |
| skipping to change at line 1573 | | skipping to change at line 1756 | |
| template< typename T, typename Compare = std::less<T>, typename A=cache_ali
gned_allocator<T> > | | template< typename T, typename Compare = std::less<T>, typename A=cache_ali
gned_allocator<T> > | |
| class priority_queue_node : public buffer_node<T, A> { | | class priority_queue_node : public buffer_node<T, A> { | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef buffer_node<T,A> base_type; | | typedef buffer_node<T,A> base_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| //! Constructor | | //! Constructor | |
|
| priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {} | | priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, & | |
| | | (this->my_graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
|
| priority_queue_node( const priority_queue_node &src ) : buffer_node<T, | | priority_queue_node( const priority_queue_node &src ) : buffer_node<T, | |
| A>(src), mark(0) {} | | A>(src), mark(0) { | |
| | | tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, & | |
| | | (this->my_graph), | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| | | | |
| /*override*/void reset() { | | /*override*/void reset() { | |
| mark = 0; | | mark = 0; | |
| base_type::reset(); | | base_type::reset(); | |
| } | | } | |
| | | | |
| typedef typename buffer_node<T, A>::size_type size_type; | | typedef typename buffer_node<T, A>::size_type size_type; | |
| typedef typename buffer_node<T, A>::item_type item_type; | | typedef typename buffer_node<T, A>::item_type item_type; | |
| | | | |
| skipping to change at line 1777 | | skipping to change at line 1974 | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| private: | | private: | |
| size_t my_threshold; | | size_t my_threshold; | |
|
| size_t my_count; | | size_t my_count; //number of successful puts | |
| internal::predecessor_cache< T > my_predecessors; | | size_t my_tries; //number of active put attempts | |
| | | internal::reservable_predecessor_cache< T > my_predecessors; | |
| spin_mutex my_mutex; | | spin_mutex my_mutex; | |
| internal::broadcast_cache< T > my_successors; | | internal::broadcast_cache< T > my_successors; | |
| int init_decrement_predecessors; | | int init_decrement_predecessors; | |
| | | | |
| friend class internal::forward_task_bypass< limiter_node<T> >; | | friend class internal::forward_task_bypass< limiter_node<T> >; | |
| | | | |
| // Let decrementer call decrement_counter() | | // Let decrementer call decrement_counter() | |
| friend class internal::decrementer< limiter_node<T> >; | | friend class internal::decrementer< limiter_node<T> >; | |
| | | | |
|
| | | bool check_conditions() { | |
| | | return ( my_count + my_tries < my_threshold && !my_predecessors.emp | |
| | | ty() && !my_successors.empty() ); | |
| | | } | |
| | | | |
| // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEU
ED | | // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEU
ED | |
|
| task * decrement_counter() { | | task *forward_task() { | |
| input_type v; | | input_type v; | |
| task *rval = NULL; | | task *rval = NULL; | |
|
| | | bool reserved = false; | |
| | | { | |
| | | spin_mutex::scoped_lock lock(my_mutex); | |
| | | if ( check_conditions() ) | |
| | | ++my_tries; | |
| | | else | |
| | | return NULL; | |
| | | } | |
| | | | |
|
| // If we can't get / put an item immediately then drop the count | | //SUCCESS | |
| if ( my_predecessors.get_item( v ) == false | | // if we can reserve and can put, we consume the reservation | |
| || (rval = my_successors.try_put_task(v)) == NULL ) { | | // we increment the count and decrement the tries | |
| | | if ( (my_predecessors.try_reserve(v)) == true ){ | |
| | | reserved=true; | |
| | | if ( (rval = my_successors.try_put_task(v)) != NULL ){ | |
| | | { | |
| | | spin_mutex::scoped_lock lock(my_mutex); | |
| | | ++my_count; | |
| | | --my_tries; | |
| | | my_predecessors.try_consume(); | |
| | | if ( check_conditions() ) { | |
| | | task* tp = this->my_graph.root_task(); | |
| | | if ( tp ) { | |
| | | task *rtask = new ( task::allocate_additional_c | |
| | | hild_of( *tp ) ) | |
| | | internal::forward_task_bypass< limiter_node | |
| | | <T> >( *this ); | |
| | | FLOW_SPAWN (*rtask); | |
| | | } | |
| | | } | |
| | | } | |
| | | return rval; | |
| | | } | |
| | | } | |
| | | //FAILURE | |
| | | //if we can't reserve, we decrement the tries | |
| | | //if we can reserve but can't put, we decrement the tries and relea | |
| | | se the reservation | |
| | | { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
|
| if(my_count) --my_count; | | --my_tries; | |
| task* tp = this->my_graph.root_task(); | | if (reserved) my_predecessors.try_release(); | |
| if ( !my_predecessors.empty() && tp ) { | | if ( check_conditions() ) { | |
| task *rtask = new ( task::allocate_additional_child_of( *tp | | task* tp = this->my_graph.root_task(); | |
| ) ) | | if ( tp ) { | |
| internal::forward_task_bypass< limiter_node<T> >( *this | | task *rtask = new ( task::allocate_additional_child_of( | |
| ); | | *tp ) ) | |
| __TBB_ASSERT(!rval, "Have two tasks to handle"); | | internal::forward_task_bypass< limiter_node<T> >( * | |
| return rtask; | | this ); | |
| | | __TBB_ASSERT(!rval, "Have two tasks to handle"); | |
| | | return rtask; | |
| | | } | |
| } | | } | |
|
| | | return rval; | |
| } | | } | |
|
| return rval; | | | |
| } | | } | |
| | | | |
| void forward() { | | void forward() { | |
|
| { | | __TBB_ASSERT(false, "Should never be called"); | |
| spin_mutex::scoped_lock lock(my_mutex); | | return; | |
| if ( my_count < my_threshold ) | | | |
| ++my_count; | | | |
| else | | | |
| return; | | | |
| } | | | |
| task * rtask = decrement_counter(); | | | |
| if(rtask) FLOW_SPAWN(*rtask); | | | |
| } | | } | |
| | | | |
|
| task *forward_task() { | | task * decrement_counter() { | |
| { | | { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
|
| if ( my_count >= my_threshold ) | | if(my_count) --my_count; | |
| return NULL; | | | |
| ++my_count; | | | |
| } | | } | |
|
| task * rtask = decrement_counter(); | | return forward_task(); | |
| return rtask; | | | |
| } | | } | |
| | | | |
| public: | | public: | |
| //! The internal receiver< continue_msg > that decrements the count | | //! The internal receiver< continue_msg > that decrements the count | |
| internal::decrementer< limiter_node<T> > decrement; | | internal::decrementer< limiter_node<T> > decrement; | |
| | | | |
| //! Constructor | | //! Constructor | |
| limiter_node(graph &g, size_t threshold, int num_decrement_predecessors
=0) : | | limiter_node(graph &g, size_t threshold, int num_decrement_predecessors
=0) : | |
|
| graph_node(g), my_threshold(threshold), my_count(0), | | graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), | |
| init_decrement_predecessors(num_decrement_predecessors), | | init_decrement_predecessors(num_decrement_predecessors), | |
| decrement(num_decrement_predecessors) | | decrement(num_decrement_predecessors) | |
| { | | { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| decrement.set_owner(this); | | decrement.set_owner(this); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->m | |
| | | y_graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<receiver<continue_msg> *>(&decrement), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| limiter_node( const limiter_node& src ) : | | limiter_node( const limiter_node& src ) : | |
| graph_node(src.my_graph), receiver<T>(), sender<T>(), | | graph_node(src.my_graph), receiver<T>(), sender<T>(), | |
|
| my_threshold(src.my_threshold), my_count(0), | | my_threshold(src.my_threshold), my_count(0), my_tries(0), | |
| init_decrement_predecessors(src.init_decrement_predecessors), | | init_decrement_predecessors(src.init_decrement_predecessors), | |
| decrement(src.init_decrement_predecessors) | | decrement(src.init_decrement_predecessors) | |
| { | | { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| decrement.set_owner(this); | | decrement.set_owner(this); | |
|
| | | tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->m | |
| | | y_graph, | |
| | | static_cast<receiver<input_type> *>(this), | |
| | | static_cast<receiver<continue_msg> *>(&decrement), | |
| | | static_cast<sender<output_type> *>(this) ) | |
| | | ; | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| //! Replace the current successor with this new successor | | //! Replace the current successor with this new successor | |
| /* override */ bool register_successor( receiver<output_type> &r ) { | | /* override */ bool register_successor( receiver<output_type> &r ) { | |
|
| | | spin_mutex::scoped_lock lock(my_mutex); | |
| | | bool was_empty = my_successors.empty(); | |
| my_successors.register_successor(r); | | my_successors.register_successor(r); | |
|
| | | //spawn a forward task if this is the only successor | |
| | | if ( was_empty && !my_predecessors.empty() && my_count + my_tries < | |
| | | my_threshold ) { | |
| | | task* tp = this->my_graph.root_task(); | |
| | | if ( tp ) { | |
| | | FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *t | |
| | | p ) ) | |
| | | internal::forward_task_bypass < limiter_node<T> | |
| | | >( *this ) ) ); | |
| | | } | |
| | | } | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes a successor from this node | | //! Removes a successor from this node | |
| /** r.remove_predecessor(*this) is also called. */ | | /** r.remove_predecessor(*this) is also called. */ | |
| /* override */ bool remove_successor( receiver<output_type> &r ) { | | /* override */ bool remove_successor( receiver<output_type> &r ) { | |
| r.remove_predecessor(*this); | | r.remove_predecessor(*this); | |
| my_successors.remove_successor(r); | | my_successors.remove_successor(r); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
|
| //! Removes src from the list of cached predecessors. | | //! Adds src to the list of cached predecessors. | |
| /* override */ bool register_predecessor( predecessor_type &src ) { | | /* override */ bool register_predecessor( predecessor_type &src ) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| my_predecessors.add( src ); | | my_predecessors.add( src ); | |
| task* tp = this->my_graph.root_task(); | | task* tp = this->my_graph.root_task(); | |
|
| if ( my_count < my_threshold && !my_successors.empty() && tp ) { | | if ( my_count + my_tries < my_threshold && !my_successors.empty() &
& tp ) { | |
| FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp )
) | | FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp )
) | |
| internal::forward_task_bypass < limiter_node<T> >(
*this ) ) ); | | internal::forward_task_bypass < limiter_node<T> >(
*this ) ) ); | |
| } | | } | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes src from the list of cached predecessors. | | //! Removes src from the list of cached predecessors. | |
| /* override */ bool remove_predecessor( predecessor_type &src ) { | | /* override */ bool remove_predecessor( predecessor_type &src ) { | |
| my_predecessors.remove( src ); | | my_predecessors.remove( src ); | |
| return true; | | return true; | |
| | | | |
| skipping to change at line 1900 | | skipping to change at line 2149 | |
| | | | |
| protected: | | protected: | |
| | | | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| //! Puts an item to this receiver | | //! Puts an item to this receiver | |
| /* override */ task *try_put_task( const T &t ) { | | /* override */ task *try_put_task( const T &t ) { | |
| { | | { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
|
| if ( my_count >= my_threshold ) | | if ( my_count + my_tries >= my_threshold ) | |
| return NULL; | | return NULL; | |
| else | | else | |
|
| ++my_count; | | ++my_tries; | |
| } | | } | |
| | | | |
| task * rtask = my_successors.try_put_task(t); | | task * rtask = my_successors.try_put_task(t); | |
| | | | |
| if ( !rtask ) { // try_put_task failed. | | if ( !rtask ) { // try_put_task failed. | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
|
| --my_count; | | --my_tries; | |
| task* tp = this->my_graph.root_task(); | | task* tp = this->my_graph.root_task(); | |
|
| if ( !my_predecessors.empty() && tp ) { | | if ( check_conditions() && tp ) { | |
| rtask = new ( task::allocate_additional_child_of( *tp ) ) | | rtask = new ( task::allocate_additional_child_of( *tp ) ) | |
| internal::forward_task_bypass< limiter_node<T> >( *this
); | | internal::forward_task_bypass< limiter_node<T> >( *this
); | |
| } | | } | |
| } | | } | |
|
| | | else { | |
| | | spin_mutex::scoped_lock lock(my_mutex); | |
| | | ++my_count; | |
| | | --my_tries; | |
| | | } | |
| return rtask; | | return rtask; | |
| } | | } | |
| | | | |
| /*override*/void reset() { | | /*override*/void reset() { | |
| my_count = 0; | | my_count = 0; | |
| my_predecessors.reset(); | | my_predecessors.reset(); | |
| decrement.reset_receiver(); | | decrement.reset_receiver(); | |
| } | | } | |
| | | | |
| /*override*/void reset_receiver() { my_predecessors.reset(); } | | /*override*/void reset_receiver() { my_predecessors.reset(); } | |
| | | | |
| skipping to change at line 1948 | | skipping to change at line 2202 | |
| template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_
node; | | template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_
node; | |
| | | | |
| template<typename OutputTuple> | | template<typename OutputTuple> | |
| class join_node<OutputTuple,reserving>: public internal::unfolded_join_node
<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, re
serving> { | | class join_node<OutputTuple,reserving>: public internal::unfolded_join_node
<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, re
serving> { | |
| private: | | private: | |
| static const int N = tbb::flow::tuple_size<OutputTuple>::value; | | static const int N = tbb::flow::tuple_size<OutputTuple>::value; | |
| typedef typename internal::unfolded_join_node<N, reserving_port, Output
Tuple, reserving> unfolded_type; | | typedef typename internal::unfolded_join_node<N, reserving_port, Output
Tuple, reserving> unfolded_type; | |
| public: | | public: | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| typedef typename unfolded_type::input_ports_type input_ports_type; | | typedef typename unfolded_type::input_ports_type input_ports_type; | |
|
| join_node(graph &g) : unfolded_type(g) { } | | join_node(graph &g) : unfolded_type(g) { | |
| join_node(const join_node &other) : unfolded_type(other) {} | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_RESERVING, &this->my_graph, | |
| | | this->input_ports(), static_cas | |
| | | t< sender< output_type > *>(this) ); | |
| | | } | |
| | | join_node(const join_node &other) : unfolded_type(other) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_RESERVING, &this->my_graph, | |
| | | this->input_ports(), static_cas | |
| | | t< sender< output_type > *>(this) ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| }; | | }; | |
| | | | |
| template<typename OutputTuple> | | template<typename OutputTuple> | |
| class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<
tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queu
eing> { | | class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<
tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queu
eing> { | |
| private: | | private: | |
| static const int N = tbb::flow::tuple_size<OutputTuple>::value; | | static const int N = tbb::flow::tuple_size<OutputTuple>::value; | |
| typedef typename internal::unfolded_join_node<N, queueing_port, OutputT
uple, queueing> unfolded_type; | | typedef typename internal::unfolded_join_node<N, queueing_port, OutputT
uple, queueing> unfolded_type; | |
| public: | | public: | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| typedef typename unfolded_type::input_ports_type input_ports_type; | | typedef typename unfolded_type::input_ports_type input_ports_type; | |
|
| join_node(graph &g) : unfolded_type(g) { } | | join_node(graph &g) : unfolded_type(g) { | |
| join_node(const join_node &other) : unfolded_type(other) {} | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_QUEUEING, &this->my_graph, | |
| | | this->input_ports(), static_cas | |
| | | t< sender< output_type > *>(this) ); | |
| | | } | |
| | | join_node(const join_node &other) : unfolded_type(other) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_QUEUEING, &this->my_graph, | |
| | | this->input_ports(), static_cas | |
| | | t< sender< output_type > *>(this) ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| }; | | }; | |
| | | | |
| // template for tag_matching join_node | | // template for tag_matching join_node | |
| template<typename OutputTuple> | | template<typename OutputTuple> | |
| class join_node<OutputTuple, tag_matching> : public internal::unfolded_join
_node<tbb::flow::tuple_size<OutputTuple>::value, | | class join_node<OutputTuple, tag_matching> : public internal::unfolded_join
_node<tbb::flow::tuple_size<OutputTuple>::value, | |
| tag_matching_port, OutputTuple, tag_matching> { | | tag_matching_port, OutputTuple, tag_matching> { | |
| private: | | private: | |
| static const int N = tbb::flow::tuple_size<OutputTuple>::value; | | static const int N = tbb::flow::tuple_size<OutputTuple>::value; | |
| typedef typename internal::unfolded_join_node<N, tag_matching_port, Out
putTuple, tag_matching> unfolded_type; | | typedef typename internal::unfolded_join_node<N, tag_matching_port, Out
putTuple, tag_matching> unfolded_type; | |
| public: | | public: | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| typedef typename unfolded_type::input_ports_type input_ports_type; | | typedef typename unfolded_type::input_ports_type input_ports_type; | |
|
| | | | |
| template<typename __TBB_B0, typename __TBB_B1> | | template<typename __TBB_B0, typename __TBB_B1> | |
|
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1 | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1 | |
| ) { } | | ) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2> | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2> | |
|
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_t | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_t | |
| ype(g, b0, b1, b2) { } | | ype(g, b0, b1, b2) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3> | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3> | |
|
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) | |
| : unfolded_type(g, b0, b1, b2, b3) { } | | : unfolded_type(g, b0, b1, b2, b3) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4> | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4> | |
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4) : | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4) : | |
|
| unfolded_type(g, b0, b1, b2, b3, b4) { } | | unfolded_type(g, b0, b1, b2, b3, b4) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| #if __TBB_VARIADIC_MAX >= 6 | | #if __TBB_VARIADIC_MAX >= 6 | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | |
| typename __TBB_B5> | | typename __TBB_B5> | |
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5) : | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5) : | |
|
| unfolded_type(g, b0, b1, b2, b3, b4, b5) { } | | unfolded_type(g, b0, b1, b2, b3, b4, b5) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| #endif | | #endif | |
| #if __TBB_VARIADIC_MAX >= 7 | | #if __TBB_VARIADIC_MAX >= 7 | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | |
| typename __TBB_B5, typename __TBB_B6> | | typename __TBB_B5, typename __TBB_B6> | |
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) : | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) : | |
|
| unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { } | | unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| #endif | | #endif | |
| #if __TBB_VARIADIC_MAX >= 8 | | #if __TBB_VARIADIC_MAX >= 8 | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | |
| typename __TBB_B5, typename __TBB_B6, typename __TBB_B7> | | typename __TBB_B5, typename __TBB_B6, typename __TBB_B7> | |
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, | |
|
| __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) | | __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) | |
| { } | | { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| #endif | | #endif | |
| #if __TBB_VARIADIC_MAX >= 9 | | #if __TBB_VARIADIC_MAX >= 9 | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | |
| typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename _
_TBB_B8> | | typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename _
_TBB_B8> | |
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, | |
|
| __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4 | | __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4 | |
| , b5, b6, b7, b8) { } | | , b5, b6, b7, b8) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| #endif | | #endif | |
| #if __TBB_VARIADIC_MAX >= 10 | | #if __TBB_VARIADIC_MAX >= 10 | |
| template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | | template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typen
ame __TBB_B3, typename __TBB_B4, | |
| typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename _
_TBB_B8, typename __TBB_B9> | | typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename _
_TBB_B8, typename __TBB_B9> | |
| join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, | | join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3,
__TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, | |
|
| __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b | | __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b | |
| 1, b2, b3, b4, b5, b6, b7, b8, b9) { } | | 1, b2, b3, b4, b5, b6, b7, b8, b9) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| #endif | | #endif | |
|
| join_node(const join_node &other) : unfolded_type(other) {} | | join_node(const join_node &other) : unfolded_type(other) { | |
| | | tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::F | |
| | | LOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, | |
| | | this->input_port | |
| | | s(), static_cast< sender< output_type > *>(this) ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| }; | | }; | |
| | | | |
| #if TBB_PREVIEW_GRAPH_NODES | | #if TBB_PREVIEW_GRAPH_NODES | |
| // or node | | // or node | |
| #include "internal/_flow_graph_or_impl.h" | | #include "internal/_flow_graph_or_impl.h" | |
| | | | |
| template<typename InputTuple> | | template<typename InputTuple> | |
| class or_node : public internal::unfolded_or_node<InputTuple> { | | class or_node : public internal::unfolded_or_node<InputTuple> { | |
| private: | | private: | |
| static const int N = tbb::flow::tuple_size<InputTuple>::value; | | static const int N = tbb::flow::tuple_size<InputTuple>::value; | |
| public: | | public: | |
| typedef typename internal::or_output_type<InputTuple>::type output_type
; | | typedef typename internal::or_output_type<InputTuple>::type output_type
; | |
| typedef typename internal::unfolded_or_node<InputTuple> unfolded_type; | | typedef typename internal::unfolded_or_node<InputTuple> unfolded_type; | |
|
| or_node(graph& g) : unfolded_type(g) { } | | or_node(graph& g) : unfolded_type(g) { | |
| | | tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FL | |
| | | OW_OR_NODE, &this->my_graph, | |
| | | this->input_ports(), static_cast | |
| | | < sender< output_type > *>(this) ); | |
| | | } | |
| // Copy constructor | | // Copy constructor | |
|
| or_node( const or_node& other ) : unfolded_type(other) { } | | or_node( const or_node& other ) : unfolded_type(other) { | |
| | | tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FL | |
| | | OW_OR_NODE, &this->my_graph, | |
| | | this->input_ports(), static_cast | |
| | | < sender< output_type > *>(this) ); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| }; | | }; | |
| #endif // TBB_PREVIEW_GRAPH_NODES | | #endif // TBB_PREVIEW_GRAPH_NODES | |
| | | | |
| //! Makes an edge between a single predecessor and a single successor | | //! Makes an edge between a single predecessor and a single successor | |
| template< typename T > | | template< typename T > | |
| inline void make_edge( sender<T> &p, receiver<T> &s ) { | | inline void make_edge( sender<T> &p, receiver<T> &s ) { | |
| p.register_successor( s ); | | p.register_successor( s ); | |
|
| | | tbb::internal::fgt_make_edge( &p, &s ); | |
| } | | } | |
| | | | |
| //! Makes an edge between a single predecessor and a single successor | | //! Makes an edge between a single predecessor and a single successor | |
| template< typename T > | | template< typename T > | |
| inline void remove_edge( sender<T> &p, receiver<T> &s ) { | | inline void remove_edge( sender<T> &p, receiver<T> &s ) { | |
| p.remove_successor( s ); | | p.remove_successor( s ); | |
|
| | | tbb::internal::fgt_remove_edge( &p, &s ); | |
| } | | } | |
| | | | |
| //! Returns a copy of the body from a function or continue node | | //! Returns a copy of the body from a function or continue node | |
| template< typename Body, typename Node > | | template< typename Body, typename Node > | |
| Body copy_body( Node &n ) { | | Body copy_body( Node &n ) { | |
| return n.template copy_function_object<Body>(); | | return n.template copy_function_object<Body>(); | |
| } | | } | |
| | | | |
| } // interface7 | | } // interface7 | |
| | | | |
| | | | |
End of changes. 82 change blocks. |
| 101 lines changed or deleted | | 544 lines changed or added | |
|