_concurrent_unordered_impl.h   _concurrent_unordered_impl.h 
skipping to change at line 1289 skipping to change at line 1289
return num; return num;
} }
// Insert an element in the hash given its value // Insert an element in the hash given its value
template< typename ValueType> template< typename ValueType>
std::pair<iterator, bool> internal_insert( __TBB_FORWARDING_REF(ValueTy pe) value, nodeptr_t pnode = NULL) std::pair<iterator, bool> internal_insert( __TBB_FORWARDING_REF(ValueTy pe) value, nodeptr_t pnode = NULL)
{ {
sokey_t order_key = (sokey_t) my_hash_compare(get_key(value)); sokey_t order_key = (sokey_t) my_hash_compare(get_key(value));
size_type bucket = order_key % my_number_of_buckets; size_type bucket = order_key % my_number_of_buckets;
//TODO:refactor the get_bucket related staff into separate function something like aqcuire_bucket(key_type) //TODO:refactor the get_bucket related stuff into separate function something like acquire_bucket(key_type)
// If bucket is empty, initialize it first // If bucket is empty, initialize it first
if (!is_initialized(bucket)) if (!is_initialized(bucket))
init_bucket(bucket); init_bucket(bucket);
size_type new_count = 0; size_type new_count = 0;
order_key = split_order_key_regular(order_key); order_key = split_order_key_regular(order_key);
raw_iterator it = get_bucket(bucket); raw_iterator it = get_bucket(bucket);
raw_iterator last = my_solist.raw_end(); raw_iterator last = my_solist.raw_end();
raw_iterator where = it; raw_iterator where = it;
__TBB_ASSERT(where != last, "Invalid head node"); __TBB_ASSERT(where != last, "Invalid head node");
// First node is a dummy node // First node is a dummy node
++where; ++where;
for (;;) for (;;)
{ {
if (where == last || solist_t::get_order_key(where) > order_key if (where == last || solist_t::get_order_key(where) > order_key
) ||
// if multimapped, stop at the first item equal to us.
(allow_multimapping && solist_t::get_order_key(where) =
= order_key &&
!my_hash_compare(get_key(*where), get_key(value))))
{ {
if (!pnode) if (!pnode)
pnode = my_solist.create_node(order_key, tbb::internal ::forward<ValueType>(value)); pnode = my_solist.create_node(order_key, tbb::internal ::forward<ValueType>(value));
// Try to insert it in the right place // Try to insert 'pnode' between 'it' and 'where'
std::pair<iterator, bool> result = my_solist.try_insert(it, where, pnode, &new_count); std::pair<iterator, bool> result = my_solist.try_insert(it, where, pnode, &new_count);
if (result.second) if (result.second)
{ {
// Insertion succeeded, adjust the table size, if neede d // Insertion succeeded, adjust the table size, if neede d
adjust_table_size(new_count, my_number_of_buckets); adjust_table_size(new_count, my_number_of_buckets);
return result; return result;
} }
else else
{ {
// Insertion failed: either the same node was inserted by another thread, or // Insertion failed: either the same node was inserted by another thread, or
// another element was inserted at exactly the same pla ce as this node. // another element was inserted at exactly the same pla ce as this node.
// Proceed with the search from the previous location w here order key was // Proceed with the search from the previous location w here order key was
// known to be larger (note: this is legal only because there is no safe // known to be larger (note: this is legal only because there is no safe
// concurrent erase operation supported). // concurrent erase operation supported).
where = it; where = it;
++where; ++where;
continue; continue;
} }
} }
else if (!allow_multimapping && solist_t::get_order_key(where) else if (!allow_multimapping && solist_t::get_order_key(where)
== order_key && my_hash_compare(get_key(*where), get_key(value)) == 0) == order_key &&
{ my_hash_compare(get_key(*where), get_key(value)) == 0)
{ // Element already in the list, return it
if (pnode) if (pnode)
my_solist.destroy_node(pnode); my_solist.destroy_node(pnode);
// Element already in the list, return it
return std::pair<iterator, bool>(my_solist.get_iterator(whe re), false); return std::pair<iterator, bool>(my_solist.get_iterator(whe re), false);
} }
// Move the iterator forward // Move the iterator forward
it = where; it = where;
++where; ++where;
} }
} }
// Find the element in the split-ordered list // Find the element in the split-ordered list
iterator internal_find(const key_type& key) iterator internal_find(const key_type& key)
{ {
sokey_t order_key = (sokey_t) my_hash_compare(key); sokey_t order_key = (sokey_t) my_hash_compare(key);
 End of changes. 6 change blocks. 
9 lines changed or deleted 12 lines changed or added


 _flow_graph_impl.h   _flow_graph_impl.h 
skipping to change at line 202 skipping to change at line 202
#endif #endif
B get_body() { return body; } B get_body() { return body; }
/*override*/ multifunction_body_leaf* clone() { /*override*/ multifunction_body_leaf* clone() {
return new multifunction_body_leaf<Input, OutputSet,B>(init_bod y); return new multifunction_body_leaf<Input, OutputSet,B>(init_bod y);
} }
private: private:
B body; B body;
B init_body; B init_body;
}; };
#if __TBB_PREVIEW_ASYNC_NODE
//! A functor that takes Input and submit it to Asynchronous Activity
template< typename Input, typename AsyncGateway >
class async_body : tbb::internal::no_assign {
public:
virtual ~async_body() {}
virtual void operator()(const Input &output, AsyncGateway& gateway)
= 0;
virtual async_body* clone() = 0;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
virtual void reset_body() = 0;
#endif
};
//! The leaf for async_body
template< typename Input, typename Body, typename AsyncGateway >
class async_body_leaf : public async_body< Input, AsyncGateway > {
public:
async_body_leaf( const Body &_body ) : body(_body), init_body(_body
) { }
/*override*/ void operator()(const Input &input, AsyncGateway& gate
way) { body( input, gateway ); }
/*override*/ async_body_leaf* clone() {
return new async_body_leaf< Input, Body, AsyncGateway >(init_bo
dy);
}
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/ void reset_body() {
body = init_body;
}
#endif
Body get_body() { return body; }
private:
Body body;
Body init_body;
};
#endif
// --------------------------- end of function_body containers ------------ ------------ // --------------------------- end of function_body containers ------------ ------------
// --------------------------- node task bodies --------------------------- ------------ // --------------------------- node task bodies --------------------------- ------------
//! A task that calls a node's forward_task function //! A task that calls a node's forward_task function
template< typename NodeType > template< typename NodeType >
class forward_task_bypass : public task { class forward_task_bypass : public task {
NodeType &my_node; NodeType &my_node;
skipping to change at line 294 skipping to change at line 329
void remove( T &n ) { void remove( T &n ) {
typename my_mutex_type::scoped_lock lock( my_mutex ); typename my_mutex_type::scoped_lock lock( my_mutex );
for ( size_t i = internal_size(); i != 0; --i ) { for ( size_t i = internal_size(); i != 0; --i ) {
T &s = internal_pop(); T &s = internal_pop();
if ( &s == &n ) return; // only remove one predecessor pe r request if ( &s == &n ) return; // only remove one predecessor pe r request
internal_push(s); internal_push(s);
} }
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef edge_container<T> built_predecessors_type;
void clear() {
while( !my_q.empty()) (void)my_q.pop();
my_built_predecessors.clear();
}
built_predecessors_type &built_predecessors() { return my_built_pre
decessors; }
typedef typename edge_container<T>::edge_list_type predecessor_list _type; typedef typename edge_container<T>::edge_list_type predecessor_list _type;
void internal_add_built_predecessor( T &n ) { void internal_add_built_predecessor( T &n ) {
typename my_mutex_type::scoped_lock lock( my_mutex ); typename my_mutex_type::scoped_lock lock( my_mutex );
my_built_predecessors.add_edge(n); my_built_predecessors.add_edge(n);
} }
void internal_delete_built_predecessor( T &n ) { void internal_delete_built_predecessor( T &n ) {
typename my_mutex_type::scoped_lock lock( my_mutex ); typename my_mutex_type::scoped_lock lock( my_mutex );
my_built_predecessors.delete_edge(n); my_built_predecessors.delete_edge(n);
} }
skipping to change at line 322 skipping to change at line 365
return (size_t)(my_built_predecessors.edge_count()); return (size_t)(my_built_predecessors.edge_count());
} }
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
protected: protected:
typedef M my_mutex_type; typedef M my_mutex_type;
my_mutex_type my_mutex; my_mutex_type my_mutex;
std::queue< T * > my_q; std::queue< T * > my_q;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
edge_container<T> my_built_predecessors; built_predecessors_type my_built_predecessors;
#endif #endif
// Assumes lock is held // Assumes lock is held
inline bool internal_empty( ) { inline bool internal_empty( ) {
return my_q.empty(); return my_q.empty();
} }
// Assumes lock is held // Assumes lock is held
inline size_type internal_size( ) { inline size_type internal_size( ) {
return my_q.size(); return my_q.size();
skipping to change at line 391 skipping to change at line 434
if ( my_owner) if ( my_owner)
src->register_successor( *my_owner ); src->register_successor( *my_owner );
} else { } else {
// Retain ownership of the edge // Retain ownership of the edge
this->add(*src); this->add(*src);
} }
} while ( msg == false ); } while ( msg == false );
return msg; return msg;
} }
void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { // If we are removing arcs (rf_clear_edges), call clear() rather th
an reset().
void reset() {
if(my_owner) { if(my_owner) {
for(;;) { for(;;) {
predecessor_type *src; predecessor_type *src;
{ {
if(this->internal_empty()) break; if(this->internal_empty()) break;
src = &this->internal_pop(); src = &this->internal_pop();
} }
src->register_successor( *my_owner); src->register_successor( *my_owner);
} }
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
if (f&rf_extract && my_owner)
my_built_predecessors.receiver_extract(*my_owner);
__TBB_ASSERT(!(f&rf_extract) || this->internal_empty(), "predec
essor cache not empty");
#endif
} }
protected: protected:
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
using node_cache< sender<T>, M >::my_built_predecessors; using node_cache< sender<T>, M >::my_built_predecessors;
#endif #endif
successor_type *my_owner; successor_type *my_owner;
}; };
skipping to change at line 472 skipping to change at line 511
return true; return true;
} }
bool bool
try_consume( ) { try_consume( ) {
reserved_src->try_consume( ); reserved_src->try_consume( );
reserved_src = NULL; reserved_src = NULL;
return true; return true;
} }
void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { void reset( ) {
reserved_src = NULL; reserved_src = NULL;
predecessor_cache<T,M>::reset(__TBB_PFG_RESET_ARG(f)); predecessor_cache<T,M>::reset( );
}
void clear() {
reserved_src = NULL;
predecessor_cache<T,M>::clear();
} }
private: private:
predecessor_type *reserved_src; predecessor_type *reserved_src;
}; };
//! An abstract cache of successors //! An abstract cache of successors
template<typename T, typename M=spin_rw_mutex > template<typename T, typename M=spin_rw_mutex >
class successor_cache : tbb::internal::no_copy { class successor_cache : tbb::internal::no_copy {
protected: protected:
typedef M my_mutex_type; typedef M my_mutex_type;
my_mutex_type my_mutex; my_mutex_type my_mutex;
typedef receiver<T> successor_type;
typedef receiver<T> *pointer_type; typedef receiver<T> *pointer_type;
typedef std::list< pointer_type > my_successors_type; typedef std::list< pointer_type > my_successors_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
edge_container<receiver<T> > my_built_successors; edge_container<successor_type> my_built_successors;
#endif #endif
my_successors_type my_successors; my_successors_type my_successors;
sender<T> *my_owner; sender<T> *my_owner;
public: public:
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename edge_container<receiver<T> >::edge_list_type succe typedef typename edge_container<successor_type>::edge_list_type suc
ssor_list_type; cessor_list_type;
void internal_add_built_successor( receiver<T> &r) {
edge_container<successor_type> &built_successors() { return my_buil
t_successors; }
void internal_add_built_successor( successor_type &r) {
typename my_mutex_type::scoped_lock l(my_mutex, true); typename my_mutex_type::scoped_lock l(my_mutex, true);
my_built_successors.add_edge( r ); my_built_successors.add_edge( r );
} }
void internal_delete_built_successor( receiver<T> &r) { void internal_delete_built_successor( successor_type &r) {
typename my_mutex_type::scoped_lock l(my_mutex, true); typename my_mutex_type::scoped_lock l(my_mutex, true);
my_built_successors.delete_edge(r); my_built_successors.delete_edge(r);
} }
void copy_successors( successor_list_type &v) { void copy_successors( successor_list_type &v) {
typename my_mutex_type::scoped_lock l(my_mutex, false); typename my_mutex_type::scoped_lock l(my_mutex, false);
my_built_successors.copy_edges(v); my_built_successors.copy_edges(v);
} }
size_t successor_count() { size_t successor_count() {
typename my_mutex_type::scoped_lock l(my_mutex,false); typename my_mutex_type::scoped_lock l(my_mutex,false);
return my_built_successors.edge_count(); return my_built_successors.edge_count();
} }
void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
if (f&rf_extract && my_owner)
my_built_successors.sender_extract(*my_owner);
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
successor_cache( ) : my_owner(NULL) {} successor_cache( ) : my_owner(NULL) {}
void set_owner( sender<T> *owner ) { my_owner = owner; } void set_owner( sender<T> *owner ) { my_owner = owner; }
virtual ~successor_cache() {} virtual ~successor_cache() {}
void register_successor( receiver<T> &r ) { void register_successor( successor_type &r ) {
typename my_mutex_type::scoped_lock l(my_mutex, true); typename my_mutex_type::scoped_lock l(my_mutex, true);
my_successors.push_back( &r ); my_successors.push_back( &r );
} }
void remove_successor( receiver<T> &r ) { void remove_successor( successor_type &r ) {
typename my_mutex_type::scoped_lock l(my_mutex, true); typename my_mutex_type::scoped_lock l(my_mutex, true);
for ( typename my_successors_type::iterator i = my_successors.b egin(); for ( typename my_successors_type::iterator i = my_successors.b egin();
i != my_successors.end(); ++i ) { i != my_successors.end(); ++i ) {
if ( *i == & r ) { if ( *i == & r ) {
my_successors.erase(i); my_successors.erase(i);
break; break;
} }
} }
} }
skipping to change at line 562 skipping to change at line 606
} }
void clear() { void clear() {
my_successors.clear(); my_successors.clear();
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_built_successors.clear(); my_built_successors.clear();
#endif #endif
} }
virtual task * try_put_task( const T &t ) = 0; virtual task * try_put_task( const T &t ) = 0;
}; }; // successor_cache<T>
//! An abstract cache of successors, specialized to continue_msg //! An abstract cache of successors, specialized to continue_msg
template<> template<>
class successor_cache< continue_msg > : tbb::internal::no_copy { class successor_cache< continue_msg > : tbb::internal::no_copy {
protected: protected:
typedef spin_rw_mutex my_mutex_type; typedef spin_rw_mutex my_mutex_type;
my_mutex_type my_mutex; my_mutex_type my_mutex;
typedef receiver<continue_msg> successor_type; typedef receiver<continue_msg> successor_type;
skipping to change at line 586 skipping to change at line 630
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
edge_container<successor_type> my_built_successors; edge_container<successor_type> my_built_successors;
typedef edge_container<successor_type>::edge_list_type successor_li st_type; typedef edge_container<successor_type>::edge_list_type successor_li st_type;
#endif #endif
sender<continue_msg> *my_owner; sender<continue_msg> *my_owner;
public: public:
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
edge_container<successor_type> &built_successors() { return my_buil
t_successors; }
void internal_add_built_successor( successor_type &r) { void internal_add_built_successor( successor_type &r) {
my_mutex_type::scoped_lock l(my_mutex, true); my_mutex_type::scoped_lock l(my_mutex, true);
my_built_successors.add_edge( r ); my_built_successors.add_edge( r );
} }
void internal_delete_built_successor( successor_type &r) { void internal_delete_built_successor( successor_type &r) {
my_mutex_type::scoped_lock l(my_mutex, true); my_mutex_type::scoped_lock l(my_mutex, true);
my_built_successors.delete_edge(r); my_built_successors.delete_edge(r);
} }
void copy_successors( successor_list_type &v) { void copy_successors( successor_list_type &v) {
my_mutex_type::scoped_lock l(my_mutex, false); my_mutex_type::scoped_lock l(my_mutex, false);
my_built_successors.copy_edges(v); my_built_successors.copy_edges(v);
} }
size_t successor_count() { size_t successor_count() {
my_mutex_type::scoped_lock l(my_mutex,false); my_mutex_type::scoped_lock l(my_mutex,false);
return my_built_successors.edge_count(); return my_built_successors.edge_count();
} }
void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
if (f&rf_extract && my_owner)
my_built_successors.sender_extract(*my_owner);
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
successor_cache( ) : my_owner(NULL) {} successor_cache( ) : my_owner(NULL) {}
void set_owner( sender<continue_msg> *owner ) { my_owner = owner; } void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
virtual ~successor_cache() {} virtual ~successor_cache() {}
void register_successor( successor_type &r ) { void register_successor( successor_type &r ) {
my_mutex_type::scoped_lock l(my_mutex, true); my_mutex_type::scoped_lock l(my_mutex, true);
skipping to change at line 655 skipping to change at line 698
void clear() { void clear() {
my_successors.clear(); my_successors.clear();
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_built_successors.clear(); my_built_successors.clear();
#endif #endif
} }
virtual task * try_put_task( const continue_msg &t ) = 0; virtual task * try_put_task( const continue_msg &t ) = 0;
}; }; // successor_cache< continue_msg >;
//! A cache of successors that are broadcast to //! A cache of successors that are broadcast to
template<typename T, typename M=spin_rw_mutex> template<typename T, typename M=spin_rw_mutex>
class broadcast_cache : public successor_cache<T, M> { class broadcast_cache : public successor_cache<T, M> {
typedef M my_mutex_type; typedef M my_mutex_type;
typedef std::list< receiver<T> * > my_successors_type; // typedef std::list< receiver<T> * > my_successors_type;
typedef typename successor_cache<T,M>::my_successors_type my_succes
sors_type;
public: public:
broadcast_cache( ) {} broadcast_cache( ) {}
// as above, but call try_put_task instead, and return the last tas k we received (if any) // as above, but call try_put_task instead, and return the last tas k we received (if any)
/*override*/ task * try_put_task( const T &t ) { /*override*/ task * try_put_task( const T &t ) {
task * last_task = NULL; task * last_task = NULL;
bool upgraded = true; bool upgraded = true;
typename my_mutex_type::scoped_lock l(this->my_mutex, upgraded) ; typename my_mutex_type::scoped_lock l(this->my_mutex, upgraded) ;
skipping to change at line 701 skipping to change at line 745
return last_task; return last_task;
} }
}; };
//! A cache of successors that are put in a round-robin fashion //! A cache of successors that are put in a round-robin fashion
template<typename T, typename M=spin_rw_mutex > template<typename T, typename M=spin_rw_mutex >
class round_robin_cache : public successor_cache<T, M> { class round_robin_cache : public successor_cache<T, M> {
typedef size_t size_type; typedef size_t size_type;
typedef M my_mutex_type; typedef M my_mutex_type;
typedef std::list< receiver<T> * > my_successors_type; // typedef std::list< receiver<T> * > my_successors_type;
typedef typename successor_cache<T,M>::my_successors_type my_succes
sors_type;
public: public:
round_robin_cache( ) {} round_robin_cache( ) {}
size_type size() { size_type size() {
typename my_mutex_type::scoped_lock l(this->my_mutex, false); typename my_mutex_type::scoped_lock l(this->my_mutex, false);
return this->my_successors.size(); return this->my_successors.size();
} }
 End of changes. 21 change blocks. 
30 lines changed or deleted 84 lines changed or added


