kccachedb.h | kccachedb.h | |||
---|---|---|---|---|
skipping to change at line 141 | skipping to change at line 141 | |||
rvsiz = zsiz; | rvsiz = zsiz; | |||
} | } | |||
} | } | |||
size_t vsiz; | size_t vsiz; | |||
const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vs iz); | const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vs iz); | |||
delete[] zbuf; | delete[] zbuf; | |||
if (vbuf == Visitor::REMOVE) { | if (vbuf == Visitor::REMOVE) { | |||
uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | |||
Slot* slot = db_->slots_ + sidx_; | Slot* slot = db_->slots_ + sidx_; | |||
Repeater repeater(Visitor::REMOVE, 0); | Repeater repeater(Visitor::REMOVE, 0); | |||
db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, tr ue); | db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, fa lse); | |||
} else if (vbuf == Visitor::NOP) { | } else if (vbuf == Visitor::NOP) { | |||
if (step) step_impl(); | if (step) step_impl(); | |||
} else { | } else { | |||
uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | |||
Slot* slot = db_->slots_ + sidx_; | Slot* slot = db_->slots_ + sidx_; | |||
Repeater repeater(vbuf, vsiz); | Repeater repeater(vbuf, vsiz); | |||
db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, tr ue); | db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, fa lse); | |||
if (step) step_impl(); | if (step) step_impl(); | |||
} | } | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Jump the cursor to the first record for forward scan. | * Jump the cursor to the first record for forward scan. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool jump() { | bool jump() { | |||
_assert_(true); | _assert_(true); | |||
skipping to change at line 377 | skipping to change at line 377 | |||
FOPEN = 1 << 0, ///< dummy for compatibility | FOPEN = 1 << 0, ///< dummy for compatibility | |||
FFATAL = 1 << 1 ///< dummy for compatibility | FFATAL = 1 << 1 ///< dummy for compatibility | |||
}; | }; | |||
/** | /** | |||
* Default constructor. | * Default constructor. | |||
*/ | */ | |||
explicit CacheDB() : | explicit CacheDB() : | |||
mlock_(), flock_(), error_(), logger_(NULL), logkinds_(0), mtrigger_( NULL), | mlock_(), flock_(), error_(), logger_(NULL), logkinds_(0), mtrigger_( NULL), | |||
omode_(0), curs_(), path_(""), type_(TYPECACHE), | omode_(0), curs_(), path_(""), type_(TYPECACHE), | |||
opts_(0), bnum_(DEFBNUM), capcnt_(-1), capsiz_(-1), | opts_(0), bnum_(DEFBNUM), capcnt_(-1), capsiz_(-1), | |||
opaque_(), embcomp_(ZLIBRAWCOMP), comp_(NULL), slots_(), tran_(false) { | opaque_(), embcomp_(ZLIBRAWCOMP), comp_(NULL), slots_(), rttmode_(tru e), tran_(false) { | |||
_assert_(true); | _assert_(true); | |||
} | } | |||
/** | /** | |||
* Destructor. | * Destructor. | |||
* @note If the database is not closed, it is closed implicitly. | * @note If the database is not closed, it is closed implicitly. | |||
*/ | */ | |||
virtual ~CacheDB() { | virtual ~CacheDB() { | |||
_assert_(true); | _assert_(true); | |||
if (omode_ != 0) close(); | if (omode_ != 0) close(); | |||
if (!curs_.empty()) { | if (!curs_.empty()) { | |||
skipping to change at line 425 | skipping to change at line 425 | |||
if (writable && !(omode_ & OWRITER)) { | if (writable && !(omode_ & OWRITER)) { | |||
set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | |||
return false; | return false; | |||
} | } | |||
if (ksiz > KSIZMAX) ksiz = KSIZMAX; | if (ksiz > KSIZMAX) ksiz = KSIZMAX; | |||
uint64_t hash = hash_record(kbuf, ksiz); | uint64_t hash = hash_record(kbuf, ksiz); | |||
int32_t sidx = hash % SLOTNUM; | int32_t sidx = hash % SLOTNUM; | |||
hash /= SLOTNUM; | hash /= SLOTNUM; | |||
Slot* slot = slots_ + sidx; | Slot* slot = slots_ + sidx; | |||
slot->lock.lock(); | slot->lock.lock(); | |||
accept_impl(slot, hash, kbuf, ksiz, visitor, comp_, false); | accept_impl(slot, hash, kbuf, ksiz, visitor, comp_, rttmode_); | |||
slot->lock.unlock(); | slot->lock.unlock(); | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Accept a visitor to multiple records at once. | * Accept a visitor to multiple records at once. | |||
* @param keys specifies a string vector of the keys. | * @param keys specifies a string vector of the keys. | |||
* @param visitor a visitor object. | * @param visitor a visitor object. | |||
* @param writable true for writable operation, or false for read-only op eration. | * @param writable true for writable operation, or false for read-only op eration. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
* @note The operations for specified records are performed atomically an d other threads | * @note The operations for specified records are performed atomically an d other threads | |||
skipping to change at line 483 | skipping to change at line 483 | |||
std::set<int32_t>::iterator sit = sidxs.begin(); | std::set<int32_t>::iterator sit = sidxs.begin(); | |||
std::set<int32_t>::iterator sitend = sidxs.end(); | std::set<int32_t>::iterator sitend = sidxs.end(); | |||
while (sit != sitend) { | while (sit != sitend) { | |||
Slot* slot = slots_ + *sit; | Slot* slot = slots_ + *sit; | |||
slot->lock.lock(); | slot->lock.lock(); | |||
++sit; | ++sit; | |||
} | } | |||
for (size_t i = 0; i < knum; i++) { | for (size_t i = 0; i < knum; i++) { | |||
RecordKey* rkey = rkeys + i; | RecordKey* rkey = rkeys + i; | |||
Slot* slot = slots_ + rkey->sidx; | Slot* slot = slots_ + rkey->sidx; | |||
accept_impl(slot, rkey->hash, rkey->kbuf, rkey->ksiz, visitor, comp_, false); | accept_impl(slot, rkey->hash, rkey->kbuf, rkey->ksiz, visitor, comp_, rttmode_); | |||
} | } | |||
sit = sidxs.begin(); | sit = sidxs.begin(); | |||
sitend = sidxs.end(); | sitend = sidxs.end(); | |||
while (sit != sitend) { | while (sit != sitend) { | |||
Slot* slot = slots_ + *sit; | Slot* slot = slots_ + *sit; | |||
slot->lock.unlock(); | slot->lock.unlock(); | |||
++sit; | ++sit; | |||
} | } | |||
delete[] rkeys; | delete[] rkeys; | |||
return true; | return true; | |||
skipping to change at line 546 | skipping to change at line 546 | |||
rvbuf = zbuf; | rvbuf = zbuf; | |||
rvsiz = zsiz; | rvsiz = zsiz; | |||
} | } | |||
} | } | |||
size_t vsiz; | size_t vsiz; | |||
const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, & vsiz); | const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, & vsiz); | |||
delete[] zbuf; | delete[] zbuf; | |||
if (vbuf == Visitor::REMOVE) { | if (vbuf == Visitor::REMOVE) { | |||
uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | |||
Repeater repeater(Visitor::REMOVE, 0); | Repeater repeater(Visitor::REMOVE, 0); | |||
accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, true); | accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false); | |||
} else if (vbuf != Visitor::NOP) { | } else if (vbuf != Visitor::NOP) { | |||
uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | |||
Repeater repeater(vbuf, vsiz); | Repeater repeater(vbuf, vsiz); | |||
accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, true); | accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false); | |||
} | } | |||
rec = next; | rec = next; | |||
curcnt++; | curcnt++; | |||
if (checker && !checker->check("iterate", "processing", curcnt, all cnt)) { | if (checker && !checker->check("iterate", "processing", curcnt, all cnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
return false; | return false; | |||
} | } | |||
} | } | |||
} | } | |||
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
return false; | return false; | |||
} | } | |||
trigger_meta(MetaTrigger::ITERATE, "iterate"); | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | ||||
er = NULL) { | ||||
_assert_(visitor && thnum <= MEMMAXSIZ); | ||||
ScopedRWLock lock(&mlock_, false); | ||||
if (omode_ == 0) { | ||||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | ||||
return false; | ||||
} | ||||
if (thnum < 1) thnum = 1; | ||||
thnum = std::pow(2, (int32_t)std::log2(thnum * std::sqrt(2))); | ||||
if (thnum > (size_t)SLOTNUM) thnum = SLOTNUM; | ||||
ScopedVisitor svis(visitor); | ||||
int64_t allcnt = count_impl(); | ||||
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt | ||||
)) { | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
class ThreadImpl : public Thread { | ||||
public: | ||||
explicit ThreadImpl() : | ||||
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), slots_(), | ||||
error_() {} | ||||
void init(CacheDB* db, Visitor* visitor, ProgressChecker* checker, in | ||||
t64_t allcnt) { | ||||
db_ = db; | ||||
visitor_ = visitor; | ||||
checker_ = checker; | ||||
allcnt_ = allcnt; | ||||
} | ||||
void add_slot(Slot* slot) { | ||||
slots_.push_back(slot); | ||||
} | ||||
const Error& error() { | ||||
return error_; | ||||
} | ||||
private: | ||||
void run() { | ||||
CacheDB* db = db_; | ||||
Visitor* visitor = visitor_; | ||||
ProgressChecker* checker = checker_; | ||||
int64_t allcnt = allcnt_; | ||||
Compressor* comp = db->comp_; | ||||
std::vector<Slot*>::iterator sit = slots_.begin(); | ||||
std::vector<Slot*>::iterator sitend = slots_.end(); | ||||
while (sit != sitend) { | ||||
Slot* slot = *sit; | ||||
Record* rec = slot->first; | ||||
while (rec) { | ||||
Record* next = rec->next; | ||||
uint32_t rksiz = rec->ksiz & KSIZMAX; | ||||
char* dbuf = (char*)rec + sizeof(*rec); | ||||
const char* rvbuf = dbuf + rksiz; | ||||
size_t rvsiz = rec->vsiz; | ||||
char* zbuf = NULL; | ||||
size_t zsiz = 0; | ||||
if (comp) { | ||||
zbuf = comp->decompress(rvbuf, rvsiz, &zsiz); | ||||
if (zbuf) { | ||||
rvbuf = zbuf; | ||||
rvsiz = zsiz; | ||||
} | ||||
} | ||||
size_t vsiz; | ||||
visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vsiz); | ||||
delete[] zbuf; | ||||
rec = next; | ||||
if (checker && !checker->check("scan_parallel", "processing", - | ||||
1, allcnt)) { | ||||
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
} | ||||
++sit; | ||||
} | ||||
} | ||||
CacheDB* db_; | ||||
Visitor* visitor_; | ||||
ProgressChecker* checker_; | ||||
int64_t allcnt_; | ||||
std::vector<Slot*> slots_; | ||||
Error error_; | ||||
}; | ||||
bool err = false; | ||||
bool orttmode = rttmode_; | ||||
rttmode_ = false; | ||||
ThreadImpl* threads = new ThreadImpl[thnum]; | ||||
for (int32_t i = 0; i < SLOTNUM; i++) { | ||||
ThreadImpl* thread = threads + (i % thnum); | ||||
thread->add_slot(slots_ + i); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->init(this, visitor, checker, allcnt); | ||||
thread->start(); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->join(); | ||||
if (thread->error() != Error::SUCCESS) { | ||||
*error_ = thread->error(); | ||||
err = true; | ||||
} | ||||
} | ||||
delete[] threads; | ||||
rttmode_ = orttmode; | ||||
if (err) return false; | ||||
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | ||||
{ | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | ||||
return true; | ||||
} | ||||
/** | ||||
* Get the last happened error. | * Get the last happened error. | |||
* @return the last happened error. | * @return the last happened error. | |||
*/ | */ | |||
Error error() const { | Error error() const { | |||
_assert_(true); | _assert_(true); | |||
return error_; | return error_; | |||
} | } | |||
/** | /** | |||
* Set the error information. | * Set the error information. | |||
* @param file the file name of the program source code. | * @param file the file name of the program source code. | |||
skipping to change at line 1043 | skipping to change at line 1163 | |||
_assert_(true); | _assert_(true); | |||
ScopedRWLock lock(&mlock_, true); | ScopedRWLock lock(&mlock_, true); | |||
if (omode_ != 0) { | if (omode_ != 0) { | |||
set_error(_KCCODELINE_, Error::INVALID, "already opened"); | set_error(_KCCODELINE_, Error::INVALID, "already opened"); | |||
return false; | return false; | |||
} | } | |||
capsiz_ = size; | capsiz_ = size; | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Switch the mode of LRU rotation. | ||||
* @param rttmode true to enable LRU rotation, false to disable LRU rotat | ||||
ion. | ||||
* @return true on success, or false on failure. | ||||
* @note This function can be called while the database is opened. | ||||
*/ | ||||
bool switch_rotation(bool rttmode) { | ||||
_assert_(true); | ||||
ScopedRWLock lock(&mlock_, true); | ||||
rttmode_ = rttmode; | ||||
return true; | ||||
} | ||||
/** | ||||
* Get the opaque data. | * Get the opaque data. | |||
* @return the pointer to the opaque data region, whose size is 16 bytes. | * @return the pointer to the opaque data region, whose size is 16 bytes. | |||
*/ | */ | |||
char* opaque() { | char* opaque() { | |||
_assert_(true); | _assert_(true); | |||
ScopedRWLock lock(&mlock_, false); | ScopedRWLock lock(&mlock_, false); | |||
if (omode_ == 0) { | if (omode_ == 0) { | |||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | set_error(_KCCODELINE_, Error::INVALID, "not opened"); | |||
return NULL; | return NULL; | |||
} | } | |||
skipping to change at line 1481 | skipping to change at line 1613 | |||
Visitor* visitor_; ///< visitor | Visitor* visitor_; ///< visitor | |||
}; | }; | |||
/** | /** | |||
* Accept a visitor to a record. | * Accept a visitor to a record. | |||
* @param slot the slot of the record. | * @param slot the slot of the record. | |||
* @param hash the hash value of the key. | * @param hash the hash value of the key. | |||
* @param kbuf the pointer to the key region. | * @param kbuf the pointer to the key region. | |||
* @param ksiz the size of the key region. | * @param ksiz the size of the key region. | |||
* @param visitor a visitor object. | * @param visitor a visitor object. | |||
* @param comp the data compressor. | * @param comp the data compressor. | |||
* @param isiter true for iterator use, or false for direct use. | * @param rtt whether to move the record to the last. | |||
*/ | */ | |||
void accept_impl(Slot* slot, uint64_t hash, const char* kbuf, size_t ksiz , Visitor* visitor, | void accept_impl(Slot* slot, uint64_t hash, const char* kbuf, size_t ksiz , Visitor* visitor, | |||
Compressor* comp, bool isiter) { | Compressor* comp, bool rtt) { | |||
_assert_(slot && kbuf && ksiz <= MEMMAXSIZ && visitor); | _assert_(slot && kbuf && ksiz <= MEMMAXSIZ && visitor); | |||
size_t bidx = hash % slot->bnum; | size_t bidx = hash % slot->bnum; | |||
Record* rec = slot->buckets[bidx]; | Record* rec = slot->buckets[bidx]; | |||
Record** entp = slot->buckets + bidx; | Record** entp = slot->buckets + bidx; | |||
uint32_t fhash = fold_hash(hash) & ~KSIZMAX; | uint32_t fhash = fold_hash(hash) & ~KSIZMAX; | |||
while (rec) { | while (rec) { | |||
uint32_t rhash = rec->ksiz & ~KSIZMAX; | uint32_t rhash = rec->ksiz & ~KSIZMAX; | |||
uint32_t rksiz = rec->ksiz & KSIZMAX; | uint32_t rksiz = rec->ksiz & KSIZMAX; | |||
if (fhash > rhash) { | if (fhash > rhash) { | |||
entp = &rec->left; | entp = &rec->left; | |||
skipping to change at line 1597 | skipping to change at line 1729 | |||
*entp = rec; | *entp = rec; | |||
if (rec->prev) rec->prev->next = rec; | if (rec->prev) rec->prev->next = rec; | |||
if (rec->next) rec->next->prev = rec; | if (rec->next) rec->next->prev = rec; | |||
dbuf = (char*)rec + sizeof(*rec); | dbuf = (char*)rec + sizeof(*rec); | |||
} | } | |||
} | } | |||
std::memcpy(dbuf + ksiz, vbuf, vsiz); | std::memcpy(dbuf + ksiz, vbuf, vsiz); | |||
rec->vsiz = vsiz; | rec->vsiz = vsiz; | |||
delete[] zbuf; | delete[] zbuf; | |||
} | } | |||
if (!isiter && slot->last != rec) { | if (rtt && slot->last != rec) { | |||
if (!curs_.empty()) escape_cursors(rec); | if (!curs_.empty()) escape_cursors(rec); | |||
if (slot->first == rec) slot->first = rec->next; | if (slot->first == rec) slot->first = rec->next; | |||
if (rec->prev) rec->prev->next = rec->next; | if (rec->prev) rec->prev->next = rec->next; | |||
if (rec->next) rec->next->prev = rec->prev; | if (rec->next) rec->next->prev = rec->prev; | |||
rec->prev = slot->last; | rec->prev = slot->last; | |||
rec->next = NULL; | rec->next = NULL; | |||
slot->last->next = rec; | slot->last->next = rec; | |||
slot->last = rec; | slot->last = rec; | |||
} | } | |||
if (adj) adjust_slot_capacity(slot); | if (adj) adjust_slot_capacity(slot); | |||
skipping to change at line 1770 | skipping to change at line 1902 | |||
TranLogList::const_iterator itbeg = logs.begin(); | TranLogList::const_iterator itbeg = logs.begin(); | |||
while (it != itbeg) { | while (it != itbeg) { | |||
--it; | --it; | |||
const char* kbuf = it->key.c_str(); | const char* kbuf = it->key.c_str(); | |||
size_t ksiz = it->key.size(); | size_t ksiz = it->key.size(); | |||
const char* vbuf = it->value.c_str(); | const char* vbuf = it->value.c_str(); | |||
size_t vsiz = it->value.size(); | size_t vsiz = it->value.size(); | |||
uint64_t hash = hash_record(kbuf, ksiz) / SLOTNUM; | uint64_t hash = hash_record(kbuf, ksiz) / SLOTNUM; | |||
if (it->full) { | if (it->full) { | |||
Setter setter(vbuf, vsiz); | Setter setter(vbuf, vsiz); | |||
accept_impl(slot, hash, kbuf, ksiz, &setter, NULL, true); | accept_impl(slot, hash, kbuf, ksiz, &setter, NULL, false); | |||
} else { | } else { | |||
Remover remover; | Remover remover; | |||
accept_impl(slot, hash, kbuf, ksiz, &remover, NULL, true); | accept_impl(slot, hash, kbuf, ksiz, &remover, NULL, false); | |||
} | } | |||
} | } | |||
} | } | |||
/** | /** | |||
* Addjust a slot table to the capacity. | * Addjust a slot table to the capacity. | |||
* @param slot the slot table. | * @param slot the slot table. | |||
*/ | */ | |||
void adjust_slot_capacity(Slot* slot) { | void adjust_slot_capacity(Slot* slot) { | |||
_assert_(slot); | _assert_(slot); | |||
if ((slot->count > slot->capcnt || slot->size > slot->capsiz) && slot-> first) { | if ((slot->count > slot->capcnt || slot->size > slot->capsiz) && slot-> first) { | |||
Record* rec = slot->first; | Record* rec = slot->first; | |||
uint32_t rksiz = rec->ksiz & KSIZMAX; | uint32_t rksiz = rec->ksiz & KSIZMAX; | |||
char* dbuf = (char*)rec + sizeof(*rec); | char* dbuf = (char*)rec + sizeof(*rec); | |||
char stack[RECBUFSIZ]; | char stack[RECBUFSIZ]; | |||
char* kbuf = rksiz > sizeof(stack) ? new char[rksiz] : stack; | char* kbuf = rksiz > sizeof(stack) ? new char[rksiz] : stack; | |||
std::memcpy(kbuf, dbuf, rksiz); | std::memcpy(kbuf, dbuf, rksiz); | |||
uint64_t hash = hash_record(kbuf, rksiz) / SLOTNUM; | uint64_t hash = hash_record(kbuf, rksiz) / SLOTNUM; | |||
Remover remover; | Remover remover; | |||
accept_impl(slot, hash, dbuf, rksiz, &remover, NULL, true); | accept_impl(slot, hash, dbuf, rksiz, &remover, NULL, false); | |||
if (kbuf != stack) delete[] kbuf; | if (kbuf != stack) delete[] kbuf; | |||
} | } | |||
} | } | |||
/** | /** | |||
* Get the hash value of a record. | * Get the hash value of a record. | |||
* @param kbuf the pointer to the key region. | * @param kbuf the pointer to the key region. | |||
* @param ksiz the size of the key region. | * @param ksiz the size of the key region. | |||
* @return the hash value. | * @return the hash value. | |||
*/ | */ | |||
uint64_t hash_record(const char* kbuf, size_t ksiz) { | uint64_t hash_record(const char* kbuf, size_t ksiz) { | |||
skipping to change at line 1918 | skipping to change at line 2050 | |||
/** The capacity of memory usage. */ | /** The capacity of memory usage. */ | |||
int64_t capsiz_; | int64_t capsiz_; | |||
/** The opaque data. */ | /** The opaque data. */ | |||
char opaque_[OPAQUESIZ]; | char opaque_[OPAQUESIZ]; | |||
/** The embedded data compressor. */ | /** The embedded data compressor. */ | |||
Compressor* embcomp_; | Compressor* embcomp_; | |||
/** The data compressor. */ | /** The data compressor. */ | |||
Compressor* comp_; | Compressor* comp_; | |||
/** The slot tables. */ | /** The slot tables. */ | |||
Slot slots_[SLOTNUM]; | Slot slots_[SLOTNUM]; | |||
/** The flag whether in LRU rotation. */ | ||||
bool rttmode_; | ||||
/** The flag whether in transaction. */ | /** The flag whether in transaction. */ | |||
bool tran_; | bool tran_; | |||
}; | }; | |||
/** An alias of the cache tree database. */ | /** An alias of the cache tree database. */ | |||
typedef PlantDB<CacheDB, BasicDB::TYPEGRASS> GrassDB; | typedef PlantDB<CacheDB, BasicDB::TYPEGRASS> GrassDB; | |||
} // common namespace | } // common namespace | |||
#endif // duplication check | #endif // duplication check | |||
End of changes. 16 change blocks. | ||||
13 lines changed or deleted | 157 lines changed or added | |||
kccommon.h | kccommon.h | |||
---|---|---|---|---|
skipping to change at line 81 | skipping to change at line 81 | |||
inline long double modfl(long double val, long double* iptr) { | inline long double modfl(long double val, long double* iptr) { | |||
double integ; | double integ; | |||
double fract = std::modf(val, &integ); | double fract = std::modf(val, &integ); | |||
*iptr = integ; | *iptr = integ; | |||
return fract; | return fract; | |||
} | } | |||
#endif | #endif | |||
namespace std { | namespace std { | |||
using ::modfl; | using ::modfl; | |||
using ::log2; | ||||
using ::snprintf; | using ::snprintf; | |||
} | } | |||
#if __cplusplus > 199711L || defined(__GXX_EXPERIMENTAL_CXX0X__) || defined (_MSC_VER) | #if __cplusplus > 199711L || defined(__GXX_EXPERIMENTAL_CXX0X__) || defined (_MSC_VER) | |||
#include <unordered_map> | #include <unordered_map> | |||
#include <unordered_set> | #include <unordered_set> | |||
#else | #else | |||
End of changes. 1 change blocks. | ||||
0 lines changed or deleted | 1 lines changed or added | |||
kcdb.h | kcdb.h | |||
---|---|---|---|---|
skipping to change at line 1133 | skipping to change at line 1133 | |||
* @param visitor a visitor object. | * @param visitor a visitor object. | |||
* @param writable true for writable operation, or false for read-only op eration. | * @param writable true for writable operation, or false for read-only op eration. | |||
* @param checker a progress checker object. If it is NULL, no checking is performed. | * @param checker a progress checker object. If it is NULL, no checking is performed. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
* @note The whole iteration is performed atomically and other threads ar e blocked. To avoid | * @note The whole iteration is performed atomically and other threads ar e blocked. To avoid | |||
* deadlock, any explicit database operation must not be performed in thi s function. | * deadlock, any explicit database operation must not be performed in thi s function. | |||
*/ | */ | |||
virtual bool iterate(Visitor *visitor, bool writable = true, | virtual bool iterate(Visitor *visitor, bool writable = true, | |||
ProgressChecker* checker = NULL) = 0; | ProgressChecker* checker = NULL) = 0; | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
virtual bool scan_parallel(Visitor *visitor, size_t thnum, | ||||
ProgressChecker* checker = NULL) = 0; | ||||
/** | ||||
* Synchronize updated contents with the file and the device. | * Synchronize updated contents with the file and the device. | |||
* @param hard true for physical synchronization with the device, or fals e for logical | * @param hard true for physical synchronization with the device, or fals e for logical | |||
* synchronization with the file system. | * synchronization with the file system. | |||
* @param proc a postprocessor object. If it is NULL, no postprocessing is performed. | * @param proc a postprocessor object. If it is NULL, no postprocessing is performed. | |||
* @param checker a progress checker object. If it is NULL, no checking is performed. | * @param checker a progress checker object. If it is NULL, no checking is performed. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
* @note The operation of the postprocessor is performed atomically and o ther threads accessing | * @note The operation of the postprocessor is performed atomically and o ther threads accessing | |||
* the same record are blocked. To avoid deadlock, any explicit database operation must not | * the same record are blocked. To avoid deadlock, any explicit database operation must not | |||
* be performed in this function. | * be performed in this function. | |||
*/ | */ | |||
End of changes. 1 change blocks. | ||||
0 lines changed or deleted | 15 lines changed or added | |||
kcdbext.h | kcdbext.h | |||
---|---|---|---|---|
skipping to change at line 66 | skipping to change at line 66 | |||
/** The default cache bucket numer. */ | /** The default cache bucket numer. */ | |||
static const int64_t DEFCBNUM = 1048583LL; | static const int64_t DEFCBNUM = 1048583LL; | |||
/** The bucket number of temprary databases. */ | /** The bucket number of temprary databases. */ | |||
static const int64_t DBBNUM = 512LL << 10; | static const int64_t DBBNUM = 512LL << 10; | |||
/** The page size of temprary databases. */ | /** The page size of temprary databases. */ | |||
static const int32_t DBPSIZ = 32768; | static const int32_t DBPSIZ = 32768; | |||
/** The mapped size of temprary databases. */ | /** The mapped size of temprary databases. */ | |||
static const int64_t DBMSIZ = 516LL * 4096; | static const int64_t DBMSIZ = 516LL * 4096; | |||
/** The page cache capacity of temprary databases. */ | /** The page cache capacity of temprary databases. */ | |||
static const int64_t DBPCCAP = 16LL << 20; | static const int64_t DBPCCAP = 16LL << 20; | |||
/** The default number of threads in parallel mode. */ | ||||
static const size_t DEFTHNUM = 8; | ||||
public: | public: | |||
/** | /** | |||
* Value iterator for the reducer. | * Value iterator for the reducer. | |||
*/ | */ | |||
class ValueIterator { | class ValueIterator { | |||
friend class MapReduce; | friend class MapReduce; | |||
public: | public: | |||
/** | /** | |||
* Get the next value. | * Get the next value. | |||
* @param sp the pointer to the variable into which the size of the reg ion of the return | * @param sp the pointer to the variable into which the size of the reg ion of the return | |||
skipping to change at line 130 | skipping to change at line 132 | |||
/** The pointer of the current value. */ | /** The pointer of the current value. */ | |||
const char* vptr_; | const char* vptr_; | |||
/** The size of the current value. */ | /** The size of the current value. */ | |||
size_t vsiz_; | size_t vsiz_; | |||
}; | }; | |||
/** | /** | |||
* Execution options. | * Execution options. | |||
*/ | */ | |||
enum Option { | enum Option { | |||
XNOLOCK = 1 << 0, ///< avoid locking against update operations | XNOLOCK = 1 << 0, ///< avoid locking against update operations | |||
XNOCOMP = 1 << 1 ///< avoid compression of temporar | XPARAMAP = 1 << 1, ///< run mappers in parallel | |||
y databases | XPARARED = 1 << 2, ///< run reducers in parallel | |||
XNOCOMP = 1 << 8 ///< avoid compression of temporar | ||||
y databases | ||||
}; | }; | |||
/** | /** | |||
* Default constructor. | * Default constructor. | |||
*/ | */ | |||
explicit MapReduce() : | explicit MapReduce() : | |||
db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0) , | db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0) , | |||
cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM) { | mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), | |||
cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), | ||||
redtasks_(NULL), redaborted_(false), lock_(NULL) { | ||||
_assert_(true); | _assert_(true); | |||
} | } | |||
/** | /** | |||
* Destructor. | * Destructor. | |||
*/ | */ | |||
virtual ~MapReduce() { | virtual ~MapReduce() { | |||
_assert_(true); | _assert_(true); | |||
} | } | |||
/** | /** | |||
* Map a record data. | * Map a record data. | |||
skipping to change at line 213 | skipping to change at line 219 | |||
virtual bool log(const char* name, const char* message) { | virtual bool log(const char* name, const char* message) { | |||
_assert_(name && message); | _assert_(name && message); | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Execute the MapReduce process about a database. | * Execute the MapReduce process about a database. | |||
* @param db the source database. | * @param db the source database. | |||
* @param tmppath the path of a directory for the temporary data storage. If it is an empty | * @param tmppath the path of a directory for the temporary data storage. If it is an empty | |||
* string, temporary data are handled on memory. | * string, temporary data are handled on memory. | |||
* @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking | * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking | |||
* against update operations by other threads, MapReduce::XNOCOMP to avoi | * against update operations by other threads, MapReduce::XPARAMAP to run | |||
d compression of | the mapper in | |||
* temporary databases. | * parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduc | |||
e::XNOCOMP to avoid | ||||
* compression of temporary databases. | ||||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { | bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { | |||
int64_t count = db->count(); | int64_t count = db->count(); | |||
if (count < 0) return false; | if (count < 0) return false; | |||
bool err = false; | bool err = false; | |||
double stime, etime; | double stime, etime; | |||
db_ = db; | db_ = db; | |||
rcomp_ = LEXICALCOMP; | rcomp_ = LEXICALCOMP; | |||
BasicDB* idb = db; | BasicDB* idb = db; | |||
skipping to change at line 307 | skipping to change at line 314 | |||
if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) | if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) | |||
err = true; | err = true; | |||
if (err) { | if (err) { | |||
for (size_t i = 0; i < dbnum_; i++) { | for (size_t i = 0; i < dbnum_; i++) { | |||
delete tmpdbs_[i]; | delete tmpdbs_[i]; | |||
} | } | |||
delete[] tmpdbs_; | delete[] tmpdbs_; | |||
return false; | return false; | |||
} | } | |||
} | } | |||
if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; | ||||
if (opts & XNOLOCK) { | if (opts & XNOLOCK) { | |||
MapChecker mapchecker; | MapChecker mapchecker; | |||
MapVisitor mapvisitor(this, &mapchecker, db->count()); | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |||
mapvisitor.visit_before(); | mapvisitor.visit_before(); | |||
if (!err) { | if (!err) { | |||
BasicDB::Cursor* cur = db->cursor(); | BasicDB::Cursor* cur = db->cursor(); | |||
if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr ue; | if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr ue; | |||
while (!err) { | while (!err) { | |||
if (!cur->accept(&mapvisitor, false, true)) { | if (!cur->accept(&mapvisitor, false, true)) { | |||
if (cur->error() != BasicDB::Error::NOREC) err = true; | if (cur->error() != BasicDB::Error::NOREC) err = true; | |||
break; | break; | |||
} | } | |||
} | } | |||
delete cur; | delete cur; | |||
} | } | |||
if (mapvisitor.error()) err = true; | if (mapvisitor.error()) { | |||
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" | ||||
); | ||||
err = true; | ||||
} | ||||
mapvisitor.visit_after(); | mapvisitor.visit_after(); | |||
} else if (opts & XPARAMAP) { | ||||
MapChecker mapchecker; | ||||
MapVisitor mapvisitor(this, &mapchecker, db->count()); | ||||
lock_ = new Mutex(); | ||||
if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) | ||||
{ | ||||
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" | ||||
); | ||||
err = true; | ||||
} | ||||
delete lock_; | ||||
lock_ = NULL; | ||||
if (mapvisitor.error()) err = true; | ||||
} else { | } else { | |||
MapChecker mapchecker; | MapChecker mapchecker; | |||
MapVisitor mapvisitor(this, &mapchecker, db->count()); | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |||
if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ; | if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ; | |||
if (mapvisitor.error()) err = true; | if (mapvisitor.error()) { | |||
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" | ||||
); | ||||
err = true; | ||||
} | ||||
} | ||||
if (redtasks_) { | ||||
delete redtasks_; | ||||
redtasks_ = NULL; | ||||
} | } | |||
if (!logf("clean", "closing the temporary databases")) err = true; | if (!logf("clean", "closing the temporary databases")) err = true; | |||
stime = time(); | stime = time(); | |||
for (size_t i = 0; i < dbnum_; i++) { | for (size_t i = 0; i < dbnum_; i++) { | |||
assert(tmpdbs_[i]); | assert(tmpdbs_[i]); | |||
const std::string& path = tmpdbs_[i]->path(); | const std::string& path = tmpdbs_[i]->path(); | |||
if (!tmpdbs_[i]->clear()) { | if (!tmpdbs_[i]->clear()) { | |||
const BasicDB::Error& e = tmpdbs_[i]->error(); | const BasicDB::Error& e = tmpdbs_[i]->error(); | |||
db->set_error(_KCCODELINE_, e.code(), e.message()); | db->set_error(_KCCODELINE_, e.code(), e.message()); | |||
err = true; | err = true; | |||
skipping to change at line 368 | skipping to change at line 397 | |||
* @param cbnum the bucket number of the internal cache. | * @param cbnum the bucket number of the internal cache. | |||
*/ | */ | |||
void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { | void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { | |||
_assert_(true); | _assert_(true); | |||
dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | |||
if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | |||
clim_ = clim > 0 ? clim : DEFCLIM; | clim_ = clim > 0 ? clim : DEFCLIM; | |||
cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | |||
if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | |||
} | } | |||
/** | ||||
* Set the thread configurations. | ||||
* @param mapthnum the number of threads for the mapper. | ||||
* @param redthnum the number of threads for the reducer. | ||||
*/ | ||||
void tune_thread(int32_t mapthnum, int32_t redthnum) { | ||||
_assert_(true); | ||||
mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; | ||||
redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; | ||||
} | ||||
protected: | protected: | |||
/** | /** | |||
* Emit a record from the mapper. | * Emit a record from the mapper. | |||
* @param kbuf the pointer to the key region. | * @param kbuf the pointer to the key region. | |||
* @param ksiz the size of the key region. | * @param ksiz the size of the key region. | |||
* @param vbuf the pointer to the value region. | * @param vbuf the pointer to the value region. | |||
* @param vsiz the size of the value region. | * @param vsiz the size of the value region. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { | bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { | |||
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | |||
bool err = false; | bool err = false; | |||
size_t rsiz = sizevarnum(vsiz) + vsiz; | size_t rsiz = sizevarnum(vsiz) + vsiz; | |||
char stack[NUMBUFSIZ*4]; | char stack[NUMBUFSIZ*4]; | |||
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | |||
char* wp = rbuf; | char* wp = rbuf; | |||
wp += writevarnum(rbuf, vsiz); | wp += writevarnum(rbuf, vsiz); | |||
std::memcpy(wp, vbuf, vsiz); | std::memcpy(wp, vbuf, vsiz); | |||
cache_->append(kbuf, ksiz, rbuf, rsiz); | if (lock_) { | |||
lock_->lock(); | ||||
cache_->append(kbuf, ksiz, rbuf, rsiz); | ||||
lock_->unlock(); | ||||
} else { | ||||
cache_->append(kbuf, ksiz, rbuf, rsiz); | ||||
} | ||||
if (rbuf != stack) delete[] rbuf; | if (rbuf != stack) delete[] rbuf; | |||
csiz_ += rsiz; | csiz_ += rsiz; | |||
return !err; | return !err; | |||
} | } | |||
private: | private: | |||
/** | /** | |||
* Task queue for parallel reducer. | ||||
*/ | ||||
class ReduceTaskQueue : public TaskQueue { | ||||
public: | ||||
/** | ||||
* Task for parallel reducer. | ||||
*/ | ||||
class ReduceTask : public Task { | ||||
friend class ReduceTaskQueue; | ||||
public: | ||||
/** constructor */ | ||||
explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, con | ||||
st Values& values) : | ||||
mr_(mr), key_(kbuf, ksiz), values_(values) {} | ||||
private: | ||||
MapReduce* mr_; ///< driver | ||||
std::string key_; ///< key | ||||
Values values_; ///< values | ||||
}; | ||||
/** constructor */ | ||||
explicit ReduceTaskQueue() {} | ||||
private: | ||||
/** process a task */ | ||||
void do_task(Task* task) { | ||||
ReduceTask* rtask = (ReduceTask*)task; | ||||
ValueIterator iter(rtask->values_.begin(), rtask->values_.end()); | ||||
if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter | ||||
)) | ||||
rtask->mr_->redaborted_ = true; | ||||
delete rtask; | ||||
} | ||||
}; | ||||
/** | ||||
* Checker for the map process. | * Checker for the map process. | |||
*/ | */ | |||
class MapChecker : public BasicDB::ProgressChecker { | class MapChecker : public BasicDB::ProgressChecker { | |||
public: | public: | |||
/** constructor */ | /** constructor */ | |||
explicit MapChecker() : stop_(false) {} | explicit MapChecker() : stop_(false) {} | |||
/** stop the process */ | /** stop the process */ | |||
void stop() { | void stop() { | |||
stop_ = true; | stop_ = true; | |||
} | } | |||
skipping to change at line 457 | skipping to change at line 533 | |||
if (!mr_->postprocess()) err_ = true; | if (!mr_->postprocess()) err_ = true; | |||
} | } | |||
private: | private: | |||
/** visit a record */ | /** visit a record */ | |||
const char* visit_full(const char* kbuf, size_t ksiz, | const char* visit_full(const char* kbuf, size_t ksiz, | |||
const char* vbuf, size_t vsiz, size_t* sp) { | const char* vbuf, size_t vsiz, size_t* sp) { | |||
if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { | if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { | |||
checker_->stop(); | checker_->stop(); | |||
err_ = true; | err_ = true; | |||
} | } | |||
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | if (mr_->lock_) { | |||
checker_->stop(); | mr_->lock_->lock(); | |||
err_ = true; | if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | |||
checker_->stop(); | ||||
err_ = true; | ||||
} | ||||
mr_->lock_->unlock(); | ||||
} else { | ||||
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | ||||
checker_->stop(); | ||||
err_ = true; | ||||
} | ||||
} | } | |||
return NOP; | return NOP; | |||
} | } | |||
MapReduce* mr_; ///< driver | MapReduce* mr_; ///< driver | |||
MapChecker* checker_; ///< checker | MapChecker* checker_; ///< checker | |||
int64_t scale_; ///< number of records | int64_t scale_; ///< number of records | |||
double stime_; ///< start time | double stime_; ///< start time | |||
bool err_; ///< error flag | bool err_; ///< error flag | |||
}; | }; | |||
/** | /** | |||
skipping to change at line 541 | skipping to change at line 626 | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool execute_reduce() { | bool execute_reduce() { | |||
bool err = false; | bool err = false; | |||
int64_t scale = 0; | int64_t scale = 0; | |||
for (size_t i = 0; i < dbnum_; i++) { | for (size_t i = 0; i < dbnum_; i++) { | |||
scale += tmpdbs_[i]->count(); | scale += tmpdbs_[i]->count(); | |||
} | } | |||
if (!logf("reduce", "started the reduce process: scale=%lld", (long lon g)scale)) err = true; | if (!logf("reduce", "started the reduce process: scale=%lld", (long lon g)scale)) err = true; | |||
double stime = time(); | double stime = time(); | |||
if (redtasks_) redtasks_->start(redthnum_); | ||||
std::priority_queue<MergeLine> lines; | std::priority_queue<MergeLine> lines; | |||
for (size_t i = 0; i < dbnum_; i++) { | for (size_t i = 0; i < dbnum_; i++) { | |||
MergeLine line; | MergeLine line; | |||
line.cur = tmpdbs_[i]->cursor(); | line.cur = tmpdbs_[i]->cursor(); | |||
line.rcomp = rcomp_; | line.rcomp = rcomp_; | |||
line.cur->jump(); | line.cur->jump(); | |||
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | |||
if (line.kbuf) { | if (line.kbuf) { | |||
lines.push(line); | lines.push(line); | |||
} else { | } else { | |||
delete line.cur; | delete line.cur; | |||
} | } | |||
} | } | |||
char* lkbuf = NULL; | char* lkbuf = NULL; | |||
size_t lksiz = 0; | size_t lksiz = 0; | |||
Values values; | Values values; | |||
while (!err && !lines.empty()) { | while (!err && !lines.empty()) { | |||
MergeLine line = lines.top(); | MergeLine line = lines.top(); | |||
lines.pop(); | lines.pop(); | |||
if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks iz))) { | if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks iz))) { | |||
if (!call_reducer(lkbuf, lksiz, values)) err = true; | if (!call_reducer(lkbuf, lksiz, values)) { | |||
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer fail | ||||
ed"); | ||||
err = true; | ||||
} | ||||
values.clear(); | values.clear(); | |||
} | } | |||
values.push_back(std::string(line.vbuf, line.vsiz)); | ||||
delete[] lkbuf; | delete[] lkbuf; | |||
lkbuf = line.kbuf; | lkbuf = line.kbuf; | |||
lksiz = line.ksiz; | lksiz = line.ksiz; | |||
values.push_back(std::string(line.vbuf, line.vsiz)); | ||||
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | |||
if (line.kbuf) { | if (line.kbuf) { | |||
lines.push(line); | lines.push(line); | |||
} else { | } else { | |||
delete line.cur; | delete line.cur; | |||
} | } | |||
} | } | |||
if (lkbuf) { | if (lkbuf) { | |||
if (!err && !call_reducer(lkbuf, lksiz, values)) err = true; | if (!err && !call_reducer(lkbuf, lksiz, values)) { | |||
values.clear(); | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed | |||
"); | ||||
err = true; | ||||
} | ||||
delete[] lkbuf; | delete[] lkbuf; | |||
} | } | |||
while (!lines.empty()) { | while (!lines.empty()) { | |||
MergeLine line = lines.top(); | MergeLine line = lines.top(); | |||
lines.pop(); | lines.pop(); | |||
delete[] line.kbuf; | delete[] line.kbuf; | |||
delete line.cur; | delete line.cur; | |||
} | } | |||
if (redtasks_) redtasks_->finish(); | ||||
double etime = time(); | double etime = time(); | |||
if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s time)) err = true; | if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s time)) err = true; | |||
return !err; | return !err; | |||
} | } | |||
/** | /** | |||
* Call the reducer. | * Call the reducer. | |||
* @param kbuf the pointer to the key region. | * @param kbuf the pointer to the key region. | |||
* @param ksiz the size of the key region. | * @param ksiz the size of the key region. | |||
* @param values a vector of the values. | * @param values a vector of the values. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { | bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { | |||
_assert_(kbuf && ksiz <= MEMMAXSIZ); | _assert_(kbuf && ksiz <= MEMMAXSIZ); | |||
if (redtasks_) { | ||||
if (redaborted_) return false; | ||||
ReduceTaskQueue::ReduceTask* task = | ||||
new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values); | ||||
redtasks_->add_task(task); | ||||
return true; | ||||
} | ||||
bool err = false; | bool err = false; | |||
ValueIterator iter(values.begin(), values.end()); | ValueIterator iter(values.begin(), values.end()); | |||
if (!reduce(kbuf, ksiz, &iter)) err = true; | if (!reduce(kbuf, ksiz, &iter)) err = true; | |||
return !err; | return !err; | |||
} | } | |||
/** Dummy constructor to forbid the use. */ | /** Dummy constructor to forbid the use. */ | |||
MapReduce(const MapReduce&); | MapReduce(const MapReduce&); | |||
/** Dummy Operator to forbid the use. */ | /** Dummy Operator to forbid the use. */ | |||
MapReduce& operator =(const MapReduce&); | MapReduce& operator =(const MapReduce&); | |||
/** The internal database. */ | /** The internal database. */ | |||
BasicDB* db_; | BasicDB* db_; | |||
/** The record comparator. */ | /** The record comparator. */ | |||
Comparator* rcomp_; | Comparator* rcomp_; | |||
/** The temporary databases. */ | /** The temporary databases. */ | |||
BasicDB** tmpdbs_; | BasicDB** tmpdbs_; | |||
/** The number of temporary databases. */ | /** The number of temporary databases. */ | |||
size_t dbnum_; | size_t dbnum_; | |||
/** The logical clock for temporary databases. */ | /** The logical clock for temporary databases. */ | |||
int64_t dbclock_; | int64_t dbclock_; | |||
/** The number of the mapper threads. */ | ||||
size_t mapthnum_; | ||||
/** The number of the reducer threads. */ | ||||
size_t redthnum_; | ||||
/** The cache for emitter. */ | /** The cache for emitter. */ | |||
TinyHashMap* cache_; | TinyHashMap* cache_; | |||
/** The current size of the cache for emitter. */ | /** The current size of the cache for emitter. */ | |||
int64_t csiz_; | int64_t csiz_; | |||
/** The limit size of the cache for emitter. */ | /** The limit size of the cache for emitter. */ | |||
int64_t clim_; | int64_t clim_; | |||
/** The bucket number of the cache for emitter. */ | /** The bucket number of the cache for emitter. */ | |||
int64_t cbnum_; | int64_t cbnum_; | |||
/** The task queue for parallel reducer. */ | ||||
TaskQueue* redtasks_; | ||||
/** The flag whether aborted. */ | ||||
bool redaborted_; | ||||
/** The whole lock. */ | ||||
Mutex* lock_; | ||||
}; | }; | |||
/** | /** | |||
* Index database. | * Index database. | |||
* @note This class is designed to implement an indexing storage with an ef ficient appending | * @note This class is designed to implement an indexing storage with an ef ficient appending | |||
* operation for the existing record values. This class is a wrapper of th e polymorphic | * operation for the existing record values. This class is a wrapper of th e polymorphic | |||
* database, featuring buffering mechanism to alleviate IO overhead in the database layer. This | * database, featuring buffering mechanism to alleviate IO overhead in the database layer. This | |||
* class can be inherited but overwriting methods is forbidden. Before eve ry database operation, | * class can be inherited but overwriting methods is forbidden. Before eve ry database operation, | |||
* it is necessary to call the IndexDB::open method in order to open a data base file and connect | * it is necessary to call the IndexDB::open method in order to open a data base file and connect | |||
* the database object to it. To avoid data missing or corruption, it is i mportant to close | * the database object to it. To avoid data missing or corruption, it is i mportant to close | |||
End of changes. 21 change blocks. | ||||
16 lines changed or deleted | 134 lines changed or added | |||
kcdirdb.h | kcdirdb.h | |||
---|---|---|---|---|
skipping to change at line 530 | skipping to change at line 530 | |||
set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | |||
return false; | return false; | |||
} | } | |||
ScopedVisitor svis(visitor); | ScopedVisitor svis(visitor); | |||
bool err = false; | bool err = false; | |||
if (!iterate_impl(visitor, checker)) err = true; | if (!iterate_impl(visitor, checker)) err = true; | |||
trigger_meta(MetaTrigger::ITERATE, "iterate"); | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |||
return !err; | return !err; | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | ||||
er = NULL) { | ||||
_assert_(visitor && thnum <= MEMMAXSIZ); | ||||
ScopedRWLock lock(&mlock_, false); | ||||
if (omode_ == 0) { | ||||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | ||||
return false; | ||||
} | ||||
if (thnum < 1) thnum = 0; | ||||
if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | ||||
ScopedVisitor svis(visitor); | ||||
rlock_.lock_reader_all(); | ||||
bool err = false; | ||||
if (!scan_parallel_impl(visitor, thnum, checker)) err = true; | ||||
rlock_.unlock_all(); | ||||
trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | ||||
return !err; | ||||
} | ||||
/** | ||||
* Get the last happened error. | * Get the last happened error. | |||
* @return the last happened error. | * @return the last happened error. | |||
*/ | */ | |||
Error error() const { | Error error() const { | |||
_assert_(true); | _assert_(true); | |||
return error_; | return error_; | |||
} | } | |||
/** | /** | |||
* Set the error information. | * Set the error information. | |||
* @param file the file name of the program source code. | * @param file the file name of the program source code. | |||
skipping to change at line 2030 | skipping to change at line 2057 | |||
return !err; | return !err; | |||
} | } | |||
/** | /** | |||
* Iterate to accept a visitor for each record. | * Iterate to accept a visitor for each record. | |||
* @param visitor a visitor object. | * @param visitor a visitor object. | |||
* @param checker a progress checker object. | * @param checker a progress checker object. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool iterate_impl(Visitor* visitor, ProgressChecker* checker) { | bool iterate_impl(Visitor* visitor, ProgressChecker* checker) { | |||
_assert_(visitor); | _assert_(visitor); | |||
DirStream dir; | ||||
if (!dir.open(path_)) { | ||||
set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed"); | ||||
return false; | ||||
} | ||||
int64_t allcnt = count_; | int64_t allcnt = count_; | |||
if (checker && !checker->check("iterate", "beginning", 0, allcnt)) { | if (checker && !checker->check("iterate", "beginning", 0, allcnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
return false; | return false; | |||
} | } | |||
DirStream dir; | ||||
if (!dir.open(path_)) { | ||||
set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed"); | ||||
return false; | ||||
} | ||||
bool err = false; | bool err = false; | |||
std::string name; | std::string name; | |||
int64_t curcnt = 0; | int64_t curcnt = 0; | |||
while (dir.read(&name)) { | while (dir.read(&name)) { | |||
if (*name.c_str() == *KCDDBMAGICFILE) continue; | if (*name.c_str() == *KCDDBMAGICFILE) continue; | |||
const std::string& rpath = path_ + File::PATHCHR + name; | const std::string& rpath = path_ + File::PATHCHR + name; | |||
Record rec; | Record rec; | |||
if (read_record(rpath, &rec)) { | if (read_record(rpath, &rec)) { | |||
if (!accept_visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, rec. rsiz, | if (!accept_visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, rec. rsiz, | |||
visitor, rpath, name.c_str())) err = true; | visitor, rpath, name.c_str())) err = true; | |||
skipping to change at line 2062 | skipping to change at line 2089 | |||
set_error(_KCCODELINE_, Error::BROKEN, "missing record"); | set_error(_KCCODELINE_, Error::BROKEN, "missing record"); | |||
err = true; | err = true; | |||
} | } | |||
curcnt++; | curcnt++; | |||
if (checker && !checker->check("iterate", "processing", curcnt, allcn t)) { | if (checker && !checker->check("iterate", "processing", curcnt, allcn t)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
err = true; | err = true; | |||
break; | break; | |||
} | } | |||
} | } | |||
if (!dir.close()) { | ||||
set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed"); | ||||
err = true; | ||||
} | ||||
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
err = true; | err = true; | |||
} | } | |||
return !err; | ||||
} | ||||
/** | ||||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. | ||||
* @return true on success, or false on failure. | ||||
*/ | ||||
bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* | ||||
checker) { | ||||
assert(visitor && thnum <= MEMMAXSIZ); | ||||
int64_t allcnt = count_; | ||||
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt | ||||
)) { | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
DirStream dir; | ||||
if (!dir.open(path_)) { | ||||
set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed"); | ||||
return false; | ||||
} | ||||
class ThreadImpl : public Thread { | ||||
public: | ||||
explicit ThreadImpl() : | ||||
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), | ||||
dir_(NULL), itmtx_(NULL), error_() {} | ||||
void init(DirDB* db, Visitor* visitor, ProgressChecker* checker, int6 | ||||
4_t allcnt, | ||||
DirStream* dir, Mutex* itmtx) { | ||||
db_ = db; | ||||
visitor_ = visitor; | ||||
checker_ = checker; | ||||
allcnt_ = allcnt; | ||||
dir_ = dir; | ||||
itmtx_ = itmtx; | ||||
} | ||||
const Error& error() { | ||||
return error_; | ||||
} | ||||
private: | ||||
void run() { | ||||
DirDB* db = db_; | ||||
Visitor* visitor = visitor_; | ||||
ProgressChecker* checker = checker_; | ||||
int64_t allcnt = allcnt_; | ||||
DirStream* dir = dir_; | ||||
Mutex* itmtx = itmtx_; | ||||
const std::string& path = db->path_; | ||||
while (true) { | ||||
itmtx->lock(); | ||||
std::string name; | ||||
if (!dir->read(&name)) { | ||||
itmtx->unlock(); | ||||
break; | ||||
} | ||||
itmtx->unlock(); | ||||
if (*name.c_str() == *KCDDBMAGICFILE) continue; | ||||
const std::string& rpath = path + File::PATHCHR + name; | ||||
Record rec; | ||||
if (db->read_record(rpath, &rec)) { | ||||
size_t vsiz; | ||||
visitor->visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, &vs | ||||
iz); | ||||
delete[] rec.rbuf; | ||||
} else { | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
if (checker && !checker->check("scan_parallel", "processing", -1, | ||||
allcnt)) { | ||||
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
} | ||||
} | ||||
DirDB* db_; | ||||
Visitor* visitor_; | ||||
ProgressChecker* checker_; | ||||
int64_t allcnt_; | ||||
DirStream* dir_; | ||||
Mutex* itmtx_; | ||||
Error error_; | ||||
}; | ||||
bool err = false; | ||||
Mutex itmtx; | ||||
ThreadImpl* threads = new ThreadImpl[thnum]; | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->init(this, visitor, checker, allcnt, &dir, &itmtx); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->start(); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->join(); | ||||
if (thread->error() != Error::SUCCESS) { | ||||
*error_ = thread->error(); | ||||
err = true; | ||||
} | ||||
} | ||||
delete[] threads; | ||||
if (!dir.close()) { | if (!dir.close()) { | |||
set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed"); | set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed"); | |||
err = true; | err = true; | |||
} | } | |||
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | ||||
{ | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
err = true; | ||||
} | ||||
return !err; | return !err; | |||
} | } | |||
/** | /** | |||
* Synchronize updated contents with the file and the device. | * Synchronize updated contents with the file and the device. | |||
* @param hard true for physical synchronization with the device, or fals e for logical | * @param hard true for physical synchronization with the device, or fals e for logical | |||
* synchronization with the file system. | * synchronization with the file system. | |||
* @param proc a postprocessor object. | * @param proc a postprocessor object. | |||
* @param checker a progress checker object. | * @param checker a progress checker object. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
End of changes. 6 change blocks. | ||||
5 lines changed or deleted | 151 lines changed or added | |||
kchashdb.h | kchashdb.h | |||
---|---|---|---|---|
skipping to change at line 723 | skipping to change at line 723 | |||
return false; | return false; | |||
} | } | |||
} | } | |||
ScopedVisitor svis(visitor); | ScopedVisitor svis(visitor); | |||
bool err = false; | bool err = false; | |||
if (!iterate_impl(visitor, checker)) err = true; | if (!iterate_impl(visitor, checker)) err = true; | |||
trigger_meta(MetaTrigger::ITERATE, "iterate"); | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |||
return !err; | return !err; | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | ||||
er = NULL) { | ||||
_assert_(visitor && thnum <= MEMMAXSIZ); | ||||
ScopedRWLock lock(&mlock_, false); | ||||
if (omode_ == 0) { | ||||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | ||||
return false; | ||||
} | ||||
if (thnum < 1) thnum = 1; | ||||
if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | ||||
if ((int64_t)thnum > bnum_) thnum = bnum_; | ||||
ScopedVisitor svis(visitor); | ||||
rlock_.lock_reader_all(); | ||||
bool err = false; | ||||
if (!scan_parallel_impl(visitor, thnum, checker)) err = true; | ||||
rlock_.unlock_all(); | ||||
trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | ||||
return !err; | ||||
} | ||||
/** | ||||
* Get the last happened error. | * Get the last happened error. | |||
* @return the last happened error. | * @return the last happened error. | |||
*/ | */ | |||
Error error() const { | Error error() const { | |||
_assert_(true); | _assert_(true); | |||
return error_; | return error_; | |||
} | } | |||
/** | /** | |||
* Set the error information. | * Set the error information. | |||
* @param file the file name of the program source code. | * @param file the file name of the program source code. | |||
skipping to change at line 2215 | skipping to change at line 2243 | |||
} | } | |||
} | } | |||
} | } | |||
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
return false; | return false; | |||
} | } | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. | ||||
* @return true on success, or false on failure. | ||||
*/ | ||||
bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* | ||||
checker) { | ||||
assert(visitor && thnum <= MEMMAXSIZ); | ||||
int64_t allcnt = count_; | ||||
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt | ||||
)) { | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
bool err = false; | ||||
std::vector<int64_t> offs; | ||||
int64_t bnum = bnum_; | ||||
size_t cap = (thnum + 1) * INT8MAX; | ||||
for (int64_t bidx = 0; bidx < bnum; bidx++) { | ||||
int64_t off = get_bucket(bidx); | ||||
if (off > 0) { | ||||
offs.push_back(off); | ||||
if (offs.size() >= cap) break; | ||||
} | ||||
} | ||||
if (!offs.empty()) { | ||||
std::sort(offs.begin(), offs.end()); | ||||
if (thnum > offs.size()) thnum = offs.size(); | ||||
class ThreadImpl : public Thread { | ||||
public: | ||||
explicit ThreadImpl() : | ||||
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), | ||||
begoff_(0), endoff_(0), error_() {} | ||||
void init(HashDB* db, Visitor* visitor, ProgressChecker* checker, i | ||||
nt64_t allcnt, | ||||
int64_t begoff, int64_t endoff) { | ||||
db_ = db; | ||||
visitor_ = visitor; | ||||
checker_ = checker; | ||||
allcnt_ = allcnt; | ||||
begoff_ = begoff; | ||||
endoff_ = endoff; | ||||
} | ||||
const Error& error() { | ||||
return error_; | ||||
} | ||||
private: | ||||
void run() { | ||||
HashDB* db = db_; | ||||
Visitor* visitor = visitor_; | ||||
ProgressChecker* checker = checker_; | ||||
int64_t off = begoff_; | ||||
int64_t end = endoff_; | ||||
int64_t allcnt = allcnt_; | ||||
Compressor* comp = db->comp_; | ||||
Record rec; | ||||
char rbuf[RECBUFSIZ]; | ||||
while (off > 0 && off < end) { | ||||
rec.off = off; | ||||
if (!db->read_record(&rec, rbuf)) { | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
if (rec.psiz == UINT16MAX) { | ||||
off += rec.rsiz; | ||||
} else { | ||||
if (!rec.vbuf && !db->read_record_body(&rec)) { | ||||
delete[] rec.bbuf; | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
const char* vbuf = rec.vbuf; | ||||
size_t vsiz = rec.vsiz; | ||||
char* zbuf = NULL; | ||||
size_t zsiz = 0; | ||||
if (comp) { | ||||
zbuf = comp->decompress(vbuf, vsiz, &zsiz); | ||||
if (!zbuf) { | ||||
db->set_error(_KCCODELINE_, Error::SYSTEM, "data decompre | ||||
ssion failed"); | ||||
delete[] rec.bbuf; | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
vbuf = zbuf; | ||||
vsiz = zsiz; | ||||
} | ||||
visitor->visit_full(rec.kbuf, rec.ksiz, vbuf, vsiz, &vsiz); | ||||
delete[] zbuf; | ||||
delete[] rec.bbuf; | ||||
off += rec.rsiz; | ||||
if (checker && !checker->check("scan_parallel", "processing", | ||||
-1, allcnt)) { | ||||
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed") | ||||
; | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
} | ||||
} | ||||
} | ||||
HashDB* db_; | ||||
Visitor* visitor_; | ||||
ProgressChecker* checker_; | ||||
int64_t allcnt_; | ||||
int64_t begoff_; | ||||
int64_t endoff_; | ||||
Error error_; | ||||
}; | ||||
ThreadImpl* threads = new ThreadImpl[thnum]; | ||||
double range = (double)offs.size() / thnum; | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
int64_t cidx = i * range; | ||||
int64_t nidx = (i + 1) * range; | ||||
int64_t begoff = i < 1 ? roff_ : offs[cidx]; | ||||
int64_t endoff = i < thnum - 1 ? offs[nidx] : (int64_t)lsiz_; | ||||
ThreadImpl* thread = threads + i; | ||||
thread->init(this, visitor, checker, allcnt, begoff, endoff); | ||||
thread->start(); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->join(); | ||||
if (thread->error() != Error::SUCCESS) { | ||||
*error_ = thread->error(); | ||||
err = true; | ||||
} | ||||
} | ||||
delete[] threads; | ||||
} | ||||
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | ||||
{ | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
err = true; | ||||
} | ||||
return !err; | ||||
} | ||||
/** | ||||
* Synchronize updated contents with the file and the device. | * Synchronize updated contents with the file and the device. | |||
* @param hard true for physical synchronization with the device, or fals e for logical | * @param hard true for physical synchronization with the device, or fals e for logical | |||
* synchronization with the file system. | * synchronization with the file system. | |||
* @param proc a postprocessor object. | * @param proc a postprocessor object. | |||
* @param checker a progress checker object. | * @param checker a progress checker object. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* ch ecker) { | bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* ch ecker) { | |||
_assert_(true); | _assert_(true); | |||
bool err = false; | bool err = false; | |||
End of changes. 2 change blocks. | ||||
0 lines changed or deleted | 171 lines changed or added | |||
kclangc.h | kclangc.h | |||
---|---|---|---|---|
skipping to change at line 383 | skipping to change at line 383 | |||
KCVISITFULL fullproc, KCVISITEMPTY emptyproc, | KCVISITFULL fullproc, KCVISITEMPTY emptyproc, | |||
void* opq, int32_t writable); | void* opq, int32_t writable); | |||
/** | /** | |||
* Iterate to accept a visitor for each record. | * Iterate to accept a visitor for each record. | |||
* @param db a database object. | * @param db a database object. | |||
* @param fullproc a call back function to visit a record. | * @param fullproc a call back function to visit a record. | |||
* @param opq an opaque pointer to be given to the call back function. | * @param opq an opaque pointer to be given to the call back function. | |||
* @param writable true for writable operation, or false for read-only oper ation. | * @param writable true for writable operation, or false for read-only oper ation. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
* @note The whole iteration is performed atomically and other threads are | * @note The whole iteration is performed atomically and other threads are | |||
blocked. | blocked. To avoid | |||
* deadlock, any explicit database operation must not be performed in this | ||||
function. | ||||
*/ | */ | |||
int32_t kcdbiterate(KCDB* db, KCVISITFULL fullproc, void* opq, int32_t writ able); | int32_t kcdbiterate(KCDB* db, KCVISITFULL fullproc, void* opq, int32_t writ able); | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param db a database object. | ||||
* @param fullproc a call back function to visit a record. | ||||
* @param opq an opaque pointer to be given to the call back function. | ||||
* @param thnum the number of worker threads. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. T | ||||
he return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database o | ||||
peration must not | ||||
* be performed in this function. | ||||
*/ | ||||
int32_t kcdbscanpara(KCDB* db, KCVISITFULL fullproc, void* opq, size_t thnu | ||||
m); | ||||
/** | ||||
* Set the value of a record. | * Set the value of a record. | |||
* @param db a database object. | * @param db a database object. | |||
* @param kbuf the pointer to the key region. | * @param kbuf the pointer to the key region. | |||
* @param ksiz the size of the key region. | * @param ksiz the size of the key region. | |||
* @param vbuf the pointer to the value region. | * @param vbuf the pointer to the value region. | |||
* @param vsiz the size of the value region. | * @param vsiz the size of the value region. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
* @note If no record corresponds to the key, a new record is created. If the corresponding | * @note If no record corresponds to the key, a new record is created. If the corresponding | |||
* record exists, the value is overwritten. | * record exists, the value is overwritten. | |||
*/ | */ | |||
End of changes. 2 change blocks. | ||||
2 lines changed or deleted | 20 lines changed or added | |||
kcplantdb.h | kcplantdb.h | |||
---|---|---|---|---|
skipping to change at line 1262 | skipping to change at line 1262 | |||
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
err = true; | err = true; | |||
} | } | |||
if (atran && !commit_transaction()) err = true; | if (atran && !commit_transaction()) err = true; | |||
if (autosync_ && !autotran_ && writable && !fix_auto_synchronization()) err = true; | if (autosync_ && !autotran_ && writable && !fix_auto_synchronization()) err = true; | |||
trigger_meta(MetaTrigger::ITERATE, "iterate"); | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |||
return !err; | return !err; | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | ||||
er = NULL) { | ||||
_assert_(visitor && thnum <= MEMMAXSIZ); | ||||
ScopedRWLock lock(&mlock_, true); | ||||
if (omode_ == 0) { | ||||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | ||||
return false; | ||||
} | ||||
if (thnum < 1) thnum = 0; | ||||
if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | ||||
bool err = false; | ||||
if (writer_) { | ||||
if (checker && !checker->check("scan_parallel", "cleaning the leaf no | ||||
de cache", -1, -1)) { | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
if (!clean_leaf_cache()) err = true; | ||||
} | ||||
ScopedVisitor svis(visitor); | ||||
int64_t allcnt = count_; | ||||
if (checker && !checker->check("scan_parallel", "beginning", 0, allcnt) | ||||
) { | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
class ProgressCheckerImpl : public ProgressChecker { | ||||
public: | ||||
explicit ProgressCheckerImpl() : ok_(1) {} | ||||
void stop() { | ||||
ok_.set(0); | ||||
} | ||||
private: | ||||
bool check(const char* name, const char* message, int64_t curcnt, int | ||||
64_t allcnt) { | ||||
return ok_ > 0; | ||||
} | ||||
AtomicInt64 ok_; | ||||
}; | ||||
ProgressCheckerImpl ichecker; | ||||
class VisitorImpl : public Visitor { | ||||
public: | ||||
explicit VisitorImpl(PlantDB* db, Visitor* visitor, | ||||
ProgressChecker* checker, int64_t allcnt, | ||||
ProgressCheckerImpl* ichecker) : | ||||
db_(db), visitor_(visitor), checker_(checker), allcnt_(allcnt), | ||||
ichecker_(ichecker), error_() {} | ||||
const Error& error() { | ||||
return error_; | ||||
} | ||||
private: | ||||
const char* visit_full(const char* kbuf, size_t ksiz, | ||||
const char* vbuf, size_t vsiz, size_t* sp) { | ||||
if (ksiz < 2 || ksiz >= NUMBUFSIZ || kbuf[0] != LNPREFIX) return NO | ||||
P; | ||||
uint64_t prev; | ||||
size_t step = readvarnum(vbuf, vsiz, &prev); | ||||
if (step < 1) return NOP; | ||||
vbuf += step; | ||||
vsiz -= step; | ||||
uint64_t next; | ||||
step = readvarnum(vbuf, vsiz, &next); | ||||
if (step < 1) return NOP; | ||||
vbuf += step; | ||||
vsiz -= step; | ||||
while (vsiz > 1) { | ||||
uint64_t rksiz; | ||||
step = readvarnum(vbuf, vsiz, &rksiz); | ||||
if (step < 1) break; | ||||
vbuf += step; | ||||
vsiz -= step; | ||||
uint64_t rvsiz; | ||||
step = readvarnum(vbuf, vsiz, &rvsiz); | ||||
if (step < 1) break; | ||||
vbuf += step; | ||||
vsiz -= step; | ||||
if (vsiz < rksiz + rvsiz) break; | ||||
size_t xvsiz; | ||||
visitor_->visit_full(vbuf, rksiz, vbuf + rksiz, rvsiz, &xvsiz); | ||||
vbuf += rksiz; | ||||
vsiz -= rksiz; | ||||
vbuf += rvsiz; | ||||
vsiz -= rvsiz; | ||||
if (checker_ && !checker_->check("scan_parallel", "processing", - | ||||
1, allcnt_)) { | ||||
db_->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
error_ = db_->error(); | ||||
ichecker_->stop(); | ||||
break; | ||||
} | ||||
} | ||||
return NOP; | ||||
} | ||||
PlantDB* db_; | ||||
Visitor* visitor_; | ||||
ProgressChecker* checker_; | ||||
int64_t allcnt_; | ||||
ProgressCheckerImpl* ichecker_; | ||||
Error error_; | ||||
}; | ||||
VisitorImpl ivisitor(this, visitor, checker, allcnt, &ichecker); | ||||
if (!db_.scan_parallel(&ivisitor, thnum, &ichecker)) err = true; | ||||
if (ivisitor.error() != Error::SUCCESS) { | ||||
const Error& e = ivisitor.error(); | ||||
db_.set_error(_KCCODELINE_, e.code(), e.message()); | ||||
err = true; | ||||
} | ||||
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | ||||
{ | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
err = true; | ||||
} | ||||
trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | ||||
return !err; | ||||
} | ||||
/** | ||||
* Get the last happened error. | * Get the last happened error. | |||
* @return the last happened error. | * @return the last happened error. | |||
*/ | */ | |||
Error error() const { | Error error() const { | |||
_assert_(true); | _assert_(true); | |||
return db_.error(); | return db_.error(); | |||
} | } | |||
/** | /** | |||
* Set the error information. | * Set the error information. | |||
* @param file the file name of the program source code. | * @param file the file name of the program source code. | |||
End of changes. 1 change blocks. | ||||
0 lines changed or deleted | 128 lines changed or added | |||
kcpolydb.h | kcpolydb.h | |||
---|---|---|---|---|
skipping to change at line 322 | skipping to change at line 322 | |||
*/ | */ | |||
bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* che cker = NULL) { | bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* che cker = NULL) { | |||
_assert_(visitor); | _assert_(visitor); | |||
if (type_ == TYPEVOID) { | if (type_ == TYPEVOID) { | |||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | set_error(_KCCODELINE_, Error::INVALID, "not opened"); | |||
return false; | return false; | |||
} | } | |||
return db_->iterate(visitor, writable, checker); | return db_->iterate(visitor, writable, checker); | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | ||||
er = NULL) { | ||||
_assert_(visitor && thnum <= MEMMAXSIZ); | ||||
if (type_ == TYPEVOID) { | ||||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | ||||
return false; | ||||
} | ||||
return db_->scan_parallel(visitor, thnum, checker); | ||||
} | ||||
/** | ||||
* Get the last happened error. | * Get the last happened error. | |||
* @return the last happened error. | * @return the last happened error. | |||
*/ | */ | |||
Error error() const { | Error error() const { | |||
_assert_(true); | _assert_(true); | |||
if (type_ == TYPEVOID) return error_; | if (type_ == TYPEVOID) return error_; | |||
return db_->error(); | return db_->error(); | |||
} | } | |||
/** | /** | |||
* Set the error information. | * Set the error information. | |||
End of changes. 1 change blocks. | ||||
0 lines changed or deleted | 22 lines changed or added | |||
kcprotodb.h | kcprotodb.h | |||
---|---|---|---|---|
skipping to change at line 592 | skipping to change at line 592 | |||
} | } | |||
} | } | |||
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
return false; | return false; | |||
} | } | |||
trigger_meta(MetaTrigger::ITERATE, "iterate"); | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | ||||
er = NULL) { | ||||
_assert_(visitor && thnum <= MEMMAXSIZ); | ||||
ScopedRWLock lock(&mlock_, false); | ||||
if (omode_ == 0) { | ||||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | ||||
return false; | ||||
} | ||||
if (thnum < 1) thnum = 1; | ||||
if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | ||||
ScopedVisitor svis(visitor); | ||||
int64_t allcnt = recs_.size(); | ||||
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt | ||||
)) { | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
class ThreadImpl : public Thread { | ||||
public: | ||||
explicit ThreadImpl() : | ||||
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), | ||||
itp_(NULL), itend_(), itmtx_(NULL), error_() {} | ||||
void init(ProtoDB* db, Visitor* visitor, ProgressChecker* checker, in | ||||
t64_t allcnt, | ||||
typename STRMAP::const_iterator* itp, typename STRMAP::cons | ||||
t_iterator itend, | ||||
Mutex* itmtx) { | ||||
db_ = db; | ||||
visitor_ = visitor; | ||||
checker_ = checker; | ||||
allcnt_ = allcnt; | ||||
itp_ = itp; | ||||
itend_ = itend; | ||||
itmtx_ = itmtx; | ||||
} | ||||
const Error& error() { | ||||
return error_; | ||||
} | ||||
private: | ||||
void run() { | ||||
ProtoDB* db = db_; | ||||
Visitor* visitor = visitor_; | ||||
ProgressChecker* checker = checker_; | ||||
int64_t allcnt = allcnt_; | ||||
typename STRMAP::const_iterator* itp = itp_; | ||||
typename STRMAP::const_iterator itend = itend_; | ||||
Mutex* itmtx = itmtx_; | ||||
while (true) { | ||||
itmtx->lock(); | ||||
if (*itp == itend) { | ||||
itmtx->unlock(); | ||||
break; | ||||
} | ||||
const std::string& key = (*itp)->first; | ||||
const std::string& value = (*itp)->second; | ||||
++(*itp); | ||||
itmtx->unlock(); | ||||
size_t vsiz; | ||||
visitor->visit_full(key.data(), key.size(), value.data(), value.s | ||||
ize(), &vsiz); | ||||
if (checker && !checker->check("scan_parallel", "processing", -1, | ||||
allcnt)) { | ||||
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
} | ||||
} | ||||
ProtoDB* db_; | ||||
Visitor* visitor_; | ||||
ProgressChecker* checker_; | ||||
int64_t allcnt_; | ||||
typename STRMAP::const_iterator* itp_; | ||||
typename STRMAP::const_iterator itend_; | ||||
Mutex* itmtx_; | ||||
Error error_; | ||||
}; | ||||
bool err = false; | ||||
typename STRMAP::const_iterator it = recs_.begin(); | ||||
typename STRMAP::const_iterator itend = recs_.end(); | ||||
Mutex itmtx; | ||||
ThreadImpl* threads = new ThreadImpl[thnum]; | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->init(this, visitor, checker, allcnt, &it, itend, &itmtx); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->start(); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->join(); | ||||
if (thread->error() != Error::SUCCESS) { | ||||
*error_ = thread->error(); | ||||
err = true; | ||||
} | ||||
} | ||||
delete[] threads; | ||||
if (err) return false; | ||||
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | ||||
{ | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | ||||
return true; | ||||
} | ||||
/** | ||||
* Get the last happened error. | * Get the last happened error. | |||
* @return the last happened error. | * @return the last happened error. | |||
*/ | */ | |||
Error error() const { | Error error() const { | |||
_assert_(true); | _assert_(true); | |||
return error_; | return error_; | |||
} | } | |||
/** | /** | |||
* Set the error information. | * Set the error information. | |||
* @param file the file name of the program source code. | * @param file the file name of the program source code. | |||
End of changes. 1 change blocks. | ||||
0 lines changed or deleted | 121 lines changed or added | |||
kcstashdb.h | kcstashdb.h | |||
---|---|---|---|---|
skipping to change at line 488 | skipping to change at line 488 | |||
} | } | |||
} | } | |||
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |||
return false; | return false; | |||
} | } | |||
trigger_meta(MetaTrigger::ITERATE, "iterate"); | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Scan each record in parallel. | ||||
* @param visitor a visitor object. | ||||
* @param thnum the number of worker threads. | ||||
* @param checker a progress checker object. If it is NULL, no checking | ||||
is performed. | ||||
* @return true on success, or false on failure. | ||||
* @note This function is for reading records and not for updating ones. | ||||
The return value of | ||||
* the visitor is just ignored. To avoid deadlock, any explicit database | ||||
operation must not | ||||
* be performed in this function. | ||||
*/ | ||||
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | ||||
er = NULL) { | ||||
_assert_(visitor && thnum <= MEMMAXSIZ); | ||||
ScopedRWLock lock(&mlock_, false); | ||||
if (omode_ == 0) { | ||||
set_error(_KCCODELINE_, Error::INVALID, "not opened"); | ||||
return false; | ||||
} | ||||
if (thnum < 1) thnum = 1; | ||||
if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | ||||
if (thnum > bnum_) thnum = bnum_; | ||||
ScopedVisitor svis(visitor); | ||||
int64_t allcnt = count_; | ||||
if (checker && !checker->check("scan_parallel", "beginning", 0, allcnt) | ||||
) { | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
class ThreadImpl : public Thread { | ||||
public: | ||||
explicit ThreadImpl() : | ||||
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), | ||||
begidx_(0), endidx_(0), error_() {} | ||||
void init(StashDB* db, Visitor* visitor, ProgressChecker* checker, in | ||||
t64_t allcnt, | ||||
size_t begidx, size_t endidx) { | ||||
db_ = db; | ||||
visitor_ = visitor; | ||||
checker_ = checker; | ||||
allcnt_ = allcnt; | ||||
begidx_ = begidx; | ||||
endidx_ = endidx; | ||||
} | ||||
const Error& error() { | ||||
return error_; | ||||
} | ||||
private: | ||||
void run() { | ||||
StashDB* db = db_; | ||||
Visitor* visitor = visitor_; | ||||
ProgressChecker* checker = checker_; | ||||
int64_t allcnt = allcnt_; | ||||
size_t endidx = endidx_; | ||||
char** buckets = db->buckets_; | ||||
for (size_t i = begidx_; i < endidx; i++) { | ||||
char* rbuf = buckets[i]; | ||||
while (rbuf) { | ||||
Record rec(rbuf); | ||||
rbuf = rec.child_; | ||||
size_t vsiz; | ||||
visitor->visit_full(rec.kbuf_, rec.ksiz_, rec.vbuf_, rec.vsiz_, | ||||
&vsiz); | ||||
if (checker && !checker->check("scan_parallel", "processing", - | ||||
1, allcnt)) { | ||||
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
error_ = db->error(); | ||||
break; | ||||
} | ||||
} | ||||
} | ||||
} | ||||
StashDB* db_; | ||||
Visitor* visitor_; | ||||
ProgressChecker* checker_; | ||||
int64_t allcnt_; | ||||
size_t begidx_; | ||||
size_t endidx_; | ||||
Error error_; | ||||
}; | ||||
bool err = false; | ||||
rlock_.lock_reader_all(); | ||||
ThreadImpl* threads = new ThreadImpl[thnum]; | ||||
double range = (double)bnum_ / thnum; | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
size_t cidx = i * range; | ||||
size_t nidx = (i + 1) * range; | ||||
if (i < 1) cidx = 0; | ||||
if (i >= thnum - 1) nidx = bnum_; | ||||
ThreadImpl* thread = threads + i; | ||||
thread->init(this, visitor, checker, allcnt, cidx, nidx); | ||||
thread->start(); | ||||
} | ||||
for (size_t i = 0; i < thnum; i++) { | ||||
ThreadImpl* thread = threads + i; | ||||
thread->join(); | ||||
if (thread->error() != Error::SUCCESS) { | ||||
*error_ = thread->error(); | ||||
err = true; | ||||
} | ||||
} | ||||
delete[] threads; | ||||
rlock_.unlock_all(); | ||||
if (err) return false; | ||||
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | ||||
{ | ||||
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | ||||
return false; | ||||
} | ||||
trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | ||||
return true; | ||||
} | ||||
/** | ||||
* Get the last happened error. | * Get the last happened error. | |||
* @return the last happened error. | * @return the last happened error. | |||
*/ | */ | |||
Error error() const { | Error error() const { | |||
_assert_(true); | _assert_(true); | |||
return error_; | return error_; | |||
} | } | |||
/** | /** | |||
* Set the error information. | * Set the error information. | |||
* @param file the file name of the program source code. | * @param file the file name of the program source code. | |||
End of changes. 1 change blocks. | ||||
0 lines changed or deleted | 114 lines changed or added | |||