_flow_graph_impl.h   _flow_graph_impl.h 
skipping to change at line 528 skipping to change at line 528
typedef M my_mutex_type; typedef M my_mutex_type;
typedef std::list< receiver<T> * > my_successors_type; typedef std::list< receiver<T> * > my_successors_type;
public: public:
broadcast_cache( ) {} broadcast_cache( ) {}
// as above, but call try_put_task instead, and return the last tas k we received (if any) // as above, but call try_put_task instead, and return the last tas k we received (if any)
/*override*/ task * try_put_task( const T &t ) { /*override*/ task * try_put_task( const T &t ) {
task * last_task = NULL; task * last_task = NULL;
bool upgraded = false; bool upgraded = true;
typename my_mutex_type::scoped_lock l(this->my_mutex, false); typename my_mutex_type::scoped_lock l(this->my_mutex, upgraded)
;
typename my_successors_type::iterator i = this->my_successors.b egin(); typename my_successors_type::iterator i = this->my_successors.b egin();
while ( i != this->my_successors.end() ) { while ( i != this->my_successors.end() ) {
task *new_task = (*i)->try_put_task(t); task *new_task = (*i)->try_put_task(t);
last_task = combine_tasks(last_task, new_task); // enqueue if necessary last_task = combine_tasks(last_task, new_task); // enqueue if necessary
if(new_task) { if(new_task) {
++i; ++i;
} }
else { // failed else { // failed
if ( (*i)->register_predecessor(*this->my_owner) ) { if ( (*i)->register_predecessor(*this->my_owner) ) {
if (!upgraded) { if (!upgraded) {
skipping to change at line 570 skipping to change at line 570
public: public:
round_robin_cache( ) {} round_robin_cache( ) {}
size_type size() { size_type size() {
typename my_mutex_type::scoped_lock l(this->my_mutex, false); typename my_mutex_type::scoped_lock l(this->my_mutex, false);
return this->my_successors.size(); return this->my_successors.size();
} }
/*override*/task *try_put_task( const T &t ) { /*override*/task *try_put_task( const T &t ) {
bool upgraded = false; bool upgraded = true;
typename my_mutex_type::scoped_lock l(this->my_mutex, false); typename my_mutex_type::scoped_lock l(this->my_mutex, upgraded)
;
typename my_successors_type::iterator i = this->my_successors.b egin(); typename my_successors_type::iterator i = this->my_successors.b egin();
while ( i != this->my_successors.end() ) { while ( i != this->my_successors.end() ) {
task *new_task = (*i)->try_put_task(t); task *new_task = (*i)->try_put_task(t);
if ( new_task ) { if ( new_task ) {
return new_task; return new_task;
} else { } else {
if ( (*i)->register_predecessor(*this->my_owner) ) { if ( (*i)->register_predecessor(*this->my_owner) ) {
if (!upgraded) { if (!upgraded) {
l.upgrade_to_writer(); l.upgrade_to_writer();
upgraded = true; upgraded = true;
 End of changes. 2 change blocks. 
4 lines changed or deleted 6 lines changed or added


 _flow_graph_join_impl.h   _flow_graph_join_impl.h 
skipping to change at line 44 skipping to change at line 44
#endif #endif
#include "tbb/internal/_flow_graph_types_impl.h" #include "tbb/internal/_flow_graph_types_impl.h"
namespace internal { namespace internal {
typedef size_t tag_value; typedef size_t tag_value;
static const tag_value NO_TAG = tag_value(-1); static const tag_value NO_TAG = tag_value(-1);
struct forwarding_base { struct forwarding_base {
forwarding_base(task *rt) : my_root_task(rt), current_tag(NO_TAG) { } forwarding_base(graph &g) : my_graph_ptr(&g), current_tag(NO_TAG) { }
virtual ~forwarding_base() {} virtual ~forwarding_base() {}
// decrement_port_count may create a forwarding task. If we cannot handle the task // decrement_port_count may create a forwarding task. If we cannot handle the task
// ourselves, ask decrement_port_count to deal with it. // ourselves, ask decrement_port_count to deal with it.
virtual task * decrement_port_count(bool handle_task) = 0; virtual task * decrement_port_count(bool handle_task) = 0;
virtual void increment_port_count() = 0; virtual void increment_port_count() = 0;
virtual task * increment_tag_count(tag_value /*t*/, bool /*handle_t ask*/) {return NULL;} virtual task * increment_tag_count(tag_value /*t*/, bool /*handle_t ask*/) {return NULL;}
// moved here so input ports can queue tasks // moved here so input ports can queue tasks
task* my_root_task; graph* my_graph_ptr;
tag_value current_tag; // so ports can refer to FE's desired items tag_value current_tag; // so ports can refer to FE's desired items
}; };
template< int N > template< int N >
struct join_helper { struct join_helper {
template< typename TupleType, typename PortType > template< typename TupleType, typename PortType >
static inline void set_join_node_pointer(TupleType &my_input, PortT ype *port) { static inline void set_join_node_pointer(TupleType &my_input, PortT ype *port) {
tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port); tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
join_helper<N-1>::set_join_node_pointer( my_input, port ); join_helper<N-1>::set_join_node_pointer( my_input, port );
skipping to change at line 669 skipping to change at line 669
class join_node_FE; class join_node_FE;
template<typename InputTuple, typename OutputTuple> template<typename InputTuple, typename OutputTuple>
class join_node_FE<reserving, InputTuple, OutputTuple> : public forward ing_base { class join_node_FE<reserving, InputTuple, OutputTuple> : public forward ing_base {
public: public:
static const int N = tbb::flow::tuple_size<OutputTuple>::value; static const int N = tbb::flow::tuple_size<OutputTuple>::value;
typedef OutputTuple output_type; typedef OutputTuple output_type;
typedef InputTuple input_type; typedef InputTuple input_type;
typedef join_node_base<reserving, InputTuple, OutputTuple> my_node_ type; // for forwarding typedef join_node_base<reserving, InputTuple, OutputTuple> my_node_ type; // for forwarding
join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NU LL) { join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
ports_with_no_inputs = N; ports_with_no_inputs = N;
join_helper<N>::set_join_node_pointer(my_inputs, this); join_helper<N>::set_join_node_pointer(my_inputs, this);
} }
join_node_FE(const join_node_FE& other) : forwarding_base(other.my_ root_task), my_node(NULL) { join_node_FE(const join_node_FE& other) : forwarding_base(*(other.f orwarding_base::my_graph_ptr)), my_node(NULL) {
ports_with_no_inputs = N; ports_with_no_inputs = N;
join_helper<N>::set_join_node_pointer(my_inputs, this); join_helper<N>::set_join_node_pointer(my_inputs, this);
} }
void set_my_node(my_node_type *new_my_node) { my_node = new_my_node ; } void set_my_node(my_node_type *new_my_node) { my_node = new_my_node ; }
void increment_port_count() { void increment_port_count() {
++ports_with_no_inputs; ++ports_with_no_inputs;
} }
// if all input_ports have predecessors, spawn forward to try and c onsume tuples // if all input_ports have predecessors, spawn forward to try and c onsume tuples
task * decrement_port_count(bool handle_task) { task * decrement_port_count(bool handle_task) {
if(ports_with_no_inputs.fetch_and_decrement() == 1) { if(ports_with_no_inputs.fetch_and_decrement() == 1) {
task *rtask = new ( task::allocate_additional_child_of( *(t task* tp = this->my_graph_ptr->root_task();
his->my_root_task) ) ) if(tp) {
forward_task_bypass task *rtask = new ( task::allocate_additional_child_of(
<my_node_type>(*my_node); *tp ) )
if(!handle_task) return rtask; forward_task_bypass<my_node_type>(*my_node);
FLOW_SPAWN(*rtask); if(!handle_task) return rtask;
FLOW_SPAWN(*rtask);
}
} }
return NULL; return NULL;
} }
input_type &input_ports() { return my_inputs; } input_type &input_ports() { return my_inputs; }
protected: protected:
void reset() { void reset() {
// called outside of parallel contexts // called outside of parallel contexts
skipping to change at line 738 skipping to change at line 740
}; };
template<typename InputTuple, typename OutputTuple> template<typename InputTuple, typename OutputTuple>
class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi ng_base { class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi ng_base {
public: public:
static const int N = tbb::flow::tuple_size<OutputTuple>::value; static const int N = tbb::flow::tuple_size<OutputTuple>::value;
typedef OutputTuple output_type; typedef OutputTuple output_type;
typedef InputTuple input_type; typedef InputTuple input_type;
typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t ype; // for forwarding typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t ype; // for forwarding
join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NU LL) { join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
ports_with_no_items = N; ports_with_no_items = N;
join_helper<N>::set_join_node_pointer(my_inputs, this); join_helper<N>::set_join_node_pointer(my_inputs, this);
} }
join_node_FE(const join_node_FE& other) : forwarding_base(other.my_ root_task), my_node(NULL) { join_node_FE(const join_node_FE& other) : forwarding_base(*(other.f orwarding_base::my_graph_ptr)), my_node(NULL) {
ports_with_no_items = N; ports_with_no_items = N;
join_helper<N>::set_join_node_pointer(my_inputs, this); join_helper<N>::set_join_node_pointer(my_inputs, this);
} }
// needed for forwarding // needed for forwarding
void set_my_node(my_node_type *new_my_node) { my_node = new_my_node ; } void set_my_node(my_node_type *new_my_node) { my_node = new_my_node ; }
void reset_port_count() { void reset_port_count() {
ports_with_no_items = N; ports_with_no_items = N;
} }
// if all input_ports have items, spawn forward to try and consume tuples // if all input_ports have items, spawn forward to try and consume tuples
task * decrement_port_count(bool handle_task) task * decrement_port_count(bool handle_task)
{ {
if(ports_with_no_items.fetch_and_decrement() == 1) { if(ports_with_no_items.fetch_and_decrement() == 1) {
task *rtask = new ( task::allocate_additional_child_of( *(t task* tp = this->my_graph_ptr->root_task();
his->my_root_task) ) ) if(tp) {
forward_task_bypass task *rtask = new ( task::allocate_additional_child_of(
<my_node_type>(*my_node); *tp ) )
if(!handle_task) return rtask; forward_task_bypass <my_node_type>(*my_node);
FLOW_SPAWN( *rtask); if(!handle_task) return rtask;
FLOW_SPAWN( *rtask);
}
} }
return NULL; return NULL;
} }
void increment_port_count() { __TBB_ASSERT(false, NULL); } // shou ld never be called void increment_port_count() { __TBB_ASSERT(false, NULL); } // shou ld never be called
input_type &input_ports() { return my_inputs; } input_type &input_ports() { return my_inputs; }
protected: protected:
skipping to change at line 853 skipping to change at line 857
friend class internal::aggregating_functor<my_class, tag_matching_F E_operation>; friend class internal::aggregating_functor<my_class, tag_matching_F E_operation>;
aggregator<my_handler, tag_matching_FE_operation> my_aggregator; aggregator<my_handler, tag_matching_FE_operation> my_aggregator;
// called from aggregator, so serialized // called from aggregator, so serialized
// construct as many output objects as possible. // construct as many output objects as possible.
// returns a task pointer if the a task would have been enqueued bu t we asked that // returns a task pointer if the a task would have been enqueued bu t we asked that
// it be returned. Otherwise returns NULL. // it be returned. Otherwise returns NULL.
task * fill_output_buffer(bool should_enqueue, bool handle_task) { task * fill_output_buffer(bool should_enqueue, bool handle_task) {
output_type l_out; output_type l_out;
task *rtask = NULL; task *rtask = NULL;
bool do_fwd = should_enqueue && this->buffer_empty(); task* tp = this->my_graph_ptr->root_task();
bool do_fwd = should_enqueue && this->buffer_empty() && tp;
while(find_value_tag(this->current_tag,N)) { // while there ar e completed items while(find_value_tag(this->current_tag,N)) { // while there ar e completed items
this->tagged_delete(this->current_tag); // remove the tag this->tagged_delete(this->current_tag); // remove the tag
if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
this->push_back(l_out); this->push_back(l_out);
if(do_fwd) { // we enqueue if receiving an item from p redecessor, not if successor asks for item if(do_fwd) { // we enqueue if receiving an item from p redecessor, not if successor asks for item
rtask = new ( task::allocate_additional_child_of( * (this->my_root_task) ) ) rtask = new ( task::allocate_additional_child_of( * tp ) )
forward_task_bypass<my_node_type>(*my_node); forward_task_bypass<my_node_type>(*my_node);
if(handle_task) { if(handle_task) {
FLOW_SPAWN(*rtask); FLOW_SPAWN(*rtask);
rtask = NULL; rtask = NULL;
} }
do_fwd = false; do_fwd = false;
} }
// retire the input values // retire the input values
join_helper<N>::reset_ports(my_inputs); // <== call b ack join_helper<N>::reset_ports(my_inputs); // <== call b ack
this->current_tag = NO_TAG; this->current_tag = NO_TAG;
skipping to change at line 931 skipping to change at line 936
__TBB_store_with_release(current->status, SUCCEEDED ); __TBB_store_with_release(current->status, SUCCEEDED );
} }
break; break;
} }
} }
} }
// ------------ End Aggregator --------------- // ------------ End Aggregator ---------------
public: public:
template<typename FunctionTuple> template<typename FunctionTuple>
join_node_FE(graph &g, FunctionTuple tag_funcs) : forwarding_base(g .root_task()), my_node(NULL) { join_node_FE(graph &g, FunctionTuple tag_funcs) : forwarding_base(g ), my_node(NULL) {
join_helper<N>::set_join_node_pointer(my_inputs, this); join_helper<N>::set_join_node_pointer(my_inputs, this);
join_helper<N>::set_tag_func(my_inputs, tag_funcs); join_helper<N>::set_tag_func(my_inputs, tag_funcs);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
join_node_FE(const join_node_FE& other) : forwarding_base(other.my_ root_task), my_tag_buffer(), join_node_FE(const join_node_FE& other) : forwarding_base(*(other.f orwarding_base::my_graph_ptr)), my_tag_buffer(),
output_buffer_type() { output_buffer_type() {
my_node = NULL; my_node = NULL;
join_helper<N>::set_join_node_pointer(my_inputs, this); join_helper<N>::set_join_node_pointer(my_inputs, this);
join_helper<N>::copy_tag_functors(my_inputs, const_cast<input_t ype &>(other.my_inputs)); join_helper<N>::copy_tag_functors(my_inputs, const_cast<input_t ype &>(other.my_inputs));
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
// needed for forwarding // needed for forwarding
void set_my_node(my_node_type *new_my_node) { my_node = new_my_node ; } void set_my_node(my_node_type *new_my_node) { my_node = new_my_node ; }
skipping to change at line 1055 skipping to change at line 1060
friend class internal::aggregating_functor<my_class, join_node_base _operation>; friend class internal::aggregating_functor<my_class, join_node_base _operation>;
bool forwarder_busy; bool forwarder_busy;
aggregator<my_handler, join_node_base_operation> my_aggregator; aggregator<my_handler, join_node_base_operation> my_aggregator;
void handle_operations(join_node_base_operation* op_list) { void handle_operations(join_node_base_operation* op_list) {
join_node_base_operation *current; join_node_base_operation *current;
while(op_list) { while(op_list) {
current = op_list; current = op_list;
op_list = op_list->next; op_list = op_list->next;
switch(current->type) { switch(current->type) {
case reg_succ: case reg_succ: {
my_successors.register_successor(*(current->my_succ)); my_successors.register_successor(*(current->my_succ
if(tuple_build_may_succeed() && !forwarder_busy) { ));
task *rtask = new ( task::allocate_additional_child task* tp = this->graph_node::my_graph.root_task();
_of(*(this->my_root_task)) ) if(tuple_build_may_succeed() && !forwarder_busy &&
forward_task_bypass tp) {
<join_node_base<JP,InputTuple,OutputTuple> task *rtask = new ( task::allocate_additional_c
>(*this); hild_of(*tp) )
FLOW_SPAWN(*rtask); forward_task_bypass
forwarder_busy = true; <join_node_base<JP,InputTuple,OutputTup
le> >(*this);
FLOW_SPAWN(*rtask);
forwarder_busy = true;
}
__TBB_store_with_release(current->status, SUCCEEDED
);
} }
__TBB_store_with_release(current->status, SUCCEEDED);
break; break;
case rem_succ: case rem_succ:
my_successors.remove_successor(*(current->my_succ)); my_successors.remove_successor(*(current->my_succ));
__TBB_store_with_release(current->status, SUCCEEDED); __TBB_store_with_release(current->status, SUCCEEDED);
break; break;
case try__get: case try__get:
if(tuple_build_may_succeed()) { if(tuple_build_may_succeed()) {
if(try_to_make_tuple(*(current->my_arg))) { if(try_to_make_tuple(*(current->my_arg))) {
tuple_accepted(); tuple_accepted();
__TBB_store_with_release(current->status, SUCCE EDED); __TBB_store_with_release(current->status, SUCCE EDED);
skipping to change at line 1117 skipping to change at line 1124
} }
// ---------- end aggregator ----------- // ---------- end aggregator -----------
public: public:
join_node_base(graph &g) : graph_node(g), input_ports_type(g), forw arder_busy(false) { join_node_base(graph &g) : graph_node(g), input_ports_type(g), forw arder_busy(false) {
my_successors.set_owner(this); my_successors.set_owner(this);
input_ports_type::set_my_node(this); input_ports_type::set_my_node(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
join_node_base(const join_node_base& other) : join_node_base(const join_node_base& other) :
graph_node(other.my_graph), input_ports_type(other), graph_node(other.graph_node::my_graph), input_ports_type(other) ,
sender<OutputTuple>(), forwarder_busy(false), my_successors() { sender<OutputTuple>(), forwarder_busy(false), my_successors() {
my_successors.set_owner(this); my_successors.set_owner(this);
input_ports_type::set_my_node(this); input_ports_type::set_my_node(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
template<typename FunctionTuple> template<typename FunctionTuple>
join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_po rts_type(g, f), forwarder_busy(false) { join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_po rts_type(g, f), forwarder_busy(false) {
my_successors.set_owner(this); my_successors.set_owner(this);
input_ports_type::set_my_node(this); input_ports_type::set_my_node(this);
 End of changes. 15 change blocks. 
34 lines changed or deleted 44 lines changed or added


 _flow_graph_node_impl.h   _flow_graph_node_impl.h 
skipping to change at line 74 skipping to change at line 74
enum op_type {reg_pred, rem_pred, app_body, try_fwd, tryput_bypass, app_body_bypass }; enum op_type {reg_pred, rem_pred, app_body, try_fwd, tryput_bypass, app_body_bypass };
typedef function_input_base<Input, A, ImplType> my_class; typedef function_input_base<Input, A, ImplType> my_class;
public: public:
//! The input type of this receiver //! The input type of this receiver
typedef Input input_type; typedef Input input_type;
//! Constructor for function_input_base //! Constructor for function_input_base
function_input_base( graph &g, size_t max_concurrency, function_inp ut_queue<input_type,A> *q = NULL ) function_input_base( graph &g, size_t max_concurrency, function_inp ut_queue<input_type,A> *q = NULL )
: my_root_task(g.root_task()), my_max_concurrency(max_concurren cy), my_concurrency(0), : my_graph(g), my_max_concurrency(max_concurrency), my_concurre ncy(0),
my_queue(q), forwarder_busy(false) { my_queue(q), forwarder_busy(false) {
my_predecessors.set_owner(this); my_predecessors.set_owner(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
//! Copy constructor //! Copy constructor
function_input_base( const function_input_base& src, function_input _queue<input_type,A> *q = NULL ) : function_input_base( const function_input_base& src, function_input _queue<input_type,A> *q = NULL ) :
receiver<Input>(), tbb::internal::no_assign(), receiver<Input>(), tbb::internal::no_assign(),
my_root_task( src.my_root_task), my_max_concurrency(src.my_max_ concurrency), my_graph(src.my_graph), my_max_concurrency(src.my_max_concurren cy),
my_concurrency(0), my_queue(q), forwarder_busy(false) my_concurrency(0), my_queue(q), forwarder_busy(false)
{ {
my_predecessors.set_owner(this); my_predecessors.set_owner(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
//! Destructor //! Destructor
virtual ~function_input_base() { virtual ~function_input_base() {
if ( my_queue ) delete my_queue; if ( my_queue ) delete my_queue;
} }
skipping to change at line 136 skipping to change at line 136
void reset_function_input_base() { void reset_function_input_base() {
my_concurrency = 0; my_concurrency = 0;
if(my_queue) { if(my_queue) {
my_queue->reset(); my_queue->reset();
} }
my_predecessors.reset(); my_predecessors.reset();
forwarder_busy = false; forwarder_busy = false;
} }
task *my_root_task; graph& my_graph;
const size_t my_max_concurrency; const size_t my_max_concurrency;
size_t my_concurrency; size_t my_concurrency;
function_input_queue<input_type, A> *my_queue; function_input_queue<input_type, A> *my_queue;
predecessor_cache<input_type, null_mutex > my_predecessors; predecessor_cache<input_type, null_mutex > my_predecessors;
/*override*/void reset_receiver() { /*override*/void reset_receiver() {
my_predecessors.reset(); my_predecessors.reset();
} }
private: private:
skipping to change at line 291 skipping to change at line 291
task * new_task = static_cast<ImplType *>(this)->apply_body_imp l_bypass(i); task * new_task = static_cast<ImplType *>(this)->apply_body_imp l_bypass(i);
if ( my_max_concurrency != 0 ) { if ( my_max_concurrency != 0 ) {
my_operation op_data(app_body_bypass); // tries to pop an item or get_item, enqueues another apply_body my_operation op_data(app_body_bypass); // tries to pop an item or get_item, enqueues another apply_body
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
tbb::task *ttask = op_data.bypass_t; tbb::task *ttask = op_data.bypass_t;
new_task = combine_tasks(new_task, ttask); new_task = combine_tasks(new_task, ttask);
} }
return new_task; return new_task;
} }
//! allocates a task to call apply_body( input ) //! allocates a task to call apply_body( input )
inline task * create_body_task( const input_type &input ) { inline task * create_body_task( const input_type &input ) {
return new(task::allocate_additional_child_of(*my_root_task))
apply_body_task_bypass < my_class, input_type >(*this, input task* tp = my_graph.root_task();
); return (tp) ?
} new(task::allocate_additional_child_of(*tp))
apply_body_task_bypass < my_class, input_type >(*this,
input) :
NULL;
}
//! Spawns a task that calls apply_body( input ) //! Spawns a task that calls apply_body( input )
inline void spawn_body_task( const input_type &input ) { inline void spawn_body_task( const input_type &input ) {
FLOW_SPAWN(*create_body_task(input)); task* tp = create_body_task(input);
// tp == NULL => g.reset(), which shouldn't occur in concurrent
context
if(tp) {
FLOW_SPAWN(*tp);
}
} }
//! This is executed by an enqueued task, the "forwarder" //! This is executed by an enqueued task, the "forwarder"
task *forward_task() { task *forward_task() {
my_operation op_data(try_fwd); my_operation op_data(try_fwd);
task *rval = NULL; task *rval = NULL;
do { do {
op_data.status = WAIT; op_data.status = WAIT;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
if(op_data.status == SUCCEEDED) { if(op_data.status == SUCCEEDED) {
tbb::task *ttask = op_data.bypass_t; tbb::task *ttask = op_data.bypass_t;
rval = combine_tasks(rval, ttask); rval = combine_tasks(rval, ttask);
} }
} while (op_data.status == SUCCEEDED); } while (op_data.status == SUCCEEDED);
return rval; return rval;
} }
inline task *create_forward_task() { inline task *create_forward_task() {
task *rval = new(task::allocate_additional_child_of(*my_root_tas task* tp = my_graph.root_task();
k)) forward_task_bypass< my_class >(*this); return (tp) ?
return rval; new(task::allocate_additional_child_of(*tp)) forward_task_by
pass< my_class >(*this) :
NULL;
} }
//! Spawns a task that calls forward() //! Spawns a task that calls forward()
inline void spawn_forward_task() { inline void spawn_forward_task() {
FLOW_SPAWN(*create_forward_task()); task* tp = create_forward_task();
if(tp) {
FLOW_SPAWN(*tp);
}
} }
}; // function_input_base }; // function_input_base
//! Implements methods for a function node that takes a type Input as i nput and sends //! Implements methods for a function node that takes a type Input as i nput and sends
// a type Output to its successors. // a type Output to its successors.
template< typename Input, typename Output, typename A> template< typename Input, typename Output, typename A>
class function_input : public function_input_base<Input, A, function_in put<Input,Output,A> > { class function_input : public function_input_base<Input, A, function_in put<Input,Output,A> > {
public: public:
typedef Input input_type; typedef Input input_type;
typedef Output output_type; typedef Output output_type;
skipping to change at line 376 skipping to change at line 389
protected: protected:
void reset_function_input() { void reset_function_input() {
base_type::reset_function_input_base(); base_type::reset_function_input_base();
} }
function_body<input_type, output_type> *my_body; function_body<input_type, output_type> *my_body;
virtual broadcast_cache<output_type > &successors() = 0; virtual broadcast_cache<output_type > &successors() = 0;
}; }; // function_input
//! Implements methods for a function node that takes a type Input as i nput //! Implements methods for a function node that takes a type Input as i nput
// and has a tuple of output ports specified. // and has a tuple of output ports specified.
template< typename Input, typename OutputPortSet, typename A> template< typename Input, typename OutputPortSet, typename A>
class multifunction_input : public function_input_base<Input, A, multif unction_input<Input,OutputPortSet,A> > { class multifunction_input : public function_input_base<Input, A, multif unction_input<Input,OutputPortSet,A> > {
public: public:
typedef Input input_type; typedef Input input_type;
typedef OutputPortSet output_ports_type; typedef OutputPortSet output_ports_type;
typedef multifunction_input<Input,OutputPortSet,A> my_class; typedef multifunction_input<Input,OutputPortSet,A> my_class;
typedef function_input_base<Input, A, my_class> base_type; typedef function_input_base<Input, A, my_class> base_type;
skipping to change at line 435 skipping to change at line 448
protected: protected:
void reset() { void reset() {
base_type::reset_function_input_base(); base_type::reset_function_input_base();
} }
multifunction_body<input_type, output_ports_type> *my_body; multifunction_body<input_type, output_ports_type> *my_body;
output_ports_type my_output_ports; output_ports_type my_output_ports;
}; }; // multifunction_input
// template to refer to an output port of a multifunction_node // template to refer to an output port of a multifunction_node
template<size_t N, typename MOP> template<size_t N, typename MOP>
typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>:: type &output_port(MOP &op) { typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>:: type &output_port(MOP &op) {
return tbb::flow::get<N>(op.output_ports()); return tbb::flow::get<N>(op.output_ports());
} }
// helper structs for split_node // helper structs for split_node
template<int N> template<int N>
struct emit_element { struct emit_element {
skipping to change at line 474 skipping to change at line 487
public: public:
//! The input type of this receiver //! The input type of this receiver
typedef continue_msg input_type; typedef continue_msg input_type;
//! The output type of this receiver //! The output type of this receiver
typedef Output output_type; typedef Output output_type;
template< typename Body > template< typename Body >
continue_input( graph &g, Body& body ) continue_input( graph &g, Body& body )
: my_root_task(g.root_task()), : my_graph_ptr(&g),
my_body( new internal::function_body_leaf< input_type, output_ type, Body>(body) ) { } my_body( new internal::function_body_leaf< input_type, output_ type, Body>(body) ) { }
template< typename Body > template< typename Body >
continue_input( graph &g, int number_of_predecessors, Body& body ) continue_input( graph &g, int number_of_predecessors, Body& body )
: continue_receiver( number_of_predecessors ), my_root_task(g.r oot_task()), : continue_receiver( number_of_predecessors ), my_graph_ptr(&g) ,
my_body( new internal::function_body_leaf< input_type, output_ type, Body>(body) ) { } my_body( new internal::function_body_leaf< input_type, output_ type, Body>(body) ) { }
continue_input( const continue_input& src ) : continue_receiver(src ), continue_input( const continue_input& src ) : continue_receiver(src ),
my_root_task(src.my_root_task), my_body( src.my_body->clone() ) {} my_graph_ptr(src.my_graph_ptr), my_body( src.my_body->clone() ) {}
~continue_input() { ~continue_input() {
delete my_body; delete my_body;
} }
template< typename Body > template< typename Body >
Body copy_function_object() { Body copy_function_object() {
internal::function_body<input_type, output_type> &body_ref = *m y_body; internal::function_body<input_type, output_type> &body_ref = *m y_body;
return dynamic_cast< internal::function_body_leaf<input_type, o utput_type, Body> & >(body_ref).get_body(); return dynamic_cast< internal::function_body_leaf<input_type, o utput_type, Body> & >(body_ref).get_body();
} }
protected: protected:
task *my_root_task; graph* my_graph_ptr;
function_body<input_type, output_type> *my_body; function_body<input_type, output_type> *my_body;
virtual broadcast_cache<output_type > &successors() = 0; virtual broadcast_cache<output_type > &successors() = 0;
friend class apply_body_task_bypass< continue_input< Output >, cont inue_msg >; friend class apply_body_task_bypass< continue_input< Output >, cont inue_msg >;
//! Applies the body to the provided input //! Applies the body to the provided input
/* override */ task *apply_body_bypass( input_type ) { /* override */ task *apply_body_bypass( input_type ) {
return successors().try_put_task( (*my_body)( continue_msg() ) ); return successors().try_put_task( (*my_body)( continue_msg() ) );
} }
//! Spawns a task that applies the body //! Spawns a task that applies the body
/* override */ task *execute( ) { /* override */ task *execute( ) {
task *res = new ( task::allocate_additional_child_of( *my_root_ task* tp = my_graph_ptr->root_task();
task ) ) return (tp) ?
apply_body_task_bypass< continue_input< Output >, continue_ new ( task::allocate_additional_child_of( *tp ) )
msg >( *this, continue_msg() ); apply_body_task_bypass< continue_input< Output >, conti
return res; nue_msg >( *this, continue_msg() ) :
NULL;
} }
}; }; // continue_input
//! Implements methods for both executable and function nodes that puts Output to its successors //! Implements methods for both executable and function nodes that puts Output to its successors
template< typename Output > template< typename Output >
class function_output : public sender<Output> { class function_output : public sender<Output> {
public: public:
typedef Output output_type; typedef Output output_type;
function_output() { my_successors.set_owner(this); } function_output() { my_successors.set_owner(this); }
function_output(const function_output & /*other*/) : sender<output_ type>() { function_output(const function_output & /*other*/) : sender<output_ type>() {
skipping to change at line 555 skipping to change at line 570
// //
// get<I>(output_ports).try_put(output_value); // get<I>(output_ports).try_put(output_value);
// //
// return value will be bool returned from successors.try_put. // return value will be bool returned from successors.try_put.
task *try_put_task(const output_type &i) { return my_successors.try _put_task(i); } task *try_put_task(const output_type &i) { return my_successors.try _put_task(i); }
protected: protected:
broadcast_cache<output_type> my_successors; broadcast_cache<output_type> my_successors;
broadcast_cache<output_type > &successors() { return my_successors; } broadcast_cache<output_type > &successors() { return my_successors; }
}; }; // function_output
template< typename Output > template< typename Output >
class multifunction_output : public function_output<Output> { class multifunction_output : public function_output<Output> {
public: public:
typedef Output output_type; typedef Output output_type;
typedef function_output<output_type> base_type; typedef function_output<output_type> base_type;
using base_type::my_successors; using base_type::my_successors;
multifunction_output() : base_type() {my_successors.set_owner(this) ;} multifunction_output() : base_type() {my_successors.set_owner(this) ;}
multifunction_output( const multifunction_output &/*other*/) : base _type() { my_successors.set_owner(this); } multifunction_output( const multifunction_output &/*other*/) : base _type() { my_successors.set_owner(this); }
bool try_put(const output_type &i) { bool try_put(const output_type &i) {
task *res = my_successors.try_put_task(i); task *res = my_successors.try_put_task(i);
if(!res) return false; if(!res) return false;
if(res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res); if(res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
return true; return true;
} }
}; }; // multifunction_output
} // internal } // internal
#endif // __TBB__flow_graph_node_impl_H #endif // __TBB__flow_graph_node_impl_H
 End of changes. 17 change blocks. 
28 lines changed or deleted 43 lines changed or added


 flow_graph.h   flow_graph.h 
skipping to change at line 82 skipping to change at line 82
class and its associated node classes can be used to express such class and its associated node classes can be used to express such
applcations. applcations.
*/ */
namespace tbb { namespace tbb {
namespace flow { namespace flow {
//! An enumeration the provides the two most common concurrency levels: unl imited and serial //! An enumeration the provides the two most common concurrency levels: unl imited and serial
enum concurrency { unlimited = 0, serial = 1 }; enum concurrency { unlimited = 0, serial = 1 };
namespace interface6 { namespace interface7 {
namespace internal { namespace internal {
template<typename T, typename M> class successor_cache; template<typename T, typename M> class successor_cache;
template<typename T, typename M> class broadcast_cache; template<typename T, typename M> class broadcast_cache;
template<typename T, typename M> class round_robin_cache; template<typename T, typename M> class round_robin_cache;
} }
//! An empty class used for messages that mean "I'm done" //! An empty class used for messages that mean "I'm done"
class continue_msg {}; class continue_msg {};
skipping to change at line 515 skipping to change at line 515
bool own_context; bool own_context;
bool cancelled; bool cancelled;
bool caught_exception; bool caught_exception;
graph_node *my_nodes, *my_nodes_last; graph_node *my_nodes, *my_nodes_last;
spin_mutex nodelist_mutex; spin_mutex nodelist_mutex;
void register_node(graph_node *n); void register_node(graph_node *n);
void remove_node(graph_node *n); void remove_node(graph_node *n);
}; }; // class graph
template <typename C, typename N> template <typename C, typename N>
graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), curren t_node(NULL) graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), curren t_node(NULL)
{ {
if (begin) current_node = my_graph->my_nodes; if (begin) current_node = my_graph->my_nodes;
//else it is an end iterator by default //else it is an end iterator by default
} }
template <typename C, typename N> template <typename C, typename N>
typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() co nst { typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() co nst {
skipping to change at line 585 skipping to change at line 585
if (n->prev) n->prev->next = n->next; if (n->prev) n->prev->next = n->next;
if (n->next) n->next->prev = n->prev; if (n->next) n->next->prev = n->prev;
if (my_nodes_last == n) my_nodes_last = n->prev; if (my_nodes_last == n) my_nodes_last = n->prev;
if (my_nodes == n) my_nodes = n->next; if (my_nodes == n) my_nodes = n->next;
} }
n->prev = n->next = NULL; n->prev = n->next = NULL;
} }
inline void graph::reset() { inline void graph::reset() {
// reset context // reset context
task *saved_my_root_task = my_root_task;
my_root_task = NULL;
if(my_context) my_context->reset(); if(my_context) my_context->reset();
cancelled = false; cancelled = false;
caught_exception = false; caught_exception = false;
// reset all the nodes comprising the graph // reset all the nodes comprising the graph
for(iterator ii = begin(); ii != end(); ++ii) { for(iterator ii = begin(); ii != end(); ++ii) {
graph_node *my_p = &(*ii); graph_node *my_p = &(*ii);
my_p->reset(); my_p->reset();
} }
my_root_task = saved_my_root_task;
} }
#include "internal/_flow_graph_node_impl.h" #include "internal/_flow_graph_node_impl.h"
//! An executable node that acts as a source, i.e. it has no predecessors //! An executable node that acts as a source, i.e. it has no predecessors
template < typename Output > template < typename Output >
class source_node : public graph_node, public sender< Output > { class source_node : public graph_node, public sender< Output > {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
public: public:
//! The type of the output message, which is complete //! The type of the output message, which is complete
typedef Output output_type; typedef Output output_type;
//! The type of successors of this node //! The type of successors of this node
typedef receiver< Output > successor_type; typedef receiver< Output > successor_type;
//! Constructor for a node with a successor //! Constructor for a node with a successor
template< typename Body > template< typename Body >
source_node( graph &g, Body body, bool is_active = true ) source_node( graph &g, Body body, bool is_active = true )
: graph_node(g), my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active), : graph_node(g), my_active(is_active), init_my_active(is_active),
my_body( new internal::source_body_leaf< output_type, Body>(body) ) , my_body( new internal::source_body_leaf< output_type, Body>(body) ) ,
my_reserved(false), my_has_cached_item(false) my_reserved(false), my_has_cached_item(false)
{ {
my_successors.set_owner(this); my_successors.set_owner(this);
} }
//! Copy constructor //! Copy constructor
source_node( const source_node& src ) : source_node( const source_node& src ) :
graph_node(src.my_graph), sender<Output>(), graph_node(src.my_graph), sender<Output>(),
my_root_task( src.my_root_task), my_active(src.init_my_active), my_active(src.init_my_active),
init_my_active(src.init_my_active), my_body( src.my_body->clone() ) , init_my_active(src.init_my_active), my_body( src.my_body->clone() ) ,
my_reserved(false), my_has_cached_item(false) my_reserved(false), my_has_cached_item(false)
{ {
my_successors.set_owner(this); my_successors.set_owner(this);
} }
//! The destructor //! The destructor
~source_node() { delete my_body; } ~source_node() { delete my_body; }
//! Add a new successor to this node //! Add a new successor to this node
skipping to change at line 730 skipping to change at line 733
//! resets the node to its initial state //! resets the node to its initial state
void reset() { void reset() {
my_active = init_my_active; my_active = init_my_active;
my_reserved =false; my_reserved =false;
if(my_has_cached_item) { if(my_has_cached_item) {
my_has_cached_item = false; my_has_cached_item = false;
} }
} }
private: private:
task *my_root_task;
spin_mutex my_mutex; spin_mutex my_mutex;
bool my_active; bool my_active;
bool init_my_active; bool init_my_active;
internal::source_body<output_type> *my_body; internal::source_body<output_type> *my_body;
internal::broadcast_cache< output_type > my_successors; internal::broadcast_cache< output_type > my_successors;
bool my_reserved; bool my_reserved;
bool my_has_cached_item; bool my_has_cached_item;
output_type my_cached_item; output_type my_cached_item;
// used by apply_body, can invoke body of node. // used by apply_body, can invoke body of node.
skipping to change at line 759 skipping to change at line 761
v = my_cached_item; v = my_cached_item;
my_reserved = true; my_reserved = true;
return true; return true;
} else { } else {
return false; return false;
} }
} }
//! Spawns a task that applies the body //! Spawns a task that applies the body
/* override */ void spawn_put( ) { /* override */ void spawn_put( ) {
FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_root_t task* tp = this->my_graph.root_task();
ask ) ) if(tp) {
internal:: source_task_bypass < source_node< output_typ FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp )
e > >( *this ) ) ); )
internal:: source_task_bypass < source_node< output
_type > >( *this ) ) );
}
} }
friend class internal::source_task_bypass< source_node< output_type > > ; friend class internal::source_task_bypass< source_node< output_type > > ;
//! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_ta sk_bypass will handle it. //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_ta sk_bypass will handle it.
/* override */ task * apply_body_bypass( ) { /* override */ task * apply_body_bypass( ) {
output_type v; output_type v;
if ( !try_reserve_apply_body(v) ) if ( !try_reserve_apply_body(v) )
return NULL; return NULL;
task *last_task = my_successors.try_put_task(v); task *last_task = my_successors.try_put_task(v);
skipping to change at line 838 skipping to change at line 843
typedef internal::function_output<output_type> fOutput_type; typedef internal::function_output<output_type> fOutput_type;
//! Constructor //! Constructor
template< typename Body > template< typename Body >
function_node( graph &g, size_t concurrency, Body body ) : function_node( graph &g, size_t concurrency, Body body ) :
graph_node(g), fInput_type( g, concurrency, body, new queue_type() ) graph_node(g), fInput_type( g, concurrency, body, new queue_type() )
{} {}
//! Copy constructor //! Copy constructor
function_node( const function_node& src ) : function_node( const function_node& src ) :
graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOu tput_type() graph_node(src.graph_node::my_graph), fInput_type( src, new queue_t ype() ), fOutput_type()
{} {}
protected: protected:
template< typename R, typename B > friend class run_and_put_task; template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache ; template<typename X, typename Y> friend class internal::broadcast_cache ;
template<typename X, typename Y> friend class internal::round_robin_cac he; template<typename X, typename Y> friend class internal::round_robin_cac he;
using fInput_type::try_put_task; using fInput_type::try_put_task;
/*override*/void reset() { fInput_type::reset_function_input(); } /*override*/void reset() { fInput_type::reset_function_input(); }
skipping to change at line 885 skipping to change at line 890
typedef typename internal::wrap_tuple_elements<N,internal::multifunctio n_output, Output>::type output_ports_type; typedef typename internal::wrap_tuple_elements<N,internal::multifunctio n_output, Output>::type output_ports_type;
private: private:
typedef typename internal::multifunction_input<input_type, output_ports _type, Allocator> base_type; typedef typename internal::multifunction_input<input_type, output_ports _type, Allocator> base_type;
typedef typename internal::function_input_queue<input_type,Allocator> q ueue_type; typedef typename internal::function_input_queue<input_type,Allocator> q ueue_type;
public: public:
template<typename Body> template<typename Body>
multifunction_node( graph &g, size_t concurrency, Body body ) : multifunction_node( graph &g, size_t concurrency, Body body ) :
graph_node(g), base_type(g,concurrency, body) graph_node(g), base_type(g,concurrency, body)
{} {}
multifunction_node( const multifunction_node &other) : multifunction_node( const multifunction_node &other) :
graph_node(other.my_graph), base_type(other) graph_node(other.graph_node::my_graph), base_type(other)
{} {}
// all the guts are in multifunction_input... // all the guts are in multifunction_input...
protected: protected:
/*override*/void reset() { base_type::reset(); } /*override*/void reset() { base_type::reset(); }
}; // multifunction_node }; // multifunction_node
template < typename Input, typename Output, typename Allocator > template < typename Input, typename Output, typename Allocator >
class multifunction_node<Input,Output,queueing,Allocator> : public graph_no de, public internal::multifunction_input<Input, class multifunction_node<Input,Output,queueing,Allocator> : public graph_no de, public internal::multifunction_input<Input,
typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v alue, internal::multifunction_output, Output>::type, Allocator> { typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v alue, internal::multifunction_output, Output>::type, Allocator> {
protected: protected:
skipping to change at line 910 skipping to change at line 915
typedef typename internal::wrap_tuple_elements<N, internal::multifuncti on_output, Output>::type output_ports_type; typedef typename internal::wrap_tuple_elements<N, internal::multifuncti on_output, Output>::type output_ports_type;
private: private:
typedef typename internal::multifunction_input<input_type, output_ports _type, Allocator> base_type; typedef typename internal::multifunction_input<input_type, output_ports _type, Allocator> base_type;
typedef typename internal::function_input_queue<input_type,Allocator> q ueue_type; typedef typename internal::function_input_queue<input_type,Allocator> q ueue_type;
public: public:
template<typename Body> template<typename Body>
multifunction_node( graph &g, size_t concurrency, Body body) : multifunction_node( graph &g, size_t concurrency, Body body) :
graph_node(g), base_type(g,concurrency, body, new queue_type()) graph_node(g), base_type(g,concurrency, body, new queue_type())
{} {}
multifunction_node( const multifunction_node &other) : multifunction_node( const multifunction_node &other) :
graph_node(other.my_graph), base_type(other, new queue_type()) graph_node(other.graph_node::my_graph), base_type(other, new queue_ type())
{} {}
// all the guts are in multifunction_input... // all the guts are in multifunction_input...
protected: protected:
/*override*/void reset() { base_type::reset(); } /*override*/void reset() { base_type::reset(); }
}; // multifunction_node }; // multifunction_node
//! split_node: accepts a tuple as input, forwards each element of the tupl e to its //! split_node: accepts a tuple as input, forwards each element of the tupl e to its
// successors. The node has unlimited concurrency, so though it is marked as // successors. The node has unlimited concurrency, so though it is marked as
// "rejecting" it does not reject inputs. // "rejecting" it does not reject inputs.
template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup leType> > template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup leType> >
skipping to change at line 966 skipping to change at line 971
{} {}
//! Constructor for executable node with continue_msg -> Output //! Constructor for executable node with continue_msg -> Output
template <typename Body > template <typename Body >
continue_node( graph &g, int number_of_predecessors, Body body ) : continue_node( graph &g, int number_of_predecessors, Body body ) :
graph_node(g), internal::continue_input<output_type>( g, number_of_ predecessors, body ) graph_node(g), internal::continue_input<output_type>( g, number_of_ predecessors, body )
{} {}
//! Copy constructor //! Copy constructor
continue_node( const continue_node& src ) : continue_node( const continue_node& src ) :
graph_node(src.my_graph), internal::continue_input<output_type>(src ), graph_node(src.graph_node::my_graph), internal::continue_input<outp ut_type>(src),
internal::function_output<Output>() internal::function_output<Output>()
{} {}
protected: protected:
template< typename R, typename B > friend class run_and_put_task; template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache ; template<typename X, typename Y> friend class internal::broadcast_cache ;
template<typename X, typename Y> friend class internal::round_robin_cac he; template<typename X, typename Y> friend class internal::round_robin_cac he;
using fInput_type::try_put_task; using fInput_type::try_put_task;
/*override*/void reset() { internal::continue_input<Output>::reset_rece iver(); } /*override*/void reset() { internal::continue_input<Output>::reset_rece iver(); }
/* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; } /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
}; }; // continue_node
template< typename T > template< typename T >
class overwrite_node : public graph_node, public receiver<T>, public sender <T> { class overwrite_node : public graph_node, public receiver<T>, public sender <T> {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
public: public:
typedef T input_type; typedef T input_type;
typedef T output_type; typedef T output_type;
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
skipping to change at line 1171 skipping to change at line 1176
public: public:
typedef T input_type; typedef T input_type;
typedef T output_type; typedef T output_type;
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
typedef buffer_node<T, A> my_class; typedef buffer_node<T, A> my_class;
protected: protected:
typedef size_t size_type; typedef size_t size_type;
internal::round_robin_cache< T, null_rw_mutex > my_successors; internal::round_robin_cache< T, null_rw_mutex > my_successors;
task *my_parent;
friend class internal::forward_task_bypass< buffer_node< T, A > >; friend class internal::forward_task_bypass< buffer_node< T, A > >;
enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task }; enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task };
enum op_stat {WAIT=0, SUCCEEDED, FAILED}; enum op_stat {WAIT=0, SUCCEEDED, FAILED};
// implements the aggregator_operation concept // implements the aggregator_operation concept
class buffer_operation : public internal::aggregated_operation< buffer_ operation > { class buffer_operation : public internal::aggregated_operation< buffer_ operation > {
public: public:
char type; char type;
T *elem; T *elem;
skipping to change at line 1213 skipping to change at line 1216
case req_item: internal_pop(tmp); break; case req_item: internal_pop(tmp); break;
case res_item: internal_reserve(tmp); break; case res_item: internal_reserve(tmp); break;
case rel_res: internal_release(tmp); try_forwarding = true; b reak; case rel_res: internal_release(tmp); try_forwarding = true; b reak;
case con_res: internal_consume(tmp); try_forwarding = true; b reak; case con_res: internal_consume(tmp); try_forwarding = true; b reak;
case put_item: internal_push(tmp); try_forwarding = true; brea k; case put_item: internal_push(tmp); try_forwarding = true; brea k;
case try_fwd_task: internal_forward_task(tmp); break; case try_fwd_task: internal_forward_task(tmp); break;
} }
} }
if (try_forwarding && !forwarder_busy) { if (try_forwarding && !forwarder_busy) {
forwarder_busy = true; forwarder_busy = true;
task *new_task = new(task::allocate_additional_child_of(*my_par task* tp = this->my_graph.root_task();
ent)) internal:: if(tp) {
forward_task_bypass task *new_task = new(task::allocate_additional_child_of(*tp
< buffer_node<input_type, A> >(*this); )) internal::
// tmp should point to the last item handled by the aggregator. forward_task_bypass
This is the operation < buffer_node<input_type, A> >(*this);
// the handling thread enqueued. So modifying that record will // tmp should point to the last item handled by the aggrega
be okay. tor. This is the operation
tbb::task *z = tmp->ltask; // the handling thread enqueued. So modifying that record
tmp->ltask = combine_tasks(z, new_task); // in case the op gen will be okay.
erated a task tbb::task *z = tmp->ltask;
tmp->ltask = combine_tasks(z, new_task); // in case the op
generated a task
}
} }
} }
inline task *grab_forwarding_task( buffer_operation &op_data) { inline task *grab_forwarding_task( buffer_operation &op_data) {
return op_data.ltask; return op_data.ltask;
} }
inline bool enqueue_forwarding_task(buffer_operation &op_data) { inline bool enqueue_forwarding_task(buffer_operation &op_data) {
task *ft = grab_forwarding_task(op_data); task *ft = grab_forwarding_task(op_data);
if(ft) { if(ft) {
skipping to change at line 1329 skipping to change at line 1335
} }
virtual void internal_release(buffer_operation *op) { virtual void internal_release(buffer_operation *op) {
this->release_front(); this->release_front();
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
public: public:
//! Constructor //! Constructor
buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(), buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(),
my_parent( g.root_task() ), forwarder_busy(false) { forwarder_busy(false) {
my_successors.set_owner(this); my_successors.set_owner(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
//! Copy constructor //! Copy constructor
buffer_node( const buffer_node& src ) : graph_node(src.my_graph), buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
reservable_item_buffer<T>(), receiver<T>(), sender<T>(), reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
my_parent( src.my_parent ) {
forwarder_busy = false; forwarder_busy = false;
my_successors.set_owner(this); my_successors.set_owner(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
virtual ~buffer_node() {} virtual ~buffer_node() {}
// //
// message sender implementation // message sender implementation
// //
skipping to change at line 1607 skipping to change at line 1612
case buffer_node<T, A>::rel_res: internal_release(tmp); try_for warding = true; break; case buffer_node<T, A>::rel_res: internal_release(tmp); try_for warding = true; break;
case buffer_node<T, A>::con_res: internal_consume(tmp); try_for warding = true; break; case buffer_node<T, A>::con_res: internal_consume(tmp); try_for warding = true; break;
case buffer_node<T, A>::req_item: internal_pop(tmp); break; case buffer_node<T, A>::req_item: internal_pop(tmp); break;
case buffer_node<T, A>::res_item: internal_reserve(tmp); break; case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
} }
} }
// process pops! for now, no special pop processing // process pops! for now, no special pop processing
if (mark<this->my_tail) heapify(); if (mark<this->my_tail) heapify();
if (try_forwarding && !this->forwarder_busy) { if (try_forwarding && !this->forwarder_busy) {
this->forwarder_busy = true; this->forwarder_busy = true;
task *new_task = new(task::allocate_additional_child_of(*(this- task* tp = this->my_graph.root_task();
>my_parent))) internal:: if(tp) {
forward_task_bypass task *new_task = new(task::allocate_additional_child_of(*tp
< buffer_node<input_type, A> >(*this); )) internal::
// tmp should point to the last item handled by the aggregator. forward_task_bypass
This is the operation < buffer_node<input_type, A> >(*this);
// the handling thread enqueued. So modifying that record will // tmp should point to the last item handled by the aggrega
be okay. tor. This is the operation
tbb::task *tmp1 = tmp->ltask; // the handling thread enqueued. So modifying that record
tmp->ltask = combine_tasks(tmp1, new_task); will be okay.
tbb::task *tmp1 = tmp->ltask;
tmp->ltask = combine_tasks(tmp1, new_task);
}
} }
} }
//! Tries to forward valid items to successors //! Tries to forward valid items to successors
/* override */ void internal_forward_task(prio_operation *op) { /* override */ void internal_forward_task(prio_operation *op) {
T i_copy; T i_copy;
task * last_task = NULL; // flagged when a successor accepts task * last_task = NULL; // flagged when a successor accepts
size_type counter = this->my_successors.size(); size_type counter = this->my_successors.size();
if (this->my_reserved || this->my_tail == 0) { if (this->my_reserved || this->my_tail == 0) {
skipping to change at line 1768 skipping to change at line 1776
class limiter_node : public graph_node, public receiver< T >, public sender < T > { class limiter_node : public graph_node, public receiver< T >, public sender < T > {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
public: public:
typedef T input_type; typedef T input_type;
typedef T output_type; typedef T output_type;
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
private: private:
task *my_root_task;
size_t my_threshold; size_t my_threshold;
size_t my_count; size_t my_count;
internal::predecessor_cache< T > my_predecessors; internal::predecessor_cache< T > my_predecessors;
spin_mutex my_mutex; spin_mutex my_mutex;
internal::broadcast_cache< T > my_successors; internal::broadcast_cache< T > my_successors;
int init_decrement_predecessors; int init_decrement_predecessors;
friend class internal::forward_task_bypass< limiter_node<T> >; friend class internal::forward_task_bypass< limiter_node<T> >;
// Let decrementer call decrement_counter() // Let decrementer call decrement_counter()
skipping to change at line 1790 skipping to change at line 1797
// only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEU ED // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEU ED
task * decrement_counter() { task * decrement_counter() {
input_type v; input_type v;
task *rval = NULL; task *rval = NULL;
// If we can't get / put an item immediately then drop the count // If we can't get / put an item immediately then drop the count
if ( my_predecessors.get_item( v ) == false if ( my_predecessors.get_item( v ) == false
|| (rval = my_successors.try_put_task(v)) == NULL ) { || (rval = my_successors.try_put_task(v)) == NULL ) {
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
--my_count; if(my_count) --my_count;
if ( !my_predecessors.empty() ) { task* tp = this->my_graph.root_task();
task *rtask = new ( task::allocate_additional_child_of( *my if ( !my_predecessors.empty() && tp ) {
_root_task ) ) task *rtask = new ( task::allocate_additional_child_of( *tp
) )
internal::forward_task_bypass< limiter_node<T> >( *this ); internal::forward_task_bypass< limiter_node<T> >( *this );
__TBB_ASSERT(!rval, "Have two tasks to handle"); __TBB_ASSERT(!rval, "Have two tasks to handle");
return rtask; return rtask;
} }
} }
return rval; return rval;
} }
void forward() { void forward() {
{ {
skipping to change at line 1814 skipping to change at line 1822
if ( my_count < my_threshold ) if ( my_count < my_threshold )
++my_count; ++my_count;
else else
return; return;
} }
task * rtask = decrement_counter(); task * rtask = decrement_counter();
if(rtask) FLOW_SPAWN(*rtask); if(rtask) FLOW_SPAWN(*rtask);
} }
task *forward_task() { task *forward_task() {
spin_mutex::scoped_lock lock(my_mutex); {
if ( my_count >= my_threshold ) spin_mutex::scoped_lock lock(my_mutex);
return NULL; if ( my_count >= my_threshold )
++my_count; return NULL;
++my_count;
}
task * rtask = decrement_counter(); task * rtask = decrement_counter();
return rtask; return rtask;
} }
public: public:
//! The internal receiver< continue_msg > that decrements the count //! The internal receiver< continue_msg > that decrements the count
internal::decrementer< limiter_node<T> > decrement; internal::decrementer< limiter_node<T> > decrement;
//! Constructor //! Constructor
limiter_node(graph &g, size_t threshold, int num_decrement_predecessors =0) : limiter_node(graph &g, size_t threshold, int num_decrement_predecessors =0) :
graph_node(g), my_root_task(g.root_task()), my_threshold(threshold) , my_count(0), graph_node(g), my_threshold(threshold), my_count(0),
init_decrement_predecessors(num_decrement_predecessors), init_decrement_predecessors(num_decrement_predecessors),
decrement(num_decrement_predecessors) decrement(num_decrement_predecessors)
{ {
my_predecessors.set_owner(this); my_predecessors.set_owner(this);
my_successors.set_owner(this); my_successors.set_owner(this);
decrement.set_owner(this); decrement.set_owner(this);
} }
//! Copy constructor //! Copy constructor
limiter_node( const limiter_node& src ) : limiter_node( const limiter_node& src ) :
graph_node(src.my_graph), receiver<T>(), sender<T>(), graph_node(src.my_graph), receiver<T>(), sender<T>(),
my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_ count(0), my_threshold(src.my_threshold), my_count(0),
init_decrement_predecessors(src.init_decrement_predecessors), init_decrement_predecessors(src.init_decrement_predecessors),
decrement(src.init_decrement_predecessors) decrement(src.init_decrement_predecessors)
{ {
my_predecessors.set_owner(this); my_predecessors.set_owner(this);
my_successors.set_owner(this); my_successors.set_owner(this);
decrement.set_owner(this); decrement.set_owner(this);
} }
//! Replace the current successor with this new successor //! Replace the current successor with this new successor
/* override */ bool register_successor( receiver<output_type> &r ) { /* override */ bool register_successor( receiver<output_type> &r ) {
skipping to change at line 1867 skipping to change at line 1877
/* override */ bool remove_successor( receiver<output_type> &r ) { /* override */ bool remove_successor( receiver<output_type> &r ) {
r.remove_predecessor(*this); r.remove_predecessor(*this);
my_successors.remove_successor(r); my_successors.remove_successor(r);
return true; return true;
} }
//! Removes src from the list of cached predecessors. //! Removes src from the list of cached predecessors.
/* override */ bool register_predecessor( predecessor_type &src ) { /* override */ bool register_predecessor( predecessor_type &src ) {
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
my_predecessors.add( src ); my_predecessors.add( src );
if ( my_count < my_threshold && !my_successors.empty() ) { task* tp = this->my_graph.root_task();
FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_ro if ( my_count < my_threshold && !my_successors.empty() && tp ) {
ot_task ) ) FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp )
)
internal::forward_task_bypass < limiter_node<T> >( *this ) ) ); internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
} }
return true; return true;
} }
//! Removes src from the list of cached predecessors. //! Removes src from the list of cached predecessors.
/* override */ bool remove_predecessor( predecessor_type &src ) { /* override */ bool remove_predecessor( predecessor_type &src ) {
my_predecessors.remove( src ); my_predecessors.remove( src );
return true; return true;
} }
skipping to change at line 1900 skipping to change at line 1911
return NULL; return NULL;
else else
++my_count; ++my_count;
} }
task * rtask = my_successors.try_put_task(t); task * rtask = my_successors.try_put_task(t);
if ( !rtask ) { // try_put_task failed. if ( !rtask ) { // try_put_task failed.
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
--my_count; --my_count;
if ( !my_predecessors.empty() ) { task* tp = this->my_graph.root_task();
rtask = new ( task::allocate_additional_child_of( *my_root_ if ( !my_predecessors.empty() && tp ) {
task ) ) rtask = new ( task::allocate_additional_child_of( *tp ) )
internal::forward_task_bypass< limiter_node<T> >( *this ); internal::forward_task_bypass< limiter_node<T> >( *this );
} }
} }
return rtask; return rtask;
} }
/*override*/void reset() { /*override*/void reset() {
my_count = 0; my_count = 0;
my_predecessors.reset(); my_predecessors.reset();
decrement.reset_receiver(); decrement.reset_receiver();
skipping to change at line 2039 skipping to change at line 2051
inline void remove_edge( sender<T> &p, receiver<T> &s ) { inline void remove_edge( sender<T> &p, receiver<T> &s ) {
p.remove_successor( s ); p.remove_successor( s );
} }
//! Returns a copy of the body from a function or continue node //! Returns a copy of the body from a function or continue node
template< typename Body, typename Node > template< typename Body, typename Node >
Body copy_body( Node &n ) { Body copy_body( Node &n ) {
return n.template copy_function_object<Body>(); return n.template copy_function_object<Body>();
} }
} // interface6 } // interface7
using interface6::graph; using interface7::graph;
using interface6::graph_node; using interface7::graph_node;
using interface6::continue_msg; using interface7::continue_msg;
using interface6::sender; using interface7::sender;
using interface6::receiver; using interface7::receiver;
using interface6::continue_receiver; using interface7::continue_receiver;
using interface6::source_node; using interface7::source_node;
using interface6::function_node; using interface7::function_node;
using interface6::multifunction_node; using interface7::multifunction_node;
using interface6::split_node; using interface7::split_node;
using interface6::internal::output_port; using interface7::internal::output_port;
#if TBB_PREVIEW_GRAPH_NODES #if TBB_PREVIEW_GRAPH_NODES
using interface6::or_node; using interface7::or_node;
#endif #endif
using interface6::continue_node; using interface7::continue_node;
using interface6::overwrite_node; using interface7::overwrite_node;
using interface6::write_once_node; using interface7::write_once_node;
using interface6::broadcast_node; using interface7::broadcast_node;
using interface6::buffer_node; using interface7::buffer_node;
using interface6::queue_node; using interface7::queue_node;
using interface6::sequencer_node; using interface7::sequencer_node;
using interface6::priority_queue_node; using interface7::priority_queue_node;
using interface6::limiter_node; using interface7::limiter_node;
using namespace interface6::internal::graph_policy_namespace; using namespace interface7::internal::graph_policy_namespace;
using interface6::join_node; using interface7::join_node;
using interface6::input_port; using interface7::input_port;
using interface6::copy_body; using interface7::copy_body;
using interface6::make_edge; using interface7::make_edge;
using interface6::remove_edge; using interface7::remove_edge;
using interface6::internal::NO_TAG; using interface7::internal::NO_TAG;
using interface6::internal::tag_value; using interface7::internal::tag_value;
} // flow } // flow
} // tbb } // tbb
#endif // __TBB_flow_graph_H #endif // __TBB_flow_graph_H
 End of changes. 29 change blocks. 
88 lines changed or deleted 99 lines changed or added


 memory_pool.h   memory_pool.h 
skipping to change at line 242 skipping to change at line 242
template <typename Alloc> template <typename Alloc>
void *memory_pool<Alloc>::allocate_request(intptr_t pool_id, size_t & bytes ) { void *memory_pool<Alloc>::allocate_request(intptr_t pool_id, size_t & bytes ) {
memory_pool<Alloc> &self = *reinterpret_cast<memory_pool<Alloc>*>(pool_ id); memory_pool<Alloc> &self = *reinterpret_cast<memory_pool<Alloc>*>(pool_ id);
const size_t unit_size = sizeof(typename Alloc::value_type); const size_t unit_size = sizeof(typename Alloc::value_type);
__TBBMALLOC_ASSERT( 0 == bytes%unit_size, NULL); __TBBMALLOC_ASSERT( 0 == bytes%unit_size, NULL);
void *ptr; void *ptr;
__TBB_TRY { ptr = self.my_alloc.allocate( bytes/unit_size ); } __TBB_TRY { ptr = self.my_alloc.allocate( bytes/unit_size ); }
__TBB_CATCH(...) { return 0; } __TBB_CATCH(...) { return 0; }
return ptr; return ptr;
} }
#if _MSC_VER==1700 && !defined(__INTEL_COMPILER) #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
// Workaround for erroneous "unreachable code" warning in the template below. // Workaround for erroneous "unreachable code" warning in the template below.
// Specific for VC++ 17 compiler // Specific for VC++ 17-18 compiler
#pragma warning (push) #pragma warning (push)
#pragma warning (disable: 4702) #pragma warning (disable: 4702)
#endif #endif
template <typename Alloc> template <typename Alloc>
int memory_pool<Alloc>::deallocate_request(intptr_t pool_id, void* raw_ptr, size_t raw_bytes) { int memory_pool<Alloc>::deallocate_request(intptr_t pool_id, void* raw_ptr, size_t raw_bytes) {
memory_pool<Alloc> &self = *reinterpret_cast<memory_pool<Alloc>*>(pool_ id); memory_pool<Alloc> &self = *reinterpret_cast<memory_pool<Alloc>*>(pool_ id);
const size_t unit_size = sizeof(typename Alloc::value_type); const size_t unit_size = sizeof(typename Alloc::value_type);
__TBBMALLOC_ASSERT( 0 == raw_bytes%unit_size, NULL); __TBBMALLOC_ASSERT( 0 == raw_bytes%unit_size, NULL);
self.my_alloc.deallocate( static_cast<typename Alloc::value_type*>(raw_ ptr), raw_bytes/unit_size ); self.my_alloc.deallocate( static_cast<typename Alloc::value_type*>(raw_ ptr), raw_bytes/unit_size );
return 0; return 0;
} }
#if _MSC_VER==1700 && !defined(__INTEL_COMPILER) #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
#pragma warning (pop) #pragma warning (pop)
#endif #endif
inline fixed_pool::fixed_pool(void *buf, size_t size) : my_buffer(buf), my_ size(size) { inline fixed_pool::fixed_pool(void *buf, size_t size) : my_buffer(buf), my_ size(size) {
rml::MemPoolPolicy args(allocate_request, 0, size, /*fixedPool=*/true); rml::MemPoolPolicy args(allocate_request, 0, size, /*fixedPool=*/true);
rml::MemPoolError res = rml::pool_create_v1(intptr_t(this), &args, &my_ pool); rml::MemPoolError res = rml::pool_create_v1(intptr_t(this), &args, &my_ pool);
if( res!=rml::POOL_OK ) __TBB_THROW(std::bad_alloc()); if( res!=rml::POOL_OK ) __TBB_THROW(std::bad_alloc());
} }
inline void *fixed_pool::allocate_request(intptr_t pool_id, size_t & bytes) { inline void *fixed_pool::allocate_request(intptr_t pool_id, size_t & bytes) {
fixed_pool &self = *reinterpret_cast<fixed_pool*>(pool_id); fixed_pool &self = *reinterpret_cast<fixed_pool*>(pool_id);
// TODO: we can implement "buffer for fixed pools used only once" polic y // TODO: we can implement "buffer for fixed pools used only once" polic y
 End of changes. 3 change blocks. 
3 lines changed or deleted 3 lines changed or added


 scalable_allocator.h   scalable_allocator.h 
skipping to change at line 105 skipping to change at line 105
TBBMALLOC_UNSUPPORTED, TBBMALLOC_UNSUPPORTED,
TBBMALLOC_NO_MEMORY, TBBMALLOC_NO_MEMORY,
TBBMALLOC_NO_EFFECT TBBMALLOC_NO_EFFECT
} ScalableAllocationResult; } ScalableAllocationResult;
/* Setting TBB_MALLOC_USE_HUGE_PAGES environment variable to 1 enables huge pages. /* Setting TBB_MALLOC_USE_HUGE_PAGES environment variable to 1 enables huge pages.
scalable_allocation_mode call has priority over environment variable. */ scalable_allocation_mode call has priority over environment variable. */
typedef enum { typedef enum {
TBBMALLOC_USE_HUGE_PAGES, /* value turns using huge pages on and off * / TBBMALLOC_USE_HUGE_PAGES, /* value turns using huge pages on and off * /
/* deprecated, kept for backward compatibility only */ /* deprecated, kept for backward compatibility only */
USE_HUGE_PAGES = TBBMALLOC_USE_HUGE_PAGES USE_HUGE_PAGES = TBBMALLOC_USE_HUGE_PAGES,
/* try to limit memory consumption value Bytes, clean internal buffers
if limit is exceeded, but not prevents from requesting memory from O
S */
TBBMALLOC_SET_SOFT_HEAP_LIMIT
} AllocationModeParam; } AllocationModeParam;
/** Set TBB allocator-specific allocation modes. /** Set TBB allocator-specific allocation modes.
@ingroup memory_allocation */ @ingroup memory_allocation */
int __TBB_EXPORTED_FUNC scalable_allocation_mode(int param, intptr_t value) ; int __TBB_EXPORTED_FUNC scalable_allocation_mode(int param, intptr_t value) ;
typedef enum { typedef enum {
/* Clean internal allocator buffers for all threads. /* Clean internal allocator buffers for all threads.
Returns TBBMALLOC_NO_EFFECT if no buffers cleaned, Returns TBBMALLOC_NO_EFFECT if no buffers cleaned,
TBBMALLOC_OK if some memory released from buffers. */ TBBMALLOC_OK if some memory released from buffers. */
skipping to change at line 132 skipping to change at line 135
/** Call TBB allocator-specific commands. /** Call TBB allocator-specific commands.
@ingroup memory_allocation */ @ingroup memory_allocation */
int __TBB_EXPORTED_FUNC scalable_allocation_command(int cmd, void *param); int __TBB_EXPORTED_FUNC scalable_allocation_command(int cmd, void *param);
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */
#endif /* __cplusplus */ #endif /* __cplusplus */
#ifdef __cplusplus #ifdef __cplusplus
//! The namespace rml contains components of low-level memory pool interfac e.
namespace rml { namespace rml {
class MemoryPool; class MemoryPool;
typedef void *(*rawAllocType)(intptr_t pool_id, size_t &bytes); typedef void *(*rawAllocType)(intptr_t pool_id, size_t &bytes);
typedef int (*rawFreeType)(intptr_t pool_id, void* raw_ptr, size_t raw_by tes); typedef int (*rawFreeType)(intptr_t pool_id, void* raw_ptr, size_t raw_by tes);
/* /*
MemPoolPolicy extension must be compatible with such structure fields layou t MemPoolPolicy extension must be compatible with such structure fields layou t
struct MemPoolPolicy { struct MemPoolPolicy {
 End of changes. 2 change blocks. 
1 lines changed or deleted 6 lines changed or added


 task.h   task.h 
skipping to change at line 351 skipping to change at line 351
#endif /* !TBB_USE_CAPTURED_EXCEPTION */ #endif /* !TBB_USE_CAPTURED_EXCEPTION */
}; };
private: private:
enum state { enum state {
may_have_children = 1 may_have_children = 1
}; };
union { union {
//! Flavor of this context: bound or isolated. //! Flavor of this context: bound or isolated.
kind_type my_kind; // TODO: describe asynchronous use, and whether any memory semantic
s are needed
__TBB_atomic kind_type my_kind;
uintptr_t _my_kind_aligner; uintptr_t _my_kind_aligner;
}; };
//! Pointer to the context of the parent cancellation group. NULL for i solated contexts. //! Pointer to the context of the parent cancellation group. NULL for i solated contexts.
task_group_context *my_parent; task_group_context *my_parent;
//! Used to form the thread specific list of contexts without additiona l memory allocation. //! Used to form the thread specific list of contexts without additiona l memory allocation.
/** A context is included into the list of the current thread when its binding to /** A context is included into the list of the current thread when its binding to
its parent happens. Any context can be present in the list of one t hread only. **/ its parent happens. Any context can be present in the list of one t hread only. **/
internal::context_list_node_t my_node; internal::context_list_node_t my_node;
skipping to change at line 871 skipping to change at line 872
}; // class task }; // class task
//! task that does nothing. Useful for synchronization. //! task that does nothing. Useful for synchronization.
/** @ingroup task_scheduling */ /** @ingroup task_scheduling */
class empty_task: public task { class empty_task: public task {
/*override*/ task* execute() { /*override*/ task* execute() {
return NULL; return NULL;
} }
}; };
//! @cond INTERNAL
namespace internal {
template<typename F>
class function_task : public task {
F my_func;
/*override*/ task* execute() {
my_func();
return NULL;
}
public:
function_task( const F& f ) : my_func(f) {}
};
} // namespace internal
//! @endcond
//! A list of children. //! A list of children.
/** Used for method task::spawn_children /** Used for method task::spawn_children
@ingroup task_scheduling */ @ingroup task_scheduling */
class task_list: internal::no_copy { class task_list: internal::no_copy {
private: private:
task* first; task* first;
task** next_ptr; task** next_ptr;
friend class task; friend class task;
friend class interface5::internal::task_base; friend class interface5::internal::task_base;
public: public:
 End of changes. 2 change blocks. 
1 lines changed or deleted 18 lines changed or added


 task_arena.h   task_arena.h 
skipping to change at line 34 skipping to change at line 34
the GNU General Public License. This exception does not however the GNU General Public License. This exception does not however
invalidate any other reasons why the executable file might be covered b y invalidate any other reasons why the executable file might be covered b y
the GNU General Public License. the GNU General Public License.
*/ */
#ifndef __TBB_task_arena_H #ifndef __TBB_task_arena_H
#define __TBB_task_arena_H #define __TBB_task_arena_H
#include "task.h" #include "task.h"
#include "tbb_exception.h" #include "tbb_exception.h"
#if TBB_USE_THREADING_TOOLS
#include "atomic.h" // for as_atomic
#endif
#if __TBB_TASK_ARENA #if __TBB_TASK_ARENA
namespace tbb { namespace tbb {
//! @cond INTERNAL //! @cond INTERNAL
namespace internal { namespace internal {
//! Internal to library. Should not be used by clients. //! Internal to library. Should not be used by clients.
/** @ingroup task_scheduling */ /** @ingroup task_scheduling */
class arena; class arena;
class task_scheduler_observer_v3; class task_scheduler_observer_v3;
} // namespace internal } // namespace internal
//! @endcond //! @endcond
namespace interface6 { namespace interface7 {
//! @cond INTERNAL //! @cond INTERNAL
namespace internal { namespace internal {
using namespace tbb::internal; using namespace tbb::internal; //e.g. function_task from task.h
template<typename F>
class enqueued_function_task : public task { // TODO: reuse from task_group
?
F my_func;
/*override*/ task* execute() {
my_func();
return NULL;
}
public:
enqueued_function_task ( const F& f ) : my_func(f) {}
};
class delegate_base : no_assign { class delegate_base : no_assign {
public: public:
virtual void operator()() const = 0; virtual void operator()() const = 0;
virtual ~delegate_base() {} virtual ~delegate_base() {}
}; };
template<typename F> template<typename F>
class delegated_function : public delegate_base { class delegated_function : public delegate_base {
F &my_func; F &my_func;
/*override*/ void operator()() const { /*override*/ void operator()() const {
my_func(); my_func();
} }
public: public:
delegated_function ( F& f ) : my_func(f) {} delegated_function ( F& f ) : my_func(f) {}
}; };
} // namespace internal
//! @endcond
/** 1-to-1 proxy representation class of scheduler's arena class task_arena_base {
* Constructors set up settings only, real construction is deferred till th protected:
e first method invocation //! NULL if not currently initialized.
* TODO: A side effect of this is that it's impossible to create a const ta internal::arena* my_arena;
sk_arena object. Rethink?
* Destructor only removes one of the references to the inner arena represe #if __TBB_TASK_GROUP_CONTEXT
ntation. //! default context of the arena
* Final destruction happens when all the references (and the work) are gon task_group_context *my_context;
e. #endif
*/
class task_arena {
friend class internal::task_scheduler_observer_v3;
//! Concurrency level for deferred initialization //! Concurrency level for deferred initialization
int my_max_concurrency; int my_max_concurrency;
//! Reserved master slots //! Reserved master slots
unsigned my_master_slots; unsigned my_master_slots;
//! NULL if not currently initialized. //! Reserved for future use
internal::arena* my_arena; intptr_t my_reserved;
// Initialization flag enabling compiler to throw excessive lazy initia task_arena_base(int max_concurrency, unsigned reserved_for_masters)
lization checks : my_arena(0)
bool my_initialized; #if __TBB_TASK_GROUP_CONTEXT
, my_context(0)
#endif
, my_max_concurrency(max_concurrency)
, my_master_slots(reserved_for_masters)
, my_reserved(0)
{}
// const methods help to optimize the !my_arena check TODO: check, IDEA : move to base-class?
void __TBB_EXPORTED_METHOD internal_initialize( ); void __TBB_EXPORTED_METHOD internal_initialize( );
void __TBB_EXPORTED_METHOD internal_terminate( ); void __TBB_EXPORTED_METHOD internal_terminate( );
void __TBB_EXPORTED_METHOD internal_enqueue( task&, intptr_t ) const; void __TBB_EXPORTED_METHOD internal_enqueue( task&, intptr_t ) const;
void __TBB_EXPORTED_METHOD internal_execute( internal::delegate_base& ) const; void __TBB_EXPORTED_METHOD internal_execute( delegate_base& ) const;
void __TBB_EXPORTED_METHOD internal_wait() const; void __TBB_EXPORTED_METHOD internal_wait() const;
static int __TBB_EXPORTED_FUNC internal_current_slot();
public: public:
//! Typedef for number of threads that is automatic. //! Typedef for number of threads that is automatic.
static const int automatic = -1; // any value < 1 means 'automatic' static const int automatic = -1; // any value < 1 means 'automatic'
};
} // namespace internal
//! @endcond
/** 1-to-1 proxy representation class of scheduler's arena
* Constructors set up settings only, real construction is deferred till th
e first method invocation
* Destructor only removes one of the references to the inner arena represe
ntation.
* Final destruction happens when all the references (and the work) are gon
e.
*/
class task_arena : public internal::task_arena_base {
friend class tbb::internal::task_scheduler_observer_v3;
bool my_initialized;
public:
//! Creates task_arena with certain concurrency limits //! Creates task_arena with certain concurrency limits
/** @arg max_concurrency specifies total number of slots in arena where /** Sets up settings only, real construction is deferred till the first
threads work method invocation
* @arg max_concurrency specifies total number of slots in arena where
threads work
* @arg reserved_for_masters specifies number of slots to be used by m aster threads only. * @arg reserved_for_masters specifies number of slots to be used by m aster threads only.
* Value of 1 is default and reflects behavior of implicit arenas . * Value of 1 is default and reflects behavior of implicit arenas .
**/ **/
task_arena(int max_concurrency = automatic, unsigned reserved_for_maste rs = 1) task_arena(int max_concurrency = automatic, unsigned reserved_for_maste rs = 1)
: my_max_concurrency(max_concurrency) : task_arena_base(max_concurrency, reserved_for_masters)
, my_master_slots(reserved_for_masters)
, my_arena(0)
, my_initialized(false) , my_initialized(false)
{} {}
//! Copies settings from another task_arena //! Copies settings from another task_arena
task_arena(const task_arena &s) task_arena(const task_arena &s) // copy settings but not the reference
: my_max_concurrency(s.my_max_concurrency) // copy settings or instance
, my_master_slots(s.my_master_slots) : task_arena_base(s.my_max_concurrency, s.my_master_slots)
, my_arena(0) // but not the reference or instance
, my_initialized(false) , my_initialized(false)
{} {}
//! Forces allocation of the resources for the task_arena as specified in constructor arguments
inline void initialize() { inline void initialize() {
if( !my_initialized ) { if( !my_initialized ) {
internal_initialize(); internal_initialize();
#if TBB_USE_THREADING_TOOLS
// Threading tools respect lock prefix but report false-positiv
e data-race via plain store
internal::as_atomic(my_initialized).fetch_and_store<release>(tr
ue);
#else
my_initialized = true; my_initialized = true;
#endif //TBB_USE_THREADING_TOOLS
} }
} }
//! Overrides concurrency level and forces initialization of internal r epresentation //! Overrides concurrency level and forces initialization of internal r epresentation
inline void initialize(int max_concurrency, unsigned reserved_for_maste rs = 1) { inline void initialize(int max_concurrency, unsigned reserved_for_maste rs = 1) {
__TBB_ASSERT( !my_arena, "task_arena was initialized already"); __TBB_ASSERT( !my_arena, "Impossible to modify settings of an alrea dy initialized task_arena");
if( !my_initialized ) { if( !my_initialized ) {
my_max_concurrency = max_concurrency; my_max_concurrency = max_concurrency;
my_master_slots = reserved_for_masters; my_master_slots = reserved_for_masters;
initialize(); initialize();
} // TODO: else throw? }
} }
//! Removes the reference to the internal arena representation. //! Removes the reference to the internal arena representation.
//! Not thread safe wrt concurrent invocations of other methods. //! Not thread safe wrt concurrent invocations of other methods.
inline void terminate() { inline void terminate() {
if( my_initialized ) { if( my_initialized ) {
internal_terminate(); internal_terminate();
my_initialized = false; my_initialized = false;
} }
} }
skipping to change at line 174 skipping to change at line 190
//! Returns true if the arena is active (initialized); false otherwise. //! Returns true if the arena is active (initialized); false otherwise.
//! The name was chosen to match a task_scheduler_init method with the same semantics. //! The name was chosen to match a task_scheduler_init method with the same semantics.
bool is_active() const { return my_initialized; } bool is_active() const { return my_initialized; }
//! Enqueues a task into the arena to process a functor, and immediatel y returns. //! Enqueues a task into the arena to process a functor, and immediatel y returns.
//! Does not require the calling thread to join the arena //! Does not require the calling thread to join the arena
template<typename F> template<typename F>
void enqueue( const F& f ) { void enqueue( const F& f ) {
initialize(); initialize();
internal_enqueue( *new( task::allocate_root() ) internal::enqueued_ #if __TBB_TASK_GROUP_CONTEXT
function_task<F>(f), 0 ); internal_enqueue( *new( task::allocate_root(*my_context) ) internal
::function_task<F>(f), 0 );
#else
internal_enqueue( *new( task::allocate_root() ) internal::function_
task<F>(f), 0 );
#endif
} }
#if __TBB_TASK_PRIORITY #if __TBB_TASK_PRIORITY
//! Enqueues a task with priority p into the arena to process a functor f, and immediately returns. //! Enqueues a task with priority p into the arena to process a functor f, and immediately returns.
//! Does not require the calling thread to join the arena //! Does not require the calling thread to join the arena
template<typename F> template<typename F>
void enqueue( const F& f, priority_t p ) { void enqueue( const F& f, priority_t p ) {
__TBB_ASSERT( p == priority_low || p == priority_normal || p == pri ority_high, "Invalid priority level value" ); __TBB_ASSERT( p == priority_low || p == priority_normal || p == pri ority_high, "Invalid priority level value" );
initialize(); initialize();
internal_enqueue( *new( task::allocate_root() ) internal::enqueued_ #if __TBB_TASK_GROUP_CONTEXT
function_task<F>(f), (intptr_t)p ); internal_enqueue( *new( task::allocate_root(*my_context) ) internal
::function_task<F>(f), (intptr_t)p );
#else
internal_enqueue( *new( task::allocate_root() ) internal::function_
task<F>(f), (intptr_t)p );
#endif
} }
#endif// __TBB_TASK_PRIORITY #endif// __TBB_TASK_PRIORITY
//! Joins the arena and executes a functor, then returns //! Joins the arena and executes a functor, then returns
//! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
//! Can decrement the arena demand for workers, causing a worker to lea ve and free a slot to the calling thread //! Can decrement the arena demand for workers, causing a worker to lea ve and free a slot to the calling thread
template<typename F> template<typename F>
void execute(F& f) { void execute(F& f) {
initialize(); initialize();
internal::delegated_function<F> d(f); internal::delegated_function<F> d(f);
skipping to change at line 208 skipping to change at line 232
//! Joins the arena and executes a functor, then returns //! Joins the arena and executes a functor, then returns
//! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
//! Can decrement the arena demand for workers, causing a worker to lea ve and free a slot to the calling thread //! Can decrement the arena demand for workers, causing a worker to lea ve and free a slot to the calling thread
template<typename F> template<typename F>
void execute(const F& f) { void execute(const F& f) {
initialize(); initialize();
internal::delegated_function<const F> d(f); internal::delegated_function<const F> d(f);
internal_execute( d ); internal_execute( d );
} }
#if __TBB_EXTRA_DEBUG
//! Wait for all work in the arena to be completed //! Wait for all work in the arena to be completed
//! Even submitted by other application threads //! Even submitted by other application threads
//! Joins arena if/when possible (in the same way as execute()) //! Joins arena if/when possible (in the same way as execute())
void wait_until_empty() { void debug_wait_until_empty() {
initialize(); initialize();
internal_wait(); internal_wait();
} }
#endif //__TBB_EXTRA_DEBUG
//! Returns the index, aka slot number, of the calling thread in its cu rrent arena //! Returns the index, aka slot number, of the calling thread in its cu rrent arena
static int __TBB_EXPORTED_FUNC current_slot(); inline static int current_slot() {
return internal_current_slot();
}
}; };
} // namespace interfaceX } // namespace interfaceX
using interface6::task_arena; using interface7::task_arena;
} // namespace tbb } // namespace tbb
#endif /* __TBB_TASK_ARENA */ #endif /* __TBB_TASK_ARENA */
#endif /* __TBB_task_arena_H */ #endif /* __TBB_task_arena_H */
 End of changes. 26 change blocks. 
54 lines changed or deleted 85 lines changed or added


 task_group.h   task_group.h 
skipping to change at line 72 skipping to change at line 72
}; };
enum task_group_status { enum task_group_status {
not_complete, not_complete,
complete, complete,
canceled canceled
}; };
namespace internal { namespace internal {
// Suppress gratuitous warnings from icc 11.0 when lambda expressions are u
sed in instances of function_task.
//#pragma warning(disable: 588)
template<typename F>
class function_task : public task {
F my_func;
/*override*/ task* execute() {
my_func();
return NULL;
}
public:
function_task( const F& f ) : my_func(f) {}
};
template<typename F> template<typename F>
class task_handle_task : public task { class task_handle_task : public task {
task_handle<F>& my_handle; task_handle<F>& my_handle;
/*override*/ task* execute() { /*override*/ task* execute() {
my_handle(); my_handle();
return NULL; return NULL;
} }
public: public:
task_handle_task( task_handle<F>& h ) : my_handle(h) { h.mark_scheduled (); } task_handle_task( task_handle<F>& h ) : my_handle(h) { h.mark_scheduled (); }
}; };
 End of changes. 1 change blocks. 
15 lines changed or deleted 0 lines changed or added


 task_scheduler_observer.h   task_scheduler_observer.h 
skipping to change at line 142 skipping to change at line 142
//! Construct local observer for a given arena in inactive state (obser vation disabled). //! Construct local observer for a given arena in inactive state (obser vation disabled).
/** entry/exit notifications are invoked whenever a thread joins/leaves arena. /** entry/exit notifications are invoked whenever a thread joins/leaves arena.
If a thread is already in the arena when the observer is activated, the entry notification If a thread is already in the arena when the observer is activated, the entry notification
is called before it executes the first stolen task. **/ is called before it executes the first stolen task. **/
task_scheduler_observer( task_arena & a) { task_scheduler_observer( task_arena & a) {
my_busy_count.store<relaxed>(v6_trait); my_busy_count.store<relaxed>(v6_trait);
my_context_tag = (intptr_t)&a; my_context_tag = (intptr_t)&a;
} }
#endif //__TBB_TASK_ARENA #endif //__TBB_TASK_ARENA
//! Destructor additionally protects concurrent on_scheduler_leaving no
tification
// It is recommended to disable observation before destructor of a deri
ved class starts,
// otherwise it can lead to concurrent notification callback on partly
destroyed object
virtual ~task_scheduler_observer() { if(my_proxy) observe(false); }
//! The callback can be invoked in a worker thread before it leaves an arena. //! The callback can be invoked in a worker thread before it leaves an arena.
/** If it returns false, the thread remains in the arena. Will not be c alled for masters /** If it returns false, the thread remains in the arena. Will not be c alled for masters
or if the worker leaves arena due to rebalancing or priority change s, etc. or if the worker leaves arena due to rebalancing or priority change s, etc.
NOTE: The preview library must be linked for this method to take ef fect **/ NOTE: The preview library must be linked for this method to take ef fect **/
virtual bool on_scheduler_leaving() { return true; } virtual bool on_scheduler_leaving() { return true; }
//! Destructor additionally protects concurrent on_scheduler_leaving no
tification
// It is recommended to disable observation before destructor of a deri
ved class starts,
// otherwise it can lead to concurrent notification callback on partly
destroyed object
virtual ~task_scheduler_observer() { if(my_proxy) observe(false); }
}; };
} //namespace interface6 } //namespace interface6
using interface6::task_scheduler_observer; using interface6::task_scheduler_observer;
#else /*TBB_PREVIEW_LOCAL_OBSERVER*/ #else /*TBB_PREVIEW_LOCAL_OBSERVER*/
typedef tbb::internal::task_scheduler_observer_v3 task_scheduler_observer; typedef tbb::internal::task_scheduler_observer_v3 task_scheduler_observer;
#endif /*TBB_PREVIEW_LOCAL_OBSERVER*/ #endif /*TBB_PREVIEW_LOCAL_OBSERVER*/
} // namespace tbb } // namespace tbb
 End of changes. 2 change blocks. 
8 lines changed or deleted 8 lines changed or added


 tbb_config.h   tbb_config.h 
skipping to change at line 107 skipping to change at line 107
#elif __TBB_GCC_VERSION >= 40404 && __TBB_GCC_VERSION < 40600 #elif __TBB_GCC_VERSION >= 40404 && __TBB_GCC_VERSION < 40600
#define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C XX0X__ && __INTEL_COMPILER >= 1200) #define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C XX0X__ && __INTEL_COMPILER >= 1200)
#elif __TBB_GCC_VERSION >= 40600 #elif __TBB_GCC_VERSION >= 40600
#define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C XX0X__ && __INTEL_COMPILER >= 1300) #define __TBB_EXCEPTION_PTR_PRESENT (__GXX_EXPERIMENTAL_C XX0X__ && __INTEL_COMPILER >= 1300)
#else #else
#define __TBB_EXCEPTION_PTR_PRESENT 0 #define __TBB_EXCEPTION_PTR_PRESENT 0
#endif #endif
#define __TBB_MAKE_EXCEPTION_PTR_PRESENT (_MSC_VER >= 1700 || (__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40600)) #define __TBB_MAKE_EXCEPTION_PTR_PRESENT (_MSC_VER >= 1700 || (__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40600))
#define __TBB_STATIC_ASSERT_PRESENT (__INTEL_CXX11_MODE__ || _MSC_VER >= 1600) #define __TBB_STATIC_ASSERT_PRESENT (__INTEL_CXX11_MODE__ || _MSC_VER >= 1600)
#define __TBB_CPP11_TUPLE_PRESENT (_MSC_VER >= 1600 || (__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40300)) #define __TBB_CPP11_TUPLE_PRESENT (_MSC_VER >= 1600 || (__GXX_EXPERIMENTAL_CXX0X__ && __TBB_GCC_VERSION >= 40300))
/** TODO: re-check for compiler version greater than 12.1 if it support /**Intel C++ compiler 14.0 crashes on using __has_include. When it fixe
s initializer lists**/ d, condition will need to be updated. **/
#define __TBB_INITIALIZER_LISTS_PRESENT 0 #if (__clang__ && __INTEL_COMPILER > 1400)
#define __TBB_CONSTEXPR_PRESENT 0 #if (__has_feature(__cxx_generalized_initializers__) && __has_inclu
#define __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT __INTEL_CXX11_MODE__ de(<initializer_list>))
#define __TBB_INITIALIZER_LISTS_PRESENT 1
#endif
#else
/** TODO: when MSVC2013 is supported by Intel C++ compiler, it will
be enabled silently by compiler, so rule will need to be updated.**/
#define __TBB_INITIALIZER_LISTS_PRESENT __INTEL_CXX11_MODE__
&& __INTEL_COMPILER >= 1400 && (_MSC_VER >= 1800 || __TBB_GCC_VERSION >= 40
400 || _LIBCPP_VERSION)
#endif
#define __TBB_CONSTEXPR_PRESENT __INTEL_CXX11_MODE__
&& __INTEL_COMPILER >= 1400
#define __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT __INTEL_CXX11_MODE__
&& __INTEL_COMPILER >= 1200
#elif __clang__ #elif __clang__
//TODO: these options need to be rechecked //TODO: these options need to be rechecked
/** on OS X* the only way to get C++11 is to use clang. For library feature s (e.g. exception_ptr) libc++ is also /** on OS X* the only way to get C++11 is to use clang. For library feature s (e.g. exception_ptr) libc++ is also
* required. So there is no need to check GCC version for clang**/ * required. So there is no need to check GCC version for clang**/
#define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT __has_feature(__cxx_ variadic_templates__) #define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT __has_feature(__cxx_ variadic_templates__)
#define __TBB_CPP11_RVALUE_REF_PRESENT __has_feature(__cxx_ rvalue_references__) #define __TBB_CPP11_RVALUE_REF_PRESENT __has_feature(__cxx_ rvalue_references__)
/** TODO: extend exception_ptr related conditions to cover libstdc++ **/ /** TODO: extend exception_ptr related conditions to cover libstdc++ **/
#define __TBB_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110 3L && _LIBCPP_VERSION) #define __TBB_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110 3L && _LIBCPP_VERSION)
#define __TBB_MAKE_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110 3L && _LIBCPP_VERSION) #define __TBB_MAKE_EXCEPTION_PTR_PRESENT (__cplusplus >= 20110 3L && _LIBCPP_VERSION)
#define __TBB_STATIC_ASSERT_PRESENT __has_feature(__cxx_s tatic_assert__) #define __TBB_STATIC_ASSERT_PRESENT __has_feature(__cxx_s tatic_assert__)
skipping to change at line 324 skipping to change at line 332
#endif #endif
#ifndef __TBB_TASK_GROUP_CONTEXT #ifndef __TBB_TASK_GROUP_CONTEXT
#define __TBB_TASK_GROUP_CONTEXT 1 #define __TBB_TASK_GROUP_CONTEXT 1
#endif /* __TBB_TASK_GROUP_CONTEXT */ #endif /* __TBB_TASK_GROUP_CONTEXT */
#ifndef __TBB_SCHEDULER_OBSERVER #ifndef __TBB_SCHEDULER_OBSERVER
#define __TBB_SCHEDULER_OBSERVER 1 #define __TBB_SCHEDULER_OBSERVER 1
#endif /* __TBB_SCHEDULER_OBSERVER */ #endif /* __TBB_SCHEDULER_OBSERVER */
#if !defined(TBB_PREVIEW_TASK_ARENA) && __TBB_BUILD #ifndef __TBB_TASK_ARENA
#define TBB_PREVIEW_TASK_ARENA __TBB_CPF_BUILD #define __TBB_TASK_ARENA (__TBB_BUILD||TBB_PREVIEW_TASK_ARENA)
#endif /* TBB_PREVIEW_TASK_ARENA */ #endif /* __TBB_TASK_ARENA */
#define __TBB_TASK_ARENA TBB_PREVIEW_TASK_ARENA #if __TBB_TASK_ARENA
#if TBB_PREVIEW_TASK_ARENA #define __TBB_RECYCLE_TO_ENQUEUE __TBB_BUILD // keep non-official
#define TBB_PREVIEW_LOCAL_OBSERVER 1
#define __TBB_NO_IMPLICIT_LINKAGE 1
#define __TBB_RECYCLE_TO_ENQUEUE 1
#ifndef __TBB_TASK_PRIORITY
#define __TBB_TASK_PRIORITY 0 // TODO: it will be removed in next v
ersions
#endif
#if !__TBB_SCHEDULER_OBSERVER #if !__TBB_SCHEDULER_OBSERVER
#error TBB_PREVIEW_TASK_ARENA requires __TBB_SCHEDULER_OBSERVER to be enabled #error TBB_PREVIEW_TASK_ARENA requires __TBB_SCHEDULER_OBSERVER to be enabled
#endif #endif
#endif /* TBB_PREVIEW_TASK_ARENA */ #endif /* __TBB_TASK_ARENA */
#if !defined(TBB_PREVIEW_LOCAL_OBSERVER) && __TBB_BUILD && __TBB_SCHEDULER_ OBSERVER #if !defined(TBB_PREVIEW_LOCAL_OBSERVER) && __TBB_BUILD && __TBB_SCHEDULER_ OBSERVER
#define TBB_PREVIEW_LOCAL_OBSERVER 1 #define TBB_PREVIEW_LOCAL_OBSERVER 1
#endif /* TBB_PREVIEW_LOCAL_OBSERVER */ #endif /* TBB_PREVIEW_LOCAL_OBSERVER */
#if TBB_USE_EXCEPTIONS && !__TBB_TASK_GROUP_CONTEXT #if TBB_USE_EXCEPTIONS && !__TBB_TASK_GROUP_CONTEXT
#error TBB_USE_EXCEPTIONS requires __TBB_TASK_GROUP_CONTEXT to be enabl ed #error TBB_USE_EXCEPTIONS requires __TBB_TASK_GROUP_CONTEXT to be enabl ed
#endif #endif
#ifndef __TBB_TASK_PRIORITY #ifndef __TBB_TASK_PRIORITY
#define __TBB_TASK_PRIORITY __TBB_TASK_GROUP_CONTEXT #define __TBB_TASK_PRIORITY (!__TBB_CPF_BUILD&&__TBB_TASK_GROUP_CONTEXT ) // TODO: it will be enabled for CPF in the next versions
#endif /* __TBB_TASK_PRIORITY */ #endif /* __TBB_TASK_PRIORITY */
#if __TBB_TASK_PRIORITY && !__TBB_TASK_GROUP_CONTEXT #if __TBB_TASK_PRIORITY && !__TBB_TASK_GROUP_CONTEXT
#error __TBB_TASK_PRIORITY requires __TBB_TASK_GROUP_CONTEXT to be enab led #error __TBB_TASK_PRIORITY requires __TBB_TASK_GROUP_CONTEXT to be enab led
#endif #endif
#if TBB_PREVIEW_WAITING_FOR_WORKERS || __TBB_BUILD #if TBB_PREVIEW_WAITING_FOR_WORKERS || __TBB_BUILD
#define __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 1 #define __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 1
#endif #endif
skipping to change at line 528 skipping to change at line 530
__GNUC__==4 && (__GNUC_MINOR__==4 ||__GNUC_MINOR__==5 || (__INTEL_COMPI LER==1300 && (__GNUC_MINOR__==6 ||__GNUC_MINOR__==7))) __GNUC__==4 && (__GNUC_MINOR__==4 ||__GNUC_MINOR__==5 || (__INTEL_COMPI LER==1300 && (__GNUC_MINOR__==6 ||__GNUC_MINOR__==7)))
/* There is an issue for specific GCC toolchain when C++11 is enabled /* There is an issue for specific GCC toolchain when C++11 is enabled
and exceptions are disabled: and exceptions are disabled:
exceprion_ptr.h/nested_exception.h use throw unconditionally. exceprion_ptr.h/nested_exception.h use throw unconditionally.
*/ */
#define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 1 #define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 1
#else #else
#define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 0 #define __TBB_LIBSTDCPP_EXCEPTION_HEADERS_BROKEN 0
#endif #endif
#if __TBB_x86_32 && (__linux__ || __APPLE__ || _WIN32 || __sun) && ((defin ed(__INTEL_COMPILER) && __INTEL_COMPILER <= 1300) || (__GNUC__==3 && __GNUC _MINOR__==3 ) || defined(__SUNPRO_CC)) #if __TBB_x86_32 && (__linux__ || __APPLE__ || _WIN32 || __sun) && ((defin ed(__INTEL_COMPILER) && __INTEL_COMPILER <= 1400) || (__GNUC__==3 && __GNUC _MINOR__==3 ) || defined(__SUNPRO_CC))
// Some compilers for IA-32 fail to provide 8-byte alignment of objects on the stack, // Some compilers for IA-32 fail to provide 8-byte alignment of objects on the stack,
// even if the object specifies 8-byte alignment. On such platforms, t he IA-32 implementation // even if the object specifies 8-byte alignment. On such platforms, t he IA-32 implementation
// of 64 bit atomics (e.g. atomic<long long>) use different tactics dep ending upon // of 64 bit atomics (e.g. atomic<long long>) use different tactics dep ending upon
// whether the object is properly aligned or not. // whether the object is properly aligned or not.
#define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 1 #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 1
#else #else
#define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 0 #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 0
#endif #endif
#if __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT && __TBB_GCC_VERSION < 40700 & & !defined(__INTEL_COMPILER) && !defined (__clang__) #if __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT && __TBB_GCC_VERSION < 40700 & & !defined(__INTEL_COMPILER) && !defined (__clang__)
#define __TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN 1 #define __TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN 1
#endif #endif
/** End of __TBB_XXX_BROKEN macro section **/ /** End of __TBB_XXX_BROKEN macro section **/
#if defined(_MSC_VER) && _MSC_VER>=1500 && !defined(__INTEL_COMPILER)
// A macro to suppress erroneous or benign "unreachable code" MSVC warn
ing (4702)
#define __TBB_MSVC_UNREACHABLE_CODE_IGNORED 1
#endif
#define __TBB_ATOMIC_CTORS (__TBB_CONSTEXPR_PRESENT && __TBB_DEFAULTED_ AND_DELETED_FUNC_PRESENT && (!__TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN)) #define __TBB_ATOMIC_CTORS (__TBB_CONSTEXPR_PRESENT && __TBB_DEFAULTED_ AND_DELETED_FUNC_PRESENT && (!__TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN))
#endif /* __TBB_tbb_config_H */ #endif /* __TBB_tbb_config_H */
 End of changes. 6 change blocks. 
20 lines changed or deleted 33 lines changed or added


 tbb_stddef.h   tbb_stddef.h 
skipping to change at line 37 skipping to change at line 37
*/ */
#ifndef __TBB_tbb_stddef_H #ifndef __TBB_tbb_stddef_H
#define __TBB_tbb_stddef_H #define __TBB_tbb_stddef_H
// Marketing-driven product version // Marketing-driven product version
#define TBB_VERSION_MAJOR 4 #define TBB_VERSION_MAJOR 4
#define TBB_VERSION_MINOR 2 #define TBB_VERSION_MINOR 2
// Engineering-focused interface version // Engineering-focused interface version
#define TBB_INTERFACE_VERSION 7000 #define TBB_INTERFACE_VERSION 7001
#define TBB_INTERFACE_VERSION_MAJOR TBB_INTERFACE_VERSION/1000 #define TBB_INTERFACE_VERSION_MAJOR TBB_INTERFACE_VERSION/1000
// The oldest major interface version still supported // The oldest major interface version still supported
// To be used in SONAME, manifests, etc. // To be used in SONAME, manifests, etc.
#define TBB_COMPATIBLE_INTERFACE_VERSION 2 #define TBB_COMPATIBLE_INTERFACE_VERSION 2
#define __TBB_STRING_AUX(x) #x #define __TBB_STRING_AUX(x) #x
#define __TBB_STRING(x) __TBB_STRING_AUX(x) #define __TBB_STRING(x) __TBB_STRING_AUX(x)
// We do not need defines below for resource processing on windows // We do not need defines below for resource processing on windows
skipping to change at line 296 skipping to change at line 296
#define __TBB_RETHROW() ((void)0) #define __TBB_RETHROW() ((void)0)
#endif /* !TBB_USE_EXCEPTIONS */ #endif /* !TBB_USE_EXCEPTIONS */
//! Report a runtime warning. //! Report a runtime warning.
void __TBB_EXPORTED_FUNC runtime_warning( const char* format, ... ); void __TBB_EXPORTED_FUNC runtime_warning( const char* format, ... );
#if TBB_USE_ASSERT #if TBB_USE_ASSERT
static void* const poisoned_ptr = reinterpret_cast<void*>(-1); static void* const poisoned_ptr = reinterpret_cast<void*>(-1);
//! Set p to invalid pointer value. //! Set p to invalid pointer value.
// Also works for regular (non-__TBB_atomic) pointers.
template<typename T> template<typename T>
inline void poison_pointer( T*& p ) { p = reinterpret_cast<T*>(poisoned_ptr ); } inline void poison_pointer( T* __TBB_atomic & p ) { p = reinterpret_cast<T* >(poisoned_ptr); }
/** Expected to be used in assertions only, thus no empty form is defined. **/ /** Expected to be used in assertions only, thus no empty form is defined. **/
template<typename T> template<typename T>
inline bool is_poisoned( T* p ) { return p == reinterpret_cast<T*>(poisoned _ptr); } inline bool is_poisoned( T* p ) { return p == reinterpret_cast<T*>(poisoned _ptr); }
#else #else
template<typename T> template<typename T>
inline void poison_pointer( T* ) {/*do nothing*/} inline void poison_pointer( T* __TBB_atomic & ) {/*do nothing*/}
#endif /* !TBB_USE_ASSERT */ #endif /* !TBB_USE_ASSERT */
//! Cast between unrelated pointer types. //! Cast between unrelated pointer types.
/** This method should be used sparingly as a last resort for dealing with /** This method should be used sparingly as a last resort for dealing with
situations that inherently break strict ISO C++ aliasing rules. */ situations that inherently break strict ISO C++ aliasing rules. */
// T is a pointer type because it will be explicitly provided by the progra mmer as a template argument; // T is a pointer type because it will be explicitly provided by the progra mmer as a template argument;
// U is a referent type to enable the compiler to check that "ptr" is a poi nter, deducing U in the process. // U is a referent type to enable the compiler to check that "ptr" is a poi nter, deducing U in the process.
template<typename T, typename U> template<typename T, typename U>
inline T punned_cast( U* ptr ) { inline T punned_cast( U* ptr ) {
uintptr_t x = reinterpret_cast<uintptr_t>(ptr); uintptr_t x = reinterpret_cast<uintptr_t>(ptr);
 End of changes. 4 change blocks. 
3 lines changed or deleted 4 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/