 _flow_graph_indexer_impl.h   _flow_graph_indexer_impl.h 
skipping to change at line 55 skipping to change at line 55
template<typename IndexerNodeBaseType, typename PortTuple> template<typename IndexerNodeBaseType, typename PortTuple>
static inline void set_indexer_node_pointer(PortTuple &my_input, In dexerNodeBaseType *p) { static inline void set_indexer_node_pointer(PortTuple &my_input, In dexerNodeBaseType *p) {
typedef typename tuple_element<N-1, TupleTypes>::type T; typedef typename tuple_element<N-1, TupleTypes>::type T;
task *(*indexer_node_put_task)(const T&, void *) = do_try_put<I ndexerNodeBaseType, T, N-1>; task *(*indexer_node_put_task)(const T&, void *) = do_try_put<I ndexerNodeBaseType, T, N-1>;
tbb::flow::get<N-1>(my_input).set_up(p, indexer_node_put_task); tbb::flow::get<N-1>(my_input).set_up(p, indexer_node_put_task);
indexer_helper<TupleTypes,N-1>::template set_indexer_node_point er<IndexerNodeBaseType,PortTuple>(my_input, p); indexer_helper<TupleTypes,N-1>::template set_indexer_node_point er<IndexerNodeBaseType,PortTuple>(my_input, p);
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
template<typename InputTuple> template<typename InputTuple>
static inline void reset_inputs(InputTuple &my_input, reset_flags f ) { static inline void reset_inputs(InputTuple &my_input, reset_flags f ) {
join_helper<N-1>::reset_inputs(my_input, f); indexer_helper<TupleTypes,N-1>::reset_inputs(my_input, f);
tbb::flow::get<N-1>(my_input).reset_receiver(f); tbb::flow::get<N-1>(my_input).reset_receiver(f);
} }
template<typename InputTuple>
static inline void extract(InputTuple &my_input) {
indexer_helper<TupleTypes,N-1>::extract(my_input);
tbb::flow::get<N-1>(my_input).extract_receiver();
}
#endif #endif
}; };
template<typename TupleTypes> template<typename TupleTypes>
struct indexer_helper<TupleTypes,1> { struct indexer_helper<TupleTypes,1> {
template<typename IndexerNodeBaseType, typename PortTuple> template<typename IndexerNodeBaseType, typename PortTuple>
static inline void set_indexer_node_pointer(PortTuple &my_input, In dexerNodeBaseType *p) { static inline void set_indexer_node_pointer(PortTuple &my_input, In dexerNodeBaseType *p) {
typedef typename tuple_element<0, TupleTypes>::type T; typedef typename tuple_element<0, TupleTypes>::type T;
task *(*indexer_node_put_task)(const T&, void *) = do_try_put<I ndexerNodeBaseType, T, 0>; task *(*indexer_node_put_task)(const T&, void *) = do_try_put<I ndexerNodeBaseType, T, 0>;
tbb::flow::get<0>(my_input).set_up(p, indexer_node_put_task); tbb::flow::get<0>(my_input).set_up(p, indexer_node_put_task);
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
template<typename InputTuple> template<typename InputTuple>
static inline void reset_inputs(InputTuple &my_input, reset_flags f ) { static inline void reset_inputs(InputTuple &my_input, reset_flags f ) {
tbb::flow::get<0>(my_input).reset_receiver(f); tbb::flow::get<0>(my_input).reset_receiver(f);
} }
template<typename InputTuple>
static inline void extract(InputTuple &my_input) {
tbb::flow::get<0>(my_input).extract_receiver();
}
#endif #endif
}; };
template<typename T> template<typename T>
class indexer_input_port : public receiver<T> { class indexer_input_port : public receiver<T> {
private: private:
void* my_indexer_ptr; void* my_indexer_ptr;
typedef task* (* forward_function_ptr)(T const &, void* ); typedef task* (* forward_function_ptr)(T const &, void* );
forward_function_ptr my_try_put_task; forward_function_ptr my_try_put_task;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
spin_mutex my_pred_mutex; spin_mutex my_pred_mutex;
edge_container<sender<T> > my_built_predecessors; typedef typename receiver<T>::built_predecessors_type built_predece
ssors_type;
built_predecessors_type my_built_predecessors;
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
public: public:
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
indexer_input_port() : my_pred_mutex() {} indexer_input_port() : my_pred_mutex() {}
indexer_input_port( const indexer_input_port & /*other*/ ) : receiv er<T>(), my_pred_mutex() { indexer_input_port( const indexer_input_port & /*other*/ ) : receiv er<T>(), my_pred_mutex() {
} }
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
void set_up(void *p, forward_function_ptr f) { void set_up(void *p, forward_function_ptr f) {
my_indexer_ptr = p; my_indexer_ptr = p;
my_try_put_task = f; my_try_put_task = f;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename receiver<T>::predecessor_list_type predecessor_lis t_type; typedef typename receiver<T>::predecessor_list_type predecessor_lis t_type;
/*override*/ built_predecessors_type &built_predecessors() { return
my_built_predecessors; }
/*override*/size_t predecessor_count() { /*override*/size_t predecessor_count() {
spin_mutex::scoped_lock l(my_pred_mutex); spin_mutex::scoped_lock l(my_pred_mutex);
return my_built_predecessors.edge_count(); return my_built_predecessors.edge_count();
} }
/*override*/void internal_add_built_predecessor(sender<T> &p) { /*override*/void internal_add_built_predecessor(sender<T> &p) {
spin_mutex::scoped_lock l(my_pred_mutex); spin_mutex::scoped_lock l(my_pred_mutex);
my_built_predecessors.add_edge(p); my_built_predecessors.add_edge(p);
} }
/*override*/void internal_delete_built_predecessor(sender<T> &p) { /*override*/void internal_delete_built_predecessor(sender<T> &p) {
spin_mutex::scoped_lock l(my_pred_mutex); spin_mutex::scoped_lock l(my_pred_mutex);
my_built_predecessors.delete_edge(p); my_built_predecessors.delete_edge(p);
} }
/*override*/void copy_predecessors( predecessor_list_type &v) { /*override*/void copy_predecessors( predecessor_list_type &v) {
spin_mutex::scoped_lock l(my_pred_mutex); spin_mutex::scoped_lock l(my_pred_mutex);
return my_built_predecessors.copy_edges(v); return my_built_predecessors.copy_edges(v);
} }
/*override*/void clear_predecessors() {
spin_mutex::scoped_lock l(my_pred_mutex);
my_built_predecessors.clear();
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
protected: protected:
template< typename R, typename B > friend class run_and_put_task; template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_c ache; template<typename X, typename Y> friend class internal::broadcast_c ache;
template<typename X, typename Y> friend class internal::round_robin _cache; template<typename X, typename Y> friend class internal::round_robin _cache;
task *try_put_task(const T &v) { task *try_put_task(const T &v) {
return my_try_put_task(v, my_indexer_ptr); return my_try_put_task(v, my_indexer_ptr);
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
public: public:
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) {
if(f&rf_extract) my_built_predecessors.receiver_extract(*this); if(f&rf_clear_edges) my_built_predecessors.clear();
} }
void extract_receiver() { my_built_predecessors.receiver_extract(*t his); }
#else #else
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f */)) { } /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f */)) { }
#endif #endif
}; };
template<typename InputTuple, typename OutputType, typename StructTypes > template<typename InputTuple, typename OutputType, typename StructTypes >
class indexer_node_FE { class indexer_node_FE {
public: public:
static const int N = tbb::flow::tuple_size<InputTuple>::value; static const int N = tbb::flow::tuple_size<InputTuple>::value;
skipping to change at line 160 skipping to change at line 178
public sender<OutputType> { public sender<OutputType> {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
public: public:
static const size_t N = tbb::flow::tuple_size<InputTuple>::value; static const size_t N = tbb::flow::tuple_size<InputTuple>::value;
typedef OutputType output_type; typedef OutputType output_type;
typedef StructTypes tuple_types; typedef StructTypes tuple_types;
typedef receiver<output_type> successor_type; typedef receiver<output_type> successor_type;
typedef indexer_node_FE<InputTuple, output_type,StructTypes> input_ ports_type; typedef indexer_node_FE<InputTuple, output_type,StructTypes> input_ ports_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename sender<output_type>::built_successors_type built_s uccessors_type;
typedef typename sender<output_type>::successor_list_type successor _list_type; typedef typename sender<output_type>::successor_list_type successor _list_type;
#endif #endif
private: private:
// ----------- Aggregator ------------ // ----------- Aggregator ------------
enum op_type { reg_succ, rem_succ, try__put_task enum op_type { reg_succ, rem_succ, try__put_task
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
, add_blt_succ, del_blt_succ, , add_blt_succ, del_blt_succ,
blt_succ_cnt, blt_succ_cpy blt_succ_cnt, blt_succ_cpy
#endif #endif
skipping to change at line 272 skipping to change at line 291
return op_data.status == SUCCEEDED; return op_data.status == SUCCEEDED;
} }
task * try_put_task(output_type const *v) { task * try_put_task(output_type const *v) {
indexer_node_base_operation op_data(v, try__put_task); indexer_node_base_operation op_data(v, try__put_task);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.bypass_t; return op_data.bypass_t;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
built_successors_type &built_successors() { return my_successors.bu
ilt_successors(); }
void internal_add_built_successor( successor_type &r) { void internal_add_built_successor( successor_type &r) {
indexer_node_base_operation op_data(r, add_blt_succ); indexer_node_base_operation op_data(r, add_blt_succ);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
void internal_delete_built_successor( successor_type &r) { void internal_delete_built_successor( successor_type &r) {
indexer_node_base_operation op_data(r, del_blt_succ); indexer_node_base_operation op_data(r, del_blt_succ);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
skipping to change at line 293 skipping to change at line 315
indexer_node_base_operation op_data(blt_succ_cnt); indexer_node_base_operation op_data(blt_succ_cnt);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.cnt_val; return op_data.cnt_val;
} }
void copy_successors( successor_list_type &v) { void copy_successors( successor_list_type &v) {
indexer_node_base_operation op_data(blt_succ_cpy); indexer_node_base_operation op_data(blt_succ_cpy);
op_data.succv = &v; op_data.succv = &v;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
void extract() {
my_successors.built_successors().sender_extract(*this);
indexer_helper<StructTypes,N>::extract(this->my_inputs);
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
protected: protected:
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) {
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_successors.reset(f); if(f & rf_clear_edges) {
indexer_helper<StructTypes,N>::reset_inputs(this->my_inputs, f) my_successors.clear();
; indexer_helper<StructTypes,N>::reset_inputs(this->my_inputs
,f);
}
#endif #endif
} }
private: private:
broadcast_cache<output_type, null_rw_mutex> my_successors; broadcast_cache<output_type, null_rw_mutex> my_successors;
}; //indexer_node_base }; //indexer_node_base
template<int N, typename InputTuple> struct input_types; template<int N, typename InputTuple> struct input_types;
template<typename InputTuple> template<typename InputTuple>
 End of changes. 13 change blocks. 
7 lines changed or deleted 38 lines changed or added


 _flow_graph_join_impl.h   _flow_graph_join_impl.h 
skipping to change at line 127 skipping to change at line 127
tbb::flow::get<N-1>(my_inputs).set_my_original_tag_func(tbb ::flow::get<N-1>(other_inputs).my_original_func()->clone()); tbb::flow::get<N-1>(my_inputs).set_my_original_tag_func(tbb ::flow::get<N-1>(other_inputs).my_original_func()->clone());
} }
join_helper<N-1>::copy_tag_functors(my_inputs, other_inputs); join_helper<N-1>::copy_tag_functors(my_inputs, other_inputs);
} }
template<typename InputTuple> template<typename InputTuple>
static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE T_ARG(__TBB_COMMA reset_flags f)) { static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE T_ARG(__TBB_COMMA reset_flags f)) {
join_helper<N-1>::reset_inputs(my_input __TBB_PFG_RESET_ARG(__T BB_COMMA f)); join_helper<N-1>::reset_inputs(my_input __TBB_PFG_RESET_ARG(__T BB_COMMA f));
tbb::flow::get<N-1>(my_input).reset_receiver(__TBB_PFG_RESET_AR G(f)); tbb::flow::get<N-1>(my_input).reset_receiver(__TBB_PFG_RESET_AR G(f));
} }
};
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
template<typename InputTuple>
static inline void extract_inputs(InputTuple &my_input) {
join_helper<N-1>::extract_inputs(my_input);
tbb::flow::get<N-1>(my_input).extract_receiver();
}
#endif
}; // join_helper<N>
template< > template< >
struct join_helper<1> { struct join_helper<1> {
template< typename TupleType, typename PortType > template< typename TupleType, typename PortType >
static inline void set_join_node_pointer(TupleType &my_input, PortT ype *port) { static inline void set_join_node_pointer(TupleType &my_input, PortT ype *port) {
tbb::flow::get<0>( my_input ).set_join_node_pointer(port); tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
} }
template< typename TupleType > template< typename TupleType >
skipping to change at line 195 skipping to change at line 203
static inline void copy_tag_functors(TagFuncTuple1 &my_inputs, TagF uncTuple2 &other_inputs) { static inline void copy_tag_functors(TagFuncTuple1 &my_inputs, TagF uncTuple2 &other_inputs) {
if(tbb::flow::get<0>(other_inputs).my_original_func()) { if(tbb::flow::get<0>(other_inputs).my_original_func()) {
tbb::flow::get<0>(my_inputs).set_my_tag_func(tbb::flow::get <0>(other_inputs).my_original_func()->clone()); tbb::flow::get<0>(my_inputs).set_my_tag_func(tbb::flow::get <0>(other_inputs).my_original_func()->clone());
tbb::flow::get<0>(my_inputs).set_my_original_tag_func(tbb:: flow::get<0>(other_inputs).my_original_func()->clone()); tbb::flow::get<0>(my_inputs).set_my_original_tag_func(tbb:: flow::get<0>(other_inputs).my_original_func()->clone());
} }
} }
template<typename InputTuple> template<typename InputTuple>
static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE T_ARG(__TBB_COMMA reset_flags f)) { static inline void reset_inputs(InputTuple &my_input __TBB_PFG_RESE T_ARG(__TBB_COMMA reset_flags f)) {
tbb::flow::get<0>(my_input).reset_receiver(__TBB_PFG_RESET_ARG( f)); tbb::flow::get<0>(my_input).reset_receiver(__TBB_PFG_RESET_ARG( f));
} }
};
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
template<typename InputTuple>
static inline void extract_inputs(InputTuple &my_input) {
tbb::flow::get<0>(my_input).extract_receiver();
}
#endif
}; // join_helper<1>
//! The two-phase join port //! The two-phase join port
template< typename T > template< typename T >
class reserving_port : public receiver<T> { class reserving_port : public receiver<T> {
public: public:
typedef T input_type; typedef T input_type;
typedef sender<T> predecessor_type; typedef sender<T> predecessor_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type; typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type;
typedef typename receiver<input_type>::built_predecessors_type buil t_predecessors_type;
#endif #endif
private: private:
// ----------- Aggregator ------------ // ----------- Aggregator ------------
enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
, add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
#endif #endif
}; };
enum op_stat {WAIT=0, SUCCEEDED, FAILED}; enum op_stat {WAIT=0, SUCCEEDED, FAILED};
typedef reserving_port<T> my_class; typedef reserving_port<T> my_class;
skipping to change at line 367 skipping to change at line 383
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
//! Complete use of the port //! Complete use of the port
void consume( ) { void consume( ) {
reserving_port_operation op_data(con_res); reserving_port_operation op_data(con_res);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/ built_predecessors_type &built_predecessors() { return my_predecessors.built_predecessors(); }
/*override*/void internal_add_built_predecessor(predecessor_type &s rc) { /*override*/void internal_add_built_predecessor(predecessor_type &s rc) {
reserving_port_operation op_data(src, add_blt_pred); reserving_port_operation op_data(src, add_blt_pred);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
/*override*/void internal_delete_built_predecessor(predecessor_type &src) { /*override*/void internal_delete_built_predecessor(predecessor_type &src) {
reserving_port_operation op_data(src, del_blt_pred); reserving_port_operation op_data(src, del_blt_pred);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
skipping to change at line 388 skipping to change at line 405
reserving_port_operation op_data(blt_pred_cnt); reserving_port_operation op_data(blt_pred_cnt);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.cnt_val; return op_data.cnt_val;
} }
/*override*/void copy_predecessors(predecessor_list_type &l) { /*override*/void copy_predecessors(predecessor_list_type &l) {
reserving_port_operation op_data(blt_pred_cpy); reserving_port_operation op_data(blt_pred_cpy);
op_data.plist = &l; op_data.plist = &l;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
void extract_receiver() {
my_predecessors.built_predecessors().receiver_extract(*this);
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
/*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) ) { /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) ) {
my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
if(f & rf_clear_edges) my_predecessors.clear();
else
#endif
my_predecessors.reset();
reserved = false; reserved = false;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
__TBB_ASSERT(!(f&rf_extract) || my_predecessors.empty(), "port edges not removed"); __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "p ort edges not removed");
#endif #endif
} }
private: private:
forwarding_base *my_join; forwarding_base *my_join;
reservable_predecessor_cache< T, null_mutex > my_predecessors; reservable_predecessor_cache< T, null_mutex > my_predecessors;
bool reserved; bool reserved;
}; }; // reserving_port
//! queueing join_port //! queueing join_port
template<typename T> template<typename T>
class queueing_port : public receiver<T>, public item_buffer<T> { class queueing_port : public receiver<T>, public item_buffer<T> {
public: public:
typedef T input_type; typedef T input_type;
typedef sender<T> predecessor_type; typedef sender<T> predecessor_type;
typedef queueing_port<T> my_node_type; typedef queueing_port<T> my_node_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename receiver<input_type>::built_predecessors_type buil t_predecessors_type;
typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type; typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type;
#endif #endif
// ----------- Aggregator ------------ // ----------- Aggregator ------------
private: private:
enum op_type { get__item, res_port, try__put_task enum op_type { get__item, res_port, try__put_task
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
, add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
#endif #endif
}; };
skipping to change at line 560 skipping to change at line 587
// reset_port is called when item is accepted by successor, but // reset_port is called when item is accepted by successor, but
// is initiated by join_node. // is initiated by join_node.
void reset_port() { void reset_port() {
queueing_port_operation op_data(res_port); queueing_port_operation op_data(res_port);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return; return;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/ built_predecessors_type &built_predecessors() { return
my_built_predecessors; }
/*override*/void internal_add_built_predecessor(sender<T> &p) { /*override*/void internal_add_built_predecessor(sender<T> &p) {
queueing_port_operation op_data(add_blt_pred); queueing_port_operation op_data(add_blt_pred);
op_data.pred = &p; op_data.pred = &p;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
/*override*/void internal_delete_built_predecessor(sender<T> &p) { /*override*/void internal_delete_built_predecessor(sender<T> &p) {
queueing_port_operation op_data(del_blt_pred); queueing_port_operation op_data(del_blt_pred);
op_data.pred = &p; op_data.pred = &p;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
skipping to change at line 584 skipping to change at line 613
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.cnt_val; return op_data.cnt_val;
} }
/*override*/void copy_predecessors(predecessor_list_type &l) { /*override*/void copy_predecessors(predecessor_list_type &l) {
queueing_port_operation op_data(blt_pred_cpy); queueing_port_operation op_data(blt_pred_cpy);
op_data.plist = &l; op_data.plist = &l;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
void extract_receiver() {
item_buffer<T>::reset();
my_built_predecessors.receiver_extract(*this);
}
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) {
item_buffer<T>::reset(); item_buffer<T>::reset();
if (f & rf_extract) if (f & rf_clear_edges)
my_built_predecessors.receiver_extract(*this); my_built_predecessors.clear();
} }
#else #else
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f */)) { item_buffer<T>::reset(); } /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f */)) { item_buffer<T>::reset(); }
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
private: private:
forwarding_base *my_join; forwarding_base *my_join;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
edge_container<sender<T> > my_built_predecessors; edge_container<sender<T> > my_built_predecessors;
#endif #endif
}; }; // queueing_port
#include "_flow_graph_tagged_buffer_impl.h" #include "_flow_graph_tagged_buffer_impl.h"
template< typename T > template< typename T >
class tag_matching_port : public receiver<T>, public tagged_buffer< tag _value, T, NO_TAG > { class tag_matching_port : public receiver<T>, public tagged_buffer< tag _value, T, NO_TAG > {
public: public:
typedef T input_type; typedef T input_type;
typedef sender<T> predecessor_type; typedef sender<T> predecessor_type;
typedef tag_matching_port<T> my_node_type; // for forwarding, if n eeded typedef tag_matching_port<T> my_node_type; // for forwarding, if n eeded
typedef function_body<input_type, tag_value> my_tag_func_type; typedef function_body<input_type, tag_value> my_tag_func_type;
typedef tagged_buffer<tag_value,T,NO_TAG> my_buffer_type; typedef tagged_buffer<tag_value,T,NO_TAG> my_buffer_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename receiver<input_type>::built_predecessors_type buil t_predecessors_type;
typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type; typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type;
#endif #endif
private: private:
// ----------- Aggregator ------------ // ----------- Aggregator ------------
private: private:
enum op_type { try__put, get__item, res_port, enum op_type { try__put, get__item, res_port,
add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
}; };
enum op_stat {WAIT=0, SUCCEEDED, FAILED}; enum op_stat {WAIT=0, SUCCEEDED, FAILED};
typedef tag_matching_port<T> my_class; typedef tag_matching_port<T> my_class;
skipping to change at line 752 skipping to change at line 787
my_tag_func = f; my_tag_func = f;
} }
bool get_item( T &v ) { bool get_item( T &v ) {
tag_matching_port_operation op_data(&v, get__item); tag_matching_port_operation op_data(&v, get__item);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.status == SUCCEEDED; return op_data.status == SUCCEEDED;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/built_predecessors_type &built_predecessors() { return
my_built_predecessors; }
/*override*/void internal_add_built_predecessor(sender<T> &p) { /*override*/void internal_add_built_predecessor(sender<T> &p) {
tag_matching_port_operation op_data(add_blt_pred); tag_matching_port_operation op_data(add_blt_pred);
op_data.pred = &p; op_data.pred = &p;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
/*override*/void internal_delete_built_predecessor(sender<T> &p) { /*override*/void internal_delete_built_predecessor(sender<T> &p) {
tag_matching_port_operation op_data(del_blt_pred); tag_matching_port_operation op_data(del_blt_pred);
op_data.pred = &p; op_data.pred = &p;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
skipping to change at line 789 skipping to change at line 826
void reset_port() { void reset_port() {
tag_matching_port_operation op_data(res_port); tag_matching_port_operation op_data(res_port);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return; return;
} }
my_tag_func_type *my_func() { return my_tag_func; } my_tag_func_type *my_func() { return my_tag_func; }
my_tag_func_type *my_original_func() { return my_original_tag_func; } my_tag_func_type *my_original_func() { return my_original_tag_func; }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
void extract_receiver() {
my_buffer_type::reset();
my_built_predecessors.receiver_extract(*this);
}
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) {
my_buffer_type::reset(); my_buffer_type::reset();
if (f & rf_extract) if (f & rf_clear_edges)
my_built_predecessors.receiver_extract(*this); my_built_predecessors.clear();
} }
#else #else
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f */)) { my_buffer_type::reset(); } /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f */)) { my_buffer_type::reset(); }
#endif #endif
private: private:
// need map of tags to values // need map of tags to values
forwarding_base *my_join; forwarding_base *my_join;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
edge_container<predecessor_type> my_built_predecessors; edge_container<predecessor_type> my_built_predecessors;
skipping to change at line 865 skipping to change at line 907
input_type &input_ports() { return my_inputs; } input_type &input_ports() { return my_inputs; }
protected: protected:
void reset( __TBB_PFG_RESET_ARG( reset_flags f)) { void reset( __TBB_PFG_RESET_ARG( reset_flags f)) {
// called outside of parallel contexts // called outside of parallel contexts
ports_with_no_inputs = N; ports_with_no_inputs = N;
join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T BB_COMMA f)); join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T BB_COMMA f));
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
void extract( ) {
// called outside of parallel contexts
ports_with_no_inputs = N;
join_helper<N>::extract_inputs(my_inputs);
}
#endif
// all methods on input ports should be called under mutual exclusi on from join_node_base. // all methods on input ports should be called under mutual exclusi on from join_node_base.
bool tuple_build_may_succeed() { bool tuple_build_may_succeed() {
return !ports_with_no_inputs; return !ports_with_no_inputs;
} }
bool try_to_make_tuple(output_type &out) { bool try_to_make_tuple(output_type &out) {
if(ports_with_no_inputs) return false; if(ports_with_no_inputs) return false;
return join_helper<N>::reserve(my_inputs, out); return join_helper<N>::reserve(my_inputs, out);
} }
skipping to change at line 886 skipping to change at line 936
void tuple_accepted() { void tuple_accepted() {
join_helper<N>::consume_reservations(my_inputs); join_helper<N>::consume_reservations(my_inputs);
} }
void tuple_rejected() { void tuple_rejected() {
join_helper<N>::release_reservations(my_inputs); join_helper<N>::release_reservations(my_inputs);
} }
input_type my_inputs; input_type my_inputs;
my_node_type *my_node; my_node_type *my_node;
atomic<size_t> ports_with_no_inputs; atomic<size_t> ports_with_no_inputs;
}; }; // join_node_FE<reserving, ... >
template<typename InputTuple, typename OutputTuple> template<typename InputTuple, typename OutputTuple>
class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi ng_base { class join_node_FE<queueing, InputTuple, OutputTuple> : public forwardi ng_base {
public: public:
static const int N = tbb::flow::tuple_size<OutputTuple>::value; static const int N = tbb::flow::tuple_size<OutputTuple>::value;
typedef OutputTuple output_type; typedef OutputTuple output_type;
typedef InputTuple input_type; typedef InputTuple input_type;
typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t ype; // for forwarding typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_t ype; // for forwarding
join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) { join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
skipping to change at line 939 skipping to change at line 989
input_type &input_ports() { return my_inputs; } input_type &input_ports() { return my_inputs; }
protected: protected:
void reset( __TBB_PFG_RESET_ARG( reset_flags f)) { void reset( __TBB_PFG_RESET_ARG( reset_flags f)) {
reset_port_count(); reset_port_count();
join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T BB_COMMA f) ); join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T BB_COMMA f) );
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
void extract() {
reset_port_count();
join_helper<N>::extract_inputs(my_inputs);
}
#endif
// all methods on input ports should be called under mutual exclusi on from join_node_base. // all methods on input ports should be called under mutual exclusi on from join_node_base.
bool tuple_build_may_succeed() { bool tuple_build_may_succeed() {
return !ports_with_no_items; return !ports_with_no_items;
} }
bool try_to_make_tuple(output_type &out) { bool try_to_make_tuple(output_type &out) {
if(ports_with_no_items) return false; if(ports_with_no_items) return false;
return join_helper<N>::get_items(my_inputs, out); return join_helper<N>::get_items(my_inputs, out);
} }
skipping to change at line 961 skipping to change at line 1017
reset_port_count(); reset_port_count();
join_helper<N>::reset_ports(my_inputs); join_helper<N>::reset_ports(my_inputs);
} }
void tuple_rejected() { void tuple_rejected() {
// nothing to do. // nothing to do.
} }
input_type my_inputs; input_type my_inputs;
my_node_type *my_node; my_node_type *my_node;
atomic<size_t> ports_with_no_items; atomic<size_t> ports_with_no_items;
}; }; // join_node_FE<queueing, ...>
// tag_matching join input port. // tag_matching join input port.
template<typename InputTuple, typename OutputTuple> template<typename InputTuple, typename OutputTuple>
class join_node_FE<tag_matching, InputTuple, OutputTuple> : public forw arding_base, class join_node_FE<tag_matching, InputTuple, OutputTuple> : public forw arding_base,
// buffer of tag value counts buffer of output items // buffer of tag value counts buffer of output items
public tagged_buffer<tag_value, size_t, NO_TAG>, public item_b uffer<OutputTuple> { public tagged_buffer<tag_value, size_t, NO_TAG>, public item_b uffer<OutputTuple> {
public: public:
static const int N = tbb::flow::tuple_size<OutputTuple>::value; static const int N = tbb::flow::tuple_size<OutputTuple>::value;
typedef OutputTuple output_type; typedef OutputTuple output_type;
typedef InputTuple input_type; typedef InputTuple input_type;
skipping to change at line 1135 skipping to change at line 1191
void reset( __TBB_PFG_RESET_ARG( reset_flags f )) { void reset( __TBB_PFG_RESET_ARG( reset_flags f )) {
// called outside of parallel contexts // called outside of parallel contexts
join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T BB_COMMA f)); join_helper<N>::reset_inputs(my_inputs __TBB_PFG_RESET_ARG( __T BB_COMMA f));
my_tag_buffer::reset(); // have to reset the tag counts my_tag_buffer::reset(); // have to reset the tag counts
output_buffer_type::reset(); // also the queue of outputs output_buffer_type::reset(); // also the queue of outputs
my_node->current_tag = NO_TAG; my_node->current_tag = NO_TAG;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
void extract() {
// called outside of parallel contexts
join_helper<N>::extract_inputs(my_inputs);
my_tag_buffer::reset(); // have to reset the tag counts
output_buffer_type::reset(); // also the queue of outputs
my_node->current_tag = NO_TAG;
}
#endif
// all methods on input ports should be called under mutual exclusi on from join_node_base. // all methods on input ports should be called under mutual exclusi on from join_node_base.
bool tuple_build_may_succeed() { // called from back-end bool tuple_build_may_succeed() { // called from back-end
tag_matching_FE_operation op_data(may_succeed); tag_matching_FE_operation op_data(may_succeed);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.status == SUCCEEDED; return op_data.status == SUCCEEDED;
} }
// cannot lock while calling back to input_ports. current_tag will only be set // cannot lock while calling back to input_ports. current_tag will only be set
// and reset under the aggregator, so it will remain consistent. // and reset under the aggregator, so it will remain consistent.
skipping to change at line 1179 skipping to change at line 1244
public: public:
typedef OutputTuple output_type; typedef OutputTuple output_type;
typedef receiver<output_type> successor_type; typedef receiver<output_type> successor_type;
typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type; typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
using input_ports_type::tuple_build_may_succeed; using input_ports_type::tuple_build_may_succeed;
using input_ports_type::try_to_make_tuple; using input_ports_type::try_to_make_tuple;
using input_ports_type::tuple_accepted; using input_ports_type::tuple_accepted;
using input_ports_type::tuple_rejected; using input_ports_type::tuple_rejected;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename sender<output_type>::built_successors_type built_s uccessors_type;
typedef typename sender<output_type>::successor_list_type successor _list_type; typedef typename sender<output_type>::successor_list_type successor _list_type;
#endif #endif
private: private:
// ----------- Aggregator ------------ // ----------- Aggregator ------------
enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypas s enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypas s
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
, add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
#endif #endif
}; };
skipping to change at line 1337 skipping to change at line 1403
return op_data.status == SUCCEEDED; return op_data.status == SUCCEEDED;
} }
bool try_get( output_type &v) { bool try_get( output_type &v) {
join_node_base_operation op_data(v, try__get); join_node_base_operation op_data(v, try__get);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.status == SUCCEEDED; return op_data.status == SUCCEEDED;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/built_successors_type &built_successors() { return my_s
uccessors.built_successors(); }
/*override*/void internal_add_built_successor( successor_type &r) { /*override*/void internal_add_built_successor( successor_type &r) {
join_node_base_operation op_data(r, add_blt_succ); join_node_base_operation op_data(r, add_blt_succ);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
/*override*/void internal_delete_built_successor( successor_type &r ) { /*override*/void internal_delete_built_successor( successor_type &r ) {
join_node_base_operation op_data(r, del_blt_succ); join_node_base_operation op_data(r, del_blt_succ);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
skipping to change at line 1360 skipping to change at line 1428
return op_data.cnt_val; return op_data.cnt_val;
} }
/*override*/ void copy_successors(successor_list_type &l) { /*override*/ void copy_successors(successor_list_type &l) {
join_node_base_operation op_data(blt_succ_cpy); join_node_base_operation op_data(blt_succ_cpy);
op_data.slist = &l; op_data.slist = &l;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/void extract() {
input_ports_type::extract();
my_successors.built_successors().sender_extract(*this);
}
#endif
protected: protected:
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) {
input_ports_type::reset(__TBB_PFG_RESET_ARG(f)); input_ports_type::reset(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_successors.reset(f); if(f & rf_clear_edges) my_successors.clear();
#endif #endif
} }
private: private:
broadcast_cache<output_type, null_rw_mutex> my_successors; broadcast_cache<output_type, null_rw_mutex> my_successors;
friend class forward_task_bypass< join_node_base<JP, InputTuple, Ou tputTuple> >; friend class forward_task_bypass< join_node_base<JP, InputTuple, Ou tputTuple> >;
task *forward_task() { task *forward_task() {
join_node_base_operation op_data(do_fwrd_bypass); join_node_base_operation op_data(do_fwrd_bypass);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
 End of changes. 27 change blocks. 
14 lines changed or deleted 92 lines changed or added


 _flow_graph_node_impl.h   _flow_graph_node_impl.h 
skipping to change at line 68 skipping to change at line 68
blt_pred_cnt, blt_pred_cpy // create vector copies of preds a nd succs blt_pred_cnt, blt_pred_cpy // create vector copies of preds a nd succs
#endif #endif
}; };
typedef function_input_base<Input, A, ImplType> my_class; typedef function_input_base<Input, A, ImplType> my_class;
public: public:
//! The input type of this receiver //! The input type of this receiver
typedef Input input_type; typedef Input input_type;
typedef sender<Input> predecessor_type; typedef sender<Input> predecessor_type;
typedef predecessor_cache<input_type, null_mutex > predecessor_cach
e_type;
typedef function_input_queue<input_type, A> input_queue_type;
typedef typename A::template rebind< input_queue_type >::other queu
e_allocator_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename predecessor_cache_type::built_predecessors_type bu ilt_predecessors_type;
typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type; typedef typename receiver<input_type>::predecessor_list_type predec essor_list_type;
#endif #endif
//! Constructor for function_input_base //! Constructor for function_input_base
function_input_base( graph &g, size_t max_concurrency, function_inp ut_queue<input_type,A> *q = NULL ) function_input_base( graph &g, size_t max_concurrency, input_queue_ type *q = NULL)
: my_graph(g), my_max_concurrency(max_concurrency), my_concurre ncy(0), : my_graph(g), my_max_concurrency(max_concurrency), my_concurre ncy(0),
my_queue(q), forwarder_busy(false) { my_queue(q), forwarder_busy(false) {
my_predecessors.set_owner(this); my_predecessors.set_owner(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
//! Copy constructor //! Copy constructor
function_input_base( const function_input_base& src, function_input _queue<input_type,A> *q = NULL ) : function_input_base( const function_input_base& src, input_queue_ty pe *q = NULL) :
receiver<Input>(), tbb::internal::no_assign(), receiver<Input>(), tbb::internal::no_assign(),
my_graph(src.my_graph), my_max_concurrency(src.my_max_concurren cy), my_graph(src.my_graph), my_max_concurrency(src.my_max_concurren cy),
my_concurrency(0), my_queue(q), forwarder_busy(false) my_concurrency(0), my_queue(q), forwarder_busy(false)
{ {
my_predecessors.set_owner(this); my_predecessors.set_owner(this);
my_aggregator.initialize_handler(my_handler(this)); my_aggregator.initialize_handler(my_handler(this));
} }
//! Destructor //! Destructor
// The queue is allocated by the constructor for {multi}function_no
de.
// TODO: pass the graph_buffer_policy to the base so it can allocat
e the queue instead.
// This would be an interface-breaking change.
virtual ~function_input_base() { virtual ~function_input_base() {
if ( my_queue ) delete my_queue; if ( my_queue ) delete my_queue;
} }
//! Put to the node, returning a task if available //! Put to the node, returning a task if available
virtual task * try_put_task( const input_type &t ) { virtual task * try_put_task( const input_type &t ) {
if ( my_max_concurrency == 0 ) { if ( my_max_concurrency == 0 ) {
return create_body_task( t ); return create_body_task( t );
} else { } else {
my_operation op_data(t, tryput_bypass); my_operation op_data(t, tryput_bypass);
skipping to change at line 152 skipping to change at line 159
my_operation op_data(blt_pred_cnt); my_operation op_data(blt_pred_cnt);
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
return op_data.cnt_val; return op_data.cnt_val;
} }
/*override*/ void copy_predecessors(predecessor_list_type &v) { /*override*/ void copy_predecessors(predecessor_list_type &v) {
my_operation op_data(blt_pred_cpy); my_operation op_data(blt_pred_cpy);
op_data.predv = &v; op_data.predv = &v;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
/*override*/built_predecessors_type &built_predecessors() {
return my_predecessors.built_predecessors();
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
protected: protected:
void reset_function_input_base( __TBB_PFG_RESET_ARG(reset_flags f)) { void reset_function_input_base( __TBB_PFG_RESET_ARG(reset_flags f)) {
my_concurrency = 0; my_concurrency = 0;
if(my_queue) { if(my_queue) {
my_queue->reset(); my_queue->reset();
} }
reset_receiver(__TBB_PFG_RESET_ARG(f)); reset_receiver(__TBB_PFG_RESET_ARG(f));
forwarder_busy = false; forwarder_busy = false;
} }
graph& my_graph; graph& my_graph;
const size_t my_max_concurrency; const size_t my_max_concurrency;
size_t my_concurrency; size_t my_concurrency;
function_input_queue<input_type, A> *my_queue; input_queue_type *my_queue;
predecessor_cache<input_type, null_mutex > my_predecessors; predecessor_cache<input_type, null_mutex > my_predecessors;
/*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) ) { /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) ) {
my_predecessors.reset(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
__TBB_ASSERT(!(f & rf_extract) || my_predecessors.empty(), "fun if( f & rf_clear_edges) my_predecessors.clear();
ction_input_base reset failed"); else
#endif
my_predecessors.reset();
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
__TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(),
"function_input_base reset failed");
#endif #endif
} }
private: private:
friend class apply_body_task_bypass< my_class, input_type >; friend class apply_body_task_bypass< my_class, input_type >;
friend class forward_task_bypass< my_class >; friend class forward_task_bypass< my_class >;
class my_operation : public aggregated_operation< my_operation > { class my_operation : public aggregated_operation< my_operation > {
public: public:
skipping to change at line 409 skipping to change at line 424
class function_input : public function_input_base<Input, A, function_in put<Input,Output,A> > { class function_input : public function_input_base<Input, A, function_in put<Input,Output,A> > {
public: public:
typedef Input input_type; typedef Input input_type;
typedef Output output_type; typedef Output output_type;
typedef function_input<Input,Output,A> my_class; typedef function_input<Input,Output,A> my_class;
typedef function_input_base<Input, A, my_class> base_type; typedef function_input_base<Input, A, my_class> base_type;
typedef function_input_queue<input_type, A> input_queue_type; typedef function_input_queue<input_type, A> input_queue_type;
// constructor // constructor
template<typename Body> template<typename Body>
function_input( graph &g, size_t max_concurrency, Body& body, funct ion_input_queue<input_type,A> *q = NULL ) : function_input( graph &g, size_t max_concurrency, Body& body, input _queue_type *q = NULL ) :
base_type(g, max_concurrency, q), base_type(g, max_concurrency, q),
my_body( new internal::function_body_leaf< input_type, output_t ype, Body>(body) ) { my_body( new internal::function_body_leaf< input_type, output_t ype, Body>(body) ) {
} }
//! Copy constructor //! Copy constructor
function_input( const function_input& src, input_queue_type *q = NU LL ) : function_input( const function_input& src, input_queue_type *q = NU LL ) :
base_type(src, q), base_type(src, q),
my_body( src.my_body->clone() ) { my_body( src.my_body->clone() ) {
} }
skipping to change at line 459 skipping to change at line 474
if(f & rf_reset_bodies) my_body->reset_body(); if(f & rf_reset_bodies) my_body->reset_body();
#endif #endif
} }
function_body<input_type, output_type> *my_body; function_body<input_type, output_type> *my_body;
virtual broadcast_cache<output_type > &successors() = 0; virtual broadcast_cache<output_type > &successors() = 0;
}; // function_input }; // function_input
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
// helper templates to reset the successor edges of the output ports of // helper templates to clear the successor edges of the output ports of
an multifunction_node an multifunction_node
template<int N> template<int N> struct clear_element {
struct reset_element { template<typename P> static void clear_this(P &p) {
template<typename P> (void)tbb::flow::get<N-1>(p).successors().clear();
static void reset_this(P &p, reset_flags f) { clear_element<N-1>::clear_this(p);
(void)tbb::flow::get<N-1>(p).successors().reset(f);
reset_element<N-1>::reset_this(p, f);
} }
template<typename P> template<typename P> static bool this_empty(P &p) {
static bool this_empty(P &p) {
if(tbb::flow::get<N-1>(p).successors().empty()) if(tbb::flow::get<N-1>(p).successors().empty())
return reset_element<N-1>::this_empty(p); return clear_element<N-1>::this_empty(p);
return false; return false;
} }
}; };
template<> template<> struct clear_element<1> {
struct reset_element<1> { template<typename P> static void clear_this(P &p) {
template<typename P> (void)tbb::flow::get<0>(p).successors().clear();
static void reset_this(P &p, reset_flags f) {
(void)tbb::flow::get<0>(p).successors().reset(f);
} }
template<typename P> template<typename P> static bool this_empty(P &p) {
static bool this_empty(P &p) {
return tbb::flow::get<0>(p).successors().empty(); return tbb::flow::get<0>(p).successors().empty();
} }
}; };
// helper templates to extract the output ports of an multifunction_nod
e from graph
template<int N> struct extract_element {
template<typename P> static void extract_this(P &p) {
(void)tbb::flow::get<N-1>(p).successors().built_successors().se
nder_extract(tbb::flow::get<N-1>(p));
extract_element<N-1>::extract_this(p);
}
};
template<> struct extract_element<1> {
template<typename P> static void extract_this(P &p) {
(void)tbb::flow::get<0>(p).successors().built_successors().send
er_extract(tbb::flow::get<0>(p));
}
};
#endif #endif
//! Implements methods for a function node that takes a type Input as i nput //! Implements methods for a function node that takes a type Input as i nput
// and has a tuple of output ports specified. // and has a tuple of output ports specified.
template< typename Input, typename OutputPortSet, typename A> template< typename Input, typename OutputPortSet, typename A>
class multifunction_input : public function_input_base<Input, A, multif unction_input<Input,OutputPortSet,A> > { class multifunction_input : public function_input_base<Input, A, multif unction_input<Input,OutputPortSet,A> > {
public: public:
static const int N = tbb::flow::tuple_size<OutputPortSet>::value; static const int N = tbb::flow::tuple_size<OutputPortSet>::value;
typedef Input input_type; typedef Input input_type;
typedef OutputPortSet output_ports_type; typedef OutputPortSet output_ports_type;
typedef multifunction_input<Input,OutputPortSet,A> my_class; typedef multifunction_input<Input,OutputPortSet,A> my_class;
typedef function_input_base<Input, A, my_class> base_type; typedef function_input_base<Input, A, my_class> base_type;
typedef function_input_queue<input_type, A> input_queue_type; typedef function_input_queue<input_type, A> input_queue_type;
// constructor // constructor
template<typename Body> template<typename Body>
multifunction_input( multifunction_input(
graph &g, graph &g,
size_t max_concurrency, size_t max_concurrency,
Body& body, Body& body,
function_input_queue<input_type,A> *q = NULL ) : input_queue_type *q = NULL ) :
base_type(g, max_concurrency, q), base_type(g, max_concurrency, q),
my_body( new internal::multifunction_body_leaf<input_type, outp ut_ports_type, Body>(body) ) { my_body( new internal::multifunction_body_leaf<input_type, outp ut_ports_type, Body>(body) ) {
} }
//! Copy constructor //! Copy constructor
multifunction_input( const multifunction_input& src, input_queue_ty pe *q = NULL ) : multifunction_input( const multifunction_input& src, input_queue_ty pe *q = NULL ) :
base_type(src, q), base_type(src, q),
my_body( src.my_body->clone() ) { my_body( src.my_body->clone() ) {
} }
skipping to change at line 540 skipping to change at line 563
tbb::internal::fgt_begin_body( my_body ); tbb::internal::fgt_begin_body( my_body );
(*my_body)(i, my_output_ports); (*my_body)(i, my_output_ports);
tbb::internal::fgt_end_body( my_body ); tbb::internal::fgt_end_body( my_body );
task * new_task = SUCCESSFULLY_ENQUEUED; task * new_task = SUCCESSFULLY_ENQUEUED;
return new_task; return new_task;
} }
output_ports_type &output_ports(){ return my_output_ports; } output_ports_type &output_ports(){ return my_output_ports; }
protected: protected:
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/void extract() {
extract_element<N>::extract_this(my_output_ports);
}
#endif
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
base_type::reset_function_input_base(__TBB_PFG_RESET_ARG(f)); base_type::reset_function_input_base(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
reset_element<N>::reset_this(my_output_ports, f); if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_po rts);
if(f & rf_reset_bodies) my_body->reset_body(); if(f & rf_reset_bodies) my_body->reset_body();
__TBB_ASSERT(!(f & rf_extract) || reset_element<N>::this_empty( my_output_ports), "multifunction_node reset failed"); __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_em pty(my_output_ports), "multifunction_node reset failed");
#endif #endif
} }
multifunction_body<input_type, output_ports_type> *my_body; multifunction_body<input_type, output_ports_type> *my_body;
output_ports_type my_output_ports; output_ports_type my_output_ports;
}; // multifunction_input }; // multifunction_input
// template to refer to an output port of a multifunction_node // template to refer to an output port of a multifunction_node
template<size_t N, typename MOP> template<size_t N, typename MOP>
skipping to change at line 654 skipping to change at line 682
/* override */ task *execute( ) { /* override */ task *execute( ) {
task* tp = my_graph_ptr->root_task(); task* tp = my_graph_ptr->root_task();
return (tp) ? return (tp) ?
new ( task::allocate_additional_child_of( *tp ) ) new ( task::allocate_additional_child_of( *tp ) )
apply_body_task_bypass< continue_input< Output >, conti nue_msg >( *this, continue_msg() ) : apply_body_task_bypass< continue_input< Output >, conti nue_msg >( *this, continue_msg() ) :
NULL; NULL;
} }
}; // continue_input }; // continue_input
#if __TBB_PREVIEW_ASYNC_NODE
//! Implements methods for a async node that takes a type Input as inpu
t and
// submit it to Asynchronous activity
template < typename Input, typename A, typename AsyncGatewayType >
class async_input : public function_input_base<Input, A, async_input<In
put, A, AsyncGatewayType> > {
public:
typedef Input input_type;
typedef AsyncGatewayType async_gateway_type;
typedef async_input< Input, A, async_gateway_type > my_class;
typedef function_input_base<Input, A, my_class> base_type;
// constructor
template<typename Body>
async_input( graph &g, Body& body ) :
base_type( g, unlimited ),
my_body( new internal::async_body_leaf< input_type, Body, async
_gateway_type >(body) ){
}
//! Copy constructor
async_input( const async_input& src ) :
base_type( src ),
my_body( src.my_body->clone() ) {
}
~async_input() {
delete my_body;
}
template< typename Body >
Body copy_function_object() {
internal::async_body<input_type, async_gateway_type> &body_ref
= *this->my_body;
return dynamic_cast< internal::async_body_leaf<input_type, Body
, async_gateway_type> & >(body_ref).get_body();
}
task * apply_body_impl_bypass( const input_type &i) {
// TODO: This FGT instrumentation only captures the submission
of the work
// but not the async thread activity.
// We will have to think about the best way to capture that.
tbb::internal::fgt_begin_body( my_body );
(*my_body)( i, async_gateway() );
tbb::internal::fgt_end_body( my_body );
return NULL;
}
virtual async_gateway_type& async_gateway() = 0;
protected:
void reset_async_input(__TBB_PFG_RESET_ARG(reset_flags f)) {
base_type::reset_function_input_base(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
if(f & rf_reset_bodies) my_body->reset_body();
#endif
}
async_body< input_type, async_gateway_type > *my_body;
};
#endif // __TBB_PREVIEW_ASYNC_NODE
//! Implements methods for both executable and function nodes that puts Output to its successors //! Implements methods for both executable and function nodes that puts Output to its successors
template< typename Output > template< typename Output >
class function_output : public sender<Output> { class function_output : public sender<Output> {
public: public:
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
template<int N> friend struct reset_element; template<int N> friend struct clear_element;
#endif #endif
typedef Output output_type; typedef Output output_type;
typedef receiver<output_type> successor_type; typedef receiver<output_type> successor_type;
typedef broadcast_cache<output_type> broadcast_cache_type; typedef broadcast_cache<output_type> broadcast_cache_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename sender<output_type>::built_successors_type built_s uccessors_type;
typedef typename sender<output_type>::successor_list_type successor _list_type; typedef typename sender<output_type>::successor_list_type successor _list_type;
#endif #endif
function_output() { my_successors.set_owner(this); } function_output() { my_successors.set_owner(this); }
function_output(const function_output & /*other*/) : sender<output_ type>() { function_output(const function_output & /*other*/) : sender<output_ type>() {
my_successors.set_owner(this); my_successors.set_owner(this);
} }
//! Adds a new successor to this node //! Adds a new successor to this node
/* override */ bool register_successor( receiver<output_type> &r ) { /* override */ bool register_successor( receiver<output_type> &r ) {
skipping to change at line 687 skipping to change at line 775
return true; return true;
} }
//! Removes a successor from this node //! Removes a successor from this node
/* override */ bool remove_successor( receiver<output_type> &r ) { /* override */ bool remove_successor( receiver<output_type> &r ) {
successors().remove_successor( r ); successors().remove_successor( r );
return true; return true;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
built_successors_type &built_successors() { return successors().bui
lt_successors(); }
/*override*/ void internal_add_built_successor( receiver<output_typ e> &r) { /*override*/ void internal_add_built_successor( receiver<output_typ e> &r) {
successors().internal_add_built_successor( r ); successors().internal_add_built_successor( r );
} }
/*override*/ void internal_delete_built_successor( receiver<output_ type> &r) { /*override*/ void internal_delete_built_successor( receiver<output_ type> &r) {
successors().internal_delete_built_successor( r ); successors().internal_delete_built_successor( r );
} }
/*override*/ size_t successor_count() { /*override*/ size_t successor_count() {
return successors().successor_count(); return successors().successor_count();
skipping to change at line 710 skipping to change at line 800
successors().copy_successors(v); successors().copy_successors(v);
} }
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
// for multifunction_node. The function_body that implements // for multifunction_node. The function_body that implements
// the node will have an input and an output tuple of ports. To pu t // the node will have an input and an output tuple of ports. To pu t
// an item to a successor, the body should // an item to a successor, the body should
// //
// get<I>(output_ports).try_put(output_value); // get<I>(output_ports).try_put(output_value);
// //
// if task pointer is returned will always spawn and return true, e lse
// return value will be bool returned from successors.try_put. // return value will be bool returned from successors.try_put.
task *try_put_task(const output_type &i) { return my_successors.try _put_task(i); } task *try_put_task(const output_type &i) { return my_successors.try _put_task(i); }
broadcast_cache_type &successors() { return my_successors; }
protected: protected:
broadcast_cache_type my_successors; broadcast_cache_type my_successors;
broadcast_cache_type &successors() { return my_successors; }
}; // function_output }; // function_output
template< typename Output > template< typename Output >
class multifunction_output : public function_output<Output> { class multifunction_output : public function_output<Output> {
public: public:
typedef Output output_type; typedef Output output_type;
typedef function_output<output_type> base_type; typedef function_output<output_type> base_type;
using base_type::my_successors; using base_type::my_successors;
 End of changes. 27 change blocks. 
30 lines changed or deleted 135 lines changed or added


 _tbb_strings.h   _tbb_strings.h 
skipping to change at line 67 skipping to change at line 67
TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_4, "output_port_4") TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_4, "output_port_4")
TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_5, "output_port_5") TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_5, "output_port_5")
TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_6, "output_port_6") TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_6, "output_port_6")
TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_7, "output_port_7") TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_7, "output_port_7")
TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_8, "output_port_8") TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_8, "output_port_8")
TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_9, "output_port_9") TBB_STRING_RESOURCE(FLOW_OUTPUT_PORT_9, "output_port_9")
TBB_STRING_RESOURCE(FLOW_OBJECT_NAME, "object_name") TBB_STRING_RESOURCE(FLOW_OBJECT_NAME, "object_name")
TBB_STRING_RESOURCE(FLOW_NULL, "null") TBB_STRING_RESOURCE(FLOW_NULL, "null")
TBB_STRING_RESOURCE(FLOW_INDEXER_NODE, "indexer_node") TBB_STRING_RESOURCE(FLOW_INDEXER_NODE, "indexer_node")
TBB_STRING_RESOURCE(FLOW_COMPOSITE_NODE, "composite_node") TBB_STRING_RESOURCE(FLOW_COMPOSITE_NODE, "composite_node")
TBB_STRING_RESOURCE(FLOW_ASYNC_NODE, "async_node")
 End of changes. 1 change blocks. 
0 lines changed or deleted 0 lines changed or added


 concurrent_hash_map.h   concurrent_hash_map.h 
skipping to change at line 582 skipping to change at line 582
struct node; struct node;
typedef typename Allocator::template rebind<node>::other node_allocator _type; typedef typename Allocator::template rebind<node>::other node_allocator _type;
node_allocator_type my_allocator; node_allocator_type my_allocator;
HashCompare my_hash_compare; HashCompare my_hash_compare;
struct node : public node_base { struct node : public node_base {
value_type item; value_type item;
node( const Key &key ) : item(key, T()) {} node( const Key &key ) : item(key, T()) {}
node( const Key &key, const T &t ) : item(key, t) {} node( const Key &key, const T &t ) : item(key, t) {}
#if __TBB_CPP11_RVALUE_REF_PRESENT #if __TBB_CPP11_RVALUE_REF_PRESENT
node( const Key &key, T &&t ) : item(key, std::move(t)) {}
node( value_type&& i ) : item(std::move(i)){} node( value_type&& i ) : item(std::move(i)){}
#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
template<typename... Args>
node( Args&&... args ) : item(std::forward<Args>(args)...) {}
#if __TBB_COPY_FROM_NON_CONST_REF_BROKEN
node( value_type& i ) : item(const_cast<const value_type&>(i)) {}
#endif //__TBB_COPY_FROM_NON_CONST_REF_BROKEN
#endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
#endif //__TBB_CPP11_RVALUE_REF_PRESENT #endif //__TBB_CPP11_RVALUE_REF_PRESENT
node( const value_type& i ) : item(i) {} node( const value_type& i ) : item(i) {}
// exception-safe allocation, see C++ Standard 2003, clause 5.3.4p1 7 // exception-safe allocation, see C++ Standard 2003, clause 5.3.4p1 7
void *operator new( size_t /*size*/, node_allocator_type &a ) { void *operator new( size_t /*size*/, node_allocator_type &a ) {
void *ptr = a.allocate(1); void *ptr = a.allocate(1);
if(!ptr) if(!ptr)
tbb::internal::throw_exception(tbb::internal::eid_bad_alloc ); tbb::internal::throw_exception(tbb::internal::eid_bad_alloc );
return ptr; return ptr;
} }
skipping to change at line 606 skipping to change at line 614
void delete_node( node_base *n ) { void delete_node( node_base *n ) {
my_allocator.destroy( static_cast<node*>(n) ); my_allocator.destroy( static_cast<node*>(n) );
my_allocator.deallocate( static_cast<node*>(n), 1); my_allocator.deallocate( static_cast<node*>(n), 1);
} }
static node* allocate_node_copy_construct(node_allocator_type& allocato r, const Key &key, const T * t){ static node* allocate_node_copy_construct(node_allocator_type& allocato r, const Key &key, const T * t){
return new( allocator ) node(key, *t); return new( allocator ) node(key, *t);
} }
#if __TBB_CPP11_RVALUE_REF_PRESENT
static node* allocate_node_move_construct(node_allocator_type& allocato
r, const Key &key, const T * t){
return new( allocator ) node(key, std::move(*const_cast<T*>(t)));
}
#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
template<typename... Args>
static node* allocate_node_emplace_construct(node_allocator_type& alloc
ator, Args&&... args){
return new( allocator ) node(std::forward<Args>(args)...);
}
#endif //#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
#endif
static node* allocate_node_default_construct(node_allocator_type& alloc ator, const Key &key, const T * ){ static node* allocate_node_default_construct(node_allocator_type& alloc ator, const Key &key, const T * ){
return new( allocator ) node(key); return new( allocator ) node(key);
} }
static node* do_not_allocate_node(node_allocator_type& , const Key &, c onst T * ){ static node* do_not_allocate_node(node_allocator_type& , const Key &, c onst T * ){
__TBB_ASSERT(false,"this dummy function should not be called"); __TBB_ASSERT(false,"this dummy function should not be called");
return NULL; return NULL;
} }
node *search_bucket( const key_type &key, bucket *b ) const { node *search_bucket( const key_type &key, bucket *b ) const {
skipping to change at line 958 skipping to change at line 978
result.release(); result.release();
return lookup(/*insert*/true, value.first, &value.second, &result, /*write=*/true, &allocate_node_copy_construct ); return lookup(/*insert*/true, value.first, &value.second, &result, /*write=*/true, &allocate_node_copy_construct );
} }
//! Insert item by copying if there is no such key present already //! Insert item by copying if there is no such key present already
/** Returns true if item is inserted. */ /** Returns true if item is inserted. */
bool insert( const value_type &value ) { bool insert( const value_type &value ) {
return lookup(/*insert*/true, value.first, &value.second, NULL, /*w rite=*/false, &allocate_node_copy_construct ); return lookup(/*insert*/true, value.first, &value.second, NULL, /*w rite=*/false, &allocate_node_copy_construct );
} }
#if __TBB_CPP11_RVALUE_REF_PRESENT
//! Insert item by copying if there is no such key present already and
acquire a read lock on the item.
/** Returns true if item is new. */
bool insert( const_accessor &result, value_type && value ) {
return generic_move_insert(result, std::move(value));
}
//! Insert item by copying if there is no such key present already and
acquire a write lock on the item.
/** Returns true if item is new. */
bool insert( accessor &result, value_type && value ) {
return generic_move_insert(result, std::move(value));
}
//! Insert item by copying if there is no such key present already
/** Returns true if item is inserted. */
bool insert( value_type && value ) {
return generic_move_insert(accessor_not_used(), std::move(value));
}
#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
//! Insert item by copying if there is no such key present already and
acquire a read lock on the item.
/** Returns true if item is new. */
template<typename... Args>
bool emplace( const_accessor &result, Args&&... args ) {
return generic_emplace(result, std::forward<Args>(args)...);
}
//! Insert item by copying if there is no such key present already and
acquire a write lock on the item.
/** Returns true if item is new. */
template<typename... Args>
bool emplace( accessor &result, Args&&... args ) {
return generic_emplace(result, std::forward<Args>(args)...);
}
//! Insert item by copying if there is no such key present already
/** Returns true if item is inserted. */
template<typename... Args>
bool emplace( Args&&... args ) {
return generic_emplace(accessor_not_used(), std::forward<Args>(args
)...);
}
#endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
#endif //__TBB_CPP11_RVALUE_REF_PRESENT
//! Insert range [first, last) //! Insert range [first, last)
template<typename I> template<typename I>
void insert( I first, I last ) { void insert( I first, I last ) {
for ( ; first != last; ++first ) for ( ; first != last; ++first )
insert( *first ); insert( *first );
} }
#if __TBB_INITIALIZER_LISTS_PRESENT #if __TBB_INITIALIZER_LISTS_PRESENT
//! Insert initializer list //! Insert initializer list
void insert( std::initializer_list<value_type> il ) { void insert( std::initializer_list<value_type> il ) {
skipping to change at line 990 skipping to change at line 1053
} }
//! Erase item by accessor. //! Erase item by accessor.
/** Return true if item was erased by particularly this call. */ /** Return true if item was erased by particularly this call. */
bool erase( accessor& item_accessor ) { bool erase( accessor& item_accessor ) {
return exclude( item_accessor ); return exclude( item_accessor );
} }
protected: protected:
//! Insert or find item and optionally acquire a lock on the item. //! Insert or find item and optionally acquire a lock on the item.
bool lookup(bool op_insert, const Key &key, const T *t, const_accessor bool lookup(bool op_insert, const Key &key, const T *t, const_accessor
*result, bool write, node* (*allocate_node)(node_allocator_type& , const *result, bool write, node* (*allocate_node)(node_allocator_type& , const
Key &, const T * ) ) ; Key &, const T * ), node *tmp_n = 0 ) ;
struct accessor_not_used { void release(){}};
friend const_accessor* accessor_location( accessor_not_used const& ){ r
eturn NULL;}
friend const_accessor* accessor_location( const_accessor & a ) { r
eturn &a;}
friend bool is_write_access_needed( accessor const& ) { retur
n true;}
friend bool is_write_access_needed( const_accessor const& ) { retur
n false;}
friend bool is_write_access_needed( accessor_not_used const& ) { retur
n false;}
#if __TBB_CPP11_RVALUE_REF_PRESENT
template<typename Accessor>
bool generic_move_insert( Accessor && result, value_type && value ) {
result.release();
return lookup(/*insert*/true, value.first, &value.second, accessor_
location(result), is_write_access_needed(result), &allocate_node_move_const
ruct );
}
#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
template<typename Accessor, typename... Args>
bool generic_emplace( Accessor && result, Args &&... args ) {
result.release();
node * node_ptr = allocate_node_emplace_construct(my_allocator, std
::forward<Args>(args)...);
return lookup(/*insert*/true, node_ptr->item.first, NULL, accessor_
location(result), is_write_access_needed(result), &do_not_allocate_node, no
de_ptr );
}
#endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
#endif //__TBB_CPP11_RVALUE_REF_PRESENT
//! delete item by accessor //! delete item by accessor
bool exclude( const_accessor &item_accessor ); bool exclude( const_accessor &item_accessor );
//! Returns an iterator for an item defined by the key, or for the next item after it (if upper==true) //! Returns an iterator for an item defined by the key, or for the next item after it (if upper==true)
template<typename I> template<typename I>
std::pair<I, I> internal_equal_range( const Key& key, I end ) const; std::pair<I, I> internal_equal_range( const Key& key, I end ) const;
//! Copy "source" to *this, where *this must start out empty. //! Copy "source" to *this, where *this must start out empty.
void internal_copy( const concurrent_hash_map& source ); void internal_copy( const concurrent_hash_map& source );
skipping to change at line 1036 skipping to change at line 1124
n = search_bucket( key, b ); n = search_bucket( key, b );
if( n ) if( n )
return &n->item; return &n->item;
else if( check_mask_race( h, m ) ) else if( check_mask_race( h, m ) )
goto restart; goto restart;
return 0; return 0;
} }
}; };
template<typename Key, typename T, typename HashCompare, typename A> template<typename Key, typename T, typename HashCompare, typename A>
bool concurrent_hash_map<Key,T,HashCompare,A>::lookup( bool op_insert, cons t Key &key, const T *t, const_accessor *result, bool write, node* (*allocat e_node)(node_allocator_type& , const Key&, const T*) ) { bool concurrent_hash_map<Key,T,HashCompare,A>::lookup( bool op_insert, cons t Key &key, const T *t, const_accessor *result, bool write, node* (*allocat e_node)(node_allocator_type& , const Key&, const T*), node *tmp_n ) {
__TBB_ASSERT( !result || !result->my_node, NULL ); __TBB_ASSERT( !result || !result->my_node, NULL );
bool return_value; bool return_value;
hashcode_t const h = my_hash_compare.hash( key ); hashcode_t const h = my_hash_compare.hash( key );
hashcode_t m = (hashcode_t) itt_load_word_with_acquire( my_mask ); hashcode_t m = (hashcode_t) itt_load_word_with_acquire( my_mask );
segment_index_t grow_segment = 0; segment_index_t grow_segment = 0;
node *n, *tmp_n = 0; node *n;
restart: restart:
{//lock scope {//lock scope
__TBB_ASSERT((m&(m+1))==0, "data structure is invalid"); __TBB_ASSERT((m&(m+1))==0, "data structure is invalid");
return_value = false; return_value = false;
// get bucket // get bucket
bucket_accessor b( this, h & m ); bucket_accessor b( this, h & m );
// find a node // find a node
n = search_bucket( key, b() ); n = search_bucket( key, b() );
if( op_insert ) { if( op_insert ) {
 End of changes. 7 change blocks. 
5 lines changed or deleted 110 lines changed or added


 flow_graph.h   flow_graph.h 
skipping to change at line 131 skipping to change at line 131
virtual bool try_reserve( T & ) { return false; } virtual bool try_reserve( T & ) { return false; }
//! Releases the reserved item //! Releases the reserved item
virtual bool try_release( ) { return false; } virtual bool try_release( ) { return false; }
//! Consumes the reserved item //! Consumes the reserved item
virtual bool try_consume( ) { return false; } virtual bool try_consume( ) { return false; }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
//! interface to record edges for traversal & deletion //! interface to record edges for traversal & deletion
typedef typename internal::edge_container<successor_type>::edge_list_t typedef typename internal::edge_container<successor_type> built_succes
ype successor_list_type; sors_type;
typedef typename built_successors_type::edge_list_type successor_list_
type;
virtual built_successors_type &built_successors() = 0
;
virtual void internal_add_built_successor( successor_type & ) = 0 ; virtual void internal_add_built_successor( successor_type & ) = 0 ;
virtual void internal_delete_built_successor( successor_type & ) = 0 ; virtual void internal_delete_built_successor( successor_type & ) = 0 ;
virtual void copy_successors( successor_list_type &) = 0 ; virtual void copy_successors( successor_list_type &) = 0 ;
virtual size_t successor_count() = 0 ; virtual size_t successor_count() = 0 ;
#endif #endif
}; }; // class sender<T>
template< typename T > class limiter_node; // needed for resetting decreme nter template< typename T > class limiter_node; // needed for resetting decreme nter
template< typename R, typename B > class run_and_put_task; template< typename R, typename B > class run_and_put_task;
static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1; static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
// flags to modify the behavior of the graph reset(). Can be combined. // flags to modify the behavior of the graph reset(). Can be combined.
enum reset_flags { enum reset_flags {
rf_reset_protocol = 0, rf_reset_protocol = 0,
rf_reset_bodies = 1<<0, // delete the current node body, reset to a copy of the initial node body. rf_reset_bodies = 1<<0, // delete the current node body, reset to a copy of the initial node body.
rf_extract = 1<<1 // delete edges (extract() for single node , reset() for graph.) rf_clear_edges = 1<<1 // delete edges
}; };
#define __TBB_PFG_RESET_ARG(exp) exp #define __TBB_PFG_RESET_ARG(exp) exp
#define __TBB_COMMA , #define __TBB_COMMA ,
#else #else
#define __TBB_PFG_RESET_ARG(exp) /* nothing */ #define __TBB_PFG_RESET_ARG(exp) /* nothing */
#define __TBB_COMMA /* nothing */ #define __TBB_COMMA /* nothing */
#endif #endif // TBB_PREVIEW_FLOW_GRAPH_FEATURES
// enqueue left task if necessary. Returns the non-enqueued task if there is one. // enqueue left task if necessary. Returns the non-enqueued task if there is one.
static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right ) { static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right ) {
// if no RHS task, don't change left. // if no RHS task, don't change left.
if(right == NULL) return left; if(right == NULL) return left;
// right != NULL // right != NULL
if(left == NULL) return right; if(left == NULL) return right;
if(left == SUCCESSFULLY_ENQUEUED) return right; if(left == SUCCESSFULLY_ENQUEUED) return right;
// left contains a task // left contains a task
if(right != SUCCESSFULLY_ENQUEUED) { if(right != SUCCESSFULLY_ENQUEUED) {
skipping to change at line 211 skipping to change at line 213
virtual task *try_put_task(const T& t) = 0; virtual task *try_put_task(const T& t) = 0;
public: public:
//! Add a predecessor to the node //! Add a predecessor to the node
virtual bool register_predecessor( predecessor_type & ) { return false; } virtual bool register_predecessor( predecessor_type & ) { return false; }
//! Remove a predecessor from the node //! Remove a predecessor from the node
virtual bool remove_predecessor( predecessor_type & ) { return false; } virtual bool remove_predecessor( predecessor_type & ) { return false; }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename internal::edge_container<predecessor_type>::edge_list_ typedef typename internal::edge_container<predecessor_type> built_prede
type predecessor_list_type; cessors_type;
typedef typename built_predecessors_type::edge_list_type predecessor_li
st_type;
virtual built_predecessors_type &built_predecessors()
= 0;
virtual void internal_add_built_predecessor( predecessor_type & ) = 0; virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
virtual void internal_delete_built_predecessor( predecessor_type & ) = 0; virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
virtual void copy_predecessors( predecessor_list_type & ) = 0; virtual void copy_predecessors( predecessor_list_type & ) = 0;
virtual size_t predecessor_count() = 0; virtual size_t predecessor_count() = 0;
#endif #endif
protected: protected:
//! put receiver back in initial state //! put receiver back in initial state
template<typename U> friend class limiter_node; template<typename U> friend class limiter_node;
virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_rese t_protocol ) ) = 0; virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_rese t_protocol ) ) = 0;
template<typename TT, typename M> template<typename TT, typename M>
friend class internal::successor_cache; friend class internal::successor_cache;
virtual bool is_continue_receiver() { return false; } virtual bool is_continue_receiver() { return false; }
}; }; // class receiver<T>
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
//* holder of edges both for caches and for those nodes which do not have p redecessor caches. //* holder of edges both for caches and for those nodes which do not have p redecessor caches.
// C == receiver< ... > or sender< ... >, depending. // C == receiver< ... > or sender< ... >, depending.
namespace internal { namespace internal {
template<typename C> template<typename C>
class edge_container { class edge_container {
public: public:
typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type; typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
skipping to change at line 263 skipping to change at line 267
} }
size_t edge_count() { size_t edge_count() {
return (size_t)(built_edges.size()); return (size_t)(built_edges.size());
} }
void clear() { void clear() {
built_edges.clear(); built_edges.clear();
} }
// methods remove the statement from all predecessors/successors liste
in the edge
// container.
template< typename S > void sender_extract( S &s ); template< typename S > void sender_extract( S &s );
template< typename R > void receiver_extract( R &r ); template< typename R > void receiver_extract( R &r );
private: private:
edge_list_type built_edges; edge_list_type built_edges;
}; }; // class edge_container
} // namespace internal } // namespace internal
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
//! Base class for receivers of completion messages //! Base class for receivers of completion messages
/** These receivers automatically reset, but cannot be explicitly waited on */ /** These receivers automatically reset, but cannot be explicitly waited on */
class continue_receiver : public receiver< continue_msg > { class continue_receiver : public receiver< continue_msg > {
public: public:
//! The input type //! The input type
typedef continue_msg input_type; typedef continue_msg input_type;
skipping to change at line 316 skipping to change at line 322
/** Does not check to see if the removal of the predecessor now makes t he current count /** Does not check to see if the removal of the predecessor now makes t he current count
exceed the new threshold. So removing a predecessor while the grap h is active can cause exceed the new threshold. So removing a predecessor while the grap h is active can cause
unexpected results. */ unexpected results. */
/* override */ bool remove_predecessor( predecessor_type & ) { /* override */ bool remove_predecessor( predecessor_type & ) {
spin_mutex::scoped_lock l(my_mutex); spin_mutex::scoped_lock l(my_mutex);
--my_predecessor_count; --my_predecessor_count;
return true; return true;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef receiver<input_type>::predecessor_list_type predecessor_list_ty typedef internal::edge_container<predecessor_type> built_predecessors_t
pe; ype;
typedef built_predecessors_type::edge_list_type predecessor_list_type;
/*override*/ built_predecessors_type &built_predecessors() { return my_
built_predecessors; }
/*override*/ void internal_add_built_predecessor( predecessor_type &s) { /*override*/ void internal_add_built_predecessor( predecessor_type &s) {
spin_mutex::scoped_lock l(my_mutex); spin_mutex::scoped_lock l(my_mutex);
my_built_predecessors.add_edge( s ); my_built_predecessors.add_edge( s );
} }
/*override*/ void internal_delete_built_predecessor( predecessor_type & s) { /*override*/ void internal_delete_built_predecessor( predecessor_type & s) {
spin_mutex::scoped_lock l(my_mutex); spin_mutex::scoped_lock l(my_mutex);
my_built_predecessors.delete_edge(s); my_built_predecessors.delete_edge(s);
} }
/*override*/ void copy_predecessors( predecessor_list_type &v) { /*override*/ void copy_predecessors( predecessor_list_type &v) {
spin_mutex::scoped_lock l(my_mutex); spin_mutex::scoped_lock l(my_mutex);
my_built_predecessors.copy_edges(v); my_built_predecessors.copy_edges(v);
} }
/*override*/ size_t predecessor_count() { /*override*/ size_t predecessor_count() {
spin_mutex::scoped_lock l(my_mutex); spin_mutex::scoped_lock l(my_mutex);
return my_built_predecessors.edge_count(); return my_built_predecessors.edge_count();
} }
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
protected: protected:
template< typename R, typename B > friend class run_and_put_task; template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache ; template<typename X, typename Y> friend class internal::broadcast_cache ;
template<typename X, typename Y> friend class internal::round_robin_cac he; template<typename X, typename Y> friend class internal::round_robin_cac he;
// execute body is supposed to be too small to create a task for. // execute body is supposed to be too small to create a task for.
/* override */ task *try_put_task( const input_type & ) { /* override */ task *try_put_task( const input_type & ) {
{ {
spin_mutex::scoped_lock l(my_mutex); spin_mutex::scoped_lock l(my_mutex);
skipping to change at line 358 skipping to change at line 367
return SUCCESSFULLY_ENQUEUED; return SUCCESSFULLY_ENQUEUED;
else else
my_current_count = 0; my_current_count = 0;
} }
task * res = execute(); task * res = execute();
if(!res) return SUCCESSFULLY_ENQUEUED; if(!res) return SUCCESSFULLY_ENQUEUED;
return res; return res;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal::edge_container<predecessor_type> my_built_predecessors; // continue_receiver must contain its own built_predecessors because it
does
// not have a node_cache.
built_predecessors_type my_built_predecessors;
#endif #endif
spin_mutex my_mutex; spin_mutex my_mutex;
int my_predecessor_count; int my_predecessor_count;
int my_current_count; int my_current_count;
int my_initial_predecessor_count; int my_initial_predecessor_count;
// the friend declaration in the base class did not eliminate the "prot ected class" // the friend declaration in the base class did not eliminate the "prot ected class"
// error in gcc 4.1.2 // error in gcc 4.1.2
template<typename U> friend class limiter_node; template<typename U> friend class limiter_node;
/*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) ) /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) )
{ {
my_current_count = 0; my_current_count = 0;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
if(f & rf_extract) { if(f & rf_clear_edges) {
my_built_predecessors.receiver_extract(*this); my_built_predecessors.clear();
my_predecessor_count = my_initial_predecessor_count; my_predecessor_count = my_initial_predecessor_count;
} }
#endif #endif
} }
//! Does whatever should happen when the threshold is reached //! Does whatever should happen when the threshold is reached
/** This should be very fast or else spawn a task. This is /** This should be very fast or else spawn a task. This is
called while the sender is blocked in the try_put(). */ called while the sender is blocked in the try_put(). */
virtual task * execute() = 0; virtual task * execute() = 0;
template<typename TT, typename M> template<typename TT, typename M>
friend class internal::successor_cache; friend class internal::successor_cache;
/*override*/ bool is_continue_receiver() { return true; } /*override*/ bool is_continue_receiver() { return true; }
};
}; // class continue_receiver
} // interface7 } // interface7
} // flow } // flow
} // tbb } // tbb
#include "internal/_flow_graph_trace_impl.h" #include "internal/_flow_graph_trace_impl.h"
namespace tbb { namespace tbb {
namespace flow { namespace flow {
namespace interface7 { namespace interface7 {
skipping to change at line 468 skipping to change at line 481
private: private:
// the graph over which we are iterating // the graph over which we are iterating
GraphContainerType *my_graph; GraphContainerType *my_graph;
// pointer into my_graph's my_nodes list // pointer into my_graph's my_nodes list
pointer current_node; pointer current_node;
//! Private initializing constructor for begin() and end() iterators //! Private initializing constructor for begin() and end() iterators
graph_iterator(GraphContainerType *g, bool begin); graph_iterator(GraphContainerType *g, bool begin);
void internal_forward(); void internal_forward();
}; }; // class graph_iterator
//! The graph class //! The graph class
/** This class serves as a handle to the graph */ /** This class serves as a handle to the graph */
class graph : tbb::internal::no_copy { class graph : tbb::internal::no_copy {
friend class graph_node; friend class graph_node;
template< typename Body > template< typename Body >
class run_task : public task { class run_task : public task {
public: public:
run_task( Body& body ) : my_body(body) {} run_task( Body& body ) : my_body(body) {}
skipping to change at line 715 skipping to change at line 728
} }
virtual ~graph_node() { virtual ~graph_node() {
my_graph.remove_node(this); my_graph.remove_node(this);
} }
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
virtual void set_name( const char *name ) = 0; virtual void set_name( const char *name ) = 0;
#endif #endif
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
virtual void extract( reset_flags f=rf_extract ) { virtual void extract( ) = 0;
bool a = my_graph.is_active();
my_graph.set_active(false);
reset((reset_flags)(f|rf_extract));
my_graph.set_active(a);
}
#endif #endif
protected: protected:
virtual void reset(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_protocol) // performs the reset on an individual node.
) = 0; virtual void reset_node(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_prot
}; ocol)) = 0;
}; // class graph_node
inline void graph::register_node(graph_node *n) { inline void graph::register_node(graph_node *n) {
n->next = NULL; n->next = NULL;
{ {
spin_mutex::scoped_lock lock(nodelist_mutex); spin_mutex::scoped_lock lock(nodelist_mutex);
n->prev = my_nodes_last; n->prev = my_nodes_last;
if (my_nodes_last) my_nodes_last->next = n; if (my_nodes_last) my_nodes_last->next = n;
my_nodes_last = n; my_nodes_last = n;
if (!my_nodes) my_nodes = n; if (!my_nodes) my_nodes = n;
} }
skipping to change at line 760 skipping to change at line 769
inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) { inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) {
// reset context // reset context
task *saved_my_root_task = my_root_task; task *saved_my_root_task = my_root_task;
my_root_task = NULL; my_root_task = NULL;
if(my_context) my_context->reset(); if(my_context) my_context->reset();
cancelled = false; cancelled = false;
caught_exception = false; caught_exception = false;
// reset all the nodes comprising the graph // reset all the nodes comprising the graph
for(iterator ii = begin(); ii != end(); ++ii) { for(iterator ii = begin(); ii != end(); ++ii) {
graph_node *my_p = &(*ii); graph_node *my_p = &(*ii);
my_p->reset(__TBB_PFG_RESET_ARG(f)); my_p->reset_node(__TBB_PFG_RESET_ARG(f));
} }
my_root_task = saved_my_root_task; my_root_task = saved_my_root_task;
} }
#include "internal/_flow_graph_node_impl.h" #include "internal/_flow_graph_node_impl.h"
//! An executable node that acts as a source, i.e. it has no predecessors //! An executable node that acts as a source, i.e. it has no predecessors
template < typename Output > template < typename Output >
class source_node : public graph_node, public sender< Output > { class source_node : public graph_node, public sender< Output > {
protected: protected:
skipping to change at line 783 skipping to change at line 792
//! The type of the output message, which is complete //! The type of the output message, which is complete
typedef Output output_type; typedef Output output_type;
//! The type of successors of this node //! The type of successors of this node
typedef receiver< Output > successor_type; typedef receiver< Output > successor_type;
//Source node has no input type //Source node has no input type
typedef null_type input_type; typedef null_type input_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename sender<output_type>::built_successors_type built_succe ssors_type;
typedef typename sender<output_type>::successor_list_type successor_lis t_type; typedef typename sender<output_type>::successor_list_type successor_lis t_type;
#endif #endif
//! Constructor for a node with a successor //! Constructor for a node with a successor
template< typename Body > template< typename Body >
source_node( graph &g, Body body, bool is_active = true ) source_node( graph &g, Body body, bool is_active = true )
: graph_node(g), my_active(is_active), init_my_active(is_active), : graph_node(g), my_active(is_active), init_my_active(is_active),
my_body( new internal::source_body_leaf< output_type, Body>(body) ) , my_body( new internal::source_body_leaf< output_type, Body>(body) ) ,
my_reserved(false), my_has_cached_item(false) my_reserved(false), my_has_cached_item(false)
{ {
skipping to change at line 836 skipping to change at line 846
} }
//! Removes a successor from this node //! Removes a successor from this node
/* override */ bool remove_successor( successor_type &r ) { /* override */ bool remove_successor( successor_type &r ) {
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
my_successors.remove_successor(r); my_successors.remove_successor(r);
return true; return true;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/ built_successors_type &built_successors() { return my_succ
essors.built_successors(); }
/*override*/void internal_add_built_successor( successor_type &r) { /*override*/void internal_add_built_successor( successor_type &r) {
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
my_successors.internal_add_built_successor(r); my_successors.internal_add_built_successor(r);
} }
/*override*/void internal_delete_built_successor( successor_type &r) { /*override*/void internal_delete_built_successor( successor_type &r) {
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
my_successors.internal_delete_built_successor(r); my_successors.internal_delete_built_successor(r);
} }
skipping to change at line 927 skipping to change at line 940
if ( !my_successors.empty() ) if ( !my_successors.empty() )
spawn_put(); spawn_put();
} }
template<typename Body> template<typename Body>
Body copy_function_object() { Body copy_function_object() {
internal::source_body<output_type> &body_ref = *this->my_body; internal::source_body<output_type> &body_ref = *this->my_body;
return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body(); return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/void extract( ) {
my_successors.built_successors().sender_extract(*this); // remove
s "my_owner" == this from each successor
my_active = init_my_active;
my_reserved = false;
if(my_has_cached_item) my_has_cached_item = false;
}
#endif
protected: protected:
//! resets the source_node to its initial state //! resets the source_node to its initial state
void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) {
my_active = init_my_active; my_active = init_my_active;
my_reserved =false; my_reserved =false;
if(my_has_cached_item) { if(my_has_cached_item) {
my_has_cached_item = false; my_has_cached_item = false;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_successors.reset(f); if(f & rf_clear_edges) my_successors.clear();
if(f & rf_reset_bodies) my_body->reset_body(); if(f & rf_reset_bodies) my_body->reset_body();
#endif #endif
} }
private: private:
spin_mutex my_mutex; spin_mutex my_mutex;
bool my_active; bool my_active;
bool init_my_active; bool init_my_active;
internal::source_body<output_type> *my_body; internal::source_body<output_type> *my_body;
internal::broadcast_cache< output_type > my_successors; internal::broadcast_cache< output_type > my_successors;
skipping to change at line 998 skipping to change at line 1020
if ( !try_reserve_apply_body(v) ) if ( !try_reserve_apply_body(v) )
return NULL; return NULL;
task *last_task = my_successors.try_put_task(v); task *last_task = my_successors.try_put_task(v);
if ( last_task ) if ( last_task )
try_consume(); try_consume();
else else
try_release(); try_release();
return last_task; return last_task;
} }
}; // source_node }; // class source_node
//! Implements a function node that supports Input -> Output //! Implements a function node that supports Input -> Output
template < typename Input, typename Output = continue_msg, graph_buffer_pol icy = queueing, typename Allocator=cache_aligned_allocator<Input> > template < typename Input, typename Output = continue_msg, graph_buffer_pol icy G = queueing, typename Allocator=cache_aligned_allocator<Input> >
class function_node : public graph_node, public internal::function_input<In put,Output,Allocator>, public internal::function_output<Output> { class function_node : public graph_node, public internal::function_input<In put,Output,Allocator>, public internal::function_output<Output> {
protected:
using graph_node::my_graph;
public: public:
typedef Input input_type; typedef Input input_type;
typedef Output output_type; typedef Output output_type;
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
typedef internal::function_input<input_type,output_type,Allocator> fInp ut_type; typedef internal::function_input<input_type,output_type,Allocator> fInp ut_type;
typedef internal::function_input_queue<input_type, Allocator> input_que ue_type;
typedef internal::function_output<output_type> fOutput_type; typedef internal::function_output<output_type> fOutput_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
using typename internal::function_input<Input,Output,Allocator>::predec using typename fInput_type::predecessor_list_type;
essor_list_type; using typename fOutput_type::successor_list_type;
using typename internal::function_output<Output>::successor_list_type; using fInput_type::my_predecessors;
#endif #endif
//! Constructor //! Constructor
// input_queue_type is allocated here, but destroyed in the function_in
put_base.
// TODO: pass the graph_buffer_policy to the function_input_base so it
can all
// be done in one place. This would be an interface-breaking change.
template< typename Body > template< typename Body >
function_node( graph &g, size_t concurrency, Body body ) : function_node( graph &g, size_t concurrency, Body body ) :
graph_node(g), internal::function_input<input_type,output_type,Allo graph_node(g), fInput_type(g, concurrency, body, G == queueing ?
cator>(g, concurrency, body) { new input_queue_type( ) : NULL ) {
tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD
E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), E, &this->graph_node::my_graph,
static_cast<sender<output_type> static_cast<receiver<input_type> *>(this), static_cast<send
*>(this), this->my_body ); er<output_type> *>(this), this->my_body );
} }
//! Copy constructor //! Copy constructor
function_node( const function_node& src ) : function_node( const function_node& src ) :
graph_node(src.my_graph), internal::function_input<input_type,outpu graph_node(src.graph_node::my_graph),
t_type,Allocator>( src ), fInput_type(src, G == queueing ? new input_queue_type : NULL),
fOutput_type() { fOutput_type() {
tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD
E, &this->my_graph, static_cast<receiver<input_type> *>(this), E, &this->graph_node::my_graph,
static_cast<sender<output_type> static_cast<receiver<input_type> *>(this), static_cast<send
*>(this), this->my_body ); er<output_type> *>(this), this->my_body );
} }
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) { /* override */ void set_name( const char *name ) {
tbb::internal::fgt_node_desc( this, name ); tbb::internal::fgt_node_desc( this, name );
} }
#endif #endif
protected:
template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache
;
template<typename X, typename Y> friend class internal::round_robin_cac
he;
using fInput_type::try_put_task;
// override of graph_node's reset.
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
successors().reset(f); /*override*/void extract( ) {
__TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_n my_predecessors.built_predecessors().receiver_extract(*this);
ode successors not empty"); successors().built_successors().sender_extract(*this);
__TBB_ASSERT(this->my_predecessors.empty(), "function_node predeces
sors not empty");
#endif
}
/* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; }
};
//! Implements a function node that supports Input -> Output
template < typename Input, typename Output, typename Allocator >
class function_node<Input,Output,queueing,Allocator> : public graph_node, p
ublic internal::function_input<Input,Output,Allocator>, public internal::fu
nction_output<Output> {
protected:
using graph_node::my_graph;
public:
typedef Input input_type;
typedef Output output_type;
typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type;
typedef internal::function_input<input_type,output_type,Allocator> fInp
ut_type;
typedef internal::function_input_queue<input_type, Allocator> queue_typ
e;
typedef internal::function_output<output_type> fOutput_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
using typename internal::function_input<Input,Output,Allocator>::predec
essor_list_type;
using typename internal::function_output<Output>::successor_list_type;
#endif
//! Constructor
template< typename Body >
function_node( graph &g, size_t concurrency, Body body ) :
graph_node(g), fInput_type( g, concurrency, body, new queue_type()
) {
tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD
E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type>
*>(this), this->my_body );
}
//! Copy constructor
function_node( const function_node& src ) :
graph_node(src.graph_node::my_graph), fInput_type( src, new queue_t
ype() ), fOutput_type() {
tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NOD
E, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type>
*>(this), this->my_body );
}
#if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) {
tbb::internal::fgt_node_desc( this, name );
} }
#endif #endif
protected: protected:
template< typename R, typename B > friend class run_and_put_task; template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache ; template<typename X, typename Y> friend class internal::broadcast_cache ;
template<typename X, typename Y> friend class internal::round_robin_cac he; template<typename X, typename Y> friend class internal::round_robin_cac he;
using fInput_type::try_put_task; using fInput_type::try_put_task;
/*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { /* override */ internal::broadcast_cache<output_type> &successors () {
return fOutput_type::my_successors; }
// override of graph_node's reset.
/*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) {
fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f)); fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
successors().reset(f); // TODO: use clear() instead.
__TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_n if(f & rf_clear_edges) {
ode successors not empty"); successors().clear();
__TBB_ASSERT(!(f & rf_extract) || this->my_predecessors.empty(), "f my_predecessors.clear();
unction_node predecessors not empty"); }
__TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "functi
on_node successors not empty");
__TBB_ASSERT(this->my_predecessors.empty(), "function_node predeces
sors not empty");
#endif #endif
} }
/* override */ internal::broadcast_cache<output_type> &successors () { }; // class function_node
return fOutput_type::my_successors; }
};
//! implements a function node that supports Input -> (set of outputs) //! implements a function node that supports Input -> (set of outputs)
// Output is a tuple of output types. // Output is a tuple of output types.
template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> > template < typename Input, typename Output, graph_buffer_policy G = queuein g, typename Allocator=cache_aligned_allocator<Input> >
class multifunction_node : class multifunction_node :
public graph_node, public graph_node,
public internal::multifunction_input public internal::multifunction_input
< <
Input, Input,
typename internal::wrap_tuple_elements< typename internal::wrap_tuple_elements<
tbb::flow::tuple_size<Output>::value, // #elements in tuple tbb::flow::tuple_size<Output>::value, // #elements in tuple
internal::multifunction_output, // wrap this around each eleme nt internal::multifunction_output, // wrap this around each eleme nt
Output // the tuple providing the types Output // the tuple providing the types
>::type, >::type,
Allocator Allocator
> { > {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
private:
static const int N = tbb::flow::tuple_size<Output>::value; static const int N = tbb::flow::tuple_size<Output>::value;
public: public:
typedef Input input_type; typedef Input input_type;
typedef null_type output_type; typedef null_type output_type;
typedef typename internal::wrap_tuple_elements<N,internal::multifunctio n_output, Output>::type output_ports_type; typedef typename internal::wrap_tuple_elements<N,internal::multifunctio n_output, Output>::type output_ports_type;
typedef internal::multifunction_input<input_type, output_ports_type, Al
locator> fInput_type;
typedef internal::function_input_queue<input_type, Allocator> input_que
ue_type;
private: private:
typedef typename internal::multifunction_input<input_type, output_ports _type, Allocator> base_type; typedef typename internal::multifunction_input<input_type, output_ports _type, Allocator> base_type;
typedef typename internal::function_input_queue<input_type,Allocator> q ueue_type; using fInput_type::my_predecessors;
public: public:
template<typename Body> template<typename Body>
multifunction_node( graph &g, size_t concurrency, Body body ) : multifunction_node( graph &g, size_t concurrency, Body body ) :
graph_node(g), base_type(g,concurrency, body) { graph_node(g), base_type(g,concurrency, body, G == queueing ? new input_queue_type : NULL) {
tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter nal::FLOW_MULTIFUNCTION_NODE, tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter nal::FLOW_MULTIFUNCTION_NODE,
&this->gra &this->graph_node::my_graph, static_cast<receiver<input_typ
ph_node::my_graph, static_cast<receiver<input_type> *>(this), e> *>(this),
this->outp this->output_ports(), this->my_body );
ut_ports(), this->my_body );
} }
multifunction_node( const multifunction_node &other) : multifunction_node( const multifunction_node &other) :
graph_node(other.graph_node::my_graph), base_type(other) { graph_node(other.graph_node::my_graph), base_type(other, G == queu eing ? new input_queue_type : NULL) {
tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter nal::FLOW_MULTIFUNCTION_NODE, tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter nal::FLOW_MULTIFUNCTION_NODE,
&this->gra &this->graph_node::my_graph, static_cast<receiver<input_typ
ph_node::my_graph, static_cast<receiver<input_type> *>(this), e> *>(this),
this->outp this->output_ports(), this->my_body );
ut_ports(), this->my_body );
} }
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) { /* override */ void set_name( const char *name ) {
tbb::internal::fgt_multioutput_node_desc( this, name ); tbb::internal::fgt_multioutput_node_desc( this, name );
} }
#endif #endif
// all the guts are in multifunction_input... #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
protected: void extract( ) {
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type: my_predecessors.built_predecessors().receiver_extract(*this);
:reset(__TBB_PFG_RESET_ARG(f)); } base_type::extract();
}; // multifunction_node
template < typename Input, typename Output, typename Allocator >
class multifunction_node<Input,Output,queueing,Allocator> : public graph_no
de, public internal::multifunction_input<Input,
typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::v
alue, internal::multifunction_output, Output>::type, Allocator> {
protected:
using graph_node::my_graph;
static const int N = tbb::flow::tuple_size<Output>::value;
public:
typedef Input input_type;
typedef null_type output_type;
typedef typename internal::wrap_tuple_elements<N, internal::multifuncti
on_output, Output>::type output_ports_type;
private:
typedef typename internal::multifunction_input<input_type, output_ports
_type, Allocator> base_type;
typedef typename internal::function_input_queue<input_type,Allocator> q
ueue_type;
public:
template<typename Body>
multifunction_node( graph &g, size_t concurrency, Body body) :
graph_node(g), base_type(g,concurrency, body, new queue_type()) {
tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter
nal::FLOW_MULTIFUNCTION_NODE,
&this->gra
ph_node::my_graph, static_cast<receiver<input_type> *>(this),
this->outp
ut_ports(), this->my_body );
}
multifunction_node( const multifunction_node &other) :
graph_node(other.graph_node::my_graph), base_type(other, new queue_
type()) {
tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::inter
nal::FLOW_MULTIFUNCTION_NODE,
&this->gra
ph_node::my_graph, static_cast<receiver<input_type> *>(this),
this->outp
ut_ports(), this->my_body );
}
#if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) {
tbb::internal::fgt_multioutput_node_desc( this, name );
} }
#endif #endif
// all the guts are in multifunction_input... // all the guts are in multifunction_input...
protected: protected:
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type: :reset(__TBB_PFG_RESET_ARG(f)); } /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) { base_ type::reset(__TBB_PFG_RESET_ARG(f)); }
}; // multifunction_node }; // multifunction_node
//! split_node: accepts a tuple as input, forwards each element of the tupl e to its //! split_node: accepts a tuple as input, forwards each element of the tupl e to its
// successors. The node has unlimited concurrency, so though it is marked as // successors. The node has unlimited concurrency, so though it is marked as
// "rejecting" it does not reject inputs. // "rejecting" it does not reject inputs.
template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup leType> > template<typename TupleType, typename Allocator=cache_aligned_allocator<Tup leType> >
class split_node : public multifunction_node<TupleType, TupleType, rejectin g, Allocator> { class split_node : public multifunction_node<TupleType, TupleType, rejectin g, Allocator> {
static const int N = tbb::flow::tuple_size<TupleType>::value; static const int N = tbb::flow::tuple_size<TupleType>::value;
typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> bas e_type; typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> bas e_type;
public: public:
skipping to change at line 1292 skipping to change at line 1240
static_cast<receiver<input_type> *>(this), static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type> *>(this), this->my_body ); static_cast<sender<output_type> *>(this), this->my_body );
} }
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) { /* override */ void set_name( const char *name ) {
tbb::internal::fgt_node_desc( this, name ); tbb::internal::fgt_node_desc( this, name );
} }
#endif #endif
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override graph_node*/ void extract() {
fInput_type::my_built_predecessors.receiver_extract(*this);
successors().built_successors().sender_extract(*this);
}
#endif
protected: protected:
template< typename R, typename B > friend class run_and_put_task; template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache ; template<typename X, typename Y> friend class internal::broadcast_cache ;
template<typename X, typename Y> friend class internal::round_robin_cac he; template<typename X, typename Y> friend class internal::round_robin_cac he;
using fInput_type::try_put_task; using fInput_type::try_put_task;
/* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) {
fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f)); fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
successors().reset(f); if(f & rf_clear_edges)successors().clear();
__TBB_ASSERT(!(f & rf_extract) || successors().empty(), "continue_n __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "contin
ode not reset"); ue_node not reset");
#endif #endif
} }
/* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
}; // continue_node }; // continue_node
template< typename T > template< typename T >
class overwrite_node : public graph_node, public receiver<T>, public sender <T> { class overwrite_node : public graph_node, public receiver<T>, public sender <T> {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
public: public:
typedef T input_type; typedef T input_type;
typedef T output_type; typedef T output_type;
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename receiver<input_type>::built_predecessors_type built_pr
edecessors_type;
typedef typename sender<output_type>::built_successors_type built_succe
ssors_type;
typedef typename receiver<input_type>::predecessor_list_type predecesso r_list_type; typedef typename receiver<input_type>::predecessor_list_type predecesso r_list_type;
typedef typename sender<output_type>::successor_list_type successor_lis t_type; typedef typename sender<output_type>::successor_list_type successor_lis t_type;
#endif #endif
overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) { overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
my_successors.set_owner( this ); my_successors.set_owner( this );
tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this- >my_graph, tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this- >my_graph,
static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) ); static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
} }
skipping to change at line 1372 skipping to change at line 1329
return true; return true;
} }
/* override */ bool remove_successor( successor_type &s ) { /* override */ bool remove_successor( successor_type &s ) {
spin_mutex::scoped_lock l( my_mutex ); spin_mutex::scoped_lock l( my_mutex );
my_successors.remove_successor(s); my_successors.remove_successor(s);
return true; return true;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/built_predecessors_type &built_predecessors() { return my_b
uilt_predecessors; }
/*override*/built_successors_type &built_successors() { return my_s
uccessors.built_successors(); }
/*override*/void internal_add_built_successor( successor_type &s) { /*override*/void internal_add_built_successor( successor_type &s) {
spin_mutex::scoped_lock l( my_mutex ); spin_mutex::scoped_lock l( my_mutex );
my_successors.internal_add_built_successor(s); my_successors.internal_add_built_successor(s);
} }
/*override*/void internal_delete_built_successor( successor_type &s) { /*override*/void internal_delete_built_successor( successor_type &s) {
spin_mutex::scoped_lock l( my_mutex ); spin_mutex::scoped_lock l( my_mutex );
my_successors.internal_delete_built_successor(s); my_successors.internal_delete_built_successor(s);
} }
skipping to change at line 1411 skipping to change at line 1371
/*override*/size_t predecessor_count() { /*override*/size_t predecessor_count() {
spin_mutex::scoped_lock l( my_mutex ); spin_mutex::scoped_lock l( my_mutex );
return my_built_predecessors.edge_count(); return my_built_predecessors.edge_count();
} }
/*override*/void copy_predecessors(predecessor_list_type &v) { /*override*/void copy_predecessors(predecessor_list_type &v) {
spin_mutex::scoped_lock l( my_mutex ); spin_mutex::scoped_lock l( my_mutex );
my_built_predecessors.copy_edges(v); my_built_predecessors.copy_edges(v);
} }
/*override*/ void extract() {
my_buffer_is_valid = false;
built_successors().sender_extract(*this);
built_predecessors().receiver_extract(*this);
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
/* override */ bool try_get( input_type &v ) { /* override */ bool try_get( input_type &v ) {
spin_mutex::scoped_lock l( my_mutex ); spin_mutex::scoped_lock l( my_mutex );
if ( my_buffer_is_valid ) { if ( my_buffer_is_valid ) {
v = my_buffer; v = my_buffer;
return true; return true;
} }
return false; return false;
} }
skipping to change at line 1445 skipping to change at line 1412
template<typename X, typename Y> friend class internal::round_robin_cac he; template<typename X, typename Y> friend class internal::round_robin_cac he;
/* override */ task * try_put_task( const input_type &v ) { /* override */ task * try_put_task( const input_type &v ) {
spin_mutex::scoped_lock l( my_mutex ); spin_mutex::scoped_lock l( my_mutex );
my_buffer = v; my_buffer = v;
my_buffer_is_valid = true; my_buffer_is_valid = true;
task * rtask = my_successors.try_put_task(v); task * rtask = my_successors.try_put_task(v);
if(!rtask) rtask = SUCCESSFULLY_ENQUEUED; if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
return rtask; return rtask;
} }
/*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
my_buffer_is_valid = false;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_successors.reset(f);
if (f&rf_extract) {
my_built_predecessors.receiver_extract(*this);
}
#endif
}
spin_mutex my_mutex; spin_mutex my_mutex;
internal::broadcast_cache< input_type, null_rw_mutex > my_successors; internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal::edge_container<predecessor_type> my_built_predecessors; internal::edge_container<predecessor_type> my_built_predecessors;
#endif #endif
input_type my_buffer; input_type my_buffer;
bool my_buffer_is_valid; bool my_buffer_is_valid;
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {} /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
/*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) {
my_buffer_is_valid = false;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
if (f&rf_clear_edges) {
my_successors.clear();
my_built_predecessors.receiver_extract(*this);
}
#endif
}
}; // overwrite_node }; // overwrite_node
template< typename T > template< typename T >
class write_once_node : public overwrite_node<T> { class write_once_node : public overwrite_node<T> {
public: public:
typedef T input_type; typedef T input_type;
typedef T output_type; typedef T output_type;
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
skipping to change at line 1529 skipping to change at line 1496
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename receiver<input_type>::predecessor_list_type predecesso r_list_type; typedef typename receiver<input_type>::predecessor_list_type predecesso r_list_type;
typedef typename sender<output_type>::successor_list_type successor_lis t_type; typedef typename sender<output_type>::successor_list_type successor_lis t_type;
#endif #endif
private: private:
internal::broadcast_cache<input_type> my_successors; internal::broadcast_cache<input_type> my_successors;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal::edge_container<predecessor_type> my_built_predecessors; internal::edge_container<predecessor_type> my_built_predecessors;
spin_mutex pred_mutex; spin_mutex pred_mutex; // serialize accesses on edge_container
#endif #endif
public: public:
broadcast_node(graph& g) : graph_node(g) { broadcast_node(graph& g) : graph_node(g) {
my_successors.set_owner( this ); my_successors.set_owner( this );
tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this- >my_graph, tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this- >my_graph,
static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) ); static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
} }
// Copy constructor // Copy constructor
skipping to change at line 1567 skipping to change at line 1534
return true; return true;
} }
//! Removes s as a successor //! Removes s as a successor
virtual bool remove_successor( receiver<T> &r ) { virtual bool remove_successor( receiver<T> &r ) {
my_successors.remove_successor( r ); my_successors.remove_successor( r );
return true; return true;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/ void internal_add_built_successor(successor_type &r) { typedef typename sender<T>::built_successors_type built_successors_type
;
/*override sender*/ built_successors_type &built_successors() { return
my_successors.built_successors(); }
/*override sender*/ void internal_add_built_successor(successor_type &r
) {
my_successors.internal_add_built_successor(r); my_successors.internal_add_built_successor(r);
} }
/*override*/ void internal_delete_built_successor(successor_type &r) { /*override sender*/ void internal_delete_built_successor(successor_type &r) {
my_successors.internal_delete_built_successor(r); my_successors.internal_delete_built_successor(r);
} }
/*override*/ size_t successor_count() { /*override sender*/ size_t successor_count() {
return my_successors.successor_count(); return my_successors.successor_count();
} }
/*override*/ void copy_successors(successor_list_type &v) { /*override*/ void copy_successors(successor_list_type &v) {
my_successors.copy_successors(v); my_successors.copy_successors(v);
} }
typedef typename receiver<T>::built_predecessors_type built_predecessor
s_type;
/*override receiver*/ built_predecessors_type &built_predecessors() { r
eturn my_built_predecessors; }
/*override*/ void internal_add_built_predecessor( predecessor_type &p) { /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
spin_mutex::scoped_lock l(pred_mutex);
my_built_predecessors.add_edge(p); my_built_predecessors.add_edge(p);
} }
/*override*/ void internal_delete_built_predecessor( predecessor_type & p) { /*override*/ void internal_delete_built_predecessor( predecessor_type & p) {
spin_mutex::scoped_lock l(pred_mutex);
my_built_predecessors.delete_edge(p); my_built_predecessors.delete_edge(p);
} }
/*override*/ size_t predecessor_count() { /*override*/ size_t predecessor_count() {
spin_mutex::scoped_lock l(pred_mutex);
return my_built_predecessors.edge_count(); return my_built_predecessors.edge_count();
} }
/*override*/ void copy_predecessors(predecessor_list_type &v) { /*override*/ void copy_predecessors(predecessor_list_type &v) {
spin_mutex::scoped_lock l(pred_mutex);
my_built_predecessors.copy_edges(v); my_built_predecessors.copy_edges(v);
} }
/*override graph_node*/ void extract() {
my_built_predecessors.receiver_extract(*this);
my_successors.built_successors().sender_extract(*this);
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
protected: protected:
template< typename R, typename B > friend class run_and_put_task; template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache ; template<typename X, typename Y> friend class internal::broadcast_cache ;
template<typename X, typename Y> friend class internal::round_robin_cac he; template<typename X, typename Y> friend class internal::round_robin_cac he;
//! build a task to run the successor if possible. Default is old beha vior. //! build a task to run the successor if possible. Default is old beha vior.
/*override*/ task *try_put_task(const T& t) { /*override*/ task *try_put_task(const T& t) {
task *new_task = my_successors.try_put_task(t); task *new_task = my_successors.try_put_task(t);
if(!new_task) new_task = SUCCESSFULLY_ENQUEUED; if(!new_task) new_task = SUCCESSFULLY_ENQUEUED;
return new_task; return new_task;
} }
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/))
{}
/*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags f)) {
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_successors.reset(f); if (f&rf_clear_edges) {
if (f&rf_extract) { my_successors.clear();
my_built_predecessors.receiver_extract(*this); my_built_predecessors.clear();
} }
__TBB_ASSERT(!(f & rf_extract) || my_successors.empty(), "Error res etting broadcast_node"); __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
#endif #endif
} }
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
}; // broadcast_node }; // broadcast_node
//! Forwards messages in arbitrary order //! Forwards messages in arbitrary order
template <typename T, typename A=cache_aligned_allocator<T> > template <typename T, typename A=cache_aligned_allocator<T> >
class buffer_node : public graph_node, public internal::reservable_item_buf fer<T, A>, public receiver<T>, public sender<T> { class buffer_node : public graph_node, public internal::reservable_item_buf fer<T, A>, public receiver<T>, public sender<T> {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
public: public:
typedef T input_type; typedef T input_type;
typedef T output_type; typedef T output_type;
skipping to change at line 1734 skipping to change at line 1719
forwarder_busy = true; forwarder_busy = true;
task *new_task = new(task::allocate_additional_child_of(*tp )) internal:: task *new_task = new(task::allocate_additional_child_of(*tp )) internal::
forward_task_bypass forward_task_bypass
< buffer_node<input_type, A> >(*this); < buffer_node<input_type, A> >(*this);
// tmp should point to the last item handled by the aggrega tor. This is the operation // tmp should point to the last item handled by the aggrega tor. This is the operation
// the handling thread enqueued. So modifying that record will be okay. // the handling thread enqueued. So modifying that record will be okay.
tbb::task *z = tmp->ltask; tbb::task *z = tmp->ltask;
tmp->ltask = combine_tasks(z, new_task); // in case the op generated a task tmp->ltask = combine_tasks(z, new_task); // in case the op generated a task
} }
} }
} } // handle_operations
inline task *grab_forwarding_task( buffer_operation &op_data) { inline task *grab_forwarding_task( buffer_operation &op_data) {
return op_data.ltask; return op_data.ltask;
} }
inline bool enqueue_forwarding_task(buffer_operation &op_data) { inline bool enqueue_forwarding_task(buffer_operation &op_data) {
task *ft = grab_forwarding_task(op_data); task *ft = grab_forwarding_task(op_data);
if(ft) { if(ft) {
FLOW_SPAWN(*ft); FLOW_SPAWN(*ft);
return true; return true;
skipping to change at line 1776 skipping to change at line 1761
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
//! Remove successor //! Remove successor
virtual void internal_rem_succ(buffer_operation *op) { virtual void internal_rem_succ(buffer_operation *op) {
my_successors.remove_successor(*(op->r)); my_successors.remove_successor(*(op->r));
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename sender<T>::built_successors_type built_successors_type
;
/*override sender*/ built_successors_type &built_successors() { return
my_successors.built_successors(); }
virtual void internal_add_built_succ(buffer_operation *op) { virtual void internal_add_built_succ(buffer_operation *op) {
my_successors.internal_add_built_successor(*(op->r)); my_successors.internal_add_built_successor(*(op->r));
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
virtual void internal_del_built_succ(buffer_operation *op) { virtual void internal_del_built_succ(buffer_operation *op) {
my_successors.internal_delete_built_successor(*(op->r)); my_successors.internal_delete_built_successor(*(op->r));
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
typedef typename receiver<T>::built_predecessors_type built_predecessor
s_type;
/*override receiver*/ built_predecessors_type &built_predecessors() { r
eturn my_built_predecessors; }
virtual void internal_add_built_pred(buffer_operation *op) { virtual void internal_add_built_pred(buffer_operation *op) {
my_built_predecessors.add_edge(*(op->p)); my_built_predecessors.add_edge(*(op->p));
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
virtual void internal_del_built_pred(buffer_operation *op) { virtual void internal_del_built_pred(buffer_operation *op) {
my_built_predecessors.delete_edge(*(op->p)); my_built_predecessors.delete_edge(*(op->p));
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
skipping to change at line 1815 skipping to change at line 1808
virtual void internal_copy_succs(buffer_operation *op) { virtual void internal_copy_succs(buffer_operation *op) {
my_successors.copy_successors(*(op->svec)); my_successors.copy_successors(*(op->svec));
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
virtual void internal_copy_preds(buffer_operation *op) { virtual void internal_copy_preds(buffer_operation *op) {
my_built_predecessors.copy_edges(*(op->pvec)); my_built_predecessors.copy_edges(*(op->pvec));
__TBB_store_with_release(op->status, SUCCEEDED); __TBB_store_with_release(op->status, SUCCEEDED);
} }
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
//! Tries to forward valid items to successors //! Tries to forward valid items to successors
virtual void internal_forward_task(buffer_operation *op) { virtual void internal_forward_task(buffer_operation *op) {
if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) { if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) {
__TBB_store_with_release(op->status, FAILED); __TBB_store_with_release(op->status, FAILED);
this->forwarder_busy = false; this->forwarder_busy = false;
return; return;
} }
T i_copy; T i_copy;
skipping to change at line 1970 skipping to change at line 1964
buffer_operation op_data(blt_pred_cpy); buffer_operation op_data(blt_pred_cpy);
op_data.pvec = &v; op_data.pvec = &v;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
/*override*/ void copy_successors( successor_list_type &v ) { /*override*/ void copy_successors( successor_list_type &v ) {
buffer_operation op_data(blt_succ_cpy); buffer_operation op_data(blt_succ_cpy);
op_data.svec = &v; op_data.svec = &v;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
} }
#endif #endif
//! Removes a successor. //! Removes a successor.
/** Removes successor r from the list of successors. /** Removes successor r from the list of successors.
It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */ It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
/* override */ bool remove_successor( successor_type &r ) { /* override */ bool remove_successor( successor_type &r ) {
r.remove_predecessor(*this); r.remove_predecessor(*this);
buffer_operation op_data(rem_succ); buffer_operation op_data(rem_succ);
op_data.r = &r; op_data.r = &r;
my_aggregator.execute(&op_data); my_aggregator.execute(&op_data);
skipping to change at line 2052 skipping to change at line 2047
// call returned a task (if another request resulted in a succe ssful // call returned a task (if another request resulted in a succe ssful
// forward this could happen.) Queue the task and reset the po inter. // forward this could happen.) Queue the task and reset the po inter.
FLOW_SPAWN(*ft); ft = NULL; FLOW_SPAWN(*ft); ft = NULL;
} }
else if(!ft && op_data.status == SUCCEEDED) { else if(!ft && op_data.status == SUCCEEDED) {
ft = SUCCESSFULLY_ENQUEUED; ft = SUCCESSFULLY_ENQUEUED;
} }
return ft; return ft;
} }
/*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/))
{ }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
public:
/* override*/ void extract() {
my_built_predecessors.receiver_extract(*this);
my_successors.built_successors().sender_extract(*this);
}
#endif
protected:
/*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) {
internal::reservable_item_buffer<T, A>::reset(); internal::reservable_item_buffer<T, A>::reset();
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_successors.reset(f); // TODO: just clear structures
if (f&rf_extract) { if (f&rf_clear_edges) {
my_built_predecessors.receiver_extract(*this); my_successors.clear();
my_built_predecessors.clear();
} }
#endif #endif
forwarder_busy = false; forwarder_busy = false;
} }
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/))
{ }
}; // buffer_node }; // buffer_node
//! Forwards messages in FIFO order //! Forwards messages in FIFO order
template <typename T, typename A=cache_aligned_allocator<T> > template <typename T, typename A=cache_aligned_allocator<T> >
class queue_node : public buffer_node<T, A> { class queue_node : public buffer_node<T, A> {
protected: protected:
typedef buffer_node<T, A> base_type; typedef buffer_node<T, A> base_type;
typedef typename base_type::size_type size_type; typedef typename base_type::size_type size_type;
typedef typename base_type::buffer_operation queue_operation; typedef typename base_type::buffer_operation queue_operation;
skipping to change at line 2154 skipping to change at line 2159
static_cast<receiver<input_type> *>(this), static_cast<receiver<input_type> *>(this),
static_cast<sender<output_type> *>(this) ) ; static_cast<sender<output_type> *>(this) ) ;
} }
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) { /* override */ void set_name( const char *name ) {
tbb::internal::fgt_node_desc( this, name ); tbb::internal::fgt_node_desc( this, name );
} }
#endif #endif
/*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { protected:
base_type::reset(__TBB_PFG_RESET_ARG(f)); /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) {
base_type::reset_node(__TBB_PFG_RESET_ARG(f));
} }
}; // queue_node }; // queue_node
//! Forwards messages in sequence order //! Forwards messages in sequence order
template< typename T, typename A=cache_aligned_allocator<T> > template< typename T, typename A=cache_aligned_allocator<T> >
class sequencer_node : public queue_node<T, A> { class sequencer_node : public queue_node<T, A> {
internal::function_body< T, size_t > *my_sequencer; internal::function_body< T, size_t > *my_sequencer;
// my_sequencer should be a benign function and must be callable // my_sequencer should be a benign function and must be callable
// from a parallel context. Does this mean it needn't be reset? // from a parallel context. Does this mean it needn't be reset?
public: public:
skipping to change at line 2262 skipping to change at line 2268
} }
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) { /* override */ void set_name( const char *name ) {
tbb::internal::fgt_node_desc( this, name ); tbb::internal::fgt_node_desc( this, name );
} }
#endif #endif
protected: protected:
/*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) {
mark = 0; mark = 0;
base_type::reset(__TBB_PFG_RESET_ARG(f)); base_type::reset_node(__TBB_PFG_RESET_ARG(f));
} }
typedef typename buffer_node<T, A>::size_type size_type; typedef typename buffer_node<T, A>::size_type size_type;
typedef typename buffer_node<T, A>::item_type item_type; typedef typename buffer_node<T, A>::item_type item_type;
typedef typename buffer_node<T, A>::buffer_operation prio_operation; typedef typename buffer_node<T, A>::buffer_operation prio_operation;
enum op_stat {WAIT=0, SUCCEEDED, FAILED}; enum op_stat {WAIT=0, SUCCEEDED, FAILED};
/* override */ void handle_operations(prio_operation *op_list) { /* override */ void handle_operations(prio_operation *op_list) {
prio_operation *tmp = op_list /*, *pop_list*/ ; prio_operation *tmp = op_list /*, *pop_list*/ ;
skipping to change at line 2505 skipping to change at line 2511
template< typename T > template< typename T >
class limiter_node : public graph_node, public receiver< T >, public sender < T > { class limiter_node : public graph_node, public receiver< T >, public sender < T > {
protected: protected:
using graph_node::my_graph; using graph_node::my_graph;
public: public:
typedef T input_type; typedef T input_type;
typedef T output_type; typedef T output_type;
typedef sender< input_type > predecessor_type; typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type; typedef receiver< output_type > successor_type;
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
typedef typename receiver<input_type>::built_predecessors_type built_pr
edecessors_type;
typedef typename sender<output_type>::built_successors_type built_succe
ssors_type;
typedef typename receiver<input_type>::predecessor_list_type predecesso r_list_type; typedef typename receiver<input_type>::predecessor_list_type predecesso r_list_type;
typedef typename sender<output_type>::successor_list_type successor_lis t_type; typedef typename sender<output_type>::successor_list_type successor_lis t_type;
#endif #endif
private: private:
size_t my_threshold; size_t my_threshold;
size_t my_count; //number of successful puts size_t my_count; //number of successful puts
size_t my_tries; //number of active put attempts size_t my_tries; //number of active put attempts
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors ; internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors ;
spin_mutex my_mutex; spin_mutex my_mutex;
skipping to change at line 2660 skipping to change at line 2668
//! Removes a successor from this node //! Removes a successor from this node
/** r.remove_predecessor(*this) is also called. */ /** r.remove_predecessor(*this) is also called. */
/* override */ bool remove_successor( receiver<output_type> &r ) { /* override */ bool remove_successor( receiver<output_type> &r ) {
r.remove_predecessor(*this); r.remove_predecessor(*this);
my_successors.remove_successor(r); my_successors.remove_successor(r);
return true; return true;
} }
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/ built_successors_type &built_successors() { return my_succ
essors.built_successors(); }
/*override*/ built_predecessors_type &built_predecessors() { return my_
predecessors.built_predecessors(); }
/*override*/void internal_add_built_successor(receiver<output_type> &sr c) { /*override*/void internal_add_built_successor(receiver<output_type> &sr c) {
my_successors.internal_add_built_successor(src); my_successors.internal_add_built_successor(src);
} }
/*override*/void internal_delete_built_successor(receiver<output_type> &src) { /*override*/void internal_delete_built_successor(receiver<output_type> &src) {
my_successors.internal_delete_built_successor(src); my_successors.internal_delete_built_successor(src);
} }
/*override*/size_t successor_count() { return my_successors.successor_c ount(); } /*override*/size_t successor_count() { return my_successors.successor_c ount(); }
skipping to change at line 2687 skipping to change at line 2698
/*override*/void internal_delete_built_predecessor(sender<output_type> &src) { /*override*/void internal_delete_built_predecessor(sender<output_type> &src) {
my_predecessors.internal_delete_built_predecessor(src); my_predecessors.internal_delete_built_predecessor(src);
} }
/*override*/size_t predecessor_count() { return my_predecessors.predece ssor_count(); } /*override*/size_t predecessor_count() { return my_predecessors.predece ssor_count(); }
/*override*/ void copy_predecessors(predecessor_list_type &v) { /*override*/ void copy_predecessors(predecessor_list_type &v) {
my_predecessors.copy_predecessors(v); my_predecessors.copy_predecessors(v);
} }
/*override*/void extract() {
my_count = 0;
my_successors.built_successors().sender_extract(*this);
my_predecessors.built_predecessors().receiver_extract(*this);
decrement.built_predecessors().receiver_extract(decrement);
}
#endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */ #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
//! Adds src to the list of cached predecessors. //! Adds src to the list of cached predecessors.
/* override */ bool register_predecessor( predecessor_type &src ) { /* override */ bool register_predecessor( predecessor_type &src ) {
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
my_predecessors.add( src ); my_predecessors.add( src );
task* tp = this->my_graph.root_task(); task* tp = this->my_graph.root_task();
if ( my_count + my_tries < my_threshold && !my_successors.empty() & & tp ) { if ( my_count + my_tries < my_threshold && !my_successors.empty() & & tp ) {
FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) ) FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
internal::forward_task_bypass < limiter_node<T> >( *this ) ) ); internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
skipping to change at line 2741 skipping to change at line 2759
} }
} }
else { else {
spin_mutex::scoped_lock lock(my_mutex); spin_mutex::scoped_lock lock(my_mutex);
++my_count; ++my_count;
--my_tries; --my_tries;
} }
return rtask; return rtask;
} }
/*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) { /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/))
{
__TBB_ASSERT(false,NULL); // should never be called
}
/*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) {
my_count = 0; my_count = 0;
my_predecessors.reset(__TBB_PFG_RESET_ARG(f));
decrement.reset_receiver(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
my_successors.reset(f); if(f & rf_clear_edges) {
my_predecessors.clear();
my_successors.clear();
}
else
#endif #endif
{
my_predecessors.reset( );
}
decrement.reset_receiver(__TBB_PFG_RESET_ARG(f));
} }
/*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { m
y_predecessors.reset(__TBB_PFG_RESET_ARG(f)); }
}; // limiter_node }; // limiter_node
#include "internal/_flow_graph_join_impl.h" #include "internal/_flow_graph_join_impl.h"
using internal::reserving_port; using internal::reserving_port;
using internal::queueing_port; using internal::queueing_port;
using internal::tag_matching_port; using internal::tag_matching_port;
using internal::input_port; using internal::input_port;
using internal::tag_value; using internal::tag_value;
using internal::NO_TAG; using internal::NO_TAG;
skipping to change at line 3266 skipping to change at line 3292
} }
template<typename C > template<typename C >
template< typename R > template< typename R >
void internal::edge_container<C>::receiver_extract( R &r ) { void internal::edge_container<C>::receiver_extract( R &r ) {
edge_list_type e = built_edges; edge_list_type e = built_edges;
for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++ i ) { for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++ i ) {
remove_edge(**i, r); remove_edge(**i, r);
} }
} }
#endif #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
//! Returns a copy of the body from a function or continue node //! Returns a copy of the body from a function or continue node
template< typename Body, typename Node > template< typename Body, typename Node >
Body copy_body( Node &n ) { Body copy_body( Node &n ) {
return n.template copy_function_object<Body>(); return n.template copy_function_object<Body>();
} }
#if __TBB_PREVIEW_COMPOSITE_NODE #if __TBB_PREVIEW_COMPOSITE_NODE
//composite_node //composite_node
skipping to change at line 3329 skipping to change at line 3355
else else
tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_ FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil d_of, this, tbb::internal::FLOW_NODE ); tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_ FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil d_of, this, tbb::internal::FLOW_NODE );
return add_nodes_impl(visible, n...); return add_nodes_impl(visible, n...);
} else { } else {
return false; return false;
} }
} }
#endif #endif
protected: protected:
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags)) {} /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags)) {}
public: public:
composite_node( graph &g, const char *my_type_name = " ") : graph_node( g), type_name(my_type_name) { composite_node( graph &g, const char *my_type_name = " ") : graph_node( g), type_name(my_type_name) {
my_input_ports = NULL; my_input_ports = NULL;
my_output_ports = NULL; my_output_ports = NULL;
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern al::FLOW_COMPOSITE_NODE ); tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern al::FLOW_COMPOSITE_NODE );
tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam e ); tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam e );
#endif #endif
} }
skipping to change at line 3402 skipping to change at line 3428
output_ports_type output_ports() { output_ports_type output_ports() {
__TBB_ASSERT(my_output_ports, "output ports not set, call set_exte rnal_ports to set output ports"); __TBB_ASSERT(my_output_ports, "output ports not set, call set_exte rnal_ports to set output ports");
return *my_output_ports; return *my_output_ports;
} }
virtual ~composite_node() { virtual ~composite_node() {
if(my_input_ports) delete my_input_ports; if(my_input_ports) delete my_input_ports;
if(my_output_ports) delete my_output_ports; if(my_output_ports) delete my_output_ports;
} }
};
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/void extract() {
__TBB_ASSERT(false, "Current composite_node implementation does not
support extract");
}
#endif
}; // class composite_node
//composite_node with only input ports //composite_node with only input ports
//TODO: trim specializations //TODO: trim specializations
template< typename... InputTypes> template< typename... InputTypes>
class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node { class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
public: public:
typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type; typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
private: private:
skipping to change at line 3451 skipping to change at line 3484
else else
tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_ FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil d_of, this, tbb::internal::FLOW_NODE ); tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_ FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil d_of, this, tbb::internal::FLOW_NODE );
return add_nodes_impl(visible, n...); return add_nodes_impl(visible, n...);
} else { } else {
return false; return false;
} }
} }
#endif #endif
protected: protected:
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags)) {} /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags)) {}
public: public:
composite_node( graph &g, const char *my_type_name = " ") : graph_node( g), type_name(my_type_name) { composite_node( graph &g, const char *my_type_name = " ") : graph_node( g), type_name(my_type_name) {
my_input_ports = NULL; my_input_ports = NULL;
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern al::FLOW_COMPOSITE_NODE ); tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern al::FLOW_COMPOSITE_NODE );
tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam e ); tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam e );
#endif #endif
} }
skipping to change at line 3511 skipping to change at line 3544
#endif #endif
input_ports_type input_ports() { input_ports_type input_ports() {
__TBB_ASSERT(my_input_ports, "input ports not set, call set_extern al_ports to set input ports"); __TBB_ASSERT(my_input_ports, "input ports not set, call set_extern al_ports to set input ports");
return *my_input_ports; return *my_input_ports;
} }
virtual ~composite_node() { virtual ~composite_node() {
if(my_input_ports) delete my_input_ports; if(my_input_ports) delete my_input_ports;
} }
};
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/void extract() {
__TBB_ASSERT(false, "Current composite_node implementation does not
support extract");
}
#endif
}; // class composite_node
//composite_nodes with only output_ports //composite_nodes with only output_ports
template<typename... OutputTypes> template<typename... OutputTypes>
class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node { class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
public: public:
typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type; typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
private: private:
output_ports_type *my_output_ports; output_ports_type *my_output_ports;
static const size_t NUM_OUTPUTS = sizeof...(OutputTypes); static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
skipping to change at line 3558 skipping to change at line 3598
else else
tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_ FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil d_of, this, tbb::internal::FLOW_NODE ); tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_ FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_chil d_of, this, tbb::internal::FLOW_NODE );
return add_nodes_impl(visible, n...); return add_nodes_impl(visible, n...);
} else { } else {
return false; return false;
} }
} }
#endif #endif
protected: protected:
/*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags)) {} /*override*/void reset_node(__TBB_PFG_RESET_ARG(reset_flags)) {}
public: public:
composite_node( graph &g, const char *my_type_name = " ") : graph_node( g), type_name(my_type_name) { composite_node( graph &g, const char *my_type_name = " ") : graph_node( g), type_name(my_type_name) {
my_output_ports = NULL; my_output_ports = NULL;
#if TBB_PREVIEW_FLOW_GRAPH_TRACE #if TBB_PREVIEW_FLOW_GRAPH_TRACE
tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern al::FLOW_COMPOSITE_NODE ); tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::intern al::FLOW_COMPOSITE_NODE );
tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam e ); tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_nam e );
#endif #endif
} }
skipping to change at line 3618 skipping to change at line 3658
#endif #endif
output_ports_type output_ports() { output_ports_type output_ports() {
__TBB_ASSERT(my_output_ports, "output ports not set, call set_exte rnal_ports to set output ports"); __TBB_ASSERT(my_output_ports, "output ports not set, call set_exte rnal_ports to set output ports");
return *my_output_ports; return *my_output_ports;
} }
virtual ~composite_node() { virtual ~composite_node() {
if(my_output_ports) delete my_output_ports; if(my_output_ports) delete my_output_ports;
} }
};
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/void extract() {
__TBB_ASSERT(false, "Current composite_node implementation does not
support extract");
}
#endif
}; // class composite_node
#endif // __TBB_PREVIEW_COMPOSITE_NODE #endif // __TBB_PREVIEW_COMPOSITE_NODE
#if __TBB_PREVIEW_ASYNC_NODE
namespace internal {
//! Pure virtual template class that defines interface for async communicat
ion
template < typename Output >
class async_gateway {
public:
typedef Output output_type;
//! Submit signal from Async Activity to FG
virtual bool async_try_put(const output_type &i ) = 0;
virtual void async_reserve() = 0;
virtual void async_commit() = 0;
virtual ~async_gateway() {}
};
}
//! Implements a async node
template < typename Input, typename Output, typename Allocator=cache_aligne
d_allocator<Input> >
class async_node : public graph_node, public internal::async_input<Input, A
llocator, internal::async_gateway<Output> >, public internal::function_outp
ut<Output>, public internal::async_gateway<Output> {
protected:
using graph_node::my_graph;
public:
typedef Input input_type;
typedef Output output_type;
typedef async_node< input_type, output_type, Allocator > my_class;
typedef sender< input_type > predecessor_type;
typedef receiver< output_type > successor_type;
typedef internal::async_gateway< output_type > async_gateway_type;
typedef internal::async_input<input_type, Allocator, async_gateway_type
> async_input_type;
typedef internal::function_output<output_type> async_output_type;
//! Constructor
template< typename Body >
async_node( graph &g, Body body ) :
graph_node( g ), async_input_type( g, body ) {
tbb::internal::fgt_node_with_body( tbb::internal::FLOW_ASYNC_NODE,
&this->graph_node::my_graph,
static_cast<receiver<input_type>
*>(this),
static_cast<sender<output_type>
*>(this), this->my_body );
}
//! Copy constructor
async_node( const async_node& src ) :
graph_node(src.graph_node::my_graph), async_input_type( src ), asyn
c_output_type(){
tbb::internal::fgt_node_with_body( tbb::internal::FLOW_ASYNC_NODE,
&this->graph_node::my_graph,
static_cast<receiver<input_type>
*>(this),
static_cast<sender<output_type>
*>(this), this->my_body );
}
/* override */ async_gateway_type& async_gateway() {
return static_cast< async_gateway_type& >(*this);
}
#if TBB_PREVIEW_FLOW_GRAPH_TRACE
/* override */ void set_name( const char *name ) {
tbb::internal::fgt_node_desc( this, name );
}
#endif
protected:
template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class internal::broadcast_cache
;
template<typename X, typename Y> friend class internal::round_robin_cac
he;
using async_input_type::try_put_task;
/*override*/void reset_node( __TBB_PFG_RESET_ARG(reset_flags f)) {
async_input_type::reset_async_input(__TBB_PFG_RESET_ARG(f));
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
if(f & rf_clear_edges) successors().clear();
__TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "functi
on_node successors not empty");
__TBB_ASSERT(!(f & rf_clear_edges) || this->my_predecessors.empty()
, "function_node predecessors not empty");
#endif
}
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES
/*override*/void extract() {
this->my_predecessors.built_predecessors().receiver_extract(*this);
successors().built_successors().sender_extract(*this);
}
#endif
internal::broadcast_cache<output_type> &successors () { return async_ou
tput_type::my_successors; }
//! Submit signal from Async Activity to FG
/*override*/ bool async_try_put(const output_type &i ) {
// TODO: enqueue a task to a FG arena
task *res = successors().try_put_task(i);
if(!res) return false;
if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
return true;
}
/*override*/ void async_reserve() {
my_graph.increment_wait_count();
}
/*override*/ void async_commit() {
my_graph.decrement_wait_count();
}
};
#endif // __TBB_PREVIEW_ASYNC_NODE
} // interface7 } // interface7
#if TBB_PREVIEW_FLOW_GRAPH_FEATURES #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
using interface7::reset_flags; using interface7::reset_flags;
using interface7::rf_reset_protocol; using interface7::rf_reset_protocol;
using interface7::rf_reset_bodies; using interface7::rf_reset_bodies;
using interface7::rf_extract; using interface7::rf_clear_edges;
#endif #endif
using interface7::graph; using interface7::graph;
using interface7::graph_node; using interface7::graph_node;
using interface7::continue_msg; using interface7::continue_msg;
using interface7::sender; using interface7::sender;
using interface7::receiver; using interface7::receiver;
using interface7::continue_receiver; using interface7::continue_receiver;
using interface7::source_node; using interface7::source_node;
skipping to change at line 3667 skipping to change at line 3818
using interface7::join_node; using interface7::join_node;
using interface7::input_port; using interface7::input_port;
using interface7::copy_body; using interface7::copy_body;
using interface7::make_edge; using interface7::make_edge;
using interface7::remove_edge; using interface7::remove_edge;
using interface7::internal::NO_TAG; using interface7::internal::NO_TAG;
using interface7::internal::tag_value; using interface7::internal::tag_value;
#if __TBB_PREVIEW_COMPOSITE_NODE #if __TBB_PREVIEW_COMPOSITE_NODE
using interface7::composite_node; using interface7::composite_node;
#endif #endif
#if __TBB_PREVIEW_ASYNC_NODE
using interface7::async_node;
#endif
} // flow } // flow
} // tbb } // tbb
#undef __TBB_PFG_RESET_ARG #undef __TBB_PFG_RESET_ARG
#undef __TBB_COMMA #undef __TBB_COMMA
#endif // __TBB_flow_graph_H #endif // __TBB_flow_graph_H
 End of changes. 102 change blocks. 
