| _flow_graph_join_impl.h | | _flow_graph_join_impl.h | |
| | | | |
| skipping to change at line 44 | | skipping to change at line 44 | |
| #endif | | #endif | |
| | | | |
| #include "tbb/internal/_flow_graph_types_impl.h" | | #include "tbb/internal/_flow_graph_types_impl.h" | |
| | | | |
| namespace internal { | | namespace internal { | |
| | | | |
| typedef size_t tag_value; | | typedef size_t tag_value; | |
| static const tag_value NO_TAG = tag_value(-1); | | static const tag_value NO_TAG = tag_value(-1); | |
| | | | |
| struct forwarding_base { | | struct forwarding_base { | |
|
| forwarding_base(task *rt) : my_root_task(rt), current_tag(NO_TAG) {
} | | forwarding_base(graph &g) : my_graph_ptr(&g), current_tag(NO_TAG) {
} | |
| virtual ~forwarding_base() {} | | virtual ~forwarding_base() {} | |
| // decrement_port_count may create a forwarding task. If we cannot
handle the task | | // decrement_port_count may create a forwarding task. If we cannot
handle the task | |
| // ourselves, ask decrement_port_count to deal with it. | | // ourselves, ask decrement_port_count to deal with it. | |
| virtual task * decrement_port_count(bool handle_task) = 0; | | virtual task * decrement_port_count(bool handle_task) = 0; | |
| virtual void increment_port_count() = 0; | | virtual void increment_port_count() = 0; | |
| virtual task * increment_tag_count(tag_value /*t*/, bool /*handle_t
ask*/) {return NULL;} | | virtual task * increment_tag_count(tag_value /*t*/, bool /*handle_t
ask*/) {return NULL;} | |
| // moved here so input ports can queue tasks | | // moved here so input ports can queue tasks | |
|
| task* my_root_task; | | graph* my_graph_ptr; | |
| tag_value current_tag; // so ports can refer to FE's desired items | | tag_value current_tag; // so ports can refer to FE's desired items | |
| }; | | }; | |
| | | | |
| template< int N > | | template< int N > | |
| struct join_helper { | | struct join_helper { | |
| | | | |
| template< typename TupleType, typename PortType > | | template< typename TupleType, typename PortType > | |
| static inline void set_join_node_pointer(TupleType &my_input, PortT
ype *port) { | | static inline void set_join_node_pointer(TupleType &my_input, PortT
ype *port) { | |
| tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port); | | tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port); | |
| join_helper<N-1>::set_join_node_pointer( my_input, port ); | | join_helper<N-1>::set_join_node_pointer( my_input, port ); | |
| | | | |
| skipping to change at line 669 | | skipping to change at line 669 | |
| class join_node_FE; | | class join_node_FE; | |
| | | | |
| template<typename InputTuple, typename OutputTuple> | | template<typename InputTuple, typename OutputTuple> | |
| class join_node_FE<reserving, InputTuple, OutputTuple> : public forward
ing_base { | | class join_node_FE<reserving, InputTuple, OutputTuple> : public forward
ing_base { | |
| public: | | public: | |
| static const int N = tbb::flow::tuple_size<OutputTuple>::value; | | static const int N = tbb::flow::tuple_size<OutputTuple>::value; | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| typedef InputTuple input_type; | | typedef InputTuple input_type; | |
| typedef join_node_base<reserving, InputTuple, OutputTuple> my_node_
type; // for forwarding | | typedef join_node_base<reserving, InputTuple, OutputTuple> my_node_
type; // for forwarding | |
| | | | |
|
| join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NU
LL) { | | join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) { | |
| ports_with_no_inputs = N; | | ports_with_no_inputs = N; | |
| join_helper<N>::set_join_node_pointer(my_inputs, this); | | join_helper<N>::set_join_node_pointer(my_inputs, this); | |
| } | | } | |
| | | | |
|
| join_node_FE(const join_node_FE& other) : forwarding_base(other.my_
root_task), my_node(NULL) { | | join_node_FE(const join_node_FE& other) : forwarding_base(*(other.f
orwarding_base::my_graph_ptr)), my_node(NULL) { | |
| ports_with_no_inputs = N; | | ports_with_no_inputs = N; | |
| join_helper<N>::set_join_node_pointer(my_inputs, this); | | join_helper<N>::set_join_node_pointer(my_inputs, this); | |
| } | | } | |
| | | | |
| void set_my_node(my_node_type *new_my_node) { my_node = new_my_node
; } | | void set_my_node(my_node_type *new_my_node) { my_node = new_my_node
; } | |
| | | | |
| void increment_port_count() { | | void increment_port_count() { | |
| ++ports_with_no_inputs; | | ++ports_with_no_inputs; | |
| } | | } | |
| | | | |
| // if all input_ports have predecessors, spawn forward to try and c
onsume tuples | | // if all input_ports have predecessors, spawn forward to try and c
onsume tuples | |
| task * decrement_port_count(bool handle_task) { | | task * decrement_port_count(bool handle_task) { | |
| if(ports_with_no_inputs.fetch_and_decrement() == 1) { | | if(ports_with_no_inputs.fetch_and_decrement() == 1) { | |
|
| task *rtask = new ( task::allocate_additional_child_of( *(t | | task* tp = this->my_graph_ptr->root_task(); | |
| his->my_root_task) ) ) | | if(tp) { | |
| forward_task_bypass | | task *rtask = new ( task::allocate_additional_child_of( | |
| <my_node_type>(*my_node); | | *tp ) ) | |
| if(!handle_task) return rtask; | | forward_task_bypass<my_node_type>(*my_node); | |
| FLOW_SPAWN(*rtask); | | if(!handle_task) return rtask; | |
| | | FLOW_SPAWN(*rtask); | |
| | | } | |
| } | | } | |
| return NULL; | | return NULL; | |
| } | | } | |
| | | | |
| input_type &input_ports() { return my_inputs; } | | input_type &input_ports() { return my_inputs; } | |
| | | | |
| protected: | | protected: | |
| | | | |
| void reset() { | | void reset() { | |
| // called outside of parallel contexts | | // called outside of parallel contexts | |
| | | | |
| skipping to change at line 738 | | skipping to change at line 740 | |
| }; | | }; | |
| | | | |
| template<typename InputTuple, typename OutputTuple> | | template<typename InputTuple, typename OutputTuple> | |
| class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi
ng_base { | | class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi
ng_base { | |
| public: | | public: | |
| static const int N = tbb::flow::tuple_size<OutputTuple>::value; | | static const int N = tbb::flow::tuple_size<OutputTuple>::value; | |
| typedef OutputTuple output_type; | | typedef OutputTuple output_type; | |
| typedef InputTuple input_type; | | typedef InputTuple input_type; | |
| typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t
ype; // for forwarding | | typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t
ype; // for forwarding | |
| | | | |
|
| join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NU
LL) { | | join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) { | |
| ports_with_no_items = N; | | ports_with_no_items = N; | |
| join_helper<N>::set_join_node_pointer(my_inputs, this); | | join_helper<N>::set_join_node_pointer(my_inputs, this); | |
| } | | } | |
| | | | |
|
| join_node_FE(const join_node_FE& other) : forwarding_base(other.my_
root_task), my_node(NULL) { | | join_node_FE(const join_node_FE& other) : forwarding_base(*(other.f
orwarding_base::my_graph_ptr)), my_node(NULL) { | |
| ports_with_no_items = N; | | ports_with_no_items = N; | |
| join_helper<N>::set_join_node_pointer(my_inputs, this); | | join_helper<N>::set_join_node_pointer(my_inputs, this); | |
| } | | } | |
| | | | |
| // needed for forwarding | | // needed for forwarding | |
| void set_my_node(my_node_type *new_my_node) { my_node = new_my_node
; } | | void set_my_node(my_node_type *new_my_node) { my_node = new_my_node
; } | |
| | | | |
| void reset_port_count() { | | void reset_port_count() { | |
| ports_with_no_items = N; | | ports_with_no_items = N; | |
| } | | } | |
| | | | |
| // if all input_ports have items, spawn forward to try and consume
tuples | | // if all input_ports have items, spawn forward to try and consume
tuples | |
| task * decrement_port_count(bool handle_task) | | task * decrement_port_count(bool handle_task) | |
| { | | { | |
| if(ports_with_no_items.fetch_and_decrement() == 1) { | | if(ports_with_no_items.fetch_and_decrement() == 1) { | |
|
| task *rtask = new ( task::allocate_additional_child_of( *(t | | task* tp = this->my_graph_ptr->root_task(); | |
| his->my_root_task) ) ) | | if(tp) { | |
| forward_task_bypass | | task *rtask = new ( task::allocate_additional_child_of( | |
| <my_node_type>(*my_node); | | *tp ) ) | |
| if(!handle_task) return rtask; | | forward_task_bypass <my_node_type>(*my_node); | |
| FLOW_SPAWN( *rtask); | | if(!handle_task) return rtask; | |
| | | FLOW_SPAWN( *rtask); | |
| | | } | |
| } | | } | |
| return NULL; | | return NULL; | |
| } | | } | |
| | | | |
| void increment_port_count() { __TBB_ASSERT(false, NULL); } // shou
ld never be called | | void increment_port_count() { __TBB_ASSERT(false, NULL); } // shou
ld never be called | |
| | | | |
| input_type &input_ports() { return my_inputs; } | | input_type &input_ports() { return my_inputs; } | |
| | | | |
| protected: | | protected: | |
| | | | |
| | | | |
| skipping to change at line 853 | | skipping to change at line 857 | |
| friend class internal::aggregating_functor<my_class, tag_matching_F
E_operation>; | | friend class internal::aggregating_functor<my_class, tag_matching_F
E_operation>; | |
| aggregator<my_handler, tag_matching_FE_operation> my_aggregator; | | aggregator<my_handler, tag_matching_FE_operation> my_aggregator; | |
| | | | |
| // called from aggregator, so serialized | | // called from aggregator, so serialized | |
| // construct as many output objects as possible. | | // construct as many output objects as possible. | |
| // returns a task pointer if the a task would have been enqueued bu
t we asked that | | // returns a task pointer if the a task would have been enqueued bu
t we asked that | |
| // it be returned. Otherwise returns NULL. | | // it be returned. Otherwise returns NULL. | |
| task * fill_output_buffer(bool should_enqueue, bool handle_task) { | | task * fill_output_buffer(bool should_enqueue, bool handle_task) { | |
| output_type l_out; | | output_type l_out; | |
| task *rtask = NULL; | | task *rtask = NULL; | |
|
| bool do_fwd = should_enqueue && this->buffer_empty(); | | task* tp = this->my_graph_ptr->root_task(); | |
| | | bool do_fwd = should_enqueue && this->buffer_empty() && tp; | |
| while(find_value_tag(this->current_tag,N)) { // while there ar
e completed items | | while(find_value_tag(this->current_tag,N)) { // while there ar
e completed items | |
| this->tagged_delete(this->current_tag); // remove the tag | | this->tagged_delete(this->current_tag); // remove the tag | |
| if(join_helper<N>::get_items(my_inputs, l_out)) { // <==
call back | | if(join_helper<N>::get_items(my_inputs, l_out)) { // <==
call back | |
| this->push_back(l_out); | | this->push_back(l_out); | |
| if(do_fwd) { // we enqueue if receiving an item from p
redecessor, not if successor asks for item | | if(do_fwd) { // we enqueue if receiving an item from p
redecessor, not if successor asks for item | |
|
| rtask = new ( task::allocate_additional_child_of( *
(this->my_root_task) ) ) | | rtask = new ( task::allocate_additional_child_of( *
tp ) ) | |
| forward_task_bypass<my_node_type>(*my_node); | | forward_task_bypass<my_node_type>(*my_node); | |
| if(handle_task) { | | if(handle_task) { | |
| FLOW_SPAWN(*rtask); | | FLOW_SPAWN(*rtask); | |
| rtask = NULL; | | rtask = NULL; | |
| } | | } | |
| do_fwd = false; | | do_fwd = false; | |
| } | | } | |
| // retire the input values | | // retire the input values | |
| join_helper<N>::reset_ports(my_inputs); // <== call b
ack | | join_helper<N>::reset_ports(my_inputs); // <== call b
ack | |
| this->current_tag = NO_TAG; | | this->current_tag = NO_TAG; | |
| | | | |
| skipping to change at line 931 | | skipping to change at line 936 | |
| __TBB_store_with_release(current->status, SUCCEEDED
); | | __TBB_store_with_release(current->status, SUCCEEDED
); | |
| } | | } | |
| break; | | break; | |
| } | | } | |
| } | | } | |
| } | | } | |
| // ------------ End Aggregator --------------- | | // ------------ End Aggregator --------------- | |
| | | | |
| public: | | public: | |
| template<typename FunctionTuple> | | template<typename FunctionTuple> | |
|
| join_node_FE(graph &g, FunctionTuple tag_funcs) : forwarding_base(g
.root_task()), my_node(NULL) { | | join_node_FE(graph &g, FunctionTuple tag_funcs) : forwarding_base(g
), my_node(NULL) { | |
| join_helper<N>::set_join_node_pointer(my_inputs, this); | | join_helper<N>::set_join_node_pointer(my_inputs, this); | |
| join_helper<N>::set_tag_func(my_inputs, tag_funcs); | | join_helper<N>::set_tag_func(my_inputs, tag_funcs); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
|
| join_node_FE(const join_node_FE& other) : forwarding_base(other.my_
root_task), my_tag_buffer(), | | join_node_FE(const join_node_FE& other) : forwarding_base(*(other.f
orwarding_base::my_graph_ptr)), my_tag_buffer(), | |
| output_buffer_type() { | | output_buffer_type() { | |
| my_node = NULL; | | my_node = NULL; | |
| join_helper<N>::set_join_node_pointer(my_inputs, this); | | join_helper<N>::set_join_node_pointer(my_inputs, this); | |
| join_helper<N>::copy_tag_functors(my_inputs, const_cast<input_t
ype &>(other.my_inputs)); | | join_helper<N>::copy_tag_functors(my_inputs, const_cast<input_t
ype &>(other.my_inputs)); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| // needed for forwarding | | // needed for forwarding | |
| void set_my_node(my_node_type *new_my_node) { my_node = new_my_node
; } | | void set_my_node(my_node_type *new_my_node) { my_node = new_my_node
; } | |
| | | | |
| | | | |
| skipping to change at line 1055 | | skipping to change at line 1060 | |
| friend class internal::aggregating_functor<my_class, join_node_base
_operation>; | | friend class internal::aggregating_functor<my_class, join_node_base
_operation>; | |
| bool forwarder_busy; | | bool forwarder_busy; | |
| aggregator<my_handler, join_node_base_operation> my_aggregator; | | aggregator<my_handler, join_node_base_operation> my_aggregator; | |
| | | | |
| void handle_operations(join_node_base_operation* op_list) { | | void handle_operations(join_node_base_operation* op_list) { | |
| join_node_base_operation *current; | | join_node_base_operation *current; | |
| while(op_list) { | | while(op_list) { | |
| current = op_list; | | current = op_list; | |
| op_list = op_list->next; | | op_list = op_list->next; | |
| switch(current->type) { | | switch(current->type) { | |
|
| case reg_succ: | | case reg_succ: { | |
| my_successors.register_successor(*(current->my_succ)); | | my_successors.register_successor(*(current->my_succ | |
| if(tuple_build_may_succeed() && !forwarder_busy) { | | )); | |
| task *rtask = new ( task::allocate_additional_child | | task* tp = this->graph_node::my_graph.root_task(); | |
| _of(*(this->my_root_task)) ) | | if(tuple_build_may_succeed() && !forwarder_busy && | |
| forward_task_bypass | | tp) { | |
| <join_node_base<JP,InputTuple,OutputTuple> | | task *rtask = new ( task::allocate_additional_c | |
| >(*this); | | hild_of(*tp) ) | |
| FLOW_SPAWN(*rtask); | | forward_task_bypass | |
| forwarder_busy = true; | | <join_node_base<JP,InputTuple,OutputTup | |
| | | le> >(*this); | |
| | | FLOW_SPAWN(*rtask); | |
| | | forwarder_busy = true; | |
| | | } | |
| | | __TBB_store_with_release(current->status, SUCCEEDED | |
| | | ); | |
| } | | } | |
|
| __TBB_store_with_release(current->status, SUCCEEDED); | | | |
| break; | | break; | |
| case rem_succ: | | case rem_succ: | |
| my_successors.remove_successor(*(current->my_succ)); | | my_successors.remove_successor(*(current->my_succ)); | |
| __TBB_store_with_release(current->status, SUCCEEDED); | | __TBB_store_with_release(current->status, SUCCEEDED); | |
| break; | | break; | |
| case try__get: | | case try__get: | |
| if(tuple_build_may_succeed()) { | | if(tuple_build_may_succeed()) { | |
| if(try_to_make_tuple(*(current->my_arg))) { | | if(try_to_make_tuple(*(current->my_arg))) { | |
| tuple_accepted(); | | tuple_accepted(); | |
| __TBB_store_with_release(current->status, SUCCE
EDED); | | __TBB_store_with_release(current->status, SUCCE
EDED); | |
| | | | |
| skipping to change at line 1117 | | skipping to change at line 1124 | |
| } | | } | |
| // ---------- end aggregator ----------- | | // ---------- end aggregator ----------- | |
| public: | | public: | |
| join_node_base(graph &g) : graph_node(g), input_ports_type(g), forw
arder_busy(false) { | | join_node_base(graph &g) : graph_node(g), input_ports_type(g), forw
arder_busy(false) { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| input_ports_type::set_my_node(this); | | input_ports_type::set_my_node(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| join_node_base(const join_node_base& other) : | | join_node_base(const join_node_base& other) : | |
|
| graph_node(other.my_graph), input_ports_type(other), | | graph_node(other.graph_node::my_graph), input_ports_type(other)
, | |
| sender<OutputTuple>(), forwarder_busy(false), my_successors() { | | sender<OutputTuple>(), forwarder_busy(false), my_successors() { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| input_ports_type::set_my_node(this); | | input_ports_type::set_my_node(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| template<typename FunctionTuple> | | template<typename FunctionTuple> | |
| join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_po
rts_type(g, f), forwarder_busy(false) { | | join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_po
rts_type(g, f), forwarder_busy(false) { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| input_ports_type::set_my_node(this); | | input_ports_type::set_my_node(this); | |
| | | | |
End of changes. 15 change blocks. |
| 34 lines changed or deleted | | 44 lines changed or added | |
|
| _flow_graph_node_impl.h | | _flow_graph_node_impl.h | |
| | | | |
| skipping to change at line 74 | | skipping to change at line 74 | |
| enum op_type {reg_pred, rem_pred, app_body, try_fwd, tryput_bypass,
app_body_bypass }; | | enum op_type {reg_pred, rem_pred, app_body, try_fwd, tryput_bypass,
app_body_bypass }; | |
| typedef function_input_base<Input, A, ImplType> my_class; | | typedef function_input_base<Input, A, ImplType> my_class; | |
| | | | |
| public: | | public: | |
| | | | |
| //! The input type of this receiver | | //! The input type of this receiver | |
| typedef Input input_type; | | typedef Input input_type; | |
| | | | |
| //! Constructor for function_input_base | | //! Constructor for function_input_base | |
| function_input_base( graph &g, size_t max_concurrency, function_inp
ut_queue<input_type,A> *q = NULL ) | | function_input_base( graph &g, size_t max_concurrency, function_inp
ut_queue<input_type,A> *q = NULL ) | |
|
| : my_root_task(g.root_task()), my_max_concurrency(max_concurren
cy), my_concurrency(0), | | : my_graph(g), my_max_concurrency(max_concurrency), my_concurre
ncy(0), | |
| my_queue(q), forwarder_busy(false) { | | my_queue(q), forwarder_busy(false) { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| function_input_base( const function_input_base& src, function_input
_queue<input_type,A> *q = NULL ) : | | function_input_base( const function_input_base& src, function_input
_queue<input_type,A> *q = NULL ) : | |
| receiver<Input>(), tbb::internal::no_assign(), | | receiver<Input>(), tbb::internal::no_assign(), | |
|
| my_root_task( src.my_root_task), my_max_concurrency(src.my_max_
concurrency), | | my_graph(src.my_graph), my_max_concurrency(src.my_max_concurren
cy), | |
| my_concurrency(0), my_queue(q), forwarder_busy(false) | | my_concurrency(0), my_queue(q), forwarder_busy(false) | |
| { | | { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| //! Destructor | | //! Destructor | |
| virtual ~function_input_base() { | | virtual ~function_input_base() { | |
| if ( my_queue ) delete my_queue; | | if ( my_queue ) delete my_queue; | |
| } | | } | |
| | | | |
| skipping to change at line 136 | | skipping to change at line 136 | |
| | | | |
| void reset_function_input_base() { | | void reset_function_input_base() { | |
| my_concurrency = 0; | | my_concurrency = 0; | |
| if(my_queue) { | | if(my_queue) { | |
| my_queue->reset(); | | my_queue->reset(); | |
| } | | } | |
| my_predecessors.reset(); | | my_predecessors.reset(); | |
| forwarder_busy = false; | | forwarder_busy = false; | |
| } | | } | |
| | | | |
|
| task *my_root_task; | | graph& my_graph; | |
| const size_t my_max_concurrency; | | const size_t my_max_concurrency; | |
| size_t my_concurrency; | | size_t my_concurrency; | |
| function_input_queue<input_type, A> *my_queue; | | function_input_queue<input_type, A> *my_queue; | |
| predecessor_cache<input_type, null_mutex > my_predecessors; | | predecessor_cache<input_type, null_mutex > my_predecessors; | |
| | | | |
| /*override*/void reset_receiver() { | | /*override*/void reset_receiver() { | |
| my_predecessors.reset(); | | my_predecessors.reset(); | |
| } | | } | |
| | | | |
| private: | | private: | |
| | | | |
| skipping to change at line 291 | | skipping to change at line 291 | |
| task * new_task = static_cast<ImplType *>(this)->apply_body_imp
l_bypass(i); | | task * new_task = static_cast<ImplType *>(this)->apply_body_imp
l_bypass(i); | |
| if ( my_max_concurrency != 0 ) { | | if ( my_max_concurrency != 0 ) { | |
| my_operation op_data(app_body_bypass); // tries to pop an
item or get_item, enqueues another apply_body | | my_operation op_data(app_body_bypass); // tries to pop an
item or get_item, enqueues another apply_body | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| tbb::task *ttask = op_data.bypass_t; | | tbb::task *ttask = op_data.bypass_t; | |
| new_task = combine_tasks(new_task, ttask); | | new_task = combine_tasks(new_task, ttask); | |
| } | | } | |
| return new_task; | | return new_task; | |
| } | | } | |
| | | | |
|
| //! allocates a task to call apply_body( input ) | | //! allocates a task to call apply_body( input ) | |
| inline task * create_body_task( const input_type &input ) { | | inline task * create_body_task( const input_type &input ) { | |
| return new(task::allocate_additional_child_of(*my_root_task)) | | | |
| apply_body_task_bypass < my_class, input_type >(*this, input | | task* tp = my_graph.root_task(); | |
| ); | | return (tp) ? | |
| } | | new(task::allocate_additional_child_of(*tp)) | |
| | | apply_body_task_bypass < my_class, input_type >(*this, | |
| | | input) : | |
| | | NULL; | |
| | | } | |
| | | | |
| //! Spawns a task that calls apply_body( input ) | | //! Spawns a task that calls apply_body( input ) | |
| inline void spawn_body_task( const input_type &input ) { | | inline void spawn_body_task( const input_type &input ) { | |
|
| FLOW_SPAWN(*create_body_task(input)); | | task* tp = create_body_task(input); | |
| | | // tp == NULL => g.reset(), which shouldn't occur in concurrent | |
| | | context | |
| | | if(tp) { | |
| | | FLOW_SPAWN(*tp); | |
| | | } | |
| } | | } | |
| | | | |
| //! This is executed by an enqueued task, the "forwarder" | | //! This is executed by an enqueued task, the "forwarder" | |
| task *forward_task() { | | task *forward_task() { | |
| my_operation op_data(try_fwd); | | my_operation op_data(try_fwd); | |
| task *rval = NULL; | | task *rval = NULL; | |
| do { | | do { | |
| op_data.status = WAIT; | | op_data.status = WAIT; | |
| my_aggregator.execute(&op_data); | | my_aggregator.execute(&op_data); | |
| if(op_data.status == SUCCEEDED) { | | if(op_data.status == SUCCEEDED) { | |
| tbb::task *ttask = op_data.bypass_t; | | tbb::task *ttask = op_data.bypass_t; | |
| rval = combine_tasks(rval, ttask); | | rval = combine_tasks(rval, ttask); | |
| } | | } | |
| } while (op_data.status == SUCCEEDED); | | } while (op_data.status == SUCCEEDED); | |
| return rval; | | return rval; | |
| } | | } | |
| | | | |
| inline task *create_forward_task() { | | inline task *create_forward_task() { | |
|
| task *rval = new(task::allocate_additional_child_of(*my_root_tas | | task* tp = my_graph.root_task(); | |
| k)) forward_task_bypass< my_class >(*this); | | return (tp) ? | |
| return rval; | | new(task::allocate_additional_child_of(*tp)) forward_task_by | |
| | | pass< my_class >(*this) : | |
| | | NULL; | |
| } | | } | |
| | | | |
| //! Spawns a task that calls forward() | | //! Spawns a task that calls forward() | |
| inline void spawn_forward_task() { | | inline void spawn_forward_task() { | |
|
| FLOW_SPAWN(*create_forward_task()); | | task* tp = create_forward_task(); | |
| | | if(tp) { | |
| | | FLOW_SPAWN(*tp); | |
| | | } | |
| } | | } | |
| }; // function_input_base | | }; // function_input_base | |
| | | | |
| //! Implements methods for a function node that takes a type Input as i
nput and sends | | //! Implements methods for a function node that takes a type Input as i
nput and sends | |
| // a type Output to its successors. | | // a type Output to its successors. | |
| template< typename Input, typename Output, typename A> | | template< typename Input, typename Output, typename A> | |
| class function_input : public function_input_base<Input, A, function_in
put<Input,Output,A> > { | | class function_input : public function_input_base<Input, A, function_in
put<Input,Output,A> > { | |
| public: | | public: | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef Output output_type; | | typedef Output output_type; | |
| | | | |
| skipping to change at line 376 | | skipping to change at line 389 | |
| | | | |
| protected: | | protected: | |
| | | | |
| void reset_function_input() { | | void reset_function_input() { | |
| base_type::reset_function_input_base(); | | base_type::reset_function_input_base(); | |
| } | | } | |
| | | | |
| function_body<input_type, output_type> *my_body; | | function_body<input_type, output_type> *my_body; | |
| virtual broadcast_cache<output_type > &successors() = 0; | | virtual broadcast_cache<output_type > &successors() = 0; | |
| | | | |
|
| }; | | }; // function_input | |
| | | | |
| //! Implements methods for a function node that takes a type Input as i
nput | | //! Implements methods for a function node that takes a type Input as i
nput | |
| // and has a tuple of output ports specified. | | // and has a tuple of output ports specified. | |
| template< typename Input, typename OutputPortSet, typename A> | | template< typename Input, typename OutputPortSet, typename A> | |
| class multifunction_input : public function_input_base<Input, A, multif
unction_input<Input,OutputPortSet,A> > { | | class multifunction_input : public function_input_base<Input, A, multif
unction_input<Input,OutputPortSet,A> > { | |
| public: | | public: | |
| typedef Input input_type; | | typedef Input input_type; | |
| typedef OutputPortSet output_ports_type; | | typedef OutputPortSet output_ports_type; | |
| typedef multifunction_input<Input,OutputPortSet,A> my_class; | | typedef multifunction_input<Input,OutputPortSet,A> my_class; | |
| typedef function_input_base<Input, A, my_class> base_type; | | typedef function_input_base<Input, A, my_class> base_type; | |
| | | | |
| skipping to change at line 435 | | skipping to change at line 448 | |
| | | | |
| protected: | | protected: | |
| | | | |
| void reset() { | | void reset() { | |
| base_type::reset_function_input_base(); | | base_type::reset_function_input_base(); | |
| } | | } | |
| | | | |
| multifunction_body<input_type, output_ports_type> *my_body; | | multifunction_body<input_type, output_ports_type> *my_body; | |
| output_ports_type my_output_ports; | | output_ports_type my_output_ports; | |
| | | | |
|
| }; | | }; // multifunction_input | |
| | | | |
| // template to refer to an output port of a multifunction_node | | // template to refer to an output port of a multifunction_node | |
| template<size_t N, typename MOP> | | template<size_t N, typename MOP> | |
| typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>::
type &output_port(MOP &op) { | | typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>::
type &output_port(MOP &op) { | |
| return tbb::flow::get<N>(op.output_ports()); | | return tbb::flow::get<N>(op.output_ports()); | |
| } | | } | |
| | | | |
| // helper structs for split_node | | // helper structs for split_node | |
| template<int N> | | template<int N> | |
| struct emit_element { | | struct emit_element { | |
| | | | |
| skipping to change at line 474 | | skipping to change at line 487 | |
| public: | | public: | |
| | | | |
| //! The input type of this receiver | | //! The input type of this receiver | |
| typedef continue_msg input_type; | | typedef continue_msg input_type; | |
| | | | |
| //! The output type of this receiver | | //! The output type of this receiver | |
| typedef Output output_type; | | typedef Output output_type; | |
| | | | |
| template< typename Body > | | template< typename Body > | |
| continue_input( graph &g, Body& body ) | | continue_input( graph &g, Body& body ) | |
|
| : my_root_task(g.root_task()), | | : my_graph_ptr(&g), | |
| my_body( new internal::function_body_leaf< input_type, output_
type, Body>(body) ) { } | | my_body( new internal::function_body_leaf< input_type, output_
type, Body>(body) ) { } | |
| | | | |
| template< typename Body > | | template< typename Body > | |
| continue_input( graph &g, int number_of_predecessors, Body& body ) | | continue_input( graph &g, int number_of_predecessors, Body& body ) | |
|
| : continue_receiver( number_of_predecessors ), my_root_task(g.r
oot_task()), | | : continue_receiver( number_of_predecessors ), my_graph_ptr(&g)
, | |
| my_body( new internal::function_body_leaf< input_type, output_
type, Body>(body) ) { } | | my_body( new internal::function_body_leaf< input_type, output_
type, Body>(body) ) { } | |
| | | | |
| continue_input( const continue_input& src ) : continue_receiver(src
), | | continue_input( const continue_input& src ) : continue_receiver(src
), | |
|
| my_root_task(src.my_root_task), my_body( src.my_body->clone() )
{} | | my_graph_ptr(src.my_graph_ptr), my_body( src.my_body->clone() )
{} | |
| | | | |
| ~continue_input() { | | ~continue_input() { | |
| delete my_body; | | delete my_body; | |
| } | | } | |
| | | | |
| template< typename Body > | | template< typename Body > | |
| Body copy_function_object() { | | Body copy_function_object() { | |
| internal::function_body<input_type, output_type> &body_ref = *m
y_body; | | internal::function_body<input_type, output_type> &body_ref = *m
y_body; | |
| return dynamic_cast< internal::function_body_leaf<input_type, o
utput_type, Body> & >(body_ref).get_body(); | | return dynamic_cast< internal::function_body_leaf<input_type, o
utput_type, Body> & >(body_ref).get_body(); | |
| } | | } | |
| | | | |
| protected: | | protected: | |
| | | | |
|
| task *my_root_task; | | graph* my_graph_ptr; | |
| function_body<input_type, output_type> *my_body; | | function_body<input_type, output_type> *my_body; | |
| | | | |
| virtual broadcast_cache<output_type > &successors() = 0; | | virtual broadcast_cache<output_type > &successors() = 0; | |
| | | | |
| friend class apply_body_task_bypass< continue_input< Output >, cont
inue_msg >; | | friend class apply_body_task_bypass< continue_input< Output >, cont
inue_msg >; | |
| | | | |
| //! Applies the body to the provided input | | //! Applies the body to the provided input | |
| /* override */ task *apply_body_bypass( input_type ) { | | /* override */ task *apply_body_bypass( input_type ) { | |
| return successors().try_put_task( (*my_body)( continue_msg() )
); | | return successors().try_put_task( (*my_body)( continue_msg() )
); | |
| } | | } | |
| | | | |
| //! Spawns a task that applies the body | | //! Spawns a task that applies the body | |
| /* override */ task *execute( ) { | | /* override */ task *execute( ) { | |
|
| task *res = new ( task::allocate_additional_child_of( *my_root_ | | task* tp = my_graph_ptr->root_task(); | |
| task ) ) | | return (tp) ? | |
| apply_body_task_bypass< continue_input< Output >, continue_ | | new ( task::allocate_additional_child_of( *tp ) ) | |
| msg >( *this, continue_msg() ); | | apply_body_task_bypass< continue_input< Output >, conti | |
| return res; | | nue_msg >( *this, continue_msg() ) : | |
| | | NULL; | |
| } | | } | |
| | | | |
|
| }; | | }; // continue_input | |
| | | | |
| //! Implements methods for both executable and function nodes that puts
Output to its successors | | //! Implements methods for both executable and function nodes that puts
Output to its successors | |
| template< typename Output > | | template< typename Output > | |
| class function_output : public sender<Output> { | | class function_output : public sender<Output> { | |
| public: | | public: | |
| | | | |
| typedef Output output_type; | | typedef Output output_type; | |
| | | | |
| function_output() { my_successors.set_owner(this); } | | function_output() { my_successors.set_owner(this); } | |
| function_output(const function_output & /*other*/) : sender<output_
type>() { | | function_output(const function_output & /*other*/) : sender<output_
type>() { | |
| | | | |
| skipping to change at line 555 | | skipping to change at line 570 | |
| // | | // | |
| // get<I>(output_ports).try_put(output_value); | | // get<I>(output_ports).try_put(output_value); | |
| // | | // | |
| // return value will be bool returned from successors.try_put. | | // return value will be bool returned from successors.try_put. | |
| task *try_put_task(const output_type &i) { return my_successors.try
_put_task(i); } | | task *try_put_task(const output_type &i) { return my_successors.try
_put_task(i); } | |
| | | | |
| protected: | | protected: | |
| broadcast_cache<output_type> my_successors; | | broadcast_cache<output_type> my_successors; | |
| broadcast_cache<output_type > &successors() { return my_successors;
} | | broadcast_cache<output_type > &successors() { return my_successors;
} | |
| | | | |
|
| }; | | }; // function_output | |
| | | | |
| template< typename Output > | | template< typename Output > | |
| class multifunction_output : public function_output<Output> { | | class multifunction_output : public function_output<Output> { | |
| public: | | public: | |
| typedef Output output_type; | | typedef Output output_type; | |
| typedef function_output<output_type> base_type; | | typedef function_output<output_type> base_type; | |
| using base_type::my_successors; | | using base_type::my_successors; | |
| | | | |
| multifunction_output() : base_type() {my_successors.set_owner(this)
;} | | multifunction_output() : base_type() {my_successors.set_owner(this)
;} | |
| multifunction_output( const multifunction_output &/*other*/) : base
_type() { my_successors.set_owner(this); } | | multifunction_output( const multifunction_output &/*other*/) : base
_type() { my_successors.set_owner(this); } | |
| | | | |
| bool try_put(const output_type &i) { | | bool try_put(const output_type &i) { | |
| task *res = my_successors.try_put_task(i); | | task *res = my_successors.try_put_task(i); | |
| if(!res) return false; | | if(!res) return false; | |
| if(res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res); | | if(res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res); | |
| return true; | | return true; | |
| } | | } | |
|
| }; | | }; // multifunction_output | |
| | | | |
| } // internal | | } // internal | |
| | | | |
| #endif // __TBB__flow_graph_node_impl_H | | #endif // __TBB__flow_graph_node_impl_H | |
| | | | |
End of changes. 17 change blocks. |
| 28 lines changed or deleted | | 43 lines changed or added | |
|
| flow_graph.h | | flow_graph.h | |
| | | | |
| skipping to change at line 82 | | skipping to change at line 82 | |
| class and its associated node classes can be used to express such | | class and its associated node classes can be used to express such | |
| applcations. | | applcations. | |
| */ | | */ | |
| | | | |
| namespace tbb { | | namespace tbb { | |
| namespace flow { | | namespace flow { | |
| | | | |
| //! An enumeration the provides the two most common concurrency levels: unl
imited and serial | | //! An enumeration the provides the two most common concurrency levels: unl
imited and serial | |
| enum concurrency { unlimited = 0, serial = 1 }; | | enum concurrency { unlimited = 0, serial = 1 }; | |
| | | | |
|
| namespace interface6 { | | namespace interface7 { | |
| | | | |
| namespace internal { | | namespace internal { | |
| template<typename T, typename M> class successor_cache; | | template<typename T, typename M> class successor_cache; | |
| template<typename T, typename M> class broadcast_cache; | | template<typename T, typename M> class broadcast_cache; | |
| template<typename T, typename M> class round_robin_cache; | | template<typename T, typename M> class round_robin_cache; | |
| } | | } | |
| | | | |
| //! An empty class used for messages that mean "I'm done" | | //! An empty class used for messages that mean "I'm done" | |
| class continue_msg {}; | | class continue_msg {}; | |
| | | | |
| | | | |
| skipping to change at line 515 | | skipping to change at line 515 | |
| bool own_context; | | bool own_context; | |
| bool cancelled; | | bool cancelled; | |
| bool caught_exception; | | bool caught_exception; | |
| | | | |
| graph_node *my_nodes, *my_nodes_last; | | graph_node *my_nodes, *my_nodes_last; | |
| | | | |
| spin_mutex nodelist_mutex; | | spin_mutex nodelist_mutex; | |
| void register_node(graph_node *n); | | void register_node(graph_node *n); | |
| void remove_node(graph_node *n); | | void remove_node(graph_node *n); | |
| | | | |
|
| }; | | }; // class graph | |
| | | | |
| template <typename C, typename N> | | template <typename C, typename N> | |
| graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), curren
t_node(NULL) | | graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), curren
t_node(NULL) | |
| { | | { | |
| if (begin) current_node = my_graph->my_nodes; | | if (begin) current_node = my_graph->my_nodes; | |
| //else it is an end iterator by default | | //else it is an end iterator by default | |
| } | | } | |
| | | | |
| template <typename C, typename N> | | template <typename C, typename N> | |
| typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() co
nst { | | typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() co
nst { | |
| | | | |
| skipping to change at line 585 | | skipping to change at line 585 | |
| if (n->prev) n->prev->next = n->next; | | if (n->prev) n->prev->next = n->next; | |
| if (n->next) n->next->prev = n->prev; | | if (n->next) n->next->prev = n->prev; | |
| if (my_nodes_last == n) my_nodes_last = n->prev; | | if (my_nodes_last == n) my_nodes_last = n->prev; | |
| if (my_nodes == n) my_nodes = n->next; | | if (my_nodes == n) my_nodes = n->next; | |
| } | | } | |
| n->prev = n->next = NULL; | | n->prev = n->next = NULL; | |
| } | | } | |
| | | | |
| inline void graph::reset() { | | inline void graph::reset() { | |
| // reset context | | // reset context | |
|
| | | task *saved_my_root_task = my_root_task; | |
| | | my_root_task = NULL; | |
| if(my_context) my_context->reset(); | | if(my_context) my_context->reset(); | |
| cancelled = false; | | cancelled = false; | |
| caught_exception = false; | | caught_exception = false; | |
| // reset all the nodes comprising the graph | | // reset all the nodes comprising the graph | |
| for(iterator ii = begin(); ii != end(); ++ii) { | | for(iterator ii = begin(); ii != end(); ++ii) { | |
| graph_node *my_p = &(*ii); | | graph_node *my_p = &(*ii); | |
| my_p->reset(); | | my_p->reset(); | |
| } | | } | |
|
| | | my_root_task = saved_my_root_task; | |
| } | | } | |
| | | | |
| #include "internal/_flow_graph_node_impl.h" | | #include "internal/_flow_graph_node_impl.h" | |
| | | | |
| //! An executable node that acts as a source, i.e. it has no predecessors | | //! An executable node that acts as a source, i.e. it has no predecessors | |
| template < typename Output > | | template < typename Output > | |
| class source_node : public graph_node, public sender< Output > { | | class source_node : public graph_node, public sender< Output > { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| //! The type of the output message, which is complete | | //! The type of the output message, which is complete | |
| typedef Output output_type; | | typedef Output output_type; | |
| | | | |
| //! The type of successors of this node | | //! The type of successors of this node | |
| typedef receiver< Output > successor_type; | | typedef receiver< Output > successor_type; | |
| | | | |
| //! Constructor for a node with a successor | | //! Constructor for a node with a successor | |
| template< typename Body > | | template< typename Body > | |
| source_node( graph &g, Body body, bool is_active = true ) | | source_node( graph &g, Body body, bool is_active = true ) | |
|
| : graph_node(g), my_root_task(g.root_task()), my_active(is_active),
init_my_active(is_active), | | : graph_node(g), my_active(is_active), init_my_active(is_active), | |
| my_body( new internal::source_body_leaf< output_type, Body>(body) )
, | | my_body( new internal::source_body_leaf< output_type, Body>(body) )
, | |
| my_reserved(false), my_has_cached_item(false) | | my_reserved(false), my_has_cached_item(false) | |
| { | | { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| source_node( const source_node& src ) : | | source_node( const source_node& src ) : | |
| graph_node(src.my_graph), sender<Output>(), | | graph_node(src.my_graph), sender<Output>(), | |
|
| my_root_task( src.my_root_task), my_active(src.init_my_active), | | my_active(src.init_my_active), | |
| init_my_active(src.init_my_active), my_body( src.my_body->clone() )
, | | init_my_active(src.init_my_active), my_body( src.my_body->clone() )
, | |
| my_reserved(false), my_has_cached_item(false) | | my_reserved(false), my_has_cached_item(false) | |
| { | | { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| } | | } | |
| | | | |
| //! The destructor | | //! The destructor | |
| ~source_node() { delete my_body; } | | ~source_node() { delete my_body; } | |
| | | | |
| //! Add a new successor to this node | | //! Add a new successor to this node | |
| | | | |
| skipping to change at line 730 | | skipping to change at line 733 | |
| //! resets the node to its initial state | | //! resets the node to its initial state | |
| void reset() { | | void reset() { | |
| my_active = init_my_active; | | my_active = init_my_active; | |
| my_reserved =false; | | my_reserved =false; | |
| if(my_has_cached_item) { | | if(my_has_cached_item) { | |
| my_has_cached_item = false; | | my_has_cached_item = false; | |
| } | | } | |
| } | | } | |
| | | | |
| private: | | private: | |
|
| task *my_root_task; | | | |
| spin_mutex my_mutex; | | spin_mutex my_mutex; | |
| bool my_active; | | bool my_active; | |
| bool init_my_active; | | bool init_my_active; | |
| internal::source_body<output_type> *my_body; | | internal::source_body<output_type> *my_body; | |
| internal::broadcast_cache< output_type > my_successors; | | internal::broadcast_cache< output_type > my_successors; | |
| bool my_reserved; | | bool my_reserved; | |
| bool my_has_cached_item; | | bool my_has_cached_item; | |
| output_type my_cached_item; | | output_type my_cached_item; | |
| | | | |
| // used by apply_body, can invoke body of node. | | // used by apply_body, can invoke body of node. | |
| | | | |
| skipping to change at line 759 | | skipping to change at line 761 | |
| v = my_cached_item; | | v = my_cached_item; | |
| my_reserved = true; | | my_reserved = true; | |
| return true; | | return true; | |
| } else { | | } else { | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| | | | |
| //! Spawns a task that applies the body | | //! Spawns a task that applies the body | |
| /* override */ void spawn_put( ) { | | /* override */ void spawn_put( ) { | |
|
| FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_root_t | | task* tp = this->my_graph.root_task(); | |
| ask ) ) | | if(tp) { | |
| internal:: source_task_bypass < source_node< output_typ | | FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) | |
| e > >( *this ) ) ); | | ) | |
| | | internal:: source_task_bypass < source_node< output | |
| | | _type > >( *this ) ) ); | |
| | | } | |
| } | | } | |
| | | | |
| friend class internal::source_task_bypass< source_node< output_type > >
; | | friend class internal::source_task_bypass< source_node< output_type > >
; | |
| //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_ta
sk_bypass will handle it. | | //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_ta
sk_bypass will handle it. | |
| /* override */ task * apply_body_bypass( ) { | | /* override */ task * apply_body_bypass( ) { | |
| output_type v; | | output_type v; | |
| if ( !try_reserve_apply_body(v) ) | | if ( !try_reserve_apply_body(v) ) | |
| return NULL; | | return NULL; | |
| | | | |
| task *last_task = my_successors.try_put_task(v); | | task *last_task = my_successors.try_put_task(v); | |
| | | | |
| skipping to change at line 838 | | skipping to change at line 843 | |
| typedef internal::function_output<output_type> fOutput_type; | | typedef internal::function_output<output_type> fOutput_type; | |
| | | | |
| //! Constructor | | //! Constructor | |
| template< typename Body > | | template< typename Body > | |
| function_node( graph &g, size_t concurrency, Body body ) : | | function_node( graph &g, size_t concurrency, Body body ) : | |
| graph_node(g), fInput_type( g, concurrency, body, new queue_type()
) | | graph_node(g), fInput_type( g, concurrency, body, new queue_type()
) | |
| {} | | {} | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| function_node( const function_node& src ) : | | function_node( const function_node& src ) : | |
|
| graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOu
tput_type() | | graph_node(src.graph_node::my_graph), fInput_type( src, new queue_t
ype() ), fOutput_type() | |
| {} | | {} | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| using fInput_type::try_put_task; | | using fInput_type::try_put_task; | |
| | | | |
| /*override*/void reset() { fInput_type::reset_function_input(); } | | /*override*/void reset() { fInput_type::reset_function_input(); } | |
| | | | |
| | | | |
| skipping to change at line 885 | | skipping to change at line 890 | |
| typedef typename internal::wrap_tuple_elements<N,internal::multifunctio
n_output, Output>::type output_ports_type; | | typedef typename internal::wrap_tuple_elements<N,internal::multifunctio
n_output, Output>::type output_ports_type; | |
| private: | | private: | |
| typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | | typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | |
| typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | | typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | |
| public: | | public: | |
| template<typename Body> | | template<typename Body> | |
| multifunction_node( graph &g, size_t concurrency, Body body ) : | | multifunction_node( graph &g, size_t concurrency, Body body ) : | |
| graph_node(g), base_type(g,concurrency, body) | | graph_node(g), base_type(g,concurrency, body) | |
| {} | | {} | |
| multifunction_node( const multifunction_node &other) : | | multifunction_node( const multifunction_node &other) : | |
|
| graph_node(other.my_graph), base_type(other) | | graph_node(other.graph_node::my_graph), base_type(other) | |
| {} | | {} | |
| // all the guts are in multifunction_input... | | // all the guts are in multifunction_input... | |
| protected: | | protected: | |
| /*override*/void reset() { base_type::reset(); } | | /*override*/void reset() { base_type::reset(); } | |
| }; // multifunction_node | | }; // multifunction_node | |
| | | | |
| template < typename Input, typename Output, typename Allocator > | | template < typename Input, typename Output, typename Allocator > | |
| class multifunction_node<Input,Output,queueing,Allocator> : public graph_no
de, public internal::multifunction_input<Input, | | class multifunction_node<Input,Output,queueing,Allocator> : public graph_no
de, public internal::multifunction_input<Input, | |
| typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v
alue, internal::multifunction_output, Output>::type, Allocator> { | | typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v
alue, internal::multifunction_output, Output>::type, Allocator> { | |
| protected: | | protected: | |
| | | | |
| skipping to change at line 910 | | skipping to change at line 915 | |
| typedef typename internal::wrap_tuple_elements<N, internal::multifuncti
on_output, Output>::type output_ports_type; | | typedef typename internal::wrap_tuple_elements<N, internal::multifuncti
on_output, Output>::type output_ports_type; | |
| private: | | private: | |
| typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | | typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type; | |
| typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | | typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type; | |
| public: | | public: | |
| template<typename Body> | | template<typename Body> | |
| multifunction_node( graph &g, size_t concurrency, Body body) : | | multifunction_node( graph &g, size_t concurrency, Body body) : | |
| graph_node(g), base_type(g,concurrency, body, new queue_type()) | | graph_node(g), base_type(g,concurrency, body, new queue_type()) | |
| {} | | {} | |
| multifunction_node( const multifunction_node &other) : | | multifunction_node( const multifunction_node &other) : | |
|
| graph_node(other.my_graph), base_type(other, new queue_type()) | | graph_node(other.graph_node::my_graph), base_type(other, new queue_
type()) | |
| {} | | {} | |
| // all the guts are in multifunction_input... | | // all the guts are in multifunction_input... | |
| protected: | | protected: | |
| /*override*/void reset() { base_type::reset(); } | | /*override*/void reset() { base_type::reset(); } | |
| }; // multifunction_node | | }; // multifunction_node | |
| | | | |
| //! split_node: accepts a tuple as input, forwards each element of the tupl
e to its | | //! split_node: accepts a tuple as input, forwards each element of the tupl
e to its | |
| // successors. The node has unlimited concurrency, so though it is marked
as | | // successors. The node has unlimited concurrency, so though it is marked
as | |
| // "rejecting" it does not reject inputs. | | // "rejecting" it does not reject inputs. | |
| template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup
leType> > | | template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup
leType> > | |
| | | | |
| skipping to change at line 966 | | skipping to change at line 971 | |
| {} | | {} | |
| | | | |
| //! Constructor for executable node with continue_msg -> Output | | //! Constructor for executable node with continue_msg -> Output | |
| template <typename Body > | | template <typename Body > | |
| continue_node( graph &g, int number_of_predecessors, Body body ) : | | continue_node( graph &g, int number_of_predecessors, Body body ) : | |
| graph_node(g), internal::continue_input<output_type>( g, number_of_
predecessors, body ) | | graph_node(g), internal::continue_input<output_type>( g, number_of_
predecessors, body ) | |
| {} | | {} | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| continue_node( const continue_node& src ) : | | continue_node( const continue_node& src ) : | |
|
| graph_node(src.my_graph), internal::continue_input<output_type>(src
), | | graph_node(src.graph_node::my_graph), internal::continue_input<outp
ut_type>(src), | |
| internal::function_output<Output>() | | internal::function_output<Output>() | |
| {} | | {} | |
| | | | |
| protected: | | protected: | |
| template< typename R, typename B > friend class run_and_put_task; | | template< typename R, typename B > friend class run_and_put_task; | |
| template<typename X, typename Y> friend class internal::broadcast_cache
; | | template<typename X, typename Y> friend class internal::broadcast_cache
; | |
| template<typename X, typename Y> friend class internal::round_robin_cac
he; | | template<typename X, typename Y> friend class internal::round_robin_cac
he; | |
| using fInput_type::try_put_task; | | using fInput_type::try_put_task; | |
| | | | |
| /*override*/void reset() { internal::continue_input<Output>::reset_rece
iver(); } | | /*override*/void reset() { internal::continue_input<Output>::reset_rece
iver(); } | |
| | | | |
| /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | | /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; } | |
|
| }; | | }; // continue_node | |
| | | | |
| template< typename T > | | template< typename T > | |
| class overwrite_node : public graph_node, public receiver<T>, public sender
<T> { | | class overwrite_node : public graph_node, public receiver<T>, public sender
<T> { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| skipping to change at line 1171 | | skipping to change at line 1176 | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| typedef buffer_node<T, A> my_class; | | typedef buffer_node<T, A> my_class; | |
| protected: | | protected: | |
| typedef size_t size_type; | | typedef size_t size_type; | |
| internal::round_robin_cache< T, null_rw_mutex > my_successors; | | internal::round_robin_cache< T, null_rw_mutex > my_successors; | |
| | | | |
|
| task *my_parent; | | | |
| | | | |
| friend class internal::forward_task_bypass< buffer_node< T, A > >; | | friend class internal::forward_task_bypass< buffer_node< T, A > >; | |
| | | | |
| enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res,
put_item, try_fwd_task }; | | enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res,
put_item, try_fwd_task }; | |
| enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | | enum op_stat {WAIT=0, SUCCEEDED, FAILED}; | |
| | | | |
| // implements the aggregator_operation concept | | // implements the aggregator_operation concept | |
| class buffer_operation : public internal::aggregated_operation< buffer_
operation > { | | class buffer_operation : public internal::aggregated_operation< buffer_
operation > { | |
| public: | | public: | |
| char type; | | char type; | |
| T *elem; | | T *elem; | |
| | | | |
| skipping to change at line 1213 | | skipping to change at line 1216 | |
| case req_item: internal_pop(tmp); break; | | case req_item: internal_pop(tmp); break; | |
| case res_item: internal_reserve(tmp); break; | | case res_item: internal_reserve(tmp); break; | |
| case rel_res: internal_release(tmp); try_forwarding = true; b
reak; | | case rel_res: internal_release(tmp); try_forwarding = true; b
reak; | |
| case con_res: internal_consume(tmp); try_forwarding = true; b
reak; | | case con_res: internal_consume(tmp); try_forwarding = true; b
reak; | |
| case put_item: internal_push(tmp); try_forwarding = true; brea
k; | | case put_item: internal_push(tmp); try_forwarding = true; brea
k; | |
| case try_fwd_task: internal_forward_task(tmp); break; | | case try_fwd_task: internal_forward_task(tmp); break; | |
| } | | } | |
| } | | } | |
| if (try_forwarding && !forwarder_busy) { | | if (try_forwarding && !forwarder_busy) { | |
| forwarder_busy = true; | | forwarder_busy = true; | |
|
| task *new_task = new(task::allocate_additional_child_of(*my_par | | task* tp = this->my_graph.root_task(); | |
| ent)) internal:: | | if(tp) { | |
| forward_task_bypass | | task *new_task = new(task::allocate_additional_child_of(*tp | |
| < buffer_node<input_type, A> >(*this); | | )) internal:: | |
| // tmp should point to the last item handled by the aggregator. | | forward_task_bypass | |
| This is the operation | | < buffer_node<input_type, A> >(*this); | |
| // the handling thread enqueued. So modifying that record will | | // tmp should point to the last item handled by the aggrega | |
| be okay. | | tor. This is the operation | |
| tbb::task *z = tmp->ltask; | | // the handling thread enqueued. So modifying that record | |
| tmp->ltask = combine_tasks(z, new_task); // in case the op gen | | will be okay. | |
| erated a task | | tbb::task *z = tmp->ltask; | |
| | | tmp->ltask = combine_tasks(z, new_task); // in case the op | |
| | | generated a task | |
| | | } | |
| } | | } | |
| } | | } | |
| | | | |
| inline task *grab_forwarding_task( buffer_operation &op_data) { | | inline task *grab_forwarding_task( buffer_operation &op_data) { | |
| return op_data.ltask; | | return op_data.ltask; | |
| } | | } | |
| | | | |
| inline bool enqueue_forwarding_task(buffer_operation &op_data) { | | inline bool enqueue_forwarding_task(buffer_operation &op_data) { | |
| task *ft = grab_forwarding_task(op_data); | | task *ft = grab_forwarding_task(op_data); | |
| if(ft) { | | if(ft) { | |
| | | | |
| skipping to change at line 1329 | | skipping to change at line 1335 | |
| } | | } | |
| | | | |
| virtual void internal_release(buffer_operation *op) { | | virtual void internal_release(buffer_operation *op) { | |
| this->release_front(); | | this->release_front(); | |
| __TBB_store_with_release(op->status, SUCCEEDED); | | __TBB_store_with_release(op->status, SUCCEEDED); | |
| } | | } | |
| | | | |
| public: | | public: | |
| //! Constructor | | //! Constructor | |
| buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(), | | buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(), | |
|
| my_parent( g.root_task() ), forwarder_busy(false) { | | forwarder_busy(false) { | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| buffer_node( const buffer_node& src ) : graph_node(src.my_graph), | | buffer_node( const buffer_node& src ) : graph_node(src.my_graph), | |
|
| reservable_item_buffer<T>(), receiver<T>(), sender<T>(), | | reservable_item_buffer<T>(), receiver<T>(), sender<T>() { | |
| my_parent( src.my_parent ) { | | | |
| forwarder_busy = false; | | forwarder_busy = false; | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| my_aggregator.initialize_handler(my_handler(this)); | | my_aggregator.initialize_handler(my_handler(this)); | |
| } | | } | |
| | | | |
| virtual ~buffer_node() {} | | virtual ~buffer_node() {} | |
| | | | |
| // | | // | |
| // message sender implementation | | // message sender implementation | |
| // | | // | |
| | | | |
| skipping to change at line 1607 | | skipping to change at line 1612 | |
| case buffer_node<T, A>::rel_res: internal_release(tmp); try_for
warding = true; break; | | case buffer_node<T, A>::rel_res: internal_release(tmp); try_for
warding = true; break; | |
| case buffer_node<T, A>::con_res: internal_consume(tmp); try_for
warding = true; break; | | case buffer_node<T, A>::con_res: internal_consume(tmp); try_for
warding = true; break; | |
| case buffer_node<T, A>::req_item: internal_pop(tmp); break; | | case buffer_node<T, A>::req_item: internal_pop(tmp); break; | |
| case buffer_node<T, A>::res_item: internal_reserve(tmp); break; | | case buffer_node<T, A>::res_item: internal_reserve(tmp); break; | |
| } | | } | |
| } | | } | |
| // process pops! for now, no special pop processing | | // process pops! for now, no special pop processing | |
| if (mark<this->my_tail) heapify(); | | if (mark<this->my_tail) heapify(); | |
| if (try_forwarding && !this->forwarder_busy) { | | if (try_forwarding && !this->forwarder_busy) { | |
| this->forwarder_busy = true; | | this->forwarder_busy = true; | |
|
| task *new_task = new(task::allocate_additional_child_of(*(this- | | task* tp = this->my_graph.root_task(); | |
| >my_parent))) internal:: | | if(tp) { | |
| forward_task_bypass | | task *new_task = new(task::allocate_additional_child_of(*tp | |
| < buffer_node<input_type, A> >(*this); | | )) internal:: | |
| // tmp should point to the last item handled by the aggregator. | | forward_task_bypass | |
| This is the operation | | < buffer_node<input_type, A> >(*this); | |
| // the handling thread enqueued. So modifying that record will | | // tmp should point to the last item handled by the aggrega | |
| be okay. | | tor. This is the operation | |
| tbb::task *tmp1 = tmp->ltask; | | // the handling thread enqueued. So modifying that record | |
| tmp->ltask = combine_tasks(tmp1, new_task); | | will be okay. | |
| | | tbb::task *tmp1 = tmp->ltask; | |
| | | tmp->ltask = combine_tasks(tmp1, new_task); | |
| | | } | |
| } | | } | |
| } | | } | |
| | | | |
| //! Tries to forward valid items to successors | | //! Tries to forward valid items to successors | |
| /* override */ void internal_forward_task(prio_operation *op) { | | /* override */ void internal_forward_task(prio_operation *op) { | |
| T i_copy; | | T i_copy; | |
| task * last_task = NULL; // flagged when a successor accepts | | task * last_task = NULL; // flagged when a successor accepts | |
| size_type counter = this->my_successors.size(); | | size_type counter = this->my_successors.size(); | |
| | | | |
| if (this->my_reserved || this->my_tail == 0) { | | if (this->my_reserved || this->my_tail == 0) { | |
| | | | |
| skipping to change at line 1768 | | skipping to change at line 1776 | |
| class limiter_node : public graph_node, public receiver< T >, public sender
< T > { | | class limiter_node : public graph_node, public receiver< T >, public sender
< T > { | |
| protected: | | protected: | |
| using graph_node::my_graph; | | using graph_node::my_graph; | |
| public: | | public: | |
| typedef T input_type; | | typedef T input_type; | |
| typedef T output_type; | | typedef T output_type; | |
| typedef sender< input_type > predecessor_type; | | typedef sender< input_type > predecessor_type; | |
| typedef receiver< output_type > successor_type; | | typedef receiver< output_type > successor_type; | |
| | | | |
| private: | | private: | |
|
| task *my_root_task; | | | |
| size_t my_threshold; | | size_t my_threshold; | |
| size_t my_count; | | size_t my_count; | |
| internal::predecessor_cache< T > my_predecessors; | | internal::predecessor_cache< T > my_predecessors; | |
| spin_mutex my_mutex; | | spin_mutex my_mutex; | |
| internal::broadcast_cache< T > my_successors; | | internal::broadcast_cache< T > my_successors; | |
| int init_decrement_predecessors; | | int init_decrement_predecessors; | |
| | | | |
| friend class internal::forward_task_bypass< limiter_node<T> >; | | friend class internal::forward_task_bypass< limiter_node<T> >; | |
| | | | |
| // Let decrementer call decrement_counter() | | // Let decrementer call decrement_counter() | |
| | | | |
| skipping to change at line 1790 | | skipping to change at line 1797 | |
| | | | |
| // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEU
ED | | // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEU
ED | |
| task * decrement_counter() { | | task * decrement_counter() { | |
| input_type v; | | input_type v; | |
| task *rval = NULL; | | task *rval = NULL; | |
| | | | |
| // If we can't get / put an item immediately then drop the count | | // If we can't get / put an item immediately then drop the count | |
| if ( my_predecessors.get_item( v ) == false | | if ( my_predecessors.get_item( v ) == false | |
| || (rval = my_successors.try_put_task(v)) == NULL ) { | | || (rval = my_successors.try_put_task(v)) == NULL ) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
|
| --my_count; | | if(my_count) --my_count; | |
| if ( !my_predecessors.empty() ) { | | task* tp = this->my_graph.root_task(); | |
| task *rtask = new ( task::allocate_additional_child_of( *my | | if ( !my_predecessors.empty() && tp ) { | |
| _root_task ) ) | | task *rtask = new ( task::allocate_additional_child_of( *tp | |
| | | ) ) | |
| internal::forward_task_bypass< limiter_node<T> >( *this
); | | internal::forward_task_bypass< limiter_node<T> >( *this
); | |
| __TBB_ASSERT(!rval, "Have two tasks to handle"); | | __TBB_ASSERT(!rval, "Have two tasks to handle"); | |
| return rtask; | | return rtask; | |
| } | | } | |
| } | | } | |
| return rval; | | return rval; | |
| } | | } | |
| | | | |
| void forward() { | | void forward() { | |
| { | | { | |
| | | | |
| skipping to change at line 1814 | | skipping to change at line 1822 | |
| if ( my_count < my_threshold ) | | if ( my_count < my_threshold ) | |
| ++my_count; | | ++my_count; | |
| else | | else | |
| return; | | return; | |
| } | | } | |
| task * rtask = decrement_counter(); | | task * rtask = decrement_counter(); | |
| if(rtask) FLOW_SPAWN(*rtask); | | if(rtask) FLOW_SPAWN(*rtask); | |
| } | | } | |
| | | | |
| task *forward_task() { | | task *forward_task() { | |
|
| spin_mutex::scoped_lock lock(my_mutex); | | { | |
| if ( my_count >= my_threshold ) | | spin_mutex::scoped_lock lock(my_mutex); | |
| return NULL; | | if ( my_count >= my_threshold ) | |
| ++my_count; | | return NULL; | |
| | | ++my_count; | |
| | | } | |
| task * rtask = decrement_counter(); | | task * rtask = decrement_counter(); | |
| return rtask; | | return rtask; | |
| } | | } | |
| | | | |
| public: | | public: | |
| //! The internal receiver< continue_msg > that decrements the count | | //! The internal receiver< continue_msg > that decrements the count | |
| internal::decrementer< limiter_node<T> > decrement; | | internal::decrementer< limiter_node<T> > decrement; | |
| | | | |
| //! Constructor | | //! Constructor | |
| limiter_node(graph &g, size_t threshold, int num_decrement_predecessors
=0) : | | limiter_node(graph &g, size_t threshold, int num_decrement_predecessors
=0) : | |
|
| graph_node(g), my_root_task(g.root_task()), my_threshold(threshold)
, my_count(0), | | graph_node(g), my_threshold(threshold), my_count(0), | |
| init_decrement_predecessors(num_decrement_predecessors), | | init_decrement_predecessors(num_decrement_predecessors), | |
| decrement(num_decrement_predecessors) | | decrement(num_decrement_predecessors) | |
| { | | { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| decrement.set_owner(this); | | decrement.set_owner(this); | |
| } | | } | |
| | | | |
| //! Copy constructor | | //! Copy constructor | |
| limiter_node( const limiter_node& src ) : | | limiter_node( const limiter_node& src ) : | |
| graph_node(src.my_graph), receiver<T>(), sender<T>(), | | graph_node(src.my_graph), receiver<T>(), sender<T>(), | |
|
| my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_
count(0), | | my_threshold(src.my_threshold), my_count(0), | |
| init_decrement_predecessors(src.init_decrement_predecessors), | | init_decrement_predecessors(src.init_decrement_predecessors), | |
| decrement(src.init_decrement_predecessors) | | decrement(src.init_decrement_predecessors) | |
| { | | { | |
| my_predecessors.set_owner(this); | | my_predecessors.set_owner(this); | |
| my_successors.set_owner(this); | | my_successors.set_owner(this); | |
| decrement.set_owner(this); | | decrement.set_owner(this); | |
| } | | } | |
| | | | |
| //! Replace the current successor with this new successor | | //! Replace the current successor with this new successor | |
| /* override */ bool register_successor( receiver<output_type> &r ) { | | /* override */ bool register_successor( receiver<output_type> &r ) { | |
| | | | |
| skipping to change at line 1867 | | skipping to change at line 1877 | |
| /* override */ bool remove_successor( receiver<output_type> &r ) { | | /* override */ bool remove_successor( receiver<output_type> &r ) { | |
| r.remove_predecessor(*this); | | r.remove_predecessor(*this); | |
| my_successors.remove_successor(r); | | my_successors.remove_successor(r); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes src from the list of cached predecessors. | | //! Removes src from the list of cached predecessors. | |
| /* override */ bool register_predecessor( predecessor_type &src ) { | | /* override */ bool register_predecessor( predecessor_type &src ) { | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| my_predecessors.add( src ); | | my_predecessors.add( src ); | |
|
| if ( my_count < my_threshold && !my_successors.empty() ) { | | task* tp = this->my_graph.root_task(); | |
| FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_ro | | if ( my_count < my_threshold && !my_successors.empty() && tp ) { | |
| ot_task ) ) | | FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) | |
| | | ) | |
| internal::forward_task_bypass < limiter_node<T> >(
*this ) ) ); | | internal::forward_task_bypass < limiter_node<T> >(
*this ) ) ); | |
| } | | } | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| //! Removes src from the list of cached predecessors. | | //! Removes src from the list of cached predecessors. | |
| /* override */ bool remove_predecessor( predecessor_type &src ) { | | /* override */ bool remove_predecessor( predecessor_type &src ) { | |
| my_predecessors.remove( src ); | | my_predecessors.remove( src ); | |
| return true; | | return true; | |
| } | | } | |
| | | | |
| skipping to change at line 1900 | | skipping to change at line 1911 | |
| return NULL; | | return NULL; | |
| else | | else | |
| ++my_count; | | ++my_count; | |
| } | | } | |
| | | | |
| task * rtask = my_successors.try_put_task(t); | | task * rtask = my_successors.try_put_task(t); | |
| | | | |
| if ( !rtask ) { // try_put_task failed. | | if ( !rtask ) { // try_put_task failed. | |
| spin_mutex::scoped_lock lock(my_mutex); | | spin_mutex::scoped_lock lock(my_mutex); | |
| --my_count; | | --my_count; | |
|
| if ( !my_predecessors.empty() ) { | | task* tp = this->my_graph.root_task(); | |
| rtask = new ( task::allocate_additional_child_of( *my_root_ | | if ( !my_predecessors.empty() && tp ) { | |
| task ) ) | | rtask = new ( task::allocate_additional_child_of( *tp ) ) | |
| internal::forward_task_bypass< limiter_node<T> >( *this
); | | internal::forward_task_bypass< limiter_node<T> >( *this
); | |
| } | | } | |
| } | | } | |
| return rtask; | | return rtask; | |
| } | | } | |
| | | | |
| /*override*/void reset() { | | /*override*/void reset() { | |
| my_count = 0; | | my_count = 0; | |
| my_predecessors.reset(); | | my_predecessors.reset(); | |
| decrement.reset_receiver(); | | decrement.reset_receiver(); | |
| | | | |
| skipping to change at line 2039 | | skipping to change at line 2051 | |
| inline void remove_edge( sender<T> &p, receiver<T> &s ) { | | inline void remove_edge( sender<T> &p, receiver<T> &s ) { | |
| p.remove_successor( s ); | | p.remove_successor( s ); | |
| } | | } | |
| | | | |
| //! Returns a copy of the body from a function or continue node | | //! Returns a copy of the body from a function or continue node | |
| template< typename Body, typename Node > | | template< typename Body, typename Node > | |
| Body copy_body( Node &n ) { | | Body copy_body( Node &n ) { | |
| return n.template copy_function_object<Body>(); | | return n.template copy_function_object<Body>(); | |
| } | | } | |
| | | | |
|
| } // interface6 | | } // interface7 | |
| | | | |
|
| using interface6::graph; | | using interface7::graph; | |
| using interface6::graph_node; | | using interface7::graph_node; | |
| using interface6::continue_msg; | | using interface7::continue_msg; | |
| using interface6::sender; | | using interface7::sender; | |
| using interface6::receiver; | | using interface7::receiver; | |
| using interface6::continue_receiver; | | using interface7::continue_receiver; | |
| | | | |
| using interface6::source_node; | | using interface7::source_node; | |
| using interface6::function_node; | | using interface7::function_node; | |
| using interface6::multifunction_node; | | using interface7::multifunction_node; | |
| using interface6::split_node; | | using interface7::split_node; | |
| using interface6::internal::output_port; | | using interface7::internal::output_port; | |
| #if TBB_PREVIEW_GRAPH_NODES | | #if TBB_PREVIEW_GRAPH_NODES | |
|
| using interface6::or_node; | | using interface7::or_node; | |
| #endif | | #endif | |
|
| using interface6::continue_node; | | using interface7::continue_node; | |
| using interface6::overwrite_node; | | using interface7::overwrite_node; | |
| using interface6::write_once_node; | | using interface7::write_once_node; | |
| using interface6::broadcast_node; | | using interface7::broadcast_node; | |
| using interface6::buffer_node; | | using interface7::buffer_node; | |
| using interface6::queue_node; | | using interface7::queue_node; | |
| using interface6::sequencer_node; | | using interface7::sequencer_node; | |
| using interface6::priority_queue_node; | | using interface7::priority_queue_node; | |
| using interface6::limiter_node; | | using interface7::limiter_node; | |
| using namespace interface6::internal::graph_policy_namespace; | | using namespace interface7::internal::graph_policy_namespace; | |
| using interface6::join_node; | | using interface7::join_node; | |
| using interface6::input_port; | | using interface7::input_port; | |
| using interface6::copy_body; | | using interface7::copy_body; | |
| using interface6::make_edge; | | using interface7::make_edge; | |
| using interface6::remove_edge; | | using interface7::remove_edge; | |
| using interface6::internal::NO_TAG; | | using interface7::internal::NO_TAG; | |
| using interface6::internal::tag_value; | | using interface7::internal::tag_value; | |
| | | | |
| } // flow | | } // flow | |
| } // tbb | | } // tbb | |
| | | | |
| #endif // __TBB_flow_graph_H | | #endif // __TBB_flow_graph_H | |
| | | | |
End of changes. 29 change blocks. |
| 88 lines changed or deleted | | 99 lines changed or added | |
|
| task_arena.h | | task_arena.h | |
| | | | |
| skipping to change at line 34 | | skipping to change at line 34 | |
| the GNU General Public License. This exception does not however | | the GNU General Public License. This exception does not however | |
| invalidate any other reasons why the executable file might be covered b
y | | invalidate any other reasons why the executable file might be covered b
y | |
| the GNU General Public License. | | the GNU General Public License. | |
| */ | | */ | |
| | | | |
| #ifndef __TBB_task_arena_H | | #ifndef __TBB_task_arena_H | |
| #define __TBB_task_arena_H | | #define __TBB_task_arena_H | |
| | | | |
| #include "task.h" | | #include "task.h" | |
| #include "tbb_exception.h" | | #include "tbb_exception.h" | |
|
| | | #if TBB_USE_THREADING_TOOLS | |
| | | #include "atomic.h" // for as_atomic | |
| | | #endif | |
| | | | |
| #if __TBB_TASK_ARENA | | #if __TBB_TASK_ARENA | |
| | | | |
| namespace tbb { | | namespace tbb { | |
| | | | |
| //! @cond INTERNAL | | //! @cond INTERNAL | |
| namespace internal { | | namespace internal { | |
| //! Internal to library. Should not be used by clients. | | //! Internal to library. Should not be used by clients. | |
| /** @ingroup task_scheduling */ | | /** @ingroup task_scheduling */ | |
| class arena; | | class arena; | |
| class task_scheduler_observer_v3; | | class task_scheduler_observer_v3; | |
| } // namespace internal | | } // namespace internal | |
| //! @endcond | | //! @endcond | |
| | | | |
|
| namespace interface6 { | | namespace interface7 { | |
| //! @cond INTERNAL | | //! @cond INTERNAL | |
| namespace internal { | | namespace internal { | |
|
| using namespace tbb::internal; | | using namespace tbb::internal; //e.g. function_task from task.h | |
| | | | |
| template<typename F> | | | |
| class enqueued_function_task : public task { // TODO: reuse from task_group | | | |
| ? | | | |
| F my_func; | | | |
| /*override*/ task* execute() { | | | |
| my_func(); | | | |
| return NULL; | | | |
| } | | | |
| public: | | | |
| enqueued_function_task ( const F& f ) : my_func(f) {} | | | |
| }; | | | |
| | | | |
| class delegate_base : no_assign { | | class delegate_base : no_assign { | |
| public: | | public: | |
| virtual void operator()() const = 0; | | virtual void operator()() const = 0; | |
| virtual ~delegate_base() {} | | virtual ~delegate_base() {} | |
| }; | | }; | |
| | | | |
| template<typename F> | | template<typename F> | |
| class delegated_function : public delegate_base { | | class delegated_function : public delegate_base { | |
| F &my_func; | | F &my_func; | |
| /*override*/ void operator()() const { | | /*override*/ void operator()() const { | |
| my_func(); | | my_func(); | |
| } | | } | |
| public: | | public: | |
| delegated_function ( F& f ) : my_func(f) {} | | delegated_function ( F& f ) : my_func(f) {} | |
| }; | | }; | |
|
| } // namespace internal | | | |
| //! @endcond | | | |
| | | | |
|
| /** 1-to-1 proxy representation class of scheduler's arena | | class task_arena_base { | |
| * Constructors set up settings only, real construction is deferred till th | | protected: | |
| e first method invocation | | //! NULL if not currently initialized. | |
| * TODO: A side effect of this is that it's impossible to create a const ta | | internal::arena* my_arena; | |
| sk_arena object. Rethink? | | | |
| * Destructor only removes one of the references to the inner arena represe | | #if __TBB_TASK_GROUP_CONTEXT | |
| ntation. | | //! default context of the arena | |
| * Final destruction happens when all the references (and the work) are gon | | task_group_context *my_context; | |
| e. | | #endif | |
| */ | | | |
| class task_arena { | | | |
| friend class internal::task_scheduler_observer_v3; | | | |
| //! Concurrency level for deferred initialization | | //! Concurrency level for deferred initialization | |
| int my_max_concurrency; | | int my_max_concurrency; | |
| | | | |
| //! Reserved master slots | | //! Reserved master slots | |
| unsigned my_master_slots; | | unsigned my_master_slots; | |
| | | | |
|
| //! NULL if not currently initialized. | | //! Reserved for future use | |
| internal::arena* my_arena; | | intptr_t my_reserved; | |
| | | | |
|
| // Initialization flag enabling compiler to throw excessive lazy initia | | task_arena_base(int max_concurrency, unsigned reserved_for_masters) | |
| lization checks | | : my_arena(0) | |
| bool my_initialized; | | #if __TBB_TASK_GROUP_CONTEXT | |
| | | , my_context(0) | |
| | | #endif | |
| | | , my_max_concurrency(max_concurrency) | |
| | | , my_master_slots(reserved_for_masters) | |
| | | , my_reserved(0) | |
| | | {} | |
| | | | |
|
| // const methods help to optimize the !my_arena check TODO: check, IDEA
: move to base-class? | | | |
| void __TBB_EXPORTED_METHOD internal_initialize( ); | | void __TBB_EXPORTED_METHOD internal_initialize( ); | |
| void __TBB_EXPORTED_METHOD internal_terminate( ); | | void __TBB_EXPORTED_METHOD internal_terminate( ); | |
| void __TBB_EXPORTED_METHOD internal_enqueue( task&, intptr_t ) const; | | void __TBB_EXPORTED_METHOD internal_enqueue( task&, intptr_t ) const; | |
|
| void __TBB_EXPORTED_METHOD internal_execute( internal::delegate_base& )
const; | | void __TBB_EXPORTED_METHOD internal_execute( delegate_base& ) const; | |
| void __TBB_EXPORTED_METHOD internal_wait() const; | | void __TBB_EXPORTED_METHOD internal_wait() const; | |
|
| | | static int __TBB_EXPORTED_FUNC internal_current_slot(); | |
| public: | | public: | |
| //! Typedef for number of threads that is automatic. | | //! Typedef for number of threads that is automatic. | |
| static const int automatic = -1; // any value < 1 means 'automatic' | | static const int automatic = -1; // any value < 1 means 'automatic' | |
| | | | |
|
| | | }; | |
| | | | |
| | | } // namespace internal | |
| | | //! @endcond | |
| | | | |
| | | /** 1-to-1 proxy representation class of scheduler's arena | |
| | | * Constructors set up settings only, real construction is deferred till th | |
| | | e first method invocation | |
| | | * Destructor only removes one of the references to the inner arena represe | |
| | | ntation. | |
| | | * Final destruction happens when all the references (and the work) are gon | |
| | | e. | |
| | | */ | |
| | | class task_arena : public internal::task_arena_base { | |
| | | friend class tbb::internal::task_scheduler_observer_v3; | |
| | | bool my_initialized; | |
| | | | |
| | | public: | |
| //! Creates task_arena with certain concurrency limits | | //! Creates task_arena with certain concurrency limits | |
|
| /** @arg max_concurrency specifies total number of slots in arena where | | /** Sets up settings only, real construction is deferred till the first | |
| threads work | | method invocation | |
| | | * @arg max_concurrency specifies total number of slots in arena where | |
| | | threads work | |
| * @arg reserved_for_masters specifies number of slots to be used by m
aster threads only. | | * @arg reserved_for_masters specifies number of slots to be used by m
aster threads only. | |
| * Value of 1 is default and reflects behavior of implicit arenas
. | | * Value of 1 is default and reflects behavior of implicit arenas
. | |
| **/ | | **/ | |
| task_arena(int max_concurrency = automatic, unsigned reserved_for_maste
rs = 1) | | task_arena(int max_concurrency = automatic, unsigned reserved_for_maste
rs = 1) | |
|
| : my_max_concurrency(max_concurrency) | | : task_arena_base(max_concurrency, reserved_for_masters) | |
| , my_master_slots(reserved_for_masters) | | | |
| , my_arena(0) | | | |
| , my_initialized(false) | | , my_initialized(false) | |
| {} | | {} | |
| | | | |
| //! Copies settings from another task_arena | | //! Copies settings from another task_arena | |
|
| task_arena(const task_arena &s) | | task_arena(const task_arena &s) // copy settings but not the reference | |
| : my_max_concurrency(s.my_max_concurrency) // copy settings | | or instance | |
| , my_master_slots(s.my_master_slots) | | : task_arena_base(s.my_max_concurrency, s.my_master_slots) | |
| , my_arena(0) // but not the reference or instance | | | |
| , my_initialized(false) | | , my_initialized(false) | |
| {} | | {} | |
| | | | |
|
| | | //! Forces allocation of the resources for the task_arena as specified
in constructor arguments | |
| inline void initialize() { | | inline void initialize() { | |
| if( !my_initialized ) { | | if( !my_initialized ) { | |
| internal_initialize(); | | internal_initialize(); | |
|
| | | #if TBB_USE_THREADING_TOOLS | |
| | | // Threading tools respect lock prefix but report false-positiv | |
| | | e data-race via plain store | |
| | | internal::as_atomic(my_initialized).fetch_and_store<release>(tr | |
| | | ue); | |
| | | #else | |
| my_initialized = true; | | my_initialized = true; | |
|
| | | #endif //TBB_USE_THREADING_TOOLS | |
| } | | } | |
| } | | } | |
| | | | |
| //! Overrides concurrency level and forces initialization of internal r
epresentation | | //! Overrides concurrency level and forces initialization of internal r
epresentation | |
| inline void initialize(int max_concurrency, unsigned reserved_for_maste
rs = 1) { | | inline void initialize(int max_concurrency, unsigned reserved_for_maste
rs = 1) { | |
|
| __TBB_ASSERT( !my_arena, "task_arena was initialized already"); | | __TBB_ASSERT( !my_arena, "Impossible to modify settings of an alrea
dy initialized task_arena"); | |
| if( !my_initialized ) { | | if( !my_initialized ) { | |
| my_max_concurrency = max_concurrency; | | my_max_concurrency = max_concurrency; | |
| my_master_slots = reserved_for_masters; | | my_master_slots = reserved_for_masters; | |
| initialize(); | | initialize(); | |
|
| } // TODO: else throw? | | } | |
| } | | } | |
| | | | |
| //! Removes the reference to the internal arena representation. | | //! Removes the reference to the internal arena representation. | |
| //! Not thread safe wrt concurrent invocations of other methods. | | //! Not thread safe wrt concurrent invocations of other methods. | |
| inline void terminate() { | | inline void terminate() { | |
| if( my_initialized ) { | | if( my_initialized ) { | |
| internal_terminate(); | | internal_terminate(); | |
| my_initialized = false; | | my_initialized = false; | |
| } | | } | |
| } | | } | |
| | | | |
| skipping to change at line 174 | | skipping to change at line 190 | |
| | | | |
| //! Returns true if the arena is active (initialized); false otherwise. | | //! Returns true if the arena is active (initialized); false otherwise. | |
| //! The name was chosen to match a task_scheduler_init method with the
same semantics. | | //! The name was chosen to match a task_scheduler_init method with the
same semantics. | |
| bool is_active() const { return my_initialized; } | | bool is_active() const { return my_initialized; } | |
| | | | |
| //! Enqueues a task into the arena to process a functor, and immediatel
y returns. | | //! Enqueues a task into the arena to process a functor, and immediatel
y returns. | |
| //! Does not require the calling thread to join the arena | | //! Does not require the calling thread to join the arena | |
| template<typename F> | | template<typename F> | |
| void enqueue( const F& f ) { | | void enqueue( const F& f ) { | |
| initialize(); | | initialize(); | |
|
| internal_enqueue( *new( task::allocate_root() ) internal::enqueued_ | | #if __TBB_TASK_GROUP_CONTEXT | |
| function_task<F>(f), 0 ); | | internal_enqueue( *new( task::allocate_root(*my_context) ) internal | |
| | | ::function_task<F>(f), 0 ); | |
| | | #else | |
| | | internal_enqueue( *new( task::allocate_root() ) internal::function_ | |
| | | task<F>(f), 0 ); | |
| | | #endif | |
| } | | } | |
| | | | |
| #if __TBB_TASK_PRIORITY | | #if __TBB_TASK_PRIORITY | |
| //! Enqueues a task with priority p into the arena to process a functor
f, and immediately returns. | | //! Enqueues a task with priority p into the arena to process a functor
f, and immediately returns. | |
| //! Does not require the calling thread to join the arena | | //! Does not require the calling thread to join the arena | |
| template<typename F> | | template<typename F> | |
| void enqueue( const F& f, priority_t p ) { | | void enqueue( const F& f, priority_t p ) { | |
| __TBB_ASSERT( p == priority_low || p == priority_normal || p == pri
ority_high, "Invalid priority level value" ); | | __TBB_ASSERT( p == priority_low || p == priority_normal || p == pri
ority_high, "Invalid priority level value" ); | |
| initialize(); | | initialize(); | |
|
| internal_enqueue( *new( task::allocate_root() ) internal::enqueued_ | | #if __TBB_TASK_GROUP_CONTEXT | |
| function_task<F>(f), (intptr_t)p ); | | internal_enqueue( *new( task::allocate_root(*my_context) ) internal | |
| | | ::function_task<F>(f), (intptr_t)p ); | |
| | | #else | |
| | | internal_enqueue( *new( task::allocate_root() ) internal::function_ | |
| | | task<F>(f), (intptr_t)p ); | |
| | | #endif | |
| } | | } | |
| #endif// __TBB_TASK_PRIORITY | | #endif// __TBB_TASK_PRIORITY | |
| | | | |
| //! Joins the arena and executes a functor, then returns | | //! Joins the arena and executes a functor, then returns | |
| //! If not possible to join, wraps the functor into a task, enqueues it
and waits for task completion | | //! If not possible to join, wraps the functor into a task, enqueues it
and waits for task completion | |
| //! Can decrement the arena demand for workers, causing a worker to lea
ve and free a slot to the calling thread | | //! Can decrement the arena demand for workers, causing a worker to lea
ve and free a slot to the calling thread | |
| template<typename F> | | template<typename F> | |
| void execute(F& f) { | | void execute(F& f) { | |
| initialize(); | | initialize(); | |
| internal::delegated_function<F> d(f); | | internal::delegated_function<F> d(f); | |
| | | | |
| skipping to change at line 208 | | skipping to change at line 232 | |
| //! Joins the arena and executes a functor, then returns | | //! Joins the arena and executes a functor, then returns | |
| //! If not possible to join, wraps the functor into a task, enqueues it
and waits for task completion | | //! If not possible to join, wraps the functor into a task, enqueues it
and waits for task completion | |
| //! Can decrement the arena demand for workers, causing a worker to lea
ve and free a slot to the calling thread | | //! Can decrement the arena demand for workers, causing a worker to lea
ve and free a slot to the calling thread | |
| template<typename F> | | template<typename F> | |
| void execute(const F& f) { | | void execute(const F& f) { | |
| initialize(); | | initialize(); | |
| internal::delegated_function<const F> d(f); | | internal::delegated_function<const F> d(f); | |
| internal_execute( d ); | | internal_execute( d ); | |
| } | | } | |
| | | | |
|
| | | #if __TBB_EXTRA_DEBUG | |
| //! Wait for all work in the arena to be completed | | //! Wait for all work in the arena to be completed | |
| //! Even submitted by other application threads | | //! Even submitted by other application threads | |
| //! Joins arena if/when possible (in the same way as execute()) | | //! Joins arena if/when possible (in the same way as execute()) | |
|
| void wait_until_empty() { | | void debug_wait_until_empty() { | |
| initialize(); | | initialize(); | |
| internal_wait(); | | internal_wait(); | |
| } | | } | |
|
| | | #endif //__TBB_EXTRA_DEBUG | |
| | | | |
| //! Returns the index, aka slot number, of the calling thread in its cu
rrent arena | | //! Returns the index, aka slot number, of the calling thread in its cu
rrent arena | |
|
| static int __TBB_EXPORTED_FUNC current_slot(); | | inline static int current_slot() { | |
| | | return internal_current_slot(); | |
| | | } | |
| }; | | }; | |
| | | | |
| } // namespace interfaceX | | } // namespace interfaceX | |
| | | | |
|
| using interface6::task_arena; | | using interface7::task_arena; | |
| | | | |
| } // namespace tbb | | } // namespace tbb | |
| | | | |
| #endif /* __TBB_TASK_ARENA */ | | #endif /* __TBB_TASK_ARENA */ | |
| | | | |
| #endif /* __TBB_task_arena_H */ | | #endif /* __TBB_task_arena_H */ | |
| | | | |
End of changes. 26 change blocks. |
| 54 lines changed or deleted | | 85 lines changed or added | |
|
| tbb_config.h | | tbb_config.h | |
| | | | |
| skipping to change at line 107 | | skipping to change at line 107 | |
| #elif __TBB_GCC_VERSION >= 40404 && __TBB_GCC_VERSION < 40600 | | #elif __TBB_GCC_VERSION >= 40404 && __TBB_GCC_VERSION < 40600 | |
| #define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C
XX0X__ && __INTEL_COMPILER >= 1200) | | #define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C
XX0X__ && __INTEL_COMPILER >= 1200) | |
| #elif __TBB_GCC_VERSION >= 40600 | | #elif __TBB_GCC_VERSION >= 40600 | |
| #define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C
XX0X__ && __INTEL_COMPILER >= 1300) | | #define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C
XX0X__ && __INTEL_COMPILER >= 1300) | |
| #else | | #else | |
| #define __TBB_EXCEPTION_PTR_PRESENT 0 | | #define __TBB_EXCEPTION_PTR_PRESENT 0 | |
| #endif | | #endif | |
| #define __TBB_MAKE_EXCEPTION_PTR_PRESENT (_MSC_VER >= 1700 ||
(__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40600)) | | #define __TBB_MAKE_EXCEPTION_PTR_PRESENT (_MSC_VER >= 1700 ||
(__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40600)) | |
| #define __TBB_STATIC_ASSERT_PRESENT (__INTEL_CXX11_MODE__
|| _MSC_VER >= 1600) | | #define __TBB_STATIC_ASSERT_PRESENT (__INTEL_CXX11_MODE__
|| _MSC_VER >= 1600) | |
| #define __TBB_CPP11_TUPLE_PRESENT (_MSC_VER >= 1600 ||
(__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40300)) | | #define __TBB_CPP11_TUPLE_PRESENT (_MSC_VER >= 1600 ||
(__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40300)) | |
|
| /** TODO: re-check for compiler version greater than 12.1 if it support | | /**Intel C++ compiler 14.0 crashes on using __has_include. When it fixe | |
| s initializer lists**/ | | d, condition will need to be updated. **/ | |
| #define __TBB_INITIALIZER_LISTS_PRESENT 0 | | #if (__clang__ && __INTEL_COMPILER > 1400) | |
| #define __TBB_CONSTEXPR_PRESENT 0 | | #if (__has_feature(__cxx_generalized_initializers__) && __has_inclu | |
| #define __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT __INTEL_CXX11_MODE__ | | de(<initializer_list>)) | |
| | | #define __TBB_INITIALIZER_LISTS_PRESENT 1 | |
| | | #endif | |
| | | #else | |
| | | /** TODO: when MSVC2013 is supported by Intel C++ compiler, it will | |
| | | be enabled silently by compiler, so rule will need to be updated.**/ | |
| | | #define __TBB_INITIALIZER_LISTS_PRESENT __INTEL_CXX11_MODE__ | |
| | | && __INTEL_COMPILER >= 1400 && (_MSC_VER >= 1800 || __TBB_GCC_VERSION >= 40 | |
| | | 400 || _LIBCPP_VERSION) | |
| | | #endif | |
| | | | |
| | | #define __TBB_CONSTEXPR_PRESENT __INTEL_CXX11_MODE__ | |
| | | && __INTEL_COMPILER >= 1400 | |
| | | #define __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT __INTEL_CXX11_MODE__ | |
| | | && __INTEL_COMPILER >= 1200 | |
| #elif __clang__ | | #elif __clang__ | |
| //TODO: these options need to be rechecked | | //TODO: these options need to be rechecked | |
| /** on OS X* the only way to get C++11 is to use clang. For library feature
s (e.g. exception_ptr) libc++ is also | | /** on OS X* the only way to get C++11 is to use clang. For library feature
s (e.g. exception_ptr) libc++ is also | |
| * required. So there is no need to check GCC version for clang**/ | | * required. So there is no need to check GCC version for clang**/ | |
| #define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT __has_feature(__cxx_
variadic_templates__) | | #define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT __has_feature(__cxx_
variadic_templates__) | |
| #define __TBB_CPP11_RVALUE_REF_PRESENT __has_feature(__cxx_
rvalue_references__) | | #define __TBB_CPP11_RVALUE_REF_PRESENT __has_feature(__cxx_
rvalue_references__) | |
| /** TODO: extend exception_ptr related conditions to cover libstdc++ **/ | | /** TODO: extend exception_ptr related conditions to cover libstdc++ **/ | |
| #define __TBB_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110
3L && _LIBCPP_VERSION) | | #define __TBB_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110
3L && _LIBCPP_VERSION) | |
| #define __TBB_MAKE_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110
3L && _LIBCPP_VERSION) | | #define __TBB_MAKE_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110
3L && _LIBCPP_VERSION) | |
| #define __TBB_STATIC_ASSERT_PRESENT __has_feature(__cxx_s
tatic_assert__) | | #define __TBB_STATIC_ASSERT_PRESENT __has_feature(__cxx_s
tatic_assert__) | |
| | | | |
| skipping to change at line 324 | | skipping to change at line 332 | |
| #endif | | #endif | |
| | | | |
| #ifndef __TBB_TASK_GROUP_CONTEXT | | #ifndef __TBB_TASK_GROUP_CONTEXT | |
| #define __TBB_TASK_GROUP_CONTEXT 1 | | #define __TBB_TASK_GROUP_CONTEXT 1 | |
| #endif /* __TBB_TASK_GROUP_CONTEXT */ | | #endif /* __TBB_TASK_GROUP_CONTEXT */ | |
| | | | |
| #ifndef __TBB_SCHEDULER_OBSERVER | | #ifndef __TBB_SCHEDULER_OBSERVER | |
| #define __TBB_SCHEDULER_OBSERVER 1 | | #define __TBB_SCHEDULER_OBSERVER 1 | |
| #endif /* __TBB_SCHEDULER_OBSERVER */ | | #endif /* __TBB_SCHEDULER_OBSERVER */ | |
| | | | |
|
| #if !defined(TBB_PREVIEW_TASK_ARENA) && __TBB_BUILD | | #ifndef __TBB_TASK_ARENA | |
| #define TBB_PREVIEW_TASK_ARENA __TBB_CPF_BUILD | | #define __TBB_TASK_ARENA (__TBB_BUILD||TBB_PREVIEW_TASK_ARENA) | |
| #endif /* TBB_PREVIEW_TASK_ARENA */ | | #endif /* __TBB_TASK_ARENA */ | |
| #define __TBB_TASK_ARENA TBB_PREVIEW_TASK_ARENA | | #if __TBB_TASK_ARENA | |
| #if TBB_PREVIEW_TASK_ARENA | | #define __TBB_RECYCLE_TO_ENQUEUE __TBB_BUILD // keep non-official | |
| #define TBB_PREVIEW_LOCAL_OBSERVER 1 | | | |
| #define __TBB_NO_IMPLICIT_LINKAGE 1 | | | |
| #define __TBB_RECYCLE_TO_ENQUEUE 1 | | | |
| #ifndef __TBB_TASK_PRIORITY | | | |
| #define __TBB_TASK_PRIORITY 0 // TODO: it will be removed in next v | | | |
| ersions | | | |
| #endif | | | |
| #if !__TBB_SCHEDULER_OBSERVER | | #if !__TBB_SCHEDULER_OBSERVER | |
| #error TBB_PREVIEW_TASK_ARENA requires __TBB_SCHEDULER_OBSERVER to
be enabled | | #error TBB_PREVIEW_TASK_ARENA requires __TBB_SCHEDULER_OBSERVER to
be enabled | |
| #endif | | #endif | |
|
| #endif /* TBB_PREVIEW_TASK_ARENA */ | | #endif /* __TBB_TASK_ARENA */ | |
| | | | |
| #if !defined(TBB_PREVIEW_LOCAL_OBSERVER) && __TBB_BUILD && __TBB_SCHEDULER_
OBSERVER | | #if !defined(TBB_PREVIEW_LOCAL_OBSERVER) && __TBB_BUILD && __TBB_SCHEDULER_
OBSERVER | |
| #define TBB_PREVIEW_LOCAL_OBSERVER 1 | | #define TBB_PREVIEW_LOCAL_OBSERVER 1 | |
| #endif /* TBB_PREVIEW_LOCAL_OBSERVER */ | | #endif /* TBB_PREVIEW_LOCAL_OBSERVER */ | |
| | | | |
| #if TBB_USE_EXCEPTIONS && !__TBB_TASK_GROUP_CONTEXT | | #if TBB_USE_EXCEPTIONS && !__TBB_TASK_GROUP_CONTEXT | |
| #error TBB_USE_EXCEPTIONS requires __TBB_TASK_GROUP_CONTEXT to be enabl
ed | | #error TBB_USE_EXCEPTIONS requires __TBB_TASK_GROUP_CONTEXT to be enabl
ed | |
| #endif | | #endif | |
| | | | |
| #ifndef __TBB_TASK_PRIORITY | | #ifndef __TBB_TASK_PRIORITY | |
|
| #define __TBB_TASK_PRIORITY __TBB_TASK_GROUP_CONTEXT | | #define __TBB_TASK_PRIORITY (!__TBB_CPF_BUILD&&__TBB_TASK_GROUP_CONTEXT
) // TODO: it will be enabled for CPF in the next versions | |
| #endif /* __TBB_TASK_PRIORITY */ | | #endif /* __TBB_TASK_PRIORITY */ | |
| | | | |
| #if __TBB_TASK_PRIORITY && !__TBB_TASK_GROUP_CONTEXT | | #if __TBB_TASK_PRIORITY && !__TBB_TASK_GROUP_CONTEXT | |
| #error __TBB_TASK_PRIORITY requires __TBB_TASK_GROUP_CONTEXT to be enab
led | | #error __TBB_TASK_PRIORITY requires __TBB_TASK_GROUP_CONTEXT to be enab
led | |
| #endif | | #endif | |
| | | | |
| #if TBB_PREVIEW_WAITING_FOR_WORKERS || __TBB_BUILD | | #if TBB_PREVIEW_WAITING_FOR_WORKERS || __TBB_BUILD | |
| #define __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 1 | | #define __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 1 | |
| #endif | | #endif | |
| | | | |
| | | | |
| skipping to change at line 528 | | skipping to change at line 530 | |
| __GNUC__==4 && (__GNUC_MINOR__==4 ||__GNUC_MINOR__==5 || (__INTEL_COMPI
LER==1300 && (__GNUC_MINOR__==6 ||__GNUC_MINOR__==7))) | | __GNUC__==4 && (__GNUC_MINOR__==4 ||__GNUC_MINOR__==5 || (__INTEL_COMPI
LER==1300 && (__GNUC_MINOR__==6 ||__GNUC_MINOR__==7))) | |
| /* There is an issue for specific GCC toolchain when C++11 is enabled | | /* There is an issue for specific GCC toolchain when C++11 is enabled | |
| and exceptions are disabled: | | and exceptions are disabled: | |
| exceprion_ptr.h/nested_exception.h use throw unconditionally. | | exceprion_ptr.h/nested_exception.h use throw unconditionally. | |
| */ | | */ | |
| #define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 1 | | #define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 1 | |
| #else | | #else | |
| #define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 0 | | #define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 0 | |
| #endif | | #endif | |
| | | | |
|
| #if __TBB_x86_32 && (__linux__ || __APPLE__ || _WIN32 || __sun) && ((defin
ed(__INTEL_COMPILER) && __INTEL_COMPILER <= 1300) || (__GNUC__==3 && __GNUC
_MINOR__==3 ) || defined(__SUNPRO_CC)) | | #if __TBB_x86_32 && (__linux__ || __APPLE__ || _WIN32 || __sun) && ((defin
ed(__INTEL_COMPILER) && __INTEL_COMPILER <= 1400) || (__GNUC__==3 && __GNUC
_MINOR__==3 ) || defined(__SUNPRO_CC)) | |
| // Some compilers for IA-32 fail to provide 8-byte alignment of objects
on the stack, | | // Some compilers for IA-32 fail to provide 8-byte alignment of objects
on the stack, | |
| // even if the object specifies 8-byte alignment. On such platforms, t
he IA-32 implementation | | // even if the object specifies 8-byte alignment. On such platforms, t
he IA-32 implementation | |
| // of 64 bit atomics (e.g. atomic<long long>) use different tactics dep
ending upon | | // of 64 bit atomics (e.g. atomic<long long>) use different tactics dep
ending upon | |
| // whether the object is properly aligned or not. | | // whether the object is properly aligned or not. | |
| #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 1 | | #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 1 | |
| #else | | #else | |
| #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 0 | | #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 0 | |
| #endif | | #endif | |
| | | | |
| #if __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT && __TBB_GCC_VERSION < 40700 &
& !defined(__INTEL_COMPILER) && !defined (__clang__) | | #if __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT && __TBB_GCC_VERSION < 40700 &
& !defined(__INTEL_COMPILER) && !defined (__clang__) | |
| #define __TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN 1 | | #define __TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN 1 | |
| #endif | | #endif | |
| /** End of __TBB_XXX_BROKEN macro section **/ | | /** End of __TBB_XXX_BROKEN macro section **/ | |
| | | | |
|
| | | #if defined(_MSC_VER) && _MSC_VER>=1500 && !defined(__INTEL_COMPILER) | |
| | | // A macro to suppress erroneous or benign "unreachable code" MSVC warn | |
| | | ing (4702) | |
| | | #define __TBB_MSVC_UNREACHABLE_CODE_IGNORED 1 | |
| | | #endif | |
| | | | |
| #define __TBB_ATOMIC_CTORS (__TBB_CONSTEXPR_PRESENT && __TBB_DEFAULTED_
AND_DELETED_FUNC_PRESENT && (!__TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN)) | | #define __TBB_ATOMIC_CTORS (__TBB_CONSTEXPR_PRESENT && __TBB_DEFAULTED_
AND_DELETED_FUNC_PRESENT && (!__TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN)) | |
| | | | |
| #endif /* __TBB_tbb_config_H */ | | #endif /* __TBB_tbb_config_H */ | |
| | | | |
End of changes. 6 change blocks. |
| 20 lines changed or deleted | | 33 lines changed or added | |
|