| _flow_graph_join_impl.h | | _flow_graph_join_impl.h | |
| | | | |
| skipping to change at line 127 | | skipping to change at line 127 | |
| tbb::flow::get<N-1>(my_inputs).set_my_original_tag_func(tbb
::flow::get<N-1>(other_inputs).my_original_func()->clone()); | | tbb::flow::get<N-1>(my_inputs).set_my_original_tag_func(tbb
::flow::get<N-1>(other_inputs).my_original_func()->clone()); | |
| } | | } | |
| join_helper<N-1>::copy_tag_functors(my_inputs, other_inputs); | | join_helper<N-1>::copy_tag_functors(my_inputs, other_inputs); | |
| } | | } | |
| | | | |
| template<typename InputTuple> | | template<typename InputTuple> | |
| static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE
T_ARG(__TBB_COMMA reset_flags f)) { | | static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE
T_ARG(__TBB_COMMA reset_flags f)) { | |
| join_helper<N-1>::reset_inputs(my_input __TBB_PFG_RESET_ARG(__T
BB_COMMA f)); | | join_helper<N-1>::reset_inputs(my_input __TBB_PFG_RESET_ARG(__T
BB_COMMA f)); | |
| tbb::flow::get<N-1>(my_input).reset_receiver(__TBB_PFG_RESET_AR
G(f)); | | tbb::flow::get<N-1>(my_input).reset_receiver(__TBB_PFG_RESET_AR
G(f)); | |
| } | | } | |
|
| }; | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | template<typename InputTuple> | |
| | | static inline void extract_inputs(InputTuple &my_input) { | |
| | | join_helper<N-1>::extract_inputs(my_input); | |
| | | tbb::flow::get<N-1>(my_input).extract_receiver(); | |
| | | } | |
| | | #endif | |
| | | }; // join_helper<N> | |
| | | | |
| template< > | | template< > | |
| struct join_helper<1> { | | struct join_helper<1> { | |
| | | | |
| template< typename TupleType, typename PortType > | | template< typename TupleType, typename PortType > | |
| static inline void set_join_node_pointer(TupleType &my_input, PortT
ype *port) { | | static inline void set_join_node_pointer(TupleType &my_input, PortT
ype *port) { | |
| tbb::flow::get<0>( my_input ).set_join_node_pointer(port); | | tbb::flow::get<0>( my_input ).set_join_node_pointer(port); | |
| } | | } | |
| | | | |
| template< typename TupleType > | | template< typename TupleType > | |
| | | | |
| skipping to change at line 195 | | skipping to change at line 203 | |
| static inline void copy_tag_functors(TagFuncTuple1 &my_inputs, TagF
uncTuple2 &other_inputs) { | | static inline void copy_tag_functors(TagFuncTuple1 &my_inputs, TagF
uncTuple2 &other_inputs) { | |
| if(tbb::flow::get<0>(other_inputs).my_original_func()) { | | if(tbb::flow::get<0>(other_inputs).my_original_func()) { | |
| tbb::flow::get<0>(my_inputs).set_my_tag_func(tbb::flow::get
<0>(other_inputs).my_original_func()->clone()); | | tbb::flow::get<0>(my_inputs).set_my_tag_func(tbb::flow::get
<0>(other_inputs).my_original_func()->clone()); | |
| tbb::flow::get<0>(my_inputs).set_my_original_tag_func(tbb::
flow::get<0>(other_inputs).my_original_func()->clone()); | | tbb::flow::get<0>(my_inputs).set_my_original_tag_func(tbb::
flow::get<0>(other_inputs).my_original_func()->clone()); | |
| } | | } | |
| } | | } | |
| template<typename InputTuple> | | template<typename InputTuple> | |
| static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE
T_ARG(__TBB_COMMA reset_flags f)) { | | static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE
T_ARG(__TBB_COMMA reset_flags f)) { | |
| tbb::flow::get<0>(my_input).reset_receiver(__TBB_PFG_RESET_ARG(
f)); | | tbb::flow::get<0>(my_input).reset_receiver(__TBB_PFG_RESET_ARG(
f)); | |
| } | | } | |
|
| }; | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | template<typename InputTuple> | |
| | | static inline void extract_inputs(InputTuple &my_input) { | |
| | | tbb::flow::get<0>(my_input).extract_receiver(); | |
| | | } | |
| | | #endif | |
| | | }; // join_helper<1> | |
| | | | |
| //! The two-phase join port | | //! The two-phase join port | |
| template< typename T > | | template< typename T > | |
| class reserving_port : public receiver<T> { | | class reserving_port : public receiver<T> { | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef sender<T> predecessor_type; | | typedef sender<T> predecessor_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | | typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | |
|
| | | typedef typename receiver<input_type>::built_predecessors_type buil
t_predecessors_type; | |
| #endif | | #endif | |
| private: | | private: | |
| // ----------- Aggregator ------------ | | // ----------- Aggregator ------------ | |
| enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res | | enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy | | , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy | |
| #endif | | #endif | |
| }; | | }; | |
| enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | | enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | |
| typedef reserving_port<T> my_class; | | typedef reserving_port<T> my_class; | |
| | | | |
| skipping to change at line 367 | | skipping to change at line 383 | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| //! Complete use of the port | | //! Complete use of the port | |
| void consume( ) { | | void consume( ) { | |
| reserving_port_operation op_data(con_res); | | reserving_port_operation op_data(con_res); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | /*override*/ built_predecessors_type &built_predecessors() { return
my_predecessors.built_predecessors(); } | |
| /*override*/void internal_add_built_predecessor(predecessor_type &s
rc) { | | /*override*/void internal_add_built_predecessor(predecessor_type &s
rc) { | |
| reserving_port_operation op_data(src, add_blt_pred); | | reserving_port_operation op_data(src, add_blt_pred); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| /*override*/void internal_delete_built_predecessor(predecessor_type
&src) { | | /*override*/void internal_delete_built_predecessor(predecessor_type
&src) { | |
| reserving_port_operation op_data(src, del_blt_pred); | | reserving_port_operation op_data(src, del_blt_pred); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 388 | | skipping to change at line 405 | |
| reserving_port_operation op_data(blt_pred_cnt); | | reserving_port_operation op_data(blt_pred_cnt); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return op_data.cnt_val; | | return op_data.cnt_val; | |
| } | | } | |
| | | | |
| /*override*/void copy_predecessors(predecessor_list_type &l) { | | /*override*/void copy_predecessors(predecessor_list_type &l) { | |
| reserving_port_operation op_data(blt_pred_cpy); | | reserving_port_operation op_data(blt_pred_cpy); | |
| op_data.plist = &l; | | op_data.plist = &l; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
|
| | | | |
| | | void extract_receiver() { | |
| | | my_predecessors.built_predecessors().receiver_extract(*this); | |
| | | } | |
| | | | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f)
) { | | /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f)
) { | |
|
| my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | if(f & rf_clear_edges) my_predecessors.clear(); | |
| | | else | |
| | | #endif | |
| | | my_predecessors.reset(); | |
| reserved = false; | | reserved = false; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| __TBB_ASSERT(!(f&rf_extract) || my_predecessors.empty(), "port
edges not removed"); | | __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "p
ort edges not removed"); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| private: | | private: | |
| forwarding_base *my_join; | | forwarding_base *my_join; | |
| reservable_predecessor_cache< T, null_mutex > my_predecessors; | | reservable_predecessor_cache< T, null_mutex > my_predecessors; | |
| bool reserved; | | bool reserved; | |
|
| }; | | }; // reserving_port | |
| | | | |
| //! queueing join_port | | //! queueing join_port | |
| template<typename T> | | template<typename T> | |
| class queueing_port : public receiver<T>, public item_buffer<T> { | | class queueing_port : public receiver<T>, public item_buffer<T> { | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef sender<T> predecessor_type; | | typedef sender<T> predecessor_type; | |
| typedef queueing_port<T> my_node_type; | | typedef queueing_port<T> my_node_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename receiver<input_type>::built_predecessors_type buil
t_predecessors_type; | |
| typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | | typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | |
| #endif | | #endif | |
| | | | |
| // ----------- Aggregator ------------ | | // ----------- Aggregator ------------ | |
| private: | | private: | |
| enum op_type { get__item, res_port, try__put_task | | enum op_type { get__item, res_port, try__put_task | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy | | , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy | |
| #endif | | #endif | |
| }; | | }; | |
| | | | |
| skipping to change at line 560 | | skipping to change at line 587 | |
| | | | |
| // reset_port is called when item is accepted by successor, but | | // reset_port is called when item is accepted by successor, but | |
| // is initiated by join_node. | | // is initiated by join_node. | |
| void reset_port() { | | void reset_port() { | |
| queueing_port_operation op_data(res_port); | | queueing_port_operation op_data(res_port); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return; | | return; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | /*override*/ built_predecessors_type &built_predecessors() { return | |
| | | my_built_predecessors; } | |
| | | | |
| /*override*/void internal_add_built_predecessor(sender<T> &p) { | | /*override*/void internal_add_built_predecessor(sender<T> &p) { | |
| queueing_port_operation op_data(add_blt_pred); | | queueing_port_operation op_data(add_blt_pred); | |
| op_data.pred = &p; | | op_data.pred = &p; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| /*override*/void internal_delete_built_predecessor(sender<T> &p) { | | /*override*/void internal_delete_built_predecessor(sender<T> &p) { | |
| queueing_port_operation op_data(del_blt_pred); | | queueing_port_operation op_data(del_blt_pred); | |
| op_data.pred = &p; | | op_data.pred = &p; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| | | | |
| skipping to change at line 584 | | skipping to change at line 613 | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return op_data.cnt_val; | | return op_data.cnt_val; | |
| } | | } | |
| | | | |
| /*override*/void copy_predecessors(predecessor_list_type &l) { | | /*override*/void copy_predecessors(predecessor_list_type &l) { | |
| queueing_port_operation op_data(blt_pred_cpy); | | queueing_port_operation op_data(blt_pred_cpy); | |
| op_data.plist = &l; | | op_data.plist = &l; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
|
| | | void extract_receiver() { | |
| | | item_buffer<T>::reset(); | |
| | | my_built_predecessors.receiver_extract(*this); | |
| | | } | |
| | | | |
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f))
{ | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f))
{ | |
| item_buffer<T>::reset(); | | item_buffer<T>::reset(); | |
|
| if (f & rf_extract) | | if (f & rf_clear_edges) | |
| my_built_predecessors.receiver_extract(*this); | | my_built_predecessors.clear(); | |
| } | | } | |
| #else | | #else | |
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f
*/)) { item_buffer<T>::reset(); } | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f
*/)) { item_buffer<T>::reset(); } | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| private: | | private: | |
| forwarding_base *my_join; | | forwarding_base *my_join; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| edge_container<sender<T> > my_built_predecessors; | | edge_container<sender<T> > my_built_predecessors; | |
| #endif | | #endif | |
|
| }; | | }; // queueing_port | |
| | | | |
| #include "_flow_graph_tagged_buffer_impl.h" | | #include "_flow_graph_tagged_buffer_impl.h" | |
| | | | |
| template< typename T > | | template< typename T > | |
| class tag_matching_port : public receiver<T>, public tagged_buffer< tag
_value, T, NO_TAG > { | | class tag_matching_port : public receiver<T>, public tagged_buffer< tag
_value, T, NO_TAG > { | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef sender<T> predecessor_type; | | typedef sender<T> predecessor_type; | |
| typedef tag_matching_port<T> my_node_type; // for forwarding, if n
eeded | | typedef tag_matching_port<T> my_node_type; // for forwarding, if n
eeded | |
| typedef function_body<input_type, tag_value> my_tag_func_type; | | typedef function_body<input_type, tag_value> my_tag_func_type; | |
| typedef tagged_buffer<tag_value,T,NO_TAG> my_buffer_type; | | typedef tagged_buffer<tag_value,T,NO_TAG> my_buffer_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename receiver<input_type>::built_predecessors_type buil
t_predecessors_type; | |
| typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | | typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | |
| #endif | | #endif | |
| private: | | private: | |
| // ----------- Aggregator ------------ | | // ----------- Aggregator ------------ | |
| private: | | private: | |
| enum op_type { try__put, get__item, res_port, | | enum op_type { try__put, get__item, res_port, | |
| add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy | | add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy | |
| }; | | }; | |
| enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | | enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | |
| typedef tag_matching_port<T> my_class; | | typedef tag_matching_port<T> my_class; | |
| | | | |
| skipping to change at line 752 | | skipping to change at line 787 | |
| my_tag_func = f; | | my_tag_func = f; | |
| } | | } | |
| | | | |
| bool get_item( T &v ) { | | bool get_item( T &v ) { | |
| tag_matching_port_operation op_data(&v, get__item); | | tag_matching_port_operation op_data(&v, get__item); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return op_data.status == SUCCEEDED; | | return op_data.status == SUCCEEDED; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | /*override*/built_predecessors_type &built_predecessors() { return | |
| | | my_built_predecessors; } | |
| | | | |
| /*override*/void internal_add_built_predecessor(sender<T> &p) { | | /*override*/void internal_add_built_predecessor(sender<T> &p) { | |
| tag_matching_port_operation op_data(add_blt_pred); | | tag_matching_port_operation op_data(add_blt_pred); | |
| op_data.pred = &p; | | op_data.pred = &p; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| /*override*/void internal_delete_built_predecessor(sender<T> &p) { | | /*override*/void internal_delete_built_predecessor(sender<T> &p) { | |
| tag_matching_port_operation op_data(del_blt_pred); | | tag_matching_port_operation op_data(del_blt_pred); | |
| op_data.pred = &p; | | op_data.pred = &p; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| | | | |
| skipping to change at line 789 | | skipping to change at line 826 | |
| void reset_port() { | | void reset_port() { | |
| tag_matching_port_operation op_data(res_port); | | tag_matching_port_operation op_data(res_port); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return; | | return; | |
| } | | } | |
| | | | |
| my_tag_func_type *my_func() { return my_tag_func; } | | my_tag_func_type *my_func() { return my_tag_func; } | |
| my_tag_func_type *my_original_func() { return my_original_tag_func;
} | | my_tag_func_type *my_original_func() { return my_original_tag_func;
} | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | void extract_receiver() { | |
| | | my_buffer_type::reset(); | |
| | | my_built_predecessors.receiver_extract(*this); | |
| | | } | |
| | | | |
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f))
{ | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f))
{ | |
| my_buffer_type::reset(); | | my_buffer_type::reset(); | |
|
| if (f & rf_extract) | | if (f & rf_clear_edges) | |
| my_built_predecessors.receiver_extract(*this); | | my_built_predecessors.clear(); | |
| } | | } | |
| #else | | #else | |
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f
*/)) { my_buffer_type::reset(); } | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f
*/)) { my_buffer_type::reset(); } | |
| #endif | | #endif | |
| | | | |
| private: | | private: | |
| // need map of tags to values | | // need map of tags to values | |
| forwarding_base *my_join; | | forwarding_base *my_join; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| edge_container<predecessor_type> my_built_predecessors; | | edge_container<predecessor_type> my_built_predecessors; | |
| | | | |
| skipping to change at line 865 | | skipping to change at line 907 | |
| input_type &input_ports() { return my_inputs; } | | input_type &input_ports() { return my_inputs; } | |
| | | | |
| protected: | | protected: | |
| | | | |
| void reset( __TBB_PFG_RESET_ARG( reset_flags f)) { | | void reset( __TBB_PFG_RESET_ARG( reset_flags f)) { | |
| // called outside of parallel contexts | | // called outside of parallel contexts | |
| ports_with_no_inputs = N; | | ports_with_no_inputs = N; | |
| join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T
BB_COMMA f)); | | join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T
BB_COMMA f)); | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | void extract( ) { | |
| | | // called outside of parallel contexts | |
| | | ports_with_no_inputs = N; | |
| | | join_helper<N>::extract_inputs(my_inputs); | |
| | | } | |
| | | #endif | |
| | | | |
| // all methods on input ports should be called under mutual exclusi
on from join_node_base. | | // all methods on input ports should be called under mutual exclusi
on from join_node_base. | |
| | | | |
| bool tuple_build_may_succeed() { | | bool tuple_build_may_succeed() { | |
| return !ports_with_no_inputs; | | return !ports_with_no_inputs; | |
| } | | } | |
| | | | |
| bool try_to_make_tuple(output_type &out) { | | bool try_to_make_tuple(output_type &out) { | |
| if(ports_with_no_inputs) return false; | | if(ports_with_no_inputs) return false; | |
| return join_helper<N>::reserve(my_inputs, out); | | return join_helper<N>::reserve(my_inputs, out); | |
| } | | } | |
| | | | |
| skipping to change at line 886 | | skipping to change at line 936 | |
| void tuple_accepted() { | | void tuple_accepted() { | |
| join_helper<N>::consume_reservations(my_inputs); | | join_helper<N>::consume_reservations(my_inputs); | |
| } | | } | |
| void tuple_rejected() { | | void tuple_rejected() { | |
| join_helper<N>::release_reservations(my_inputs); | | join_helper<N>::release_reservations(my_inputs); | |
| } | | } | |
| | | | |
| input_type my_inputs; | | input_type my_inputs; | |
| my_node_type *my_node; | | my_node_type *my_node; | |
| atomic<size_t> ports_with_no_inputs; | | atomic<size_t> ports_with_no_inputs; | |
|
| }; | | }; // join_node_FE<reserving, ... > | |
| | | | |
| template<typename InputTuple, typename OutputTuple> | | template<typename InputTuple, typename OutputTuple> | |
| class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi
ng_base { | | class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi
ng_base { | |
| public: | | public: | |
| static const int N = tbb::flow::tuple_size<OutputTuple>::value; | | static const int N = tbb::flow::tuple_size<OutputTuple>::value; | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| typedef InputTuple input_type; | | typedef InputTuple input_type; | |
| typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t
ype; // for forwarding | | typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t
ype; // for forwarding | |
| | | | |
| join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) { | | join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) { | |
| | | | |
| skipping to change at line 939 | | skipping to change at line 989 | |
| | | | |
| input_type &input_ports() { return my_inputs; } | | input_type &input_ports() { return my_inputs; } | |
| | | | |
| protected: | | protected: | |
| | | | |
| void reset( __TBB_PFG_RESET_ARG( reset_flags f)) { | | void reset( __TBB_PFG_RESET_ARG( reset_flags f)) { | |
| reset_port_count(); | | reset_port_count(); | |
| join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T
BB_COMMA f) ); | | join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T
BB_COMMA f) ); | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | void extract() { | |
| | | reset_port_count(); | |
| | | join_helper<N>::extract_inputs(my_inputs); | |
| | | } | |
| | | #endif | |
| // all methods on input ports should be called under mutual exclusi
on from join_node_base. | | // all methods on input ports should be called under mutual exclusi
on from join_node_base. | |
| | | | |
| bool tuple_build_may_succeed() { | | bool tuple_build_may_succeed() { | |
| return !ports_with_no_items; | | return !ports_with_no_items; | |
| } | | } | |
| | | | |
| bool try_to_make_tuple(output_type &out) { | | bool try_to_make_tuple(output_type &out) { | |
| if(ports_with_no_items) return false; | | if(ports_with_no_items) return false; | |
| return join_helper<N>::get_items(my_inputs, out); | | return join_helper<N>::get_items(my_inputs, out); | |
| } | | } | |
| | | | |
| skipping to change at line 961 | | skipping to change at line 1017 | |
| reset_port_count(); | | reset_port_count(); | |
| join_helper<N>::reset_ports(my_inputs); | | join_helper<N>::reset_ports(my_inputs); | |
| } | | } | |
| void tuple_rejected() { | | void tuple_rejected() { | |
| // nothing to do. | | // nothing to do. | |
| } | | } | |
| | | | |
| input_type my_inputs; | | input_type my_inputs; | |
| my_node_type *my_node; | | my_node_type *my_node; | |
| atomic<size_t> ports_with_no_items; | | atomic<size_t> ports_with_no_items; | |
|
| }; | | }; // join_node_FE<queueing, ...> | |
| | | | |
| // tag_matching join input port. | | // tag_matching join input port. | |
| template<typename InputTuple, typename OutputTuple> | | template<typename InputTuple, typename OutputTuple> | |
| class join_node_FE<tag_matching, InputTuple, OutputTuple> : public forw
arding_base, | | class join_node_FE<tag_matching, InputTuple, OutputTuple> : public forw
arding_base, | |
| // buffer of tag value counts buffer
of output items | | // buffer of tag value counts buffer
of output items | |
| public tagged_buffer<tag_value, size_t, NO_TAG>, public item_b
uffer<OutputTuple> { | | public tagged_buffer<tag_value, size_t, NO_TAG>, public item_b
uffer<OutputTuple> { | |
| public: | | public: | |
| static const int N = tbb::flow::tuple_size<OutputTuple>::value; | | static const int N = tbb::flow::tuple_size<OutputTuple>::value; | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| typedef InputTuple input_type; | | typedef InputTuple input_type; | |
| | | | |
| skipping to change at line 1135 | | skipping to change at line 1191 | |
| | | | |
| void reset( __TBB_PFG_RESET_ARG( reset_flags f )) { | | void reset( __TBB_PFG_RESET_ARG( reset_flags f )) { | |
| // called outside of parallel contexts | | // called outside of parallel contexts | |
| join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T
BB_COMMA f)); | | join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T
BB_COMMA f)); | |
| | | | |
| my_tag_buffer::reset(); // have to reset the tag counts | | my_tag_buffer::reset(); // have to reset the tag counts | |
| output_buffer_type::reset(); // also the queue of outputs | | output_buffer_type::reset(); // also the queue of outputs | |
| my_node->current_tag = NO_TAG; | | my_node->current_tag = NO_TAG; | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | void extract() { | |
| | | // called outside of parallel contexts | |
| | | join_helper<N>::extract_inputs(my_inputs); | |
| | | my_tag_buffer::reset(); // have to reset the tag counts | |
| | | output_buffer_type::reset(); // also the queue of outputs | |
| | | my_node->current_tag = NO_TAG; | |
| | | } | |
| | | #endif | |
| // all methods on input ports should be called under mutual exclusi
on from join_node_base. | | // all methods on input ports should be called under mutual exclusi
on from join_node_base. | |
| | | | |
| bool tuple_build_may_succeed() { // called from back-end | | bool tuple_build_may_succeed() { // called from back-end | |
| tag_matching_FE_operation op_data(may_succeed); | | tag_matching_FE_operation op_data(may_succeed); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return op_data.status == SUCCEEDED; | | return op_data.status == SUCCEEDED; | |
| } | | } | |
| | | | |
| // cannot lock while calling back to input_ports. current_tag will
only be set | | // cannot lock while calling back to input_ports. current_tag will
only be set | |
| // and reset under the aggregator, so it will remain consistent. | | // and reset under the aggregator, so it will remain consistent. | |
| | | | |
| skipping to change at line 1179 | | skipping to change at line 1244 | |
| public: | | public: | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| | | | |
| typedef receiver<output_type> successor_type; | | typedef receiver<output_type> successor_type; | |
| typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type; | | typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type; | |
| using input_ports_type::tuple_build_may_succeed; | | using input_ports_type::tuple_build_may_succeed; | |
| using input_ports_type::try_to_make_tuple; | | using input_ports_type::try_to_make_tuple; | |
| using input_ports_type::tuple_accepted; | | using input_ports_type::tuple_accepted; | |
| using input_ports_type::tuple_rejected; | | using input_ports_type::tuple_rejected; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename sender<output_type>::built_successors_type built_s
uccessors_type; | |
| typedef typename sender<output_type>::successor_list_type successor
_list_type; | | typedef typename sender<output_type>::successor_list_type successor
_list_type; | |
| #endif | | #endif | |
| | | | |
| private: | | private: | |
| // ----------- Aggregator ------------ | | // ----------- Aggregator ------------ | |
| enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypas
s | | enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypas
s | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy | | , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy | |
| #endif | | #endif | |
| }; | | }; | |
| | | | |
| skipping to change at line 1337 | | skipping to change at line 1403 | |
| return op_data.status == SUCCEEDED; | | return op_data.status == SUCCEEDED; | |
| } | | } | |
| | | | |
| bool try_get( output_type &v) { | | bool try_get( output_type &v) { | |
| join_node_base_operation op_data(v, try__get); | | join_node_base_operation op_data(v, try__get); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return op_data.status == SUCCEEDED; | | return op_data.status == SUCCEEDED; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | /*override*/built_successors_type &built_successors() { return my_s | |
| | | uccessors.built_successors(); } | |
| | | | |
| /*override*/void internal_add_built_successor( successor_type &r) { | | /*override*/void internal_add_built_successor( successor_type &r) { | |
| join_node_base_operation op_data(r, add_blt_succ); | | join_node_base_operation op_data(r, add_blt_succ); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| /*override*/void internal_delete_built_successor( successor_type &r
) { | | /*override*/void internal_delete_built_successor( successor_type &r
) { | |
| join_node_base_operation op_data(r, del_blt_succ); | | join_node_base_operation op_data(r, del_blt_succ); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 1360 | | skipping to change at line 1428 | |
| return op_data.cnt_val; | | return op_data.cnt_val; | |
| } | | } | |
| | | | |
| /*override*/ void copy_successors(successor_list_type &l) { | | /*override*/ void copy_successors(successor_list_type &l) { | |
| join_node_base_operation op_data(blt_succ_cpy); | | join_node_base_operation op_data(blt_succ_cpy); | |
| op_data.slist = &l; | | op_data.slist = &l; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override*/void extract() { | |
| | | input_ports_type::extract(); | |
| | | my_successors.built_successors().sender_extract(*this); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| | | | |
|
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) { | |
| input_ports_type::reset(__TBB_PFG_RESET_ARG(f)); | | input_ports_type::reset(__TBB_PFG_RESET_ARG(f)); | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| my_successors.reset(f); | | if(f & rf_clear_edges) my_successors.clear(); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| private: | | private: | |
| broadcast_cache<output_type, null_rw_mutex> my_successors; | | broadcast_cache<output_type, null_rw_mutex> my_successors; | |
| | | | |
| friend class forward_task_bypass< join_node_base<JP, InputTuple, Ou
tputTuple> >; | | friend class forward_task_bypass< join_node_base<JP, InputTuple, Ou
tputTuple> >; | |
| task *forward_task() { | | task *forward_task() { | |
| join_node_base_operation op_data(do_fwrd_bypass); | | join_node_base_operation op_data(do_fwrd_bypass); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| | | | |
End of changes. 27 change blocks. |
| 14 lines changed or deleted | | 92 lines changed or added | |
|
| _flow_graph_node_impl.h | | _flow_graph_node_impl.h | |
| | | | |
| skipping to change at line 68 | | skipping to change at line 68 | |
| blt_pred_cnt, blt_pred_cpy // create vector copies of preds a
nd succs | | blt_pred_cnt, blt_pred_cpy // create vector copies of preds a
nd succs | |
| #endif | | #endif | |
| }; | | }; | |
| typedef function_input_base<Input, A, ImplType> my_class; | | typedef function_input_base<Input, A, ImplType> my_class; | |
| | | | |
| public: | | public: | |
| | | | |
| //! The input type of this receiver | | //! The input type of this receiver | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef sender<Input> predecessor_type; | | typedef sender<Input> predecessor_type; | |
|
| | | typedef predecessor_cache<input_type, null_mutex > predecessor_cach | |
| | | e_type; | |
| | | typedef function_input_queue<input_type, A> input_queue_type; | |
| | | typedef typename A::template rebind< input_queue_type >::other queu | |
| | | e_allocator_type; | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename predecessor_cache_type::built_predecessors_type bu
ilt_predecessors_type; | |
| typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | | typedef typename receiver<input_type>::predecessor_list_type predec
essor_list_type; | |
| #endif | | #endif | |
| | | | |
| //! Constructor for function_input_base | | //! Constructor for function_input_base | |
|
| function_input_base( graph &g, size_t max_concurrency, function_inp
ut_queue<input_type,A> *q = NULL ) | | function_input_base( graph &g, size_t max_concurrency, input_queue_
type *q = NULL) | |
| : my_graph(g), my_max_concurrency(max_concurrency), my_concurre
ncy(0), | | : my_graph(g), my_max_concurrency(max_concurrency), my_concurre
ncy(0), | |
| my_queue(q), forwarder_busy(false) { | | my_queue(q), forwarder_busy(false) { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
|
| function_input_base( const function_input_base& src, function_input
_queue<input_type,A> *q = NULL ) : | | function_input_base( const function_input_base& src, input_queue_ty
pe *q = NULL) : | |
| receiver<Input>(), tbb::internal::no_assign(), | | receiver<Input>(), tbb::internal::no_assign(), | |
| my_graph(src.my_graph), my_max_concurrency(src.my_max_concurren
cy), | | my_graph(src.my_graph), my_max_concurrency(src.my_max_concurren
cy), | |
| my_concurrency(0), my_queue(q), forwarder_busy(false) | | my_concurrency(0), my_queue(q), forwarder_busy(false) | |
| { | | { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| //! Destructor | | //! Destructor | |
|
| | | // The queue is allocated by the constructor for {multi}function_no | |
| | | de. | |
| | | // TODO: pass the graph_buffer_policy to the base so it can allocat | |
| | | e the queue instead. | |
| | | // This would be an interface-breaking change. | |
| virtual ~function_input_base() { | | virtual ~function_input_base() { | |
| if ( my_queue ) delete my_queue; | | if ( my_queue ) delete my_queue; | |
| } | | } | |
| | | | |
| //! Put to the node, returning a task if available | | //! Put to the node, returning a task if available | |
| virtual task * try_put_task( const input_type &t ) { | | virtual task * try_put_task( const input_type &t ) { | |
| if ( my_max_concurrency == 0 ) { | | if ( my_max_concurrency == 0 ) { | |
| return create_body_task( t ); | | return create_body_task( t ); | |
| } else { | | } else { | |
| my_operation op_data(t, tryput_bypass); | | my_operation op_data(t, tryput_bypass); | |
| | | | |
| skipping to change at line 152 | | skipping to change at line 159 | |
| my_operation op_data(blt_pred_cnt); | | my_operation op_data(blt_pred_cnt); | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| return op_data.cnt_val; | | return op_data.cnt_val; | |
| } | | } | |
| | | | |
| /*override*/ void copy_predecessors(predecessor_list_type &v) { | | /*override*/ void copy_predecessors(predecessor_list_type &v) { | |
| my_operation op_data(blt_pred_cpy); | | my_operation op_data(blt_pred_cpy); | |
| op_data.predv = &v; | | op_data.predv = &v; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
|
| | | | |
| | | /*override*/built_predecessors_type &built_predecessors() { | |
| | | return my_predecessors.built_predecessors(); | |
| | | } | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| protected: | | protected: | |
| | | | |
| void reset_function_input_base( __TBB_PFG_RESET_ARG(reset_flags f))
{ | | void reset_function_input_base( __TBB_PFG_RESET_ARG(reset_flags f))
{ | |
| my_concurrency = 0; | | my_concurrency = 0; | |
| if(my_queue) { | | if(my_queue) { | |
| my_queue->reset(); | | my_queue->reset(); | |
| } | | } | |
| reset_receiver(__TBB_PFG_RESET_ARG(f)); | | reset_receiver(__TBB_PFG_RESET_ARG(f)); | |
| forwarder_busy = false; | | forwarder_busy = false; | |
| } | | } | |
| | | | |
| graph& my_graph; | | graph& my_graph; | |
| const size_t my_max_concurrency; | | const size_t my_max_concurrency; | |
| size_t my_concurrency; | | size_t my_concurrency; | |
|
| function_input_queue<input_type, A> *my_queue; | | input_queue_type *my_queue; | |
| predecessor_cache<input_type, null_mutex > my_predecessors; | | predecessor_cache<input_type, null_mutex > my_predecessors; | |
| | | | |
| /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f)
) { | | /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f)
) { | |
|
| my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| __TBB_ASSERT(!(f & rf_extract) || my_predecessors.empty(), "fun | | if( f & rf_clear_edges) my_predecessors.clear(); | |
| ction_input_base reset failed"); | | else | |
| | | #endif | |
| | | my_predecessors.reset(); | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), | |
| | | "function_input_base reset failed"); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| private: | | private: | |
| | | | |
| friend class apply_body_task_bypass< my_class, input_type >; | | friend class apply_body_task_bypass< my_class, input_type >; | |
| friend class forward_task_bypass< my_class >; | | friend class forward_task_bypass< my_class >; | |
| | | | |
| class my_operation : public aggregated_operation< my_operation > { | | class my_operation : public aggregated_operation< my_operation > { | |
| public: | | public: | |
| | | | |
| skipping to change at line 409 | | skipping to change at line 424 | |
| class function_input : public function_input_base<Input, A, function_in
put<Input,Output,A> > { | | class function_input : public function_input_base<Input, A, function_in
put<Input,Output,A> > { | |
| public: | | public: | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef function_input<Input,Output,A> my_class; | | typedef function_input<Input,Output,A> my_class; | |
| typedef function_input_base<Input, A, my_class> base_type; | | typedef function_input_base<Input, A, my_class> base_type; | |
| typedef function_input_queue<input_type, A> input_queue_type; | | typedef function_input_queue<input_type, A> input_queue_type; | |
| | | | |
| // constructor | | // constructor | |
| template<typename Body> | | template<typename Body> | |
|
| function_input( graph &g, size_t max_concurrency, Body& body, funct
ion_input_queue<input_type,A> *q = NULL ) : | | function_input( graph &g, size_t max_concurrency, Body& body, input
_queue_type *q = NULL ) : | |
| base_type(g, max_concurrency, q), | | base_type(g, max_concurrency, q), | |
| my_body( new internal::function_body_leaf< input_type, output_t
ype, Body>(body) ) { | | my_body( new internal::function_body_leaf< input_type, output_t
ype, Body>(body) ) { | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| function_input( const function_input& src, input_queue_type *q = NU
LL ) : | | function_input( const function_input& src, input_queue_type *q = NU
LL ) : | |
| base_type(src, q), | | base_type(src, q), | |
| my_body( src.my_body->clone() ) { | | my_body( src.my_body->clone() ) { | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 459 | | skipping to change at line 474 | |
| if(f & rf_reset_bodies) my_body->reset_body(); | | if(f & rf_reset_bodies) my_body->reset_body(); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| function_body<input_type, output_type> *my_body; | | function_body<input_type, output_type> *my_body; | |
| virtual broadcast_cache<output_type > &successors() = 0; | | virtual broadcast_cache<output_type > &successors() = 0; | |
| | | | |
| }; // function_input | | }; // function_input | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| // helper templates to reset the successor edges of the output ports of | | // helper templates to clear the successor edges of the output ports of | |
| an multifunction_node | | an multifunction_node | |
| template<int N> | | template<int N> struct clear_element { | |
| struct reset_element { | | template<typename P> static void clear_this(P &p) { | |
| template<typename P> | | (void)tbb::flow::get<N-1>(p).successors().clear(); | |
| static void reset_this(P &p, reset_flags f) { | | clear_element<N-1>::clear_this(p); | |
| (void)tbb::flow::get<N-1>(p).successors().reset(f); | | | |
| reset_element<N-1>::reset_this(p, f); | | | |
| } | | } | |
|
| template<typename P> | | template<typename P> static bool this_empty(P &p) { | |
| static bool this_empty(P &p) { | | | |
| if(tbb::flow::get<N-1>(p).successors().empty()) | | if(tbb::flow::get<N-1>(p).successors().empty()) | |
|
| return reset_element<N-1>::this_empty(p); | | return clear_element<N-1>::this_empty(p); | |
| return false; | | return false; | |
| } | | } | |
| }; | | }; | |
| | | | |
|
| template<> | | template<> struct clear_element<1> { | |
| struct reset_element<1> { | | template<typename P> static void clear_this(P &p) { | |
| template<typename P> | | (void)tbb::flow::get<0>(p).successors().clear(); | |
| static void reset_this(P &p, reset_flags f) { | | | |
| (void)tbb::flow::get<0>(p).successors().reset(f); | | | |
| } | | } | |
|
| template<typename P> | | template<typename P> static bool this_empty(P &p) { | |
| static bool this_empty(P &p) { | | | |
| return tbb::flow::get<0>(p).successors().empty(); | | return tbb::flow::get<0>(p).successors().empty(); | |
| } | | } | |
| }; | | }; | |
|
| | | | |
| | | // helper templates to extract the output ports of an multifunction_nod | |
| | | e from graph | |
| | | template<int N> struct extract_element { | |
| | | template<typename P> static void extract_this(P &p) { | |
| | | (void)tbb::flow::get<N-1>(p).successors().built_successors().se | |
| | | nder_extract(tbb::flow::get<N-1>(p)); | |
| | | extract_element<N-1>::extract_this(p); | |
| | | } | |
| | | }; | |
| | | | |
| | | template<> struct extract_element<1> { | |
| | | template<typename P> static void extract_this(P &p) { | |
| | | (void)tbb::flow::get<0>(p).successors().built_successors().send | |
| | | er_extract(tbb::flow::get<0>(p)); | |
| | | } | |
| | | }; | |
| #endif | | #endif | |
| | | | |
| //! Implements methods for a function node that takes a type Input as i
nput | | //! Implements methods for a function node that takes a type Input as i
nput | |
| // and has a tuple of output ports specified. | | // and has a tuple of output ports specified. | |
| template< typename Input, typename OutputPortSet, typename A> | | template< typename Input, typename OutputPortSet, typename A> | |
| class multifunction_input : public function_input_base<Input, A, multif
unction_input<Input,OutputPortSet,A> > { | | class multifunction_input : public function_input_base<Input, A, multif
unction_input<Input,OutputPortSet,A> > { | |
| public: | | public: | |
| static const int N = tbb::flow::tuple_size<OutputPortSet>::value; | | static const int N = tbb::flow::tuple_size<OutputPortSet>::value; | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef OutputPortSet output_ports_type; | | typedef OutputPortSet output_ports_type; | |
| typedef multifunction_input<Input,OutputPortSet,A> my_class; | | typedef multifunction_input<Input,OutputPortSet,A> my_class; | |
| typedef function_input_base<Input, A, my_class> base_type; | | typedef function_input_base<Input, A, my_class> base_type; | |
| typedef function_input_queue<input_type, A> input_queue_type; | | typedef function_input_queue<input_type, A> input_queue_type; | |
| | | | |
| // constructor | | // constructor | |
| template<typename Body> | | template<typename Body> | |
| multifunction_input( | | multifunction_input( | |
| graph &g, | | graph &g, | |
| size_t max_concurrency, | | size_t max_concurrency, | |
| Body& body, | | Body& body, | |
|
| function_input_queue<input_type,A> *q = NULL ) : | | input_queue_type *q = NULL ) : | |
| base_type(g, max_concurrency, q), | | base_type(g, max_concurrency, q), | |
| my_body( new internal::multifunction_body_leaf<input_type, outp
ut_ports_type, Body>(body) ) { | | my_body( new internal::multifunction_body_leaf<input_type, outp
ut_ports_type, Body>(body) ) { | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| multifunction_input( const multifunction_input& src, input_queue_ty
pe *q = NULL ) : | | multifunction_input( const multifunction_input& src, input_queue_ty
pe *q = NULL ) : | |
| base_type(src, q), | | base_type(src, q), | |
| my_body( src.my_body->clone() ) { | | my_body( src.my_body->clone() ) { | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 540 | | skipping to change at line 563 | |
| tbb::internal::fgt_begin_body( my_body ); | | tbb::internal::fgt_begin_body( my_body ); | |
| (*my_body)(i, my_output_ports); | | (*my_body)(i, my_output_ports); | |
| tbb::internal::fgt_end_body( my_body ); | | tbb::internal::fgt_end_body( my_body ); | |
| task * new_task = SUCCESSFULLY_ENQUEUED; | | task * new_task = SUCCESSFULLY_ENQUEUED; | |
| return new_task; | | return new_task; | |
| } | | } | |
| | | | |
| output_ports_type &output_ports(){ return my_output_ports; } | | output_ports_type &output_ports(){ return my_output_ports; } | |
| | | | |
| protected: | | protected: | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override*/void extract() { | |
| | | extract_element<N>::extract_this(my_output_ports); | |
| | | } | |
| | | #endif | |
| | | | |
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { | |
| base_type::reset_function_input_base(__TBB_PFG_RESET_ARG(f)); | | base_type::reset_function_input_base(__TBB_PFG_RESET_ARG(f)); | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| reset_element<N>::reset_this(my_output_ports, f); | | if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_po
rts); | |
| if(f & rf_reset_bodies) my_body->reset_body(); | | if(f & rf_reset_bodies) my_body->reset_body(); | |
|
| __TBB_ASSERT(!(f & rf_extract) || reset_element<N>::this_empty(
my_output_ports), "multifunction_node reset failed"); | | __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_em
pty(my_output_ports), "multifunction_node reset failed"); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| multifunction_body<input_type, output_ports_type> *my_body; | | multifunction_body<input_type, output_ports_type> *my_body; | |
| output_ports_type my_output_ports; | | output_ports_type my_output_ports; | |
| | | | |
| }; // multifunction_input | | }; // multifunction_input | |
| | | | |
| // template to refer to an output port of a multifunction_node | | // template to refer to an output port of a multifunction_node | |
| template<size_t N, typename MOP> | | template<size_t N, typename MOP> | |
| | | | |
| skipping to change at line 654 | | skipping to change at line 682 | |
| /* override */ task *execute( ) { | | /* override */ task *execute( ) { | |
| task* tp = my_graph_ptr->root_task(); | | task* tp = my_graph_ptr->root_task(); | |
| return (tp) ? | | return (tp) ? | |
| new ( task::allocate_additional_child_of( *tp ) ) | | new ( task::allocate_additional_child_of( *tp ) ) | |
| apply_body_task_bypass< continue_input< Output >, conti
nue_msg >( *this, continue_msg() ) : | | apply_body_task_bypass< continue_input< Output >, conti
nue_msg >( *this, continue_msg() ) : | |
| NULL; | | NULL; | |
| } | | } | |
| | | | |
| }; // continue_input | | }; // continue_input | |
| | | | |
|
| | | #if __TBB_PREVIEW_ASYNC_NODE | |
| | | | |
| | | //! Implements methods for a async node that takes a type Input as inpu | |
| | | t and | |
| | | // submit it to Asynchronous activity | |
| | | template < typename Input, typename A, typename AsyncGatewayType > | |
| | | class async_input : public function_input_base<Input, A, async_input<In | |
| | | put, A, AsyncGatewayType> > { | |
| | | public: | |
| | | typedef Input input_type; | |
| | | typedef AsyncGatewayType async_gateway_type; | |
| | | typedef async_input< Input, A, async_gateway_type > my_class; | |
| | | typedef function_input_base<Input, A, my_class> base_type; | |
| | | | |
| | | // constructor | |
| | | template<typename Body> | |
| | | async_input( graph &g, Body& body ) : | |
| | | base_type( g, unlimited ), | |
| | | my_body( new internal::async_body_leaf< input_type, Body, async | |
| | | _gateway_type >(body) ){ | |
| | | } | |
| | | | |
| | | //! Copy constructor | |
| | | async_input( const async_input& src ) : | |
| | | base_type( src ), | |
| | | my_body( src.my_body->clone() ) { | |
| | | } | |
| | | | |
| | | ~async_input() { | |
| | | delete my_body; | |
| | | } | |
| | | | |
| | | template< typename Body > | |
| | | Body copy_function_object() { | |
| | | internal::async_body<input_type, async_gateway_type> &body_ref | |
| | | = *this->my_body; | |
| | | return dynamic_cast< internal::async_body_leaf<input_type, Body | |
| | | , async_gateway_type> & >(body_ref).get_body(); | |
| | | } | |
| | | | |
| | | task * apply_body_impl_bypass( const input_type &i) { | |
| | | // TODO: This FGT instrumentation only captures the submission | |
| | | of the work | |
| | | // but not the async thread activity. | |
| | | // We will have to think about the best way to capture that. | |
| | | tbb::internal::fgt_begin_body( my_body ); | |
| | | (*my_body)( i, async_gateway() ); | |
| | | tbb::internal::fgt_end_body( my_body ); | |
| | | return NULL; | |
| | | } | |
| | | | |
| | | virtual async_gateway_type& async_gateway() = 0; | |
| | | | |
| | | protected: | |
| | | void reset_async_input(__TBB_PFG_RESET_ARG(reset_flags f)) { | |
| | | base_type::reset_function_input_base(__TBB_PFG_RESET_ARG(f)); | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | if(f & rf_reset_bodies) my_body->reset_body(); | |
| | | #endif | |
| | | } | |
| | | | |
| | | async_body< input_type, async_gateway_type > *my_body; | |
| | | }; | |
| | | #endif // __TBB_PREVIEW_ASYNC_NODE | |
| | | | |
| //! Implements methods for both executable and function nodes that puts
Output to its successors | | //! Implements methods for both executable and function nodes that puts
Output to its successors | |
| template< typename Output > | | template< typename Output > | |
| class function_output : public sender<Output> { | | class function_output : public sender<Output> { | |
| public: | | public: | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| template<int N> friend struct reset_element; | | template<int N> friend struct clear_element; | |
| #endif | | #endif | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef receiver<output_type> successor_type; | | typedef receiver<output_type> successor_type; | |
| typedef broadcast_cache<output_type> broadcast_cache_type; | | typedef broadcast_cache<output_type> broadcast_cache_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename sender<output_type>::built_successors_type built_s
uccessors_type; | |
| typedef typename sender<output_type>::successor_list_type successor
_list_type; | | typedef typename sender<output_type>::successor_list_type successor
_list_type; | |
| #endif | | #endif | |
| | | | |
| function_output() { my_successors.set_owner(this); } | | function_output() { my_successors.set_owner(this); } | |
| function_output(const function_output & /*other*/) : sender<output_
type>() { | | function_output(const function_output & /*other*/) : sender<output_
type>() { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| } | | } | |
| | | | |
| //! Adds a new successor to this node | | //! Adds a new successor to this node | |
| /* override */ bool register_successor( receiver<output_type> &r )
{ | | /* override */ bool register_successor( receiver<output_type> &r )
{ | |
| | | | |
| skipping to change at line 687 | | skipping to change at line 775 | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes a successor from this node | | //! Removes a successor from this node | |
| /* override */ bool remove_successor( receiver<output_type> &r ) { | | /* override */ bool remove_successor( receiver<output_type> &r ) { | |
| successors().remove_successor( r ); | | successors().remove_successor( r ); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | built_successors_type &built_successors() { return successors().bui | |
| | | lt_successors(); } | |
| | | | |
| /*override*/ void internal_add_built_successor( receiver<output_typ
e> &r) { | | /*override*/ void internal_add_built_successor( receiver<output_typ
e> &r) { | |
| successors().internal_add_built_successor( r ); | | successors().internal_add_built_successor( r ); | |
| } | | } | |
| | | | |
| /*override*/ void internal_delete_built_successor( receiver<output_
type> &r) { | | /*override*/ void internal_delete_built_successor( receiver<output_
type> &r) { | |
| successors().internal_delete_built_successor( r ); | | successors().internal_delete_built_successor( r ); | |
| } | | } | |
| | | | |
| /*override*/ size_t successor_count() { | | /*override*/ size_t successor_count() { | |
| return successors().successor_count(); | | return successors().successor_count(); | |
| | | | |
| skipping to change at line 710 | | skipping to change at line 800 | |
| successors().copy_successors(v); | | successors().copy_successors(v); | |
| } | | } | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| // for multifunction_node. The function_body that implements | | // for multifunction_node. The function_body that implements | |
| // the node will have an input and an output tuple of ports. To pu
t | | // the node will have an input and an output tuple of ports. To pu
t | |
| // an item to a successor, the body should | | // an item to a successor, the body should | |
| // | | // | |
| // get<I>(output_ports).try_put(output_value); | | // get<I>(output_ports).try_put(output_value); | |
| // | | // | |
|
| | | // if task pointer is returned will always spawn and return true, e
lse | |
| // return value will be bool returned from successors.try_put. | | // return value will be bool returned from successors.try_put. | |
| task *try_put_task(const output_type &i) { return my_successors.try
_put_task(i); } | | task *try_put_task(const output_type &i) { return my_successors.try
_put_task(i); } | |
| | | | |
|
| | | broadcast_cache_type &successors() { return my_successors; } | |
| protected: | | protected: | |
| broadcast_cache_type my_successors; | | broadcast_cache_type my_successors; | |
|
| broadcast_cache_type &successors() { return my_successors; } | | | |
| | | | |
| }; // function_output | | }; // function_output | |
| | | | |
| template< typename Output > | | template< typename Output > | |
| class multifunction_output : public function_output<Output> { | | class multifunction_output : public function_output<Output> { | |
| public: | | public: | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef function_output<output_type> base_type; | | typedef function_output<output_type> base_type; | |
| using base_type::my_successors; | | using base_type::my_successors; | |
| | | | |
| | | | |
End of changes. 27 change blocks. |
| 30 lines changed or deleted | | 135 lines changed or added | |
|
| flow_graph.h | | flow_graph.h | |
| | | | |
| skipping to change at line 131 | | skipping to change at line 131 | |
| virtual bool try_reserve( T & ) { return false; } | | virtual bool try_reserve( T & ) { return false; } | |
| | | | |
| //! Releases the reserved item | | //! Releases the reserved item | |
| virtual bool try_release( ) { return false; } | | virtual bool try_release( ) { return false; } | |
| | | | |
| //! Consumes the reserved item | | //! Consumes the reserved item | |
| virtual bool try_consume( ) { return false; } | | virtual bool try_consume( ) { return false; } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| //! interface to record edges for traversal & deletion | | //! interface to record edges for traversal & deletion | |
|
| typedef typename internal::edge_container<successor_type>::edge_list_t | | typedef typename internal::edge_container<successor_type> built_succes | |
| ype successor_list_type; | | sors_type; | |
| | | typedef typename built_successors_type::edge_list_type successor_list_ | |
| | | type; | |
| | | virtual built_successors_type &built_successors() = 0 | |
| | | ; | |
| virtual void internal_add_built_successor( successor_type & ) = 0
; | | virtual void internal_add_built_successor( successor_type & ) = 0
; | |
| virtual void internal_delete_built_successor( successor_type & ) = 0
; | | virtual void internal_delete_built_successor( successor_type & ) = 0
; | |
| virtual void copy_successors( successor_list_type &) = 0
; | | virtual void copy_successors( successor_list_type &) = 0
; | |
| virtual size_t successor_count() = 0
; | | virtual size_t successor_count() = 0
; | |
| #endif | | #endif | |
|
| }; | | }; // class sender<T> | |
| | | | |
| template< typename T > class limiter_node; // needed for resetting decreme
nter | | template< typename T > class limiter_node; // needed for resetting decreme
nter | |
| template< typename R, typename B > class run_and_put_task; | | template< typename R, typename B > class run_and_put_task; | |
| | | | |
| static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1; | | static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1; | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| // flags to modify the behavior of the graph reset(). Can be combined. | | // flags to modify the behavior of the graph reset(). Can be combined. | |
| enum reset_flags { | | enum reset_flags { | |
| rf_reset_protocol = 0, | | rf_reset_protocol = 0, | |
| rf_reset_bodies = 1<<0, // delete the current node body, reset to
a copy of the initial node body. | | rf_reset_bodies = 1<<0, // delete the current node body, reset to
a copy of the initial node body. | |
|
| rf_extract = 1<<1 // delete edges (extract() for single node
, reset() for graph.) | | rf_clear_edges = 1<<1 // delete edges | |
| }; | | }; | |
| | | | |
| #define __TBB_PFG_RESET_ARG(exp) exp | | #define __TBB_PFG_RESET_ARG(exp) exp | |
| #define __TBB_COMMA , | | #define __TBB_COMMA , | |
| #else | | #else | |
| #define __TBB_PFG_RESET_ARG(exp) /* nothing */ | | #define __TBB_PFG_RESET_ARG(exp) /* nothing */ | |
| #define __TBB_COMMA /* nothing */ | | #define __TBB_COMMA /* nothing */ | |
|
| #endif | | #endif // TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | | |
| // enqueue left task if necessary. Returns the non-enqueued task if there
is one. | | // enqueue left task if necessary. Returns the non-enqueued task if there
is one. | |
| static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right
) { | | static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right
) { | |
| // if no RHS task, don't change left. | | // if no RHS task, don't change left. | |
| if(right == NULL) return left; | | if(right == NULL) return left; | |
| // right != NULL | | // right != NULL | |
| if(left == NULL) return right; | | if(left == NULL) return right; | |
| if(left == SUCCESSFULLY_ENQUEUED) return right; | | if(left == SUCCESSFULLY_ENQUEUED) return right; | |
| // left contains a task | | // left contains a task | |
| if(right != SUCCESSFULLY_ENQUEUED) { | | if(right != SUCCESSFULLY_ENQUEUED) { | |
| | | | |
| skipping to change at line 211 | | skipping to change at line 213 | |
| virtual task *try_put_task(const T& t) = 0; | | virtual task *try_put_task(const T& t) = 0; | |
| public: | | public: | |
| | | | |
| //! Add a predecessor to the node | | //! Add a predecessor to the node | |
| virtual bool register_predecessor( predecessor_type & ) { return false;
} | | virtual bool register_predecessor( predecessor_type & ) { return false;
} | |
| | | | |
| //! Remove a predecessor from the node | | //! Remove a predecessor from the node | |
| virtual bool remove_predecessor( predecessor_type & ) { return false; } | | virtual bool remove_predecessor( predecessor_type & ) { return false; } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| typedef typename internal::edge_container<predecessor_type>::edge_list_ | | typedef typename internal::edge_container<predecessor_type> built_prede | |
| type predecessor_list_type; | | cessors_type; | |
| | | typedef typename built_predecessors_type::edge_list_type predecessor_li | |
| | | st_type; | |
| | | virtual built_predecessors_type &built_predecessors() | |
| | | = 0; | |
| virtual void internal_add_built_predecessor( predecessor_type & )
= 0; | | virtual void internal_add_built_predecessor( predecessor_type & )
= 0; | |
| virtual void internal_delete_built_predecessor( predecessor_type & )
= 0; | | virtual void internal_delete_built_predecessor( predecessor_type & )
= 0; | |
| virtual void copy_predecessors( predecessor_list_type & )
= 0; | | virtual void copy_predecessors( predecessor_list_type & )
= 0; | |
| virtual size_t predecessor_count()
= 0; | | virtual size_t predecessor_count()
= 0; | |
| #endif | | #endif | |
| | | | |
| protected: | | protected: | |
| //! put receiver back in initial state | | //! put receiver back in initial state | |
| template<typename U> friend class limiter_node; | | template<typename U> friend class limiter_node; | |
| virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_rese
t_protocol ) ) = 0; | | virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_rese
t_protocol ) ) = 0; | |
| | | | |
| template<typename TT, typename M> | | template<typename TT, typename M> | |
| friend class internal::successor_cache; | | friend class internal::successor_cache; | |
| virtual bool is_continue_receiver() { return false; } | | virtual bool is_continue_receiver() { return false; } | |
|
| }; | | }; // class receiver<T> | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| //* holder of edges both for caches and for those nodes which do not have p
redecessor caches. | | //* holder of edges both for caches and for those nodes which do not have p
redecessor caches. | |
| // C == receiver< ... > or sender< ... >, depending. | | // C == receiver< ... > or sender< ... >, depending. | |
| namespace internal { | | namespace internal { | |
| template<typename C> | | template<typename C> | |
| class edge_container { | | class edge_container { | |
| | | | |
| public: | | public: | |
| typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type; | | typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type; | |
| | | | |
| skipping to change at line 263 | | skipping to change at line 267 | |
| } | | } | |
| | | | |
| size_t edge_count() { | | size_t edge_count() { | |
| return (size_t)(built_edges.size()); | | return (size_t)(built_edges.size()); | |
| } | | } | |
| | | | |
| void clear() { | | void clear() { | |
| built_edges.clear(); | | built_edges.clear(); | |
| } | | } | |
| | | | |
|
| | | // methods remove the statement from all predecessors/successors liste | |
| | | in the edge | |
| | | // container. | |
| template< typename S > void sender_extract( S &s ); | | template< typename S > void sender_extract( S &s ); | |
| template< typename R > void receiver_extract( R &r ); | | template< typename R > void receiver_extract( R &r ); | |
| | | | |
| private: | | private: | |
| edge_list_type built_edges; | | edge_list_type built_edges; | |
|
| }; | | }; // class edge_container | |
| } // namespace internal | | } // namespace internal | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| //! Base class for receivers of completion messages | | //! Base class for receivers of completion messages | |
| /** These receivers automatically reset, but cannot be explicitly waited on
*/ | | /** These receivers automatically reset, but cannot be explicitly waited on
*/ | |
| class continue_receiver : public receiver< continue_msg > { | | class continue_receiver : public receiver< continue_msg > { | |
| public: | | public: | |
| | | | |
| //! The input type | | //! The input type | |
| typedef continue_msg input_type; | | typedef continue_msg input_type; | |
| | | | |
| skipping to change at line 316 | | skipping to change at line 322 | |
| /** Does not check to see if the removal of the predecessor now makes t
he current count | | /** Does not check to see if the removal of the predecessor now makes t
he current count | |
| exceed the new threshold. So removing a predecessor while the grap
h is active can cause | | exceed the new threshold. So removing a predecessor while the grap
h is active can cause | |
| unexpected results. */ | | unexpected results. */ | |
| /* override */ bool remove_predecessor( predecessor_type & ) { | | /* override */ bool remove_predecessor( predecessor_type & ) { | |
| spin_mutex::scoped_lock l(my_mutex); | | spin_mutex::scoped_lock l(my_mutex); | |
| --my_predecessor_count; | | --my_predecessor_count; | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| typedef receiver<input_type>::predecessor_list_type predecessor_list_ty | | typedef internal::edge_container<predecessor_type> built_predecessors_t | |
| pe; | | ype; | |
| | | typedef built_predecessors_type::edge_list_type predecessor_list_type; | |
| | | /*override*/ built_predecessors_type &built_predecessors() { return my_ | |
| | | built_predecessors; } | |
| | | | |
| /*override*/ void internal_add_built_predecessor( predecessor_type &s)
{ | | /*override*/ void internal_add_built_predecessor( predecessor_type &s)
{ | |
| spin_mutex::scoped_lock l(my_mutex); | | spin_mutex::scoped_lock l(my_mutex); | |
| my_built_predecessors.add_edge( s ); | | my_built_predecessors.add_edge( s ); | |
| } | | } | |
| | | | |
| /*override*/ void internal_delete_built_predecessor( predecessor_type &
s) { | | /*override*/ void internal_delete_built_predecessor( predecessor_type &
s) { | |
| spin_mutex::scoped_lock l(my_mutex); | | spin_mutex::scoped_lock l(my_mutex); | |
| my_built_predecessors.delete_edge(s); | | my_built_predecessors.delete_edge(s); | |
| } | | } | |
| | | | |
| /*override*/ void copy_predecessors( predecessor_list_type &v) { | | /*override*/ void copy_predecessors( predecessor_list_type &v) { | |
| spin_mutex::scoped_lock l(my_mutex); | | spin_mutex::scoped_lock l(my_mutex); | |
| my_built_predecessors.copy_edges(v); | | my_built_predecessors.copy_edges(v); | |
| } | | } | |
| | | | |
| /*override*/ size_t predecessor_count() { | | /*override*/ size_t predecessor_count() { | |
| spin_mutex::scoped_lock l(my_mutex); | | spin_mutex::scoped_lock l(my_mutex); | |
| return my_built_predecessors.edge_count(); | | return my_built_predecessors.edge_count(); | |
| } | | } | |
|
| | | | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| // execute body is supposed to be too small to create a task for. | | // execute body is supposed to be too small to create a task for. | |
| /* override */ task *try_put_task( const input_type & ) { | | /* override */ task *try_put_task( const input_type & ) { | |
| { | | { | |
| spin_mutex::scoped_lock l(my_mutex); | | spin_mutex::scoped_lock l(my_mutex); | |
| | | | |
| skipping to change at line 358 | | skipping to change at line 367 | |
| return SUCCESSFULLY_ENQUEUED; | | return SUCCESSFULLY_ENQUEUED; | |
| else | | else | |
| my_current_count = 0; | | my_current_count = 0; | |
| } | | } | |
| task * res = execute(); | | task * res = execute(); | |
| if(!res) return SUCCESSFULLY_ENQUEUED; | | if(!res) return SUCCESSFULLY_ENQUEUED; | |
| return res; | | return res; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| internal::edge_container<predecessor_type> my_built_predecessors; | | // continue_receiver must contain its own built_predecessors because it | |
| | | does | |
| | | // not have a node_cache. | |
| | | built_predecessors_type my_built_predecessors; | |
| #endif | | #endif | |
| spin_mutex my_mutex; | | spin_mutex my_mutex; | |
| int my_predecessor_count; | | int my_predecessor_count; | |
| int my_current_count; | | int my_current_count; | |
| int my_initial_predecessor_count; | | int my_initial_predecessor_count; | |
| // the friend declaration in the base class did not eliminate the "prot
ected class" | | // the friend declaration in the base class did not eliminate the "prot
ected class" | |
| // error in gcc 4.1.2 | | // error in gcc 4.1.2 | |
| template<typename U> friend class limiter_node; | | template<typename U> friend class limiter_node; | |
|
| | | | |
| /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) ) | | /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) ) | |
| { | | { | |
| my_current_count = 0; | | my_current_count = 0; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| if(f & rf_extract) { | | if(f & rf_clear_edges) { | |
| my_built_predecessors.receiver_extract(*this); | | my_built_predecessors.clear(); | |
| my_predecessor_count = my_initial_predecessor_count; | | my_predecessor_count = my_initial_predecessor_count; | |
| } | | } | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| //! Does whatever should happen when the threshold is reached | | //! Does whatever should happen when the threshold is reached | |
| /** This should be very fast or else spawn a task. This is | | /** This should be very fast or else spawn a task. This is | |
| called while the sender is blocked in the try_put(). */ | | called while the sender is blocked in the try_put(). */ | |
| virtual task * execute() = 0; | | virtual task * execute() = 0; | |
| template<typename TT, typename M> | | template<typename TT, typename M> | |
| friend class internal::successor_cache; | | friend class internal::successor_cache; | |
| /*override*/ bool is_continue_receiver() { return true; } | | /*override*/ bool is_continue_receiver() { return true; } | |
|
| }; | | | |
| | | }; // class continue_receiver | |
| } // interface7 | | } // interface7 | |
| } // flow | | } // flow | |
| } // tbb | | } // tbb | |
| | | | |
| #include "internal/_flow_graph_trace_impl.h" | | #include "internal/_flow_graph_trace_impl.h" | |
| | | | |
| namespace tbb { | | namespace tbb { | |
| namespace flow { | | namespace flow { | |
| namespace interface7 { | | namespace interface7 { | |
| | | | |
| | | | |
| skipping to change at line 468 | | skipping to change at line 481 | |
| | | | |
| private: | | private: | |
| // the graph over which we are iterating | | // the graph over which we are iterating | |
| GraphContainerType *my_graph; | | GraphContainerType *my_graph; | |
| // pointer into my_graph's my_nodes list | | // pointer into my_graph's my_nodes list | |
| pointer current_node; | | pointer current_node; | |
| | | | |
| //! Private initializing constructor for begin() and end() iterators | | //! Private initializing constructor for begin() and end() iterators | |
| graph_iterator(GraphContainerType *g, bool begin); | | graph_iterator(GraphContainerType *g, bool begin); | |
| void internal_forward(); | | void internal_forward(); | |
|
| }; | | }; // class graph_iterator | |
| | | | |
| //! The graph class | | //! The graph class | |
| /** This class serves as a handle to the graph */ | | /** This class serves as a handle to the graph */ | |
| class graph : tbb::internal::no_copy { | | class graph : tbb::internal::no_copy { | |
| friend class graph_node; | | friend class graph_node; | |
| | | | |
| template< typename Body > | | template< typename Body > | |
| class run_task : public task { | | class run_task : public task { | |
| public: | | public: | |
| run_task( Body& body ) : my_body(body) {} | | run_task( Body& body ) : my_body(body) {} | |
| | | | |
| skipping to change at line 715 | | skipping to change at line 728 | |
| } | | } | |
| virtual ~graph_node() { | | virtual ~graph_node() { | |
| my_graph.remove_node(this); | | my_graph.remove_node(this); | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| virtual void set_name( const char *name ) = 0; | | virtual void set_name( const char *name ) = 0; | |
| #endif | | #endif | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| virtual void extract( reset_flags f=rf_extract ) { | | virtual void extract( ) = 0; | |
| bool a = my_graph.is_active(); | | | |
| my_graph.set_active(false); | | | |
| reset((reset_flags)(f|rf_extract)); | | | |
| my_graph.set_active(a); | | | |
| } | | | |
| #endif | | #endif | |
| | | | |
| protected: | | protected: | |
|
| virtual void reset(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_protocol) | | // performs the reset on an individual node. | |
| ) = 0; | | virtual void reset_node(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_prot | |
| }; | | ocol)) = 0; | |
| | | }; // class graph_node | |
| | | | |
| inline void graph::register_node(graph_node *n) { | | inline void graph::register_node(graph_node *n) { | |
| n->next = NULL; | | n->next = NULL; | |
| { | | { | |
| spin_mutex::scoped_lock lock(nodelist_mutex); | | spin_mutex::scoped_lock lock(nodelist_mutex); | |
| n->prev = my_nodes_last; | | n->prev = my_nodes_last; | |
| if (my_nodes_last) my_nodes_last->next = n; | | if (my_nodes_last) my_nodes_last->next = n; | |
| my_nodes_last = n; | | my_nodes_last = n; | |
| if (!my_nodes) my_nodes = n; | | if (!my_nodes) my_nodes = n; | |
| } | | } | |
| | | | |
| skipping to change at line 760 | | skipping to change at line 769 | |
| inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) { | | inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) { | |
| // reset context | | // reset context | |
| task *saved_my_root_task = my_root_task; | | task *saved_my_root_task = my_root_task; | |
| my_root_task = NULL; | | my_root_task = NULL; | |
| if(my_context) my_context->reset(); | | if(my_context) my_context->reset(); | |
| cancelled = false; | | cancelled = false; | |
| caught_exception = false; | | caught_exception = false; | |
| // reset all the nodes comprising the graph | | // reset all the nodes comprising the graph | |
| for(iterator ii = begin(); ii != end(); ++ii) { | | for(iterator ii = begin(); ii != end(); ++ii) { | |
| graph_node *my_p = &(*ii); | | graph_node *my_p = &(*ii); | |
|
| my_p->reset(__TBB_PFG_RESET_ARG(f)); | | my_p->reset_node(__TBB_PFG_RESET_ARG(f)); | |
| } | | } | |
| my_root_task = saved_my_root_task; | | my_root_task = saved_my_root_task; | |
| } | | } | |
| | | | |
| #include "internal/_flow_graph_node_impl.h" | | #include "internal/_flow_graph_node_impl.h" | |
| | | | |
| //! An executable node that acts as a source, i.e. it has no predecessors | | //! An executable node that acts as a source, i.e. it has no predecessors | |
| template < typename Output > | | template < typename Output > | |
| class source_node : public graph_node, public sender< Output > { | | class source_node : public graph_node, public sender< Output > { | |
| protected: | | protected: | |
| | | | |
| skipping to change at line 783 | | skipping to change at line 792 | |
| //! The type of the output message, which is complete | | //! The type of the output message, which is complete | |
| typedef Output output_type; | | typedef Output output_type; | |
| | | | |
| //! The type of successors of this node | | //! The type of successors of this node | |
| typedef receiver< Output > successor_type; | | typedef receiver< Output > successor_type; | |
| | | | |
| //Source node has no input type | | //Source node has no input type | |
| typedef null_type input_type; | | typedef null_type input_type; | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename sender<output_type>::built_successors_type built_succe
ssors_type; | |
| typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | | typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | |
| #endif | | #endif | |
| | | | |
| //! Constructor for a node with a successor | | //! Constructor for a node with a successor | |
| template< typename Body > | | template< typename Body > | |
| source_node( graph &g, Body body, bool is_active = true ) | | source_node( graph &g, Body body, bool is_active = true ) | |
| : graph_node(g), my_active(is_active), init_my_active(is_active), | | : graph_node(g), my_active(is_active), init_my_active(is_active), | |
| my_body( new internal::source_body_leaf< output_type, Body>(body) )
, | | my_body( new internal::source_body_leaf< output_type, Body>(body) )
, | |
| my_reserved(false), my_has_cached_item(false) | | my_reserved(false), my_has_cached_item(false) | |
| { | | { | |
| | | | |
| skipping to change at line 836 | | skipping to change at line 846 | |
| } | | } | |
| | | | |
| //! Removes a successor from this node | | //! Removes a successor from this node | |
| /* override */ bool remove_successor( successor_type &r ) { | | /* override */ bool remove_successor( successor_type &r ) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| my_successors.remove_successor(r); | | my_successors.remove_successor(r); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | | |
| | | /*override*/ built_successors_type &built_successors() { return my_succ | |
| | | essors.built_successors(); } | |
| | | | |
| /*override*/void internal_add_built_successor( successor_type &r) { | | /*override*/void internal_add_built_successor( successor_type &r) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| my_successors.internal_add_built_successor(r); | | my_successors.internal_add_built_successor(r); | |
| } | | } | |
| | | | |
| /*override*/void internal_delete_built_successor( successor_type &r) { | | /*override*/void internal_delete_built_successor( successor_type &r) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| my_successors.internal_delete_built_successor(r); | | my_successors.internal_delete_built_successor(r); | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 927 | | skipping to change at line 940 | |
| if ( !my_successors.empty() ) | | if ( !my_successors.empty() ) | |
| spawn_put(); | | spawn_put(); | |
| } | | } | |
| | | | |
| template<typename Body> | | template<typename Body> | |
| Body copy_function_object() { | | Body copy_function_object() { | |
| internal::source_body<output_type> &body_ref = *this->my_body; | | internal::source_body<output_type> &body_ref = *this->my_body; | |
| return dynamic_cast< internal::source_body_leaf<output_type, Body>
& >(body_ref).get_body(); | | return dynamic_cast< internal::source_body_leaf<output_type, Body>
& >(body_ref).get_body(); | |
| } | | } | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override*/void extract( ) { | |
| | | my_successors.built_successors().sender_extract(*this); // remove | |
| | | s "my_owner" == this from each successor | |
| | | my_active = init_my_active; | |
| | | my_reserved = false; | |
| | | if(my_has_cached_item) my_has_cached_item = false; | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| | | | |
| //! resets the source_node to its initial state | | //! resets the source_node to its initial state | |
|
| void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) { | |
| my_active = init_my_active; | | my_active = init_my_active; | |
| my_reserved =false; | | my_reserved =false; | |
| if(my_has_cached_item) { | | if(my_has_cached_item) { | |
| my_has_cached_item = false; | | my_has_cached_item = false; | |
| } | | } | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| my_successors.reset(f); | | if(f & rf_clear_edges) my_successors.clear(); | |
| if(f & rf_reset_bodies) my_body->reset_body(); | | if(f & rf_reset_bodies) my_body->reset_body(); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| private: | | private: | |
| spin_mutex my_mutex; | | spin_mutex my_mutex; | |
| bool my_active; | | bool my_active; | |
| bool init_my_active; | | bool init_my_active; | |
| internal::source_body<output_type> *my_body; | | internal::source_body<output_type> *my_body; | |
| internal::broadcast_cache< output_type > my_successors; | | internal::broadcast_cache< output_type > my_successors; | |
| | | | |
| skipping to change at line 998 | | skipping to change at line 1020 | |
| if ( !try_reserve_apply_body(v) ) | | if ( !try_reserve_apply_body(v) ) | |
| return NULL; | | return NULL; | |
| | | | |
| task *last_task = my_successors.try_put_task(v); | | task *last_task = my_successors.try_put_task(v); | |
| if ( last_task ) | | if ( last_task ) | |
| try_consume(); | | try_consume(); | |
| else | | else | |
| try_release(); | | try_release(); | |
| return last_task; | | return last_task; | |
| } | | } | |
|
| }; // source_node | | }; // class source_node | |
| | | | |
| //! Implements a function node that supports Input -> Output | | //! Implements a function node that supports Input -> Output | |
|
| template < typename Input, typename Output = continue_msg, graph_buffer_pol
icy = queueing, typename Allocator=cache_aligned_allocator<Input> > | | template < typename Input, typename Output = continue_msg, graph_buffer_pol
icy G = queueing, typename Allocator=cache_aligned_allocator<Input> > | |
| class function_node : public graph_node, public internal::function_input<In
put,Output,Allocator>, public internal::function_output<Output> { | | class function_node : public graph_node, public internal::function_input<In
put,Output,Allocator>, public internal::function_output<Output> { | |
|
| protected: | | | |
| using graph_node::my_graph; | | | |
| public: | | public: | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| typedef internal::function_input<input_type,output_type,Allocator> fInp
ut_type; | | typedef internal::function_input<input_type,output_type,Allocator> fInp
ut_type; | |
|
| | | typedef internal::function_input_queue<input_type, Allocator> input_que
ue_type; | |
| typedef internal::function_output<output_type> fOutput_type; | | typedef internal::function_output<output_type> fOutput_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| using typename internal::function_input<Input,Output,Allocator>::predec | | using typename fInput_type::predecessor_list_type; | |
| essor_list_type; | | using typename fOutput_type::successor_list_type; | |
| using typename internal::function_output<Output>::successor_list_type; | | using fInput_type::my_predecessors; | |
| #endif | | #endif | |
| | | | |
| //! Constructor | | //! Constructor | |
|
| | | // input_queue_type is allocated here, but destroyed in the function_in | |
| | | put_base. | |
| | | // TODO: pass the graph_buffer_policy to the function_input_base so it | |
| | | can all | |
| | | // be done in one place. This would be an interface-breaking change. | |
| template< typename Body > | | template< typename Body > | |
| function_node( graph &g, size_t concurrency, Body body ) : | | function_node( graph &g, size_t concurrency, Body body ) : | |
|
| graph_node(g), internal::function_input<input_type,output_type,Allo | | graph_node(g), fInput_type(g, concurrency, body, G == queueing ? | |
| cator>(g, concurrency, body) { | | new input_queue_type( ) : NULL ) { | |
| tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | |
| E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), | | E, &this->graph_node::my_graph, | |
| static_cast<sender<output_type> | | static_cast<receiver<input_type> *>(this), static_cast<send | |
| *>(this), this->my_body ); | | er<output_type> *>(this), this->my_body ); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| function_node( const function_node& src ) : | | function_node( const function_node& src ) : | |
|
| graph_node(src.my_graph), internal::function_input<input_type,outpu | | graph_node(src.graph_node::my_graph), | |
| t_type,Allocator>( src ), | | fInput_type(src, G == queueing ? new input_queue_type : NULL), | |
| fOutput_type() { | | fOutput_type() { | |
|
| tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | |
| E, &this->my_graph, static_cast<receiver<input_type> *>(this), | | E, &this->graph_node::my_graph, | |
| static_cast<sender<output_type> | | static_cast<receiver<input_type> *>(this), static_cast<send | |
| *>(this), this->my_body ); | | er<output_type> *>(this), this->my_body ); | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| /* override */ void set_name( const char *name ) { | | /* override */ void set_name( const char *name ) { | |
| tbb::internal::fgt_node_desc( this, name ); | | tbb::internal::fgt_node_desc( this, name ); | |
| } | | } | |
| #endif | | #endif | |
| | | | |
|
| protected: | | | |
| template< typename R, typename B > friend class run_and_put_task; | | | |
| template<typename X, typename Y> friend class internal::broadcast_cache | | | |
| ; | | | |
| template<typename X, typename Y> friend class internal::round_robin_cac | | | |
| he; | | | |
| using fInput_type::try_put_task; | | | |
| | | | |
| // override of graph_node's reset. | | | |
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { | | | |
| fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f)); | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| successors().reset(f); | | /*override*/void extract( ) { | |
| __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_n | | my_predecessors.built_predecessors().receiver_extract(*this); | |
| ode successors not empty"); | | successors().built_successors().sender_extract(*this); | |
| __TBB_ASSERT(this->my_predecessors.empty(), "function_node predeces | | | |
| sors not empty"); | | | |
| #endif | | | |
| } | | | |
| | | | |
| /* override */ internal::broadcast_cache<output_type> &successors () { | | | |
| return fOutput_type::my_successors; } | | | |
| }; | | | |
| | | | |
| //! Implements a function node that supports Input -> Output | | | |
| template < typename Input, typename Output, typename Allocator > | | | |
| class function_node<Input,Output,queueing,Allocator> : public graph_node, p | | | |
| ublic internal::function_input<Input,Output,Allocator>, public internal::fu | | | |
| nction_output<Output> { | | | |
| protected: | | | |
| using graph_node::my_graph; | | | |
| public: | | | |
| typedef Input input_type; | | | |
| typedef Output output_type; | | | |
| typedef sender< input_type > predecessor_type; | | | |
| typedef receiver< output_type > successor_type; | | | |
| typedef internal::function_input<input_type,output_type,Allocator> fInp | | | |
| ut_type; | | | |
| typedef internal::function_input_queue<input_type, Allocator> queue_typ | | | |
| e; | | | |
| typedef internal::function_output<output_type> fOutput_type; | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | | |
| using typename internal::function_input<Input,Output,Allocator>::predec | | | |
| essor_list_type; | | | |
| using typename internal::function_output<Output>::successor_list_type; | | | |
| #endif | | | |
| | | | |
| //! Constructor | | | |
| template< typename Body > | | | |
| function_node( graph &g, size_t concurrency, Body body ) : | | | |
| graph_node(g), fInput_type( g, concurrency, body, new queue_type() | | | |
| ) { | | | |
| tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | | | |
| E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), | | | |
| static_cast<sender<output_type> | | | |
| *>(this), this->my_body ); | | | |
| } | | | |
| | | | |
| //! Copy constructor | | | |
| function_node( const function_node& src ) : | | | |
| graph_node(src.graph_node::my_graph), fInput_type( src, new queue_t | | | |
| ype() ), fOutput_type() { | | | |
| tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD | | | |
| E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), | | | |
| static_cast<sender<output_type> | | | |
| *>(this), this->my_body ); | | | |
| } | | | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | | |
| /* override */ void set_name( const char *name ) { | | | |
| tbb::internal::fgt_node_desc( this, name ); | | | |
| } | | } | |
| #endif | | #endif | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| using fInput_type::try_put_task; | | using fInput_type::try_put_task; | |
| | | | |
|
| /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { | | /* override */ internal::broadcast_cache<output_type> &successors () { | |
| | | return fOutput_type::my_successors; } | |
| | | | |
| | | // override of graph_node's reset. | |
| | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) { | |
| fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f)); | | fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f)); | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| successors().reset(f); | | // TODO: use clear() instead. | |
| __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_n | | if(f & rf_clear_edges) { | |
| ode successors not empty"); | | successors().clear(); | |
| __TBB_ASSERT(!(f & rf_extract) || this->my_predecessors.empty(), "f | | my_predecessors.clear(); | |
| unction_node predecessors not empty"); | | } | |
| | | __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "functi | |
| | | on_node successors not empty"); | |
| | | __TBB_ASSERT(this->my_predecessors.empty(), "function_node predeces | |
| | | sors not empty"); | |
| #endif | | #endif | |
|
| | | | |
| } | | } | |
| | | | |
|
| /* override */ internal::broadcast_cache<output_type> &successors () { | | }; // class function_node | |
| return fOutput_type::my_successors; } | | | |
| }; | | | |
| | | | |
| //! implements a function node that supports Input -> (set of outputs) | | //! implements a function node that supports Input -> (set of outputs) | |
| // Output is a tuple of output types. | | // Output is a tuple of output types. | |
|
| template < typename Input, typename Output, graph_buffer_policy = queueing,
typename Allocator=cache_aligned_allocator<Input> > | | template < typename Input, typename Output, graph_buffer_policy G = queuein
g, typename Allocator=cache_aligned_allocator<Input> > | |
| class multifunction_node : | | class multifunction_node : | |
| public graph_node, | | public graph_node, | |
| public internal::multifunction_input | | public internal::multifunction_input | |
| < | | < | |
| Input, | | Input, | |
| typename internal::wrap_tuple_elements< | | typename internal::wrap_tuple_elements< | |
| tbb::flow::tuple_size<Output>::value, // #elements in tuple | | tbb::flow::tuple_size<Output>::value, // #elements in tuple | |
| internal::multifunction_output, // wrap this around each eleme
nt | | internal::multifunction_output, // wrap this around each eleme
nt | |
| Output // the tuple providing the types | | Output // the tuple providing the types | |
| >::type, | | >::type, | |
| Allocator | | Allocator | |
| > { | | > { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
|
| private: | | | |
| static const int N = tbb::flow::tuple_size<Output>::value; | | static const int N = tbb::flow::tuple_size<Output>::value; | |
| public: | | public: | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef null_type output_type; | | typedef null_type output_type; | |
| typedef typename internal::wrap_tuple_elements<N,internal::multifunctio
n_output, Output>::type output_ports_type; | | typedef typename internal::wrap_tuple_elements<N,internal::multifunctio
n_output, Output>::type output_ports_type; | |
|
| | | typedef internal::multifunction_input<input_type, output_ports_type, Al | |
| | | locator> fInput_type; | |
| | | typedef internal::function_input_queue<input_type, Allocator> input_que | |
| | | ue_type; | |
| private: | | private: | |
| typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | | typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | |
|
| typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | | using fInput_type::my_predecessors; | |
| public: | | public: | |
| template<typename Body> | | template<typename Body> | |
| multifunction_node( graph &g, size_t concurrency, Body body ) : | | multifunction_node( graph &g, size_t concurrency, Body body ) : | |
|
| graph_node(g), base_type(g,concurrency, body) { | | graph_node(g), base_type(g,concurrency, body, G == queueing ? new
input_queue_type : NULL) { | |
| tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter
nal::FLOW_MULTIFUNCTION_NODE, | | tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter
nal::FLOW_MULTIFUNCTION_NODE, | |
|
| &this->gra | | &this->graph_node::my_graph, static_cast<receiver<input_typ | |
| ph_node::my_graph, static_cast<receiver<input_type> *>(this), | | e> *>(this), | |
| this->outp | | this->output_ports(), this->my_body ); | |
| ut_ports(), this->my_body ); | | | |
| } | | } | |
| | | | |
| multifunction_node( const multifunction_node &other) : | | multifunction_node( const multifunction_node &other) : | |
|
| graph_node(other.graph_node::my_graph), base_type(other) { | | graph_node(other.graph_node::my_graph), base_type(other, G == queu
eing ? new input_queue_type : NULL) { | |
| tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter
nal::FLOW_MULTIFUNCTION_NODE, | | tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter
nal::FLOW_MULTIFUNCTION_NODE, | |
|
| &this->gra | | &this->graph_node::my_graph, static_cast<receiver<input_typ | |
| ph_node::my_graph, static_cast<receiver<input_type> *>(this), | | e> *>(this), | |
| this->outp | | this->output_ports(), this->my_body ); | |
| ut_ports(), this->my_body ); | | | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| /* override */ void set_name( const char *name ) { | | /* override */ void set_name( const char *name ) { | |
| tbb::internal::fgt_multioutput_node_desc( this, name ); | | tbb::internal::fgt_multioutput_node_desc( this, name ); | |
| } | | } | |
| #endif | | #endif | |
| | | | |
|
| // all the guts are in multifunction_input... | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| protected: | | void extract( ) { | |
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type: | | my_predecessors.built_predecessors().receiver_extract(*this); | |
| :reset(__TBB_PFG_RESET_ARG(f)); } | | base_type::extract(); | |
| }; // multifunction_node | | | |
| | | | |
| template < typename Input, typename Output, typename Allocator > | | | |
| class multifunction_node<Input,Output,queueing,Allocator> : public graph_no | | | |
| de, public internal::multifunction_input<Input, | | | |
| typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v | | | |
| alue, internal::multifunction_output, Output>::type, Allocator> { | | | |
| protected: | | | |
| using graph_node::my_graph; | | | |
| static const int N = tbb::flow::tuple_size<Output>::value; | | | |
| public: | | | |
| typedef Input input_type; | | | |
| typedef null_type output_type; | | | |
| typedef typename internal::wrap_tuple_elements<N, internal::multifuncti | | | |
| on_output, Output>::type output_ports_type; | | | |
| private: | | | |
| typedef typename internal::multifunction_input<input_type, output_ports | | | |
| _type, Allocator> base_type; | | | |
| typedef typename internal::function_input_queue<input_type,Allocator> q | | | |
| ueue_type; | | | |
| public: | | | |
| template<typename Body> | | | |
| multifunction_node( graph &g, size_t concurrency, Body body) : | | | |
| graph_node(g), base_type(g,concurrency, body, new queue_type()) { | | | |
| tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter | | | |
| nal::FLOW_MULTIFUNCTION_NODE, | | | |
| &this->gra | | | |
| ph_node::my_graph, static_cast<receiver<input_type> *>(this), | | | |
| this->outp | | | |
| ut_ports(), this->my_body ); | | | |
| } | | | |
| | | | |
| multifunction_node( const multifunction_node &other) : | | | |
| graph_node(other.graph_node::my_graph), base_type(other, new queue_ | | | |
| type()) { | | | |
| tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter | | | |
| nal::FLOW_MULTIFUNCTION_NODE, | | | |
| &this->gra | | | |
| ph_node::my_graph, static_cast<receiver<input_type> *>(this), | | | |
| this->outp | | | |
| ut_ports(), this->my_body ); | | | |
| } | | | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | | |
| /* override */ void set_name( const char *name ) { | | | |
| tbb::internal::fgt_multioutput_node_desc( this, name ); | | | |
| } | | } | |
| #endif | | #endif | |
|
| | | | |
| // all the guts are in multifunction_input... | | // all the guts are in multifunction_input... | |
| protected: | | protected: | |
|
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type:
:reset(__TBB_PFG_RESET_ARG(f)); } | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) { base_
type::reset(__TBB_PFG_RESET_ARG(f)); } | |
| }; // multifunction_node | | }; // multifunction_node | |
| | | | |
| //! split_node: accepts a tuple as input, forwards each element of the tupl
e to its | | //! split_node: accepts a tuple as input, forwards each element of the tupl
e to its | |
| // successors. The node has unlimited concurrency, so though it is marked
as | | // successors. The node has unlimited concurrency, so though it is marked
as | |
| // "rejecting" it does not reject inputs. | | // "rejecting" it does not reject inputs. | |
| template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup
leType> > | | template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup
leType> > | |
| class split_node : public multifunction_node<TupleType, TupleType, rejectin
g, Allocator> { | | class split_node : public multifunction_node<TupleType, TupleType, rejectin
g, Allocator> { | |
| static const int N = tbb::flow::tuple_size<TupleType>::value; | | static const int N = tbb::flow::tuple_size<TupleType>::value; | |
| typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> bas
e_type; | | typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> bas
e_type; | |
| public: | | public: | |
| | | | |
| skipping to change at line 1292 | | skipping to change at line 1240 | |
| static_cast<receiver<input_type>
*>(this), | | static_cast<receiver<input_type>
*>(this), | |
| static_cast<sender<output_type>
*>(this), this->my_body ); | | static_cast<sender<output_type>
*>(this), this->my_body ); | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| /* override */ void set_name( const char *name ) { | | /* override */ void set_name( const char *name ) { | |
| tbb::internal::fgt_node_desc( this, name ); | | tbb::internal::fgt_node_desc( this, name ); | |
| } | | } | |
| #endif | | #endif | |
| | | | |
|
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override graph_node*/ void extract() { | |
| | | fInput_type::my_built_predecessors.receiver_extract(*this); | |
| | | successors().built_successors().sender_extract(*this); | |
| | | } | |
| | | #endif | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| using fInput_type::try_put_task; | | using fInput_type::try_put_task; | |
|
| | | /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | |
| | | | |
|
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) { | |
| fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f)); | | fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f)); | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| successors().reset(f); | | if(f & rf_clear_edges)successors().clear(); | |
| __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "continue_n | | __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "contin | |
| ode not reset"); | | ue_node not reset"); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
|
| /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | | | |
| }; // continue_node | | }; // continue_node | |
| | | | |
| template< typename T > | | template< typename T > | |
| class overwrite_node : public graph_node, public receiver<T>, public sender
<T> { | | class overwrite_node : public graph_node, public receiver<T>, public sender
<T> { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename receiver<input_type>::built_predecessors_type built_pr | |
| | | edecessors_type; | |
| | | typedef typename sender<output_type>::built_successors_type built_succe | |
| | | ssors_type; | |
| typedef typename receiver<input_type>::predecessor_list_type predecesso
r_list_type; | | typedef typename receiver<input_type>::predecessor_list_type predecesso
r_list_type; | |
| typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | | typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | |
| #endif | | #endif | |
| | | | |
| overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) { | | overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) { | |
| my_successors.set_owner( this ); | | my_successors.set_owner( this ); | |
| tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this-
>my_graph, | | tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this-
>my_graph, | |
| static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type> *>(this) ); | | static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 1372 | | skipping to change at line 1329 | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| /* override */ bool remove_successor( successor_type &s ) { | | /* override */ bool remove_successor( successor_type &s ) { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| my_successors.remove_successor(s); | | my_successors.remove_successor(s); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | /*override*/built_predecessors_type &built_predecessors() { return my_b | |
| | | uilt_predecessors; } | |
| | | /*override*/built_successors_type &built_successors() { return my_s | |
| | | uccessors.built_successors(); } | |
| | | | |
| /*override*/void internal_add_built_successor( successor_type &s) { | | /*override*/void internal_add_built_successor( successor_type &s) { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| my_successors.internal_add_built_successor(s); | | my_successors.internal_add_built_successor(s); | |
| } | | } | |
| | | | |
| /*override*/void internal_delete_built_successor( successor_type &s) { | | /*override*/void internal_delete_built_successor( successor_type &s) { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| my_successors.internal_delete_built_successor(s); | | my_successors.internal_delete_built_successor(s); | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 1411 | | skipping to change at line 1371 | |
| | | | |
| /*override*/size_t predecessor_count() { | | /*override*/size_t predecessor_count() { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| return my_built_predecessors.edge_count(); | | return my_built_predecessors.edge_count(); | |
| } | | } | |
| | | | |
| /*override*/void copy_predecessors(predecessor_list_type &v) { | | /*override*/void copy_predecessors(predecessor_list_type &v) { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| my_built_predecessors.copy_edges(v); | | my_built_predecessors.copy_edges(v); | |
| } | | } | |
|
| | | | |
| | | /*override*/ void extract() { | |
| | | my_buffer_is_valid = false; | |
| | | built_successors().sender_extract(*this); | |
| | | built_predecessors().receiver_extract(*this); | |
| | | } | |
| | | | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| /* override */ bool try_get( input_type &v ) { | | /* override */ bool try_get( input_type &v ) { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| if ( my_buffer_is_valid ) { | | if ( my_buffer_is_valid ) { | |
| v = my_buffer; | | v = my_buffer; | |
| return true; | | return true; | |
| } | | } | |
| return false; | | return false; | |
| } | | } | |
| | | | |
| skipping to change at line 1445 | | skipping to change at line 1412 | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| /* override */ task * try_put_task( const input_type &v ) { | | /* override */ task * try_put_task( const input_type &v ) { | |
| spin_mutex::scoped_lock l( my_mutex ); | | spin_mutex::scoped_lock l( my_mutex ); | |
| my_buffer = v; | | my_buffer = v; | |
| my_buffer_is_valid = true; | | my_buffer_is_valid = true; | |
| task * rtask = my_successors.try_put_task(v); | | task * rtask = my_successors.try_put_task(v); | |
| if(!rtask) rtask = SUCCESSFULLY_ENQUEUED; | | if(!rtask) rtask = SUCCESSFULLY_ENQUEUED; | |
| return rtask; | | return rtask; | |
| } | | } | |
| | | | |
|
| /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { | | | |
| my_buffer_is_valid = false; | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | | |
| my_successors.reset(f); | | | |
| if (f&rf_extract) { | | | |
| my_built_predecessors.receiver_extract(*this); | | | |
| } | | | |
| #endif | | | |
| } | | | |
| | | | |
| spin_mutex my_mutex; | | spin_mutex my_mutex; | |
| internal::broadcast_cache< input_type, null_rw_mutex > my_successors; | | internal::broadcast_cache< input_type, null_rw_mutex > my_successors; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| internal::edge_container<predecessor_type> my_built_predecessors; | | internal::edge_container<predecessor_type> my_built_predecessors; | |
| #endif | | #endif | |
| input_type my_buffer; | | input_type my_buffer; | |
| bool my_buffer_is_valid; | | bool my_buffer_is_valid; | |
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/))
{} | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/))
{} | |
|
| | | | |
| | | /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) { | |
| | | my_buffer_is_valid = false; | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | if (f&rf_clear_edges) { | |
| | | my_successors.clear(); | |
| | | my_built_predecessors.receiver_extract(*this); | |
| | | } | |
| | | #endif | |
| | | } | |
| }; // overwrite_node | | }; // overwrite_node | |
| | | | |
| template< typename T > | | template< typename T > | |
| class write_once_node : public overwrite_node<T> { | | class write_once_node : public overwrite_node<T> { | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| | | | |
| skipping to change at line 1529 | | skipping to change at line 1496 | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| typedef typename receiver<input_type>::predecessor_list_type predecesso
r_list_type; | | typedef typename receiver<input_type>::predecessor_list_type predecesso
r_list_type; | |
| typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | | typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | |
| #endif | | #endif | |
| private: | | private: | |
| internal::broadcast_cache<input_type> my_successors; | | internal::broadcast_cache<input_type> my_successors; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| internal::edge_container<predecessor_type> my_built_predecessors; | | internal::edge_container<predecessor_type> my_built_predecessors; | |
|
| spin_mutex pred_mutex; | | spin_mutex pred_mutex; // serialize accesses on edge_container | |
| #endif | | #endif | |
| public: | | public: | |
| | | | |
| broadcast_node(graph& g) : graph_node(g) { | | broadcast_node(graph& g) : graph_node(g) { | |
| my_successors.set_owner( this ); | | my_successors.set_owner( this ); | |
| tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this-
>my_graph, | | tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this-
>my_graph, | |
| static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type> *>(this) ); | | static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type> *>(this) ); | |
| } | | } | |
| | | | |
| // Copy constructor | | // Copy constructor | |
| | | | |
| skipping to change at line 1567 | | skipping to change at line 1534 | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes s as a successor | | //! Removes s as a successor | |
| virtual bool remove_successor( receiver<T> &r ) { | | virtual bool remove_successor( receiver<T> &r ) { | |
| my_successors.remove_successor( r ); | | my_successors.remove_successor( r ); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| /*override*/ void internal_add_built_successor(successor_type &r) { | | typedef typename sender<T>::built_successors_type built_successors_type | |
| | | ; | |
| | | | |
| | | /*override sender*/ built_successors_type &built_successors() { return | |
| | | my_successors.built_successors(); } | |
| | | | |
| | | /*override sender*/ void internal_add_built_successor(successor_type &r | |
| | | ) { | |
| my_successors.internal_add_built_successor(r); | | my_successors.internal_add_built_successor(r); | |
| } | | } | |
| | | | |
|
| /*override*/ void internal_delete_built_successor(successor_type &r) { | | /*override sender*/ void internal_delete_built_successor(successor_type
&r) { | |
| my_successors.internal_delete_built_successor(r); | | my_successors.internal_delete_built_successor(r); | |
| } | | } | |
| | | | |
|
| /*override*/ size_t successor_count() { | | /*override sender*/ size_t successor_count() { | |
| return my_successors.successor_count(); | | return my_successors.successor_count(); | |
| } | | } | |
| | | | |
| /*override*/ void copy_successors(successor_list_type &v) { | | /*override*/ void copy_successors(successor_list_type &v) { | |
| my_successors.copy_successors(v); | | my_successors.copy_successors(v); | |
| } | | } | |
| | | | |
|
| | | typedef typename receiver<T>::built_predecessors_type built_predecessor | |
| | | s_type; | |
| | | | |
| | | /*override receiver*/ built_predecessors_type &built_predecessors() { r | |
| | | eturn my_built_predecessors; } | |
| | | | |
| /*override*/ void internal_add_built_predecessor( predecessor_type &p)
{ | | /*override*/ void internal_add_built_predecessor( predecessor_type &p)
{ | |
|
| | | spin_mutex::scoped_lock l(pred_mutex); | |
| my_built_predecessors.add_edge(p); | | my_built_predecessors.add_edge(p); | |
| } | | } | |
| | | | |
| /*override*/ void internal_delete_built_predecessor( predecessor_type &
p) { | | /*override*/ void internal_delete_built_predecessor( predecessor_type &
p) { | |
|
| | | spin_mutex::scoped_lock l(pred_mutex); | |
| my_built_predecessors.delete_edge(p); | | my_built_predecessors.delete_edge(p); | |
| } | | } | |
| | | | |
| /*override*/ size_t predecessor_count() { | | /*override*/ size_t predecessor_count() { | |
|
| | | spin_mutex::scoped_lock l(pred_mutex); | |
| return my_built_predecessors.edge_count(); | | return my_built_predecessors.edge_count(); | |
| } | | } | |
| | | | |
| /*override*/ void copy_predecessors(predecessor_list_type &v) { | | /*override*/ void copy_predecessors(predecessor_list_type &v) { | |
|
| | | spin_mutex::scoped_lock l(pred_mutex); | |
| my_built_predecessors.copy_edges(v); | | my_built_predecessors.copy_edges(v); | |
| } | | } | |
|
| | | | |
| | | /*override graph_node*/ void extract() { | |
| | | my_built_predecessors.receiver_extract(*this); | |
| | | my_successors.built_successors().sender_extract(*this); | |
| | | } | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| //! build a task to run the successor if possible. Default is old beha
vior. | | //! build a task to run the successor if possible. Default is old beha
vior. | |
| /*override*/ task *try_put_task(const T& t) { | | /*override*/ task *try_put_task(const T& t) { | |
| task *new_task = my_successors.try_put_task(t); | | task *new_task = my_successors.try_put_task(t); | |
| if(!new_task) new_task = SUCCESSFULLY_ENQUEUED; | | if(!new_task) new_task = SUCCESSFULLY_ENQUEUED; | |
| return new_task; | | return new_task; | |
| } | | } | |
| | | | |
|
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) | |
| | | {} | |
| | | | |
| | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) { | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| my_successors.reset(f); | | if (f&rf_clear_edges) { | |
| if (f&rf_extract) { | | my_successors.clear(); | |
| my_built_predecessors.receiver_extract(*this); | | my_built_predecessors.clear(); | |
| } | | } | |
|
| __TBB_ASSERT(!(f & rf_extract) || my_successors.empty(), "Error res
etting broadcast_node"); | | __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error
resetting broadcast_node"); | |
| #endif | | #endif | |
| } | | } | |
|
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/))
{} | | | |
| }; // broadcast_node | | }; // broadcast_node | |
| | | | |
| //! Forwards messages in arbitrary order | | //! Forwards messages in arbitrary order | |
| template <typename T, typename A=cache_aligned_allocator<T> > | | template <typename T, typename A=cache_aligned_allocator<T> > | |
| class buffer_node : public graph_node, public internal::reservable_item_buf
fer<T, A>, public receiver<T>, public sender<T> { | | class buffer_node : public graph_node, public internal::reservable_item_buf
fer<T, A>, public receiver<T>, public sender<T> { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| | | | |
| skipping to change at line 1734 | | skipping to change at line 1719 | |
| forwarder_busy = true; | | forwarder_busy = true; | |
| task *new_task = new(task::allocate_additional_child_of(*tp
)) internal:: | | task *new_task = new(task::allocate_additional_child_of(*tp
)) internal:: | |
| forward_task_bypass | | forward_task_bypass | |
| < buffer_node<input_type, A> >(*this); | | < buffer_node<input_type, A> >(*this); | |
| // tmp should point to the last item handled by the aggrega
tor. This is the operation | | // tmp should point to the last item handled by the aggrega
tor. This is the operation | |
| // the handling thread enqueued. So modifying that record
will be okay. | | // the handling thread enqueued. So modifying that record
will be okay. | |
| tbb::task *z = tmp->ltask; | | tbb::task *z = tmp->ltask; | |
| tmp->ltask = combine_tasks(z, new_task); // in case the op
generated a task | | tmp->ltask = combine_tasks(z, new_task); // in case the op
generated a task | |
| } | | } | |
| } | | } | |
|
| } | | } // handle_operations | |
| | | | |
| inline task *grab_forwarding_task( buffer_operation &op_data) { | | inline task *grab_forwarding_task( buffer_operation &op_data) { | |
| return op_data.ltask; | | return op_data.ltask; | |
| } | | } | |
| | | | |
| inline bool enqueue_forwarding_task(buffer_operation &op_data) { | | inline bool enqueue_forwarding_task(buffer_operation &op_data) { | |
| task *ft = grab_forwarding_task(op_data); | | task *ft = grab_forwarding_task(op_data); | |
| if(ft) { | | if(ft) { | |
| FLOW_SPAWN(*ft); | | FLOW_SPAWN(*ft); | |
| return true; | | return true; | |
| | | | |
| skipping to change at line 1776 | | skipping to change at line 1761 | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| //! Remove successor | | //! Remove successor | |
| virtual void internal_rem_succ(buffer_operation *op) { | | virtual void internal_rem_succ(buffer_operation *op) { | |
| my_successors.remove_successor(*(op->r)); | | my_successors.remove_successor(*(op->r)); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename sender<T>::built_successors_type built_successors_type | |
| | | ; | |
| | | | |
| | | /*override sender*/ built_successors_type &built_successors() { return | |
| | | my_successors.built_successors(); } | |
| | | | |
| virtual void internal_add_built_succ(buffer_operation *op) { | | virtual void internal_add_built_succ(buffer_operation *op) { | |
| my_successors.internal_add_built_successor(*(op->r)); | | my_successors.internal_add_built_successor(*(op->r)); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| virtual void internal_del_built_succ(buffer_operation *op) { | | virtual void internal_del_built_succ(buffer_operation *op) { | |
| my_successors.internal_delete_built_successor(*(op->r)); | | my_successors.internal_delete_built_successor(*(op->r)); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
|
| | | typedef typename receiver<T>::built_predecessors_type built_predecessor | |
| | | s_type; | |
| | | | |
| | | /*override receiver*/ built_predecessors_type &built_predecessors() { r | |
| | | eturn my_built_predecessors; } | |
| | | | |
| virtual void internal_add_built_pred(buffer_operation *op) { | | virtual void internal_add_built_pred(buffer_operation *op) { | |
| my_built_predecessors.add_edge(*(op->p)); | | my_built_predecessors.add_edge(*(op->p)); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| virtual void internal_del_built_pred(buffer_operation *op) { | | virtual void internal_del_built_pred(buffer_operation *op) { | |
| my_built_predecessors.delete_edge(*(op->p)); | | my_built_predecessors.delete_edge(*(op->p)); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 1815 | | skipping to change at line 1808 | |
| | | | |
| virtual void internal_copy_succs(buffer_operation *op) { | | virtual void internal_copy_succs(buffer_operation *op) { | |
| my_successors.copy_successors(*(op->svec)); | | my_successors.copy_successors(*(op->svec)); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| virtual void internal_copy_preds(buffer_operation *op) { | | virtual void internal_copy_preds(buffer_operation *op) { | |
| my_built_predecessors.copy_edges(*(op->pvec)); | | my_built_predecessors.copy_edges(*(op->pvec)); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
|
| | | | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| //! Tries to forward valid items to successors | | //! Tries to forward valid items to successors | |
| virtual void internal_forward_task(buffer_operation *op) { | | virtual void internal_forward_task(buffer_operation *op) { | |
| if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) { | | if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) { | |
| __TBB_store_with_release(op->status, FAILED); | | __TBB_store_with_release(op->status, FAILED); | |
| this->forwarder_busy = false; | | this->forwarder_busy = false; | |
| return; | | return; | |
| } | | } | |
| T i_copy; | | T i_copy; | |
| | | | |
| skipping to change at line 1970 | | skipping to change at line 1964 | |
| buffer_operation op_data(blt_pred_cpy); | | buffer_operation op_data(blt_pred_cpy); | |
| op_data.pvec = &v; | | op_data.pvec = &v; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
| | | | |
| /*override*/ void copy_successors( successor_list_type &v ) { | | /*override*/ void copy_successors( successor_list_type &v ) { | |
| buffer_operation op_data(blt_succ_cpy); | | buffer_operation op_data(blt_succ_cpy); | |
| op_data.svec = &v; | | op_data.svec = &v; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| } | | } | |
|
| | | | |
| #endif | | #endif | |
| | | | |
| //! Removes a successor. | | //! Removes a successor. | |
| /** Removes successor r from the list of successors. | | /** Removes successor r from the list of successors. | |
| It also calls r.remove_predecessor(*this) to remove this node as a
predecessor. */ | | It also calls r.remove_predecessor(*this) to remove this node as a
predecessor. */ | |
| /* override */ bool remove_successor( successor_type &r ) { | | /* override */ bool remove_successor( successor_type &r ) { | |
| r.remove_predecessor(*this); | | r.remove_predecessor(*this); | |
| buffer_operation op_data(rem_succ); | | buffer_operation op_data(rem_succ); | |
| op_data.r = &r; | | op_data.r = &r; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| | | | |
| skipping to change at line 2052 | | skipping to change at line 2047 | |
| // call returned a task (if another request resulted in a succe
ssful | | // call returned a task (if another request resulted in a succe
ssful | |
| // forward this could happen.) Queue the task and reset the po
inter. | | // forward this could happen.) Queue the task and reset the po
inter. | |
| FLOW_SPAWN(*ft); ft = NULL; | | FLOW_SPAWN(*ft); ft = NULL; | |
| } | | } | |
| else if(!ft && op_data.status == SUCCEEDED) { | | else if(!ft && op_data.status == SUCCEEDED) { | |
| ft = SUCCESSFULLY_ENQUEUED; | | ft = SUCCESSFULLY_ENQUEUED; | |
| } | | } | |
| return ft; | | return ft; | |
| } | | } | |
| | | | |
|
| /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) | |
| | | { } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | public: | |
| | | /* override*/ void extract() { | |
| | | my_built_predecessors.receiver_extract(*this); | |
| | | my_successors.built_successors().sender_extract(*this); | |
| | | } | |
| | | #endif | |
| | | | |
| | | protected: | |
| | | /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) { | |
| internal::reservable_item_buffer<T, A>::reset(); | | internal::reservable_item_buffer<T, A>::reset(); | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| my_successors.reset(f); | | // TODO: just clear structures | |
| if (f&rf_extract) { | | if (f&rf_clear_edges) { | |
| my_built_predecessors.receiver_extract(*this); | | my_successors.clear(); | |
| | | my_built_predecessors.clear(); | |
| } | | } | |
| #endif | | #endif | |
| forwarder_busy = false; | | forwarder_busy = false; | |
| } | | } | |
| | | | |
|
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) | | | |
| { } | | | |
| | | | |
| }; // buffer_node | | }; // buffer_node | |
| | | | |
| //! Forwards messages in FIFO order | | //! Forwards messages in FIFO order | |
| template <typename T, typename A=cache_aligned_allocator<T> > | | template <typename T, typename A=cache_aligned_allocator<T> > | |
| class queue_node : public buffer_node<T, A> { | | class queue_node : public buffer_node<T, A> { | |
| protected: | | protected: | |
| typedef buffer_node<T, A> base_type; | | typedef buffer_node<T, A> base_type; | |
| typedef typename base_type::size_type size_type; | | typedef typename base_type::size_type size_type; | |
| typedef typename base_type::buffer_operation queue_operation; | | typedef typename base_type::buffer_operation queue_operation; | |
| | | | |
| | | | |
| skipping to change at line 2154 | | skipping to change at line 2159 | |
| static_cast<receiver<input_type> *>(this), | | static_cast<receiver<input_type> *>(this), | |
| static_cast<sender<output_type> *>(this) )
; | | static_cast<sender<output_type> *>(this) )
; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| /* override */ void set_name( const char *name ) { | | /* override */ void set_name( const char *name ) { | |
| tbb::internal::fgt_node_desc( this, name ); | | tbb::internal::fgt_node_desc( this, name ); | |
| } | | } | |
| #endif | | #endif | |
| | | | |
|
| /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { | | protected: | |
| base_type::reset(__TBB_PFG_RESET_ARG(f)); | | /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) { | |
| | | base_type::reset_node(__TBB_PFG_RESET_ARG(f)); | |
| } | | } | |
| }; // queue_node | | }; // queue_node | |
| | | | |
| //! Forwards messages in sequence order | | //! Forwards messages in sequence order | |
| template< typename T, typename A=cache_aligned_allocator<T> > | | template< typename T, typename A=cache_aligned_allocator<T> > | |
| class sequencer_node : public queue_node<T, A> { | | class sequencer_node : public queue_node<T, A> { | |
| internal::function_body< T, size_t > *my_sequencer; | | internal::function_body< T, size_t > *my_sequencer; | |
| // my_sequencer should be a benign function and must be callable | | // my_sequencer should be a benign function and must be callable | |
| // from a parallel context. Does this mean it needn't be reset? | | // from a parallel context. Does this mean it needn't be reset? | |
| public: | | public: | |
| | | | |
| skipping to change at line 2262 | | skipping to change at line 2268 | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| /* override */ void set_name( const char *name ) { | | /* override */ void set_name( const char *name ) { | |
| tbb::internal::fgt_node_desc( this, name ); | | tbb::internal::fgt_node_desc( this, name ); | |
| } | | } | |
| #endif | | #endif | |
| | | | |
| protected: | | protected: | |
| | | | |
|
| /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) { | |
| mark = 0; | | mark = 0; | |
|
| base_type::reset(__TBB_PFG_RESET_ARG(f)); | | base_type::reset_node(__TBB_PFG_RESET_ARG(f)); | |
| } | | } | |
| | | | |
| typedef typename buffer_node<T, A>::size_type size_type; | | typedef typename buffer_node<T, A>::size_type size_type; | |
| typedef typename buffer_node<T, A>::item_type item_type; | | typedef typename buffer_node<T, A>::item_type item_type; | |
| typedef typename buffer_node<T, A>::buffer_operation prio_operation; | | typedef typename buffer_node<T, A>::buffer_operation prio_operation; | |
| | | | |
| enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | | enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | |
| | | | |
| /* override */ void handle_operations(prio_operation *op_list) { | | /* override */ void handle_operations(prio_operation *op_list) { | |
| prio_operation *tmp = op_list /*, *pop_list*/ ; | | prio_operation *tmp = op_list /*, *pop_list*/ ; | |
| | | | |
| skipping to change at line 2505 | | skipping to change at line 2511 | |
| template< typename T > | | template< typename T > | |
| class limiter_node : public graph_node, public receiver< T >, public sender
< T > { | | class limiter_node : public graph_node, public receiver< T >, public sender
< T > { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | typedef typename receiver<input_type>::built_predecessors_type built_pr | |
| | | edecessors_type; | |
| | | typedef typename sender<output_type>::built_successors_type built_succe | |
| | | ssors_type; | |
| typedef typename receiver<input_type>::predecessor_list_type predecesso
r_list_type; | | typedef typename receiver<input_type>::predecessor_list_type predecesso
r_list_type; | |
| typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | | typedef typename sender<output_type>::successor_list_type successor_lis
t_type; | |
| #endif | | #endif | |
| | | | |
| private: | | private: | |
| size_t my_threshold; | | size_t my_threshold; | |
| size_t my_count; //number of successful puts | | size_t my_count; //number of successful puts | |
| size_t my_tries; //number of active put attempts | | size_t my_tries; //number of active put attempts | |
| internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
; | | internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
; | |
| spin_mutex my_mutex; | | spin_mutex my_mutex; | |
| | | | |
| skipping to change at line 2660 | | skipping to change at line 2668 | |
| | | | |
| //! Removes a successor from this node | | //! Removes a successor from this node | |
| /** r.remove_predecessor(*this) is also called. */ | | /** r.remove_predecessor(*this) is also called. */ | |
| /* override */ bool remove_successor( receiver<output_type> &r ) { | | /* override */ bool remove_successor( receiver<output_type> &r ) { | |
| r.remove_predecessor(*this); | | r.remove_predecessor(*this); | |
| my_successors.remove_successor(r); | | my_successors.remove_successor(r); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| | | /*override*/ built_successors_type &built_successors() { return my_succ | |
| | | essors.built_successors(); } | |
| | | /*override*/ built_predecessors_type &built_predecessors() { return my_ | |
| | | predecessors.built_predecessors(); } | |
| | | | |
| /*override*/void internal_add_built_successor(receiver<output_type> &sr
c) { | | /*override*/void internal_add_built_successor(receiver<output_type> &sr
c) { | |
| my_successors.internal_add_built_successor(src); | | my_successors.internal_add_built_successor(src); | |
| } | | } | |
| | | | |
| /*override*/void internal_delete_built_successor(receiver<output_type>
&src) { | | /*override*/void internal_delete_built_successor(receiver<output_type>
&src) { | |
| my_successors.internal_delete_built_successor(src); | | my_successors.internal_delete_built_successor(src); | |
| } | | } | |
| | | | |
| /*override*/size_t successor_count() { return my_successors.successor_c
ount(); } | | /*override*/size_t successor_count() { return my_successors.successor_c
ount(); } | |
| | | | |
| | | | |
| skipping to change at line 2687 | | skipping to change at line 2698 | |
| | | | |
| /*override*/void internal_delete_built_predecessor(sender<output_type>
&src) { | | /*override*/void internal_delete_built_predecessor(sender<output_type>
&src) { | |
| my_predecessors.internal_delete_built_predecessor(src); | | my_predecessors.internal_delete_built_predecessor(src); | |
| } | | } | |
| | | | |
| /*override*/size_t predecessor_count() { return my_predecessors.predece
ssor_count(); } | | /*override*/size_t predecessor_count() { return my_predecessors.predece
ssor_count(); } | |
| | | | |
| /*override*/ void copy_predecessors(predecessor_list_type &v) { | | /*override*/ void copy_predecessors(predecessor_list_type &v) { | |
| my_predecessors.copy_predecessors(v); | | my_predecessors.copy_predecessors(v); | |
| } | | } | |
|
| | | | |
| | | /*override*/void extract() { | |
| | | my_count = 0; | |
| | | my_successors.built_successors().sender_extract(*this); | |
| | | my_predecessors.built_predecessors().receiver_extract(*this); | |
| | | decrement.built_predecessors().receiver_extract(decrement); | |
| | | } | |
| #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| //! Adds src to the list of cached predecessors. | | //! Adds src to the list of cached predecessors. | |
| /* override */ bool register_predecessor( predecessor_type &src ) { | | /* override */ bool register_predecessor( predecessor_type &src ) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| my_predecessors.add( src ); | | my_predecessors.add( src ); | |
| task* tp = this->my_graph.root_task(); | | task* tp = this->my_graph.root_task(); | |
| if ( my_count + my_tries < my_threshold && !my_successors.empty() &
& tp ) { | | if ( my_count + my_tries < my_threshold && !my_successors.empty() &
& tp ) { | |
| FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp )
) | | FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp )
) | |
| internal::forward_task_bypass < limiter_node<T> >(
*this ) ) ); | | internal::forward_task_bypass < limiter_node<T> >(
*this ) ) ); | |
| | | | |
| skipping to change at line 2741 | | skipping to change at line 2759 | |
| } | | } | |
| } | | } | |
| else { | | else { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| ++my_count; | | ++my_count; | |
| --my_tries; | | --my_tries; | |
| } | | } | |
| return rtask; | | return rtask; | |
| } | | } | |
| | | | |
|
| /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { | | /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) | |
| | | { | |
| | | __TBB_ASSERT(false,NULL); // should never be called | |
| | | } | |
| | | | |
| | | /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) { | |
| my_count = 0; | | my_count = 0; | |
|
| my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); | | | |
| decrement.reset_receiver(__TBB_PFG_RESET_ARG(f)); | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
|
| my_successors.reset(f); | | if(f & rf_clear_edges) { | |
| | | my_predecessors.clear(); | |
| | | my_successors.clear(); | |
| | | } | |
| | | else | |
| #endif | | #endif | |
|
| | | { | |
| | | my_predecessors.reset( ); | |
| | | } | |
| | | decrement.reset_receiver(__TBB_PFG_RESET_ARG(f)); | |
| } | | } | |
|
| | | | |
| /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { m | | | |
| y_predecessors.reset(__TBB_PFG_RESET_ARG(f)); } | | | |
| }; // limiter_node | | }; // limiter_node | |
| | | | |
| #include "internal/_flow_graph_join_impl.h" | | #include "internal/_flow_graph_join_impl.h" | |
| | | | |
| using internal::reserving_port; | | using internal::reserving_port; | |
| using internal::queueing_port; | | using internal::queueing_port; | |
| using internal::tag_matching_port; | | using internal::tag_matching_port; | |
| using internal::input_port; | | using internal::input_port; | |
| using internal::tag_value; | | using internal::tag_value; | |
| using internal::NO_TAG; | | using internal::NO_TAG; | |
| | | | |
| skipping to change at line 3266 | | skipping to change at line 3292 | |
| } | | } | |
| | | | |
| template<typename C > | | template<typename C > | |
| template< typename R > | | template< typename R > | |
| void internal::edge_container<C>::receiver_extract( R &r ) { | | void internal::edge_container<C>::receiver_extract( R &r ) { | |
| edge_list_type e = built_edges; | | edge_list_type e = built_edges; | |
| for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++
i ) { | | for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++
i ) { | |
| remove_edge(**i, r); | | remove_edge(**i, r); | |
| } | | } | |
| } | | } | |
|
| #endif | | #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ | |
| | | | |
| //! Returns a copy of the body from a function or continue node | | //! Returns a copy of the body from a function or continue node | |
| template< typename Body, typename Node > | | template< typename Body, typename Node > | |
| Body copy_body( Node &n ) { | | Body copy_body( Node &n ) { | |
| return n.template copy_function_object<Body>(); | | return n.template copy_function_object<Body>(); | |
| } | | } | |
| | | | |
| #if __TBB_PREVIEW_COMPOSITE_NODE | | #if __TBB_PREVIEW_COMPOSITE_NODE | |
| | | | |
| //composite_node | | //composite_node | |
| | | | |
| skipping to change at line 3329 | | skipping to change at line 3355 | |
| else | | else | |
| tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_
FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil
d_of, this, tbb::internal::FLOW_NODE ); | | tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_
FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil
d_of, this, tbb::internal::FLOW_NODE ); | |
| return add_nodes_impl(visible, n...); | | return add_nodes_impl(visible, n...); | |
| } else { | | } else { | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| #endif | | #endif | |
| | | | |
| protected: | | protected: | |
|
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags)) {} | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags)) {} | |
| | | | |
| public: | | public: | |
| composite_node( graph &g, const char *my_type_name = " ") : graph_node(
g), type_name(my_type_name) { | | composite_node( graph &g, const char *my_type_name = " ") : graph_node(
g), type_name(my_type_name) { | |
| my_input_ports = NULL; | | my_input_ports = NULL; | |
| my_output_ports = NULL; | | my_output_ports = NULL; | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern
al::FLOW_COMPOSITE_NODE ); | | tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern
al::FLOW_COMPOSITE_NODE ); | |
| tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam
e ); | | tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam
e ); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| skipping to change at line 3402 | | skipping to change at line 3428 | |
| | | | |
| output_ports_type output_ports() { | | output_ports_type output_ports() { | |
| __TBB_ASSERT(my_output_ports, "output ports not set, call set_exte
rnal_ports to set output ports"); | | __TBB_ASSERT(my_output_ports, "output ports not set, call set_exte
rnal_ports to set output ports"); | |
| return *my_output_ports; | | return *my_output_ports; | |
| } | | } | |
| | | | |
| virtual ~composite_node() { | | virtual ~composite_node() { | |
| if(my_input_ports) delete my_input_ports; | | if(my_input_ports) delete my_input_ports; | |
| if(my_output_ports) delete my_output_ports; | | if(my_output_ports) delete my_output_ports; | |
| } | | } | |
|
| }; | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override*/void extract() { | |
| | | __TBB_ASSERT(false, "Current composite_node implementation does not | |
| | | support extract"); | |
| | | } | |
| | | #endif | |
| | | | |
| | | }; // class composite_node | |
| | | | |
| //composite_node with only input ports | | //composite_node with only input ports | |
| //TODO: trim specializations | | //TODO: trim specializations | |
| | | | |
| template< typename... InputTypes> | | template< typename... InputTypes> | |
| class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> >
: public graph_node { | | class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> >
: public graph_node { | |
| public: | | public: | |
| typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type; | | typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type; | |
| | | | |
| private: | | private: | |
| | | | |
| skipping to change at line 3451 | | skipping to change at line 3484 | |
| else | | else | |
| tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_
FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil
d_of, this, tbb::internal::FLOW_NODE ); | | tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_
FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil
d_of, this, tbb::internal::FLOW_NODE ); | |
| return add_nodes_impl(visible, n...); | | return add_nodes_impl(visible, n...); | |
| } else { | | } else { | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| #endif | | #endif | |
| | | | |
| protected: | | protected: | |
|
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags)) {} | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags)) {} | |
| | | | |
| public: | | public: | |
| composite_node( graph &g, const char *my_type_name = " ") : graph_node(
g), type_name(my_type_name) { | | composite_node( graph &g, const char *my_type_name = " ") : graph_node(
g), type_name(my_type_name) { | |
| my_input_ports = NULL; | | my_input_ports = NULL; | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern
al::FLOW_COMPOSITE_NODE ); | | tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern
al::FLOW_COMPOSITE_NODE ); | |
| tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam
e ); | | tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam
e ); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 3511 | | skipping to change at line 3544 | |
| #endif | | #endif | |
| | | | |
| input_ports_type input_ports() { | | input_ports_type input_ports() { | |
| __TBB_ASSERT(my_input_ports, "input ports not set, call set_extern
al_ports to set input ports"); | | __TBB_ASSERT(my_input_ports, "input ports not set, call set_extern
al_ports to set input ports"); | |
| return *my_input_ports; | | return *my_input_ports; | |
| } | | } | |
| | | | |
| virtual ~composite_node() { | | virtual ~composite_node() { | |
| if(my_input_ports) delete my_input_ports; | | if(my_input_ports) delete my_input_ports; | |
| } | | } | |
|
| }; | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override*/void extract() { | |
| | | __TBB_ASSERT(false, "Current composite_node implementation does not | |
| | | support extract"); | |
| | | } | |
| | | #endif | |
| | | | |
| | | }; // class composite_node | |
| | | | |
| //composite_nodes with only output_ports | | //composite_nodes with only output_ports | |
| template<typename... OutputTypes> | | template<typename... OutputTypes> | |
| class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...>
> : public graph_node { | | class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...>
> : public graph_node { | |
| public: | | public: | |
| typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type; | | typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type; | |
| | | | |
| private: | | private: | |
| output_ports_type *my_output_ports; | | output_ports_type *my_output_ports; | |
| static const size_t NUM_OUTPUTS = sizeof...(OutputTypes); | | static const size_t NUM_OUTPUTS = sizeof...(OutputTypes); | |
| | | | |
| skipping to change at line 3558 | | skipping to change at line 3598 | |
| else | | else | |
| tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_
FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil
d_of, this, tbb::internal::FLOW_NODE ); | | tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_
FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil
d_of, this, tbb::internal::FLOW_NODE ); | |
| return add_nodes_impl(visible, n...); | | return add_nodes_impl(visible, n...); | |
| } else { | | } else { | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| #endif | | #endif | |
| | | | |
| protected: | | protected: | |
|
| /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags)) {} | | /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags)) {} | |
| | | | |
| public: | | public: | |
| composite_node( graph &g, const char *my_type_name = " ") : graph_node(
g), type_name(my_type_name) { | | composite_node( graph &g, const char *my_type_name = " ") : graph_node(
g), type_name(my_type_name) { | |
| my_output_ports = NULL; | | my_output_ports = NULL; | |
| #if TBB_PREVIEW_FLOW_GRAPH_TRACE | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern
al::FLOW_COMPOSITE_NODE ); | | tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern
al::FLOW_COMPOSITE_NODE ); | |
| tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam
e ); | | tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam
e ); | |
| #endif | | #endif | |
| } | | } | |
| | | | |
| | | | |
| skipping to change at line 3618 | | skipping to change at line 3658 | |
| #endif | | #endif | |
| | | | |
| output_ports_type output_ports() { | | output_ports_type output_ports() { | |
| __TBB_ASSERT(my_output_ports, "output ports not set, call set_exte
rnal_ports to set output ports"); | | __TBB_ASSERT(my_output_ports, "output ports not set, call set_exte
rnal_ports to set output ports"); | |
| return *my_output_ports; | | return *my_output_ports; | |
| } | | } | |
| | | | |
| virtual ~composite_node() { | | virtual ~composite_node() { | |
| if(my_output_ports) delete my_output_ports; | | if(my_output_ports) delete my_output_ports; | |
| } | | } | |
|
| }; | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override*/void extract() { | |
| | | __TBB_ASSERT(false, "Current composite_node implementation does not | |
| | | support extract"); | |
| | | } | |
| | | #endif | |
| | | | |
| | | }; // class composite_node | |
| | | | |
| #endif // __TBB_PREVIEW_COMPOSITE_NODE | | #endif // __TBB_PREVIEW_COMPOSITE_NODE | |
| | | | |
|
| | | #if __TBB_PREVIEW_ASYNC_NODE | |
| | | namespace internal { | |
| | | //! Pure virtual template class that defines interface for async communicat | |
| | | ion | |
| | | template < typename Output > | |
| | | class async_gateway { | |
| | | public: | |
| | | typedef Output output_type; | |
| | | | |
| | | //! Submit signal from Async Activity to FG | |
| | | virtual bool async_try_put(const output_type &i ) = 0; | |
| | | | |
| | | virtual void async_reserve() = 0; | |
| | | | |
| | | virtual void async_commit() = 0; | |
| | | | |
| | | virtual ~async_gateway() {} | |
| | | }; | |
| | | } | |
| | | | |
| | | //! Implements a async node | |
| | | template < typename Input, typename Output, typename Allocator=cache_aligne | |
| | | d_allocator<Input> > | |
| | | class async_node : public graph_node, public internal::async_input<Input, A | |
| | | llocator, internal::async_gateway<Output> >, public internal::function_outp | |
| | | ut<Output>, public internal::async_gateway<Output> { | |
| | | protected: | |
| | | using graph_node::my_graph; | |
| | | public: | |
| | | typedef Input input_type; | |
| | | typedef Output output_type; | |
| | | typedef async_node< input_type, output_type, Allocator > my_class; | |
| | | typedef sender< input_type > predecessor_type; | |
| | | typedef receiver< output_type > successor_type; | |
| | | typedef internal::async_gateway< output_type > async_gateway_type; | |
| | | typedef internal::async_input<input_type, Allocator, async_gateway_type | |
| | | > async_input_type; | |
| | | typedef internal::function_output<output_type> async_output_type; | |
| | | | |
| | | //! Constructor | |
| | | template< typename Body > | |
| | | async_node( graph &g, Body body ) : | |
| | | graph_node( g ), async_input_type( g, body ) { | |
| | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_ASYNC_NODE, | |
| | | &this->graph_node::my_graph, | |
| | | static_cast<receiver<input_type> | |
| | | *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| | | //! Copy constructor | |
| | | async_node( const async_node& src ) : | |
| | | graph_node(src.graph_node::my_graph), async_input_type( src ), asyn | |
| | | c_output_type(){ | |
| | | tbb::internal::fgt_node_with_body( tbb::internal::FLOW_ASYNC_NODE, | |
| | | &this->graph_node::my_graph, | |
| | | static_cast<receiver<input_type> | |
| | | *>(this), | |
| | | static_cast<sender<output_type> | |
| | | *>(this), this->my_body ); | |
| | | } | |
| | | | |
| | | /* override */ async_gateway_type& async_gateway() { | |
| | | return static_cast< async_gateway_type& >(*this); | |
| | | } | |
| | | | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_TRACE | |
| | | /* override */ void set_name( const char *name ) { | |
| | | tbb::internal::fgt_node_desc( this, name ); | |
| | | } | |
| | | #endif | |
| | | | |
| | | protected: | |
| | | template< typename R, typename B > friend class run_and_put_task; | |
| | | template<typename X, typename Y> friend class internal::broadcast_cache | |
| | | ; | |
| | | template<typename X, typename Y> friend class internal::round_robin_cac | |
| | | he; | |
| | | using async_input_type::try_put_task; | |
| | | | |
| | | /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) { | |
| | | async_input_type::reset_async_input(__TBB_PFG_RESET_ARG(f)); | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | if(f & rf_clear_edges) successors().clear(); | |
| | | __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "functi | |
| | | on_node successors not empty"); | |
| | | __TBB_ASSERT(!(f & rf_clear_edges) || this->my_predecessors.empty() | |
| | | , "function_node predecessors not empty"); | |
| | | #endif | |
| | | } | |
| | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| | | /*override*/void extract() { | |
| | | this->my_predecessors.built_predecessors().receiver_extract(*this); | |
| | | successors().built_successors().sender_extract(*this); | |
| | | } | |
| | | #endif | |
| | | | |
| | | internal::broadcast_cache<output_type> &successors () { return async_ou | |
| | | tput_type::my_successors; } | |
| | | | |
| | | //! Submit signal from Async Activity to FG | |
| | | /*override*/ bool async_try_put(const output_type &i ) { | |
| | | // TODO: enqueue a task to a FG arena | |
| | | task *res = successors().try_put_task(i); | |
| | | if(!res) return false; | |
| | | if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res); | |
| | | return true; | |
| | | } | |
| | | | |
| | | /*override*/ void async_reserve() { | |
| | | my_graph.increment_wait_count(); | |
| | | } | |
| | | | |
| | | /*override*/ void async_commit() { | |
| | | my_graph.decrement_wait_count(); | |
| | | } | |
| | | }; | |
| | | | |
| | | #endif // __TBB_PREVIEW_ASYNC_NODE | |
| | | | |
| } // interface7 | | } // interface7 | |
| | | | |
| #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | | #if TBB_PREVIEW_FLOW_GRAPH_FEATURES | |
| using interface7::reset_flags; | | using interface7::reset_flags; | |
| using interface7::rf_reset_protocol; | | using interface7::rf_reset_protocol; | |
| using interface7::rf_reset_bodies; | | using interface7::rf_reset_bodies; | |
|
| using interface7::rf_extract; | | using interface7::rf_clear_edges; | |
| #endif | | #endif | |
| | | | |
| using interface7::graph; | | using interface7::graph; | |
| using interface7::graph_node; | | using interface7::graph_node; | |
| using interface7::continue_msg; | | using interface7::continue_msg; | |
| using interface7::sender; | | using interface7::sender; | |
| using interface7::receiver; | | using interface7::receiver; | |
| using interface7::continue_receiver; | | using interface7::continue_receiver; | |
| | | | |
| using interface7::source_node; | | using interface7::source_node; | |
| | | | |
| skipping to change at line 3667 | | skipping to change at line 3818 | |
| using interface7::join_node; | | using interface7::join_node; | |
| using interface7::input_port; | | using interface7::input_port; | |
| using interface7::copy_body; | | using interface7::copy_body; | |
| using interface7::make_edge; | | using interface7::make_edge; | |
| using interface7::remove_edge; | | using interface7::remove_edge; | |
| using interface7::internal::NO_TAG; | | using interface7::internal::NO_TAG; | |
| using interface7::internal::tag_value; | | using interface7::internal::tag_value; | |
| #if __TBB_PREVIEW_COMPOSITE_NODE | | #if __TBB_PREVIEW_COMPOSITE_NODE | |
| using interface7::composite_node; | | using interface7::composite_node; | |
| #endif | | #endif | |
|
| | | #if __TBB_PREVIEW_ASYNC_NODE | |
| | | using interface7::async_node; | |
| | | #endif | |
| } // flow | | } // flow | |
| } // tbb | | } // tbb | |
| | | | |
| #undef __TBB_PFG_RESET_ARG | | #undef __TBB_PFG_RESET_ARG | |
| #undef __TBB_COMMA | | #undef __TBB_COMMA | |
| | | | |
| #endif // __TBB_flow_graph_H | | #endif // __TBB_flow_graph_H | |
| | | | |
End of changes. 102 change blocks. |
| 244 lines changed or deleted | | 415 lines changed or added | |
|