244 lines changed or deleted 415 lines changed or added


 tbb_config.h   tbb_config.h 
skipping to change at line 211 skipping to change at line 211
#define __TBB_CPP11_LAMBDAS_PRESENT (__GXX_EXPERIMENTAL_C XX0X__ && __TBB_GCC_VERSION >= 40500) #define __TBB_CPP11_LAMBDAS_PRESENT (__GXX_EXPERIMENTAL_C XX0X__ && __TBB_GCC_VERSION >= 40500)
#elif _MSC_VER #elif _MSC_VER
#define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT (_MSC_VER >= 1800) #define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT (_MSC_VER >= 1800)
#define __TBB_CPP11_RVALUE_REF_PRESENT (_MSC_VER >= 1600) #define __TBB_CPP11_RVALUE_REF_PRESENT (_MSC_VER >= 1600)
#define __TBB_EXCEPTION_PTR_PRESENT (_MSC_VER >= 1600) #define __TBB_EXCEPTION_PTR_PRESENT (_MSC_VER >= 1600)
#define __TBB_STATIC_ASSERT_PRESENT (_MSC_VER >= 1600) #define __TBB_STATIC_ASSERT_PRESENT (_MSC_VER >= 1600)
#define __TBB_CPP11_TUPLE_PRESENT (_MSC_VER >= 1600) #define __TBB_CPP11_TUPLE_PRESENT (_MSC_VER >= 1600)
#define __TBB_INITIALIZER_LISTS_PRESENT (_MSC_VER >= 1800) #define __TBB_INITIALIZER_LISTS_PRESENT (_MSC_VER >= 1800)
#define __TBB_CONSTEXPR_PRESENT 0 #define __TBB_CONSTEXPR_PRESENT 0
#define __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT (_MSC_VER >= 1800) #define __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT (_MSC_VER >= 1800)
#define __TBB_NOEXCEPT_PRESENT 0 /*for _MSC_VER == 1 800*/ #define __TBB_NOEXCEPT_PRESENT (_MSC_VER >= 1900)
#define __TBB_CPP11_STD_BEGIN_END_PRESENT (_MSC_VER >= 1700) #define __TBB_CPP11_STD_BEGIN_END_PRESENT (_MSC_VER >= 1700)
#define __TBB_CPP11_AUTO_PRESENT (_MSC_VER >= 1600) #define __TBB_CPP11_AUTO_PRESENT (_MSC_VER >= 1600)
#define __TBB_CPP11_DECLTYPE_PRESENT (_MSC_VER >= 1600) #define __TBB_CPP11_DECLTYPE_PRESENT (_MSC_VER >= 1600)
#define __TBB_CPP11_LAMBDAS_PRESENT (_MSC_VER >= 1600) #define __TBB_CPP11_LAMBDAS_PRESENT (_MSC_VER >= 1600)
#else #else
#define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT 0 #define __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT 0
#define __TBB_CPP11_RVALUE_REF_PRESENT 0 #define __TBB_CPP11_RVALUE_REF_PRESENT 0
#define __TBB_EXCEPTION_PTR_PRESENT 0 #define __TBB_EXCEPTION_PTR_PRESENT 0
#define __TBB_STATIC_ASSERT_PRESENT 0 #define __TBB_STATIC_ASSERT_PRESENT 0
#define __TBB_CPP11_TUPLE_PRESENT 0 #define __TBB_CPP11_TUPLE_PRESENT 0
skipping to change at line 620 skipping to change at line 620
#if !__TBB_GCC_WARNING_SUPPRESSION_PRESENT #if !__TBB_GCC_WARNING_SUPPRESSION_PRESENT
#error Warning suppression is not supported, while should. #error Warning suppression is not supported, while should.
#endif #endif
#endif #endif
/*In a PIC mode some versions of GCC 4.1.2 generate incorrect inlined code for 8 byte __sync_val_compare_and_swap intrinsic */ /*In a PIC mode some versions of GCC 4.1.2 generate incorrect inlined code for 8 byte __sync_val_compare_and_swap intrinsic */
#if __TBB_GCC_VERSION == 40102 && __PIC__ && !defined(__INTEL_COMPILER) && !defined(__clang__) #if __TBB_GCC_VERSION == 40102 && __PIC__ && !defined(__INTEL_COMPILER) && !defined(__clang__)
#define __TBB_GCC_CAS8_BUILTIN_INLINING_BROKEN 1 #define __TBB_GCC_CAS8_BUILTIN_INLINING_BROKEN 1
#endif #endif
#if __TBB_x86_32 && (__linux__ || __APPLE__ || _WIN32 || __sun || __ANDROID __) && (__INTEL_COMPILER || (__GNUC__==3 && __GNUC_MINOR__==3 ) || __SUNPR O_CC) #if __TBB_x86_32 && (__linux__ || __APPLE__ || _WIN32 || __sun || __ANDROID __) && (__INTEL_COMPILER || (__GNUC__==3 && __GNUC_MINOR__==3 )||(__MINGW3 2__ ) && (__GNUC__==4 && __GNUC_MINOR__==5 ) || __SUNPRO_CC)
// Some compilers for IA-32 fail to provide 8-byte alignment of objects on the stack, // Some compilers for IA-32 fail to provide 8-byte alignment of objects on the stack,
// even if the object specifies 8-byte alignment. On such platforms, t he IA-32 implementation // even if the object specifies 8-byte alignment. On such platforms, t he IA-32 implementation
// of 64 bit atomics (e.g. atomic<long long>) use different tactics dep ending upon // of 64 bit atomics (e.g. atomic<long long>) use different tactics dep ending upon
// whether the object is properly aligned or not. // whether the object is properly aligned or not.
#define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 1 #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 1
#else #else
#define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 0 #define __TBB_FORCE_64BIT_ALIGNMENT_BROKEN 0
#endif #endif
#if __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT && __TBB_GCC_VERSION < 40700 & & !defined(__INTEL_COMPILER) && !defined (__clang__) #if __TBB_DEFAULTED_AND_DELETED_FUNC_PRESENT && __TBB_GCC_VERSION < 40700 & & !defined(__INTEL_COMPILER) && !defined (__clang__)
skipping to change at line 649 skipping to change at line 649
// A compiler bug: a disabled copy constructor prevents use of the moving c onstructor // A compiler bug: a disabled copy constructor prevents use of the moving c onstructor
#define __TBB_IF_NO_COPY_CTOR_MOVE_SEMANTICS_BROKEN (_MSC_VER && (__INTEL_C OMPILER >= 1300 && __INTEL_COMPILER <= 1310) && !__INTEL_CXX11_MODE__) #define __TBB_IF_NO_COPY_CTOR_MOVE_SEMANTICS_BROKEN (_MSC_VER && (__INTEL_C OMPILER >= 1300 && __INTEL_COMPILER <= 1310) && !__INTEL_CXX11_MODE__)
// MSVC 2013 and ICC 15 seems do not generate implicit move constructor for empty derived class while should // MSVC 2013 and ICC 15 seems do not generate implicit move constructor for empty derived class while should
#define __TBB_CPP11_IMPLICIT_MOVE_MEMBERS_GENERATION_FOR_DERIVED_BROKEN (_ _TBB_CPP11_RVALUE_REF_PRESENT && \ #define __TBB_CPP11_IMPLICIT_MOVE_MEMBERS_GENERATION_FOR_DERIVED_BROKEN (_ _TBB_CPP11_RVALUE_REF_PRESENT && \
( !__INTEL_COMPILER && _MSC_VER && _MSC_VER <= 1800 || __INTEL_COMPIL ER && __INTEL_COMPILER <= 1500 )) ( !__INTEL_COMPILER && _MSC_VER && _MSC_VER <= 1800 || __INTEL_COMPIL ER && __INTEL_COMPILER <= 1500 ))
#define __TBB_CPP11_DECLVAL_BROKEN (_MSC_VER == 1600 || (__GNUC__ && __TBB_ GCC_VERSION < 40500) ) #define __TBB_CPP11_DECLVAL_BROKEN (_MSC_VER == 1600 || (__GNUC__ && __TBB_ GCC_VERSION < 40500) )
// Intel C++ compiler has difficulties with copying std::pair with VC11 std
::reference_wrapper being a const member
#define __TBB_COPY_FROM_NON_CONST_REF_BROKEN (_MSC_VER == 1700 && __INTEL_C
OMPILER && __INTEL_COMPILER < 1600)
//The implicit upcasting of the tuple of a reference of a derived class to a base class fails on icc 13.X //The implicit upcasting of the tuple of a reference of a derived class to a base class fails on icc 13.X
//if the system's gcc environment is 4.8 //if the system's gcc environment is 4.8
#if (__INTEL_COMPILER >=1300 && __INTEL_COMPILER <=1310) && __TBB_GCC_VERSI ON>=40700 && __GXX_EXPERIMENTAL_CXX0X__ #if (__INTEL_COMPILER >=1300 && __INTEL_COMPILER <=1310) && __TBB_GCC_VERSI ON>=40700 && __GXX_EXPERIMENTAL_CXX0X__
#define __TBB_UPCAST_OF_TUPLE_OF_REF_BROKEN 1 #define __TBB_UPCAST_OF_TUPLE_OF_REF_BROKEN 1
#endif #endif
/** End of __TBB_XXX_BROKEN macro section **/ /** End of __TBB_XXX_BROKEN macro section **/
#if defined(_MSC_VER) && _MSC_VER>=1500 && !defined(__INTEL_COMPILER) #if defined(_MSC_VER) && _MSC_VER>=1500 && !defined(__INTEL_COMPILER)
// A macro to suppress erroneous or benign "unreachable code" MSVC warn ing (4702) // A macro to suppress erroneous or benign "unreachable code" MSVC warn ing (4702)
skipping to change at line 670 skipping to change at line 672
#endif #endif
#define __TBB_ATOMIC_CTORS (__TBB_CONSTEXPR_PRESENT && __TBB_DEFAULTED_ AND_DELETED_FUNC_PRESENT && (!__TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN)) #define __TBB_ATOMIC_CTORS (__TBB_CONSTEXPR_PRESENT && __TBB_DEFAULTED_ AND_DELETED_FUNC_PRESENT && (!__TBB_ZERO_INIT_WITH_DEFAULTED_CTOR_BROKEN))
#define __TBB_ALLOCATOR_CONSTRUCT_VARIADIC (__TBB_CPP11_VARIADIC_TEMPL ATES_PRESENT && __TBB_CPP11_RVALUE_REF_PRESENT) #define __TBB_ALLOCATOR_CONSTRUCT_VARIADIC (__TBB_CPP11_VARIADIC_TEMPL ATES_PRESENT && __TBB_CPP11_RVALUE_REF_PRESENT)
#define __TBB_VARIADIC_PARALLEL_INVOKE (TBB_PREVIEW_VARIADIC_PARAL LEL_INVOKE && __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT && __TBB_CPP11_RVALUE_ REF_PRESENT) #define __TBB_VARIADIC_PARALLEL_INVOKE (TBB_PREVIEW_VARIADIC_PARAL LEL_INVOKE && __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT && __TBB_CPP11_RVALUE_ REF_PRESENT)
#define __TBB_PREVIEW_COMPOSITE_NODE (TBB_PREVIEW_FLOW_GRAPH_NOD ES && __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT \ #define __TBB_PREVIEW_COMPOSITE_NODE (TBB_PREVIEW_FLOW_GRAPH_NOD ES && __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT \
&& __TBB_CPP11_RVALUE_REF_ PRESENT && __TBB_CPP11_AUTO_PRESENT) \ && __TBB_CPP11_RVALUE_REF_ PRESENT && __TBB_CPP11_AUTO_PRESENT) \
&& __TBB_CPP11_VARIADIC_TU PLE_PRESENT && !__TBB_UPCAST_OF_TUPLE_OF_REF_BROKEN && __TBB_CPP11_VARIADIC_TU PLE_PRESENT && !__TBB_UPCAST_OF_TUPLE_OF_REF_BROKEN
#define __TBB_PREVIEW_ASYNC_NODE TBB_PREVIEW_FLOW_GRAPH_NODES
#endif /* __TBB_tbb_config_H */ #endif /* __TBB_tbb_config_H */
 End of changes. 4 change blocks. 
