kcthread.h | kcthread.h | |||
---|---|---|---|---|
skipping to change at line 594 | skipping to change at line 594 | |||
* @param sec the interval of the suspension in seconds. | * @param sec the interval of the suspension in seconds. | |||
* @return true on catched signal, or false on timeout. | * @return true on catched signal, or false on timeout. | |||
*/ | */ | |||
bool wait(Mutex* mutex, double sec); | bool wait(Mutex* mutex, double sec); | |||
/** | /** | |||
* Send the wake-up signal to another waiting thread. | * Send the wake-up signal to another waiting thread. | |||
* @note The mutex used for the wait method should be locked by the calle r. | * @note The mutex used for the wait method should be locked by the calle r. | |||
*/ | */ | |||
void signal(); | void signal(); | |||
/** | /** | |||
* Send the wake-up signal to all waiting threads. | * Send the wake-up signals to all waiting threads. | |||
* @note The mutex used for the wait method should be locked by the calle r. | * @note The mutex used for the wait method should be locked by the calle r. | |||
*/ | */ | |||
void broadcast(); | void broadcast(); | |||
private: | private: | |||
/** Dummy constructor to forbid the use. */ | /** Dummy constructor to forbid the use. */ | |||
CondVar(const CondVar&); | CondVar(const CondVar&); | |||
/** Dummy Operator to forbid the use. */ | /** Dummy Operator to forbid the use. */ | |||
CondVar& operator =(const CondVar&); | CondVar& operator =(const CondVar&); | |||
/** Opaque pointer. */ | /** Opaque pointer. */ | |||
void* opq_; | void* opq_; | |||
}; | }; | |||
/** | /** | |||
* Assosiative condition variable. | ||||
*/ | ||||
class CondMap { | ||||
private: | ||||
struct Count; | ||||
struct Slot; | ||||
/** An alias of set of counters. */ | ||||
typedef std::map<std::string, Count> CountMap; | ||||
/** The number of slots. */ | ||||
static const size_t SLOTNUM = 64; | ||||
public: | ||||
/** | ||||
* Default constructor. | ||||
*/ | ||||
explicit CondMap() : slots_() { | ||||
_assert_(true); | ||||
} | ||||
/** | ||||
* Destructor. | ||||
*/ | ||||
~CondMap() { | ||||
_assert_(true); | ||||
} | ||||
/** | ||||
* Wait for a signal. | ||||
* @param kbuf the pointer to the key region. | ||||
* @param ksiz the size of the key region. | ||||
* @param sec the interval of the suspension in seconds. If it is negati | ||||
ve, no timeout is | ||||
* specified. | ||||
* @return true on catched signal, or false on timeout. | ||||
*/ | ||||
bool wait(const char* kbuf, size_t ksiz, double sec = -1) { | ||||
_assert_(kbuf && ksiz <= MEMMAXSIZ); | ||||
std::string key(kbuf, ksiz); | ||||
return wait(key, sec); | ||||
} | ||||
/** | ||||
* Wait for a signal by a key. | ||||
* @param key the key. | ||||
* @param sec the interval of the suspension in seconds. If it is negati | ||||
ve, no timeout is | ||||
* specified. | ||||
* @return true on catched signal, or false on timeout. | ||||
*/ | ||||
bool wait(const std::string& key, double sec = -1) { | ||||
_assert_(true); | ||||
double invtime = sec < 0 ? 1.0 : sec; | ||||
double curtime = time(); | ||||
double endtime = curtime + (sec < 0 ? UINT32MAX : sec); | ||||
Slot* slot = get_slot(key); | ||||
while (curtime < endtime) { | ||||
ScopedMutex lock(&slot->mutex); | ||||
CountMap::iterator cit = slot->counter.find(key); | ||||
if (cit == slot->counter.end()) { | ||||
Count cnt = { 1, false }; | ||||
slot->counter[key] = cnt; | ||||
} else { | ||||
cit->second.num++; | ||||
} | ||||
slot->cond.wait(&slot->mutex, invtime); | ||||
cit = slot->counter.find(key); | ||||
cit->second.num--; | ||||
if (cit->second.wake > 0) { | ||||
cit->second.wake--; | ||||
if (cit->second.num < 1) slot->counter.erase(cit); | ||||
return true; | ||||
} | ||||
if (cit->second.num < 1) slot->counter.erase(cit); | ||||
curtime = time(); | ||||
} | ||||
return false; | ||||
} | ||||
/** | ||||
* Send a wake-up signal to another thread waiting by a key. | ||||
* @param kbuf the pointer to the key region. | ||||
* @param ksiz the size of the key region. | ||||
* @return the number of threads waiting for the signal. | ||||
*/ | ||||
size_t signal(const char* kbuf, size_t ksiz) { | ||||
_assert_(kbuf && ksiz <= MEMMAXSIZ); | ||||
std::string key(kbuf, ksiz); | ||||
return signal(key); | ||||
} | ||||
/** | ||||
* Send a wake-up signal to another thread waiting by a key. | ||||
* @param key the key. | ||||
* @return the number of threads waiting for the signal. | ||||
*/ | ||||
size_t signal(const std::string& key) { | ||||
_assert_(true); | ||||
Slot* slot = get_slot(key); | ||||
ScopedMutex lock(&slot->mutex); | ||||
CountMap::iterator cit = slot->counter.find(key); | ||||
if (cit == slot->counter.end() || cit->second.num < 1) return 0; | ||||
if (cit->second.wake < cit->second.num) cit->second.wake++; | ||||
slot->cond.broadcast(); | ||||
return cit->second.num; | ||||
} | ||||
/** | ||||
* Send wake-up signals to all threads waiting by a key. | ||||
* @param kbuf the pointer to the key region. | ||||
* @param ksiz the size of the key region. | ||||
* @return the number of threads waiting for the signal. | ||||
*/ | ||||
size_t broadcast(const char* kbuf, size_t ksiz) { | ||||
_assert_(kbuf && ksiz <= MEMMAXSIZ); | ||||
std::string key(kbuf, ksiz); | ||||
return broadcast(key); | ||||
} | ||||
/** | ||||
* Send wake-up signals to all threads waiting by a key. | ||||
* @param key the key. | ||||
* @return the number of threads waiting for the signal. | ||||
*/ | ||||
size_t broadcast(const std::string& key) { | ||||
_assert_(true); | ||||
Slot* slot = get_slot(key); | ||||
ScopedMutex lock(&slot->mutex); | ||||
CountMap::iterator cit = slot->counter.find(key); | ||||
if (cit == slot->counter.end() || cit->second.num < 1) return 0; | ||||
cit->second.wake = cit->second.num; | ||||
slot->cond.broadcast(); | ||||
return cit->second.num; | ||||
} | ||||
/** | ||||
* Send wake-up signals to all threads waiting by each key. | ||||
* @return the number of threads waiting for the signal. | ||||
*/ | ||||
size_t broadcast_all() { | ||||
_assert_(true); | ||||
size_t sum = 0; | ||||
for (size_t i = 0; i < SLOTNUM; i++) { | ||||
Slot* slot = slots_ + i; | ||||
ScopedMutex lock(&slot->mutex); | ||||
CountMap::iterator cit = slot->counter.begin(); | ||||
CountMap::iterator citend = slot->counter.end(); | ||||
while (cit != citend) { | ||||
if (cit->second.num > 0) { | ||||
cit->second.wake = cit->second.num; | ||||
sum += cit->second.num; | ||||
} | ||||
slot->cond.broadcast(); | ||||
++cit; | ||||
} | ||||
} | ||||
return sum; | ||||
} | ||||
/** | ||||
* Get the total number of threads waiting for signals. | ||||
* @return the total number of threads waiting for signals. | ||||
*/ | ||||
size_t count() { | ||||
_assert_(true); | ||||
size_t sum = 0; | ||||
for (size_t i = 0; i < SLOTNUM; i++) { | ||||
Slot* slot = slots_ + i; | ||||
ScopedMutex lock(&slot->mutex); | ||||
CountMap::iterator cit = slot->counter.begin(); | ||||
CountMap::iterator citend = slot->counter.end(); | ||||
while (cit != citend) { | ||||
sum += cit->second.num; | ||||
++cit; | ||||
} | ||||
} | ||||
return sum; | ||||
} | ||||
private: | ||||
/** | ||||
* Counter for waiting threads. | ||||
*/ | ||||
struct Count { | ||||
size_t num; ///< waiting threads | ||||
size_t wake; ///< waking threads | ||||
}; | ||||
/** | ||||
* Slot of a key space. | ||||
*/ | ||||
struct Slot { | ||||
CondVar cond; ///< condition variable | ||||
Mutex mutex; ///< mutex | ||||
CountMap counter; ///< counter | ||||
}; | ||||
/** | ||||
* Get the slot corresponding a key. | ||||
* @param key the key. | ||||
* @return the slot corresponding the key. | ||||
*/ | ||||
Slot* get_slot(const std::string& key) { | ||||
return slots_ + hashmurmur(key.data(), key.size()) % SLOTNUM; | ||||
} | ||||
/** The slot array. */ | ||||
Slot slots_[SLOTNUM]; | ||||
}; | ||||
/** | ||||
* Key of thread specific data. | * Key of thread specific data. | |||
*/ | */ | |||
class TSDKey { | class TSDKey { | |||
public: | public: | |||
/** | /** | |||
* Default constructor. | * Default constructor. | |||
*/ | */ | |||
explicit TSDKey(); | explicit TSDKey(); | |||
/** | /** | |||
* Constructor. | * Constructor. | |||
End of changes. 2 change blocks. | ||||
1 lines changed or deleted | 197 lines changed or added | |||