2 lines changed or deleted 7 lines changed or added


 tbb_stddef.h   tbb_stddef.h 
skipping to change at line 29 skipping to change at line 29
*/ */
#ifndef __TBB_tbb_stddef_H #ifndef __TBB_tbb_stddef_H
#define __TBB_tbb_stddef_H #define __TBB_tbb_stddef_H
// Marketing-driven product version // Marketing-driven product version
#define TBB_VERSION_MAJOR 4 #define TBB_VERSION_MAJOR 4
#define TBB_VERSION_MINOR 3 #define TBB_VERSION_MINOR 3
// Engineering-focused interface version // Engineering-focused interface version
#define TBB_INTERFACE_VERSION 8005 #define TBB_INTERFACE_VERSION 8006
#define TBB_INTERFACE_VERSION_MAJOR TBB_INTERFACE_VERSION/1000 #define TBB_INTERFACE_VERSION_MAJOR TBB_INTERFACE_VERSION/1000
// The oldest major interface version still supported // The oldest major interface version still supported
// To be used in SONAME, manifests, etc. // To be used in SONAME, manifests, etc.
#define TBB_COMPATIBLE_INTERFACE_VERSION 2 #define TBB_COMPATIBLE_INTERFACE_VERSION 2
#define __TBB_STRING_AUX(x) #x #define __TBB_STRING_AUX(x) #x
#define __TBB_STRING(x) __TBB_STRING_AUX(x) #define __TBB_STRING(x) __TBB_STRING_AUX(x)
// We do not need defines below for resource processing on windows // We do not need defines below for resource processing on windows
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/