kccachedb.h   kccachedb.h 
skipping to change at line 141 skipping to change at line 141
rvsiz = zsiz; rvsiz = zsiz;
} }
} }
size_t vsiz; size_t vsiz;
const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vs iz); const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vs iz);
delete[] zbuf; delete[] zbuf;
if (vbuf == Visitor::REMOVE) { if (vbuf == Visitor::REMOVE) {
uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM;
Slot* slot = db_->slots_ + sidx_; Slot* slot = db_->slots_ + sidx_;
Repeater repeater(Visitor::REMOVE, 0); Repeater repeater(Visitor::REMOVE, 0);
db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, tr ue); db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, fa lse);
} else if (vbuf == Visitor::NOP) { } else if (vbuf == Visitor::NOP) {
if (step) step_impl(); if (step) step_impl();
} else { } else {
uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM;
Slot* slot = db_->slots_ + sidx_; Slot* slot = db_->slots_ + sidx_;
Repeater repeater(vbuf, vsiz); Repeater repeater(vbuf, vsiz);
db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, tr ue); db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, fa lse);
if (step) step_impl(); if (step) step_impl();
} }
return true; return true;
} }
/** /**
* Jump the cursor to the first record for forward scan. * Jump the cursor to the first record for forward scan.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool jump() { bool jump() {
_assert_(true); _assert_(true);
skipping to change at line 377 skipping to change at line 377
FOPEN = 1 << 0, ///< dummy for compatibility FOPEN = 1 << 0, ///< dummy for compatibility
FFATAL = 1 << 1 ///< dummy for compatibility FFATAL = 1 << 1 ///< dummy for compatibility
}; };
/** /**
* Default constructor. * Default constructor.
*/ */
explicit CacheDB() : explicit CacheDB() :
mlock_(), flock_(), error_(), logger_(NULL), logkinds_(0), mtrigger_( NULL), mlock_(), flock_(), error_(), logger_(NULL), logkinds_(0), mtrigger_( NULL),
omode_(0), curs_(), path_(""), type_(TYPECACHE), omode_(0), curs_(), path_(""), type_(TYPECACHE),
opts_(0), bnum_(DEFBNUM), capcnt_(-1), capsiz_(-1), opts_(0), bnum_(DEFBNUM), capcnt_(-1), capsiz_(-1),
opaque_(), embcomp_(ZLIBRAWCOMP), comp_(NULL), slots_(), tran_(false) { opaque_(), embcomp_(ZLIBRAWCOMP), comp_(NULL), slots_(), rttmode_(tru e), tran_(false) {
_assert_(true); _assert_(true);
} }
/** /**
* Destructor. * Destructor.
* @note If the database is not closed, it is closed implicitly. * @note If the database is not closed, it is closed implicitly.
*/ */
virtual ~CacheDB() { virtual ~CacheDB() {
_assert_(true); _assert_(true);
if (omode_ != 0) close(); if (omode_ != 0) close();
if (!curs_.empty()) { if (!curs_.empty()) {
skipping to change at line 425 skipping to change at line 425
if (writable && !(omode_ & OWRITER)) { if (writable && !(omode_ & OWRITER)) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false; return false;
} }
if (ksiz > KSIZMAX) ksiz = KSIZMAX; if (ksiz > KSIZMAX) ksiz = KSIZMAX;
uint64_t hash = hash_record(kbuf, ksiz); uint64_t hash = hash_record(kbuf, ksiz);
int32_t sidx = hash % SLOTNUM; int32_t sidx = hash % SLOTNUM;
hash /= SLOTNUM; hash /= SLOTNUM;
Slot* slot = slots_ + sidx; Slot* slot = slots_ + sidx;
slot->lock.lock(); slot->lock.lock();
accept_impl(slot, hash, kbuf, ksiz, visitor, comp_, false); accept_impl(slot, hash, kbuf, ksiz, visitor, comp_, rttmode_);
slot->lock.unlock(); slot->lock.unlock();
return true; return true;
} }
/** /**
* Accept a visitor to multiple records at once. * Accept a visitor to multiple records at once.
* @param keys specifies a string vector of the keys. * @param keys specifies a string vector of the keys.
* @param visitor a visitor object. * @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only op eration. * @param writable true for writable operation, or false for read-only op eration.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note The operations for specified records are performed atomically an d other threads * @note The operations for specified records are performed atomically an d other threads
skipping to change at line 483 skipping to change at line 483
std::set<int32_t>::iterator sit = sidxs.begin(); std::set<int32_t>::iterator sit = sidxs.begin();
std::set<int32_t>::iterator sitend = sidxs.end(); std::set<int32_t>::iterator sitend = sidxs.end();
while (sit != sitend) { while (sit != sitend) {
Slot* slot = slots_ + *sit; Slot* slot = slots_ + *sit;
slot->lock.lock(); slot->lock.lock();
++sit; ++sit;
} }
for (size_t i = 0; i < knum; i++) { for (size_t i = 0; i < knum; i++) {
RecordKey* rkey = rkeys + i; RecordKey* rkey = rkeys + i;
Slot* slot = slots_ + rkey->sidx; Slot* slot = slots_ + rkey->sidx;
accept_impl(slot, rkey->hash, rkey->kbuf, rkey->ksiz, visitor, comp_, false); accept_impl(slot, rkey->hash, rkey->kbuf, rkey->ksiz, visitor, comp_, rttmode_);
} }
sit = sidxs.begin(); sit = sidxs.begin();
sitend = sidxs.end(); sitend = sidxs.end();
while (sit != sitend) { while (sit != sitend) {
Slot* slot = slots_ + *sit; Slot* slot = slots_ + *sit;
slot->lock.unlock(); slot->lock.unlock();
++sit; ++sit;
} }
delete[] rkeys; delete[] rkeys;
return true; return true;
skipping to change at line 546 skipping to change at line 546
rvbuf = zbuf; rvbuf = zbuf;
rvsiz = zsiz; rvsiz = zsiz;
} }
} }
size_t vsiz; size_t vsiz;
const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, & vsiz); const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, & vsiz);
delete[] zbuf; delete[] zbuf;
if (vbuf == Visitor::REMOVE) { if (vbuf == Visitor::REMOVE) {
uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM;
Repeater repeater(Visitor::REMOVE, 0); Repeater repeater(Visitor::REMOVE, 0);
accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, true); accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false);
} else if (vbuf != Visitor::NOP) { } else if (vbuf != Visitor::NOP) {
uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM;
Repeater repeater(vbuf, vsiz); Repeater repeater(vbuf, vsiz);
accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, true); accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false);
} }
rec = next; rec = next;
curcnt++; curcnt++;
if (checker && !checker->check("iterate", "processing", curcnt, all cnt)) { if (checker && !checker->check("iterate", "processing", curcnt, all cnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
} }
} }
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
trigger_meta(MetaTrigger::ITERATE, "iterate"); trigger_meta(MetaTrigger::ITERATE, "iterate");
return true; return true;
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check
er = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 1;
thnum = std::pow(2, (int32_t)std::log2(thnum * std::sqrt(2)));
if (thnum > (size_t)SLOTNUM) thnum = SLOTNUM;
ScopedVisitor svis(visitor);
int64_t allcnt = count_impl();
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt
)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
class ThreadImpl : public Thread {
public:
explicit ThreadImpl() :
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), slots_(),
error_() {}
void init(CacheDB* db, Visitor* visitor, ProgressChecker* checker, in
t64_t allcnt) {
db_ = db;
visitor_ = visitor;
checker_ = checker;
allcnt_ = allcnt;
}
void add_slot(Slot* slot) {
slots_.push_back(slot);
}
const Error& error() {
return error_;
}
private:
void run() {
CacheDB* db = db_;
Visitor* visitor = visitor_;
ProgressChecker* checker = checker_;
int64_t allcnt = allcnt_;
Compressor* comp = db->comp_;
std::vector<Slot*>::iterator sit = slots_.begin();
std::vector<Slot*>::iterator sitend = slots_.end();
while (sit != sitend) {
Slot* slot = *sit;
Record* rec = slot->first;
while (rec) {
Record* next = rec->next;
uint32_t rksiz = rec->ksiz & KSIZMAX;
char* dbuf = (char*)rec + sizeof(*rec);
const char* rvbuf = dbuf + rksiz;
size_t rvsiz = rec->vsiz;
char* zbuf = NULL;
size_t zsiz = 0;
if (comp) {
zbuf = comp->decompress(rvbuf, rvsiz, &zsiz);
if (zbuf) {
rvbuf = zbuf;
rvsiz = zsiz;
}
}
size_t vsiz;
visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vsiz);
delete[] zbuf;
rec = next;
if (checker && !checker->check("scan_parallel", "processing", -
1, allcnt)) {
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
error_ = db->error();
break;
}
}
++sit;
}
}
CacheDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
std::vector<Slot*> slots_;
Error error_;
};
bool err = false;
bool orttmode = rttmode_;
rttmode_ = false;
ThreadImpl* threads = new ThreadImpl[thnum];
for (int32_t i = 0; i < SLOTNUM; i++) {
ThreadImpl* thread = threads + (i % thnum);
thread->add_slot(slots_ + i);
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->init(this, visitor, checker, allcnt);
thread->start();
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->join();
if (thread->error() != Error::SUCCESS) {
*error_ = thread->error();
err = true;
}
}
delete[] threads;
rttmode_ = orttmode;
if (err) return false;
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt))
{
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return true;
}
/**
* Get the last happened error. * Get the last happened error.
* @return the last happened error. * @return the last happened error.
*/ */
Error error() const { Error error() const {
_assert_(true); _assert_(true);
return error_; return error_;
} }
/** /**
* Set the error information. * Set the error information.
* @param file the file name of the program source code. * @param file the file name of the program source code.
skipping to change at line 1043 skipping to change at line 1163
_assert_(true); _assert_(true);
ScopedRWLock lock(&mlock_, true); ScopedRWLock lock(&mlock_, true);
if (omode_ != 0) { if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened"); set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false; return false;
} }
capsiz_ = size; capsiz_ = size;
return true; return true;
} }
/** /**
* Switch the mode of LRU rotation.
* @param rttmode true to enable LRU rotation, false to disable LRU rotat
ion.
* @return true on success, or false on failure.
* @note This function can be called while the database is opened.
*/
bool switch_rotation(bool rttmode) {
_assert_(true);
ScopedRWLock lock(&mlock_, true);
rttmode_ = rttmode;
return true;
}
/**
* Get the opaque data. * Get the opaque data.
* @return the pointer to the opaque data region, whose size is 16 bytes. * @return the pointer to the opaque data region, whose size is 16 bytes.
*/ */
char* opaque() { char* opaque() {
_assert_(true); _assert_(true);
ScopedRWLock lock(&mlock_, false); ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) { if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened"); set_error(_KCCODELINE_, Error::INVALID, "not opened");
return NULL; return NULL;
} }
skipping to change at line 1481 skipping to change at line 1613
Visitor* visitor_; ///< visitor Visitor* visitor_; ///< visitor
}; };
/** /**
* Accept a visitor to a record. * Accept a visitor to a record.
* @param slot the slot of the record. * @param slot the slot of the record.
* @param hash the hash value of the key. * @param hash the hash value of the key.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param visitor a visitor object. * @param visitor a visitor object.
* @param comp the data compressor. * @param comp the data compressor.
* @param isiter true for iterator use, or false for direct use. * @param rtt whether to move the record to the last.
*/ */
void accept_impl(Slot* slot, uint64_t hash, const char* kbuf, size_t ksiz , Visitor* visitor, void accept_impl(Slot* slot, uint64_t hash, const char* kbuf, size_t ksiz , Visitor* visitor,
Compressor* comp, bool isiter) { Compressor* comp, bool rtt) {
_assert_(slot && kbuf && ksiz <= MEMMAXSIZ && visitor); _assert_(slot && kbuf && ksiz <= MEMMAXSIZ && visitor);
size_t bidx = hash % slot->bnum; size_t bidx = hash % slot->bnum;
Record* rec = slot->buckets[bidx]; Record* rec = slot->buckets[bidx];
Record** entp = slot->buckets + bidx; Record** entp = slot->buckets + bidx;
uint32_t fhash = fold_hash(hash) & ~KSIZMAX; uint32_t fhash = fold_hash(hash) & ~KSIZMAX;
while (rec) { while (rec) {
uint32_t rhash = rec->ksiz & ~KSIZMAX; uint32_t rhash = rec->ksiz & ~KSIZMAX;
uint32_t rksiz = rec->ksiz & KSIZMAX; uint32_t rksiz = rec->ksiz & KSIZMAX;
if (fhash > rhash) { if (fhash > rhash) {
entp = &rec->left; entp = &rec->left;
skipping to change at line 1597 skipping to change at line 1729
*entp = rec; *entp = rec;
if (rec->prev) rec->prev->next = rec; if (rec->prev) rec->prev->next = rec;
if (rec->next) rec->next->prev = rec; if (rec->next) rec->next->prev = rec;
dbuf = (char*)rec + sizeof(*rec); dbuf = (char*)rec + sizeof(*rec);
} }
} }
std::memcpy(dbuf + ksiz, vbuf, vsiz); std::memcpy(dbuf + ksiz, vbuf, vsiz);
rec->vsiz = vsiz; rec->vsiz = vsiz;
delete[] zbuf; delete[] zbuf;
} }
if (!isiter && slot->last != rec) { if (rtt && slot->last != rec) {
if (!curs_.empty()) escape_cursors(rec); if (!curs_.empty()) escape_cursors(rec);
if (slot->first == rec) slot->first = rec->next; if (slot->first == rec) slot->first = rec->next;
if (rec->prev) rec->prev->next = rec->next; if (rec->prev) rec->prev->next = rec->next;
if (rec->next) rec->next->prev = rec->prev; if (rec->next) rec->next->prev = rec->prev;
rec->prev = slot->last; rec->prev = slot->last;
rec->next = NULL; rec->next = NULL;
slot->last->next = rec; slot->last->next = rec;
slot->last = rec; slot->last = rec;
} }
if (adj) adjust_slot_capacity(slot); if (adj) adjust_slot_capacity(slot);
skipping to change at line 1770 skipping to change at line 1902
TranLogList::const_iterator itbeg = logs.begin(); TranLogList::const_iterator itbeg = logs.begin();
while (it != itbeg) { while (it != itbeg) {
--it; --it;
const char* kbuf = it->key.c_str(); const char* kbuf = it->key.c_str();
size_t ksiz = it->key.size(); size_t ksiz = it->key.size();
const char* vbuf = it->value.c_str(); const char* vbuf = it->value.c_str();
size_t vsiz = it->value.size(); size_t vsiz = it->value.size();
uint64_t hash = hash_record(kbuf, ksiz) / SLOTNUM; uint64_t hash = hash_record(kbuf, ksiz) / SLOTNUM;
if (it->full) { if (it->full) {
Setter setter(vbuf, vsiz); Setter setter(vbuf, vsiz);
accept_impl(slot, hash, kbuf, ksiz, &setter, NULL, true); accept_impl(slot, hash, kbuf, ksiz, &setter, NULL, false);
} else { } else {
Remover remover; Remover remover;
accept_impl(slot, hash, kbuf, ksiz, &remover, NULL, true); accept_impl(slot, hash, kbuf, ksiz, &remover, NULL, false);
} }
} }
} }
/** /**
* Addjust a slot table to the capacity. * Addjust a slot table to the capacity.
* @param slot the slot table. * @param slot the slot table.
*/ */
void adjust_slot_capacity(Slot* slot) { void adjust_slot_capacity(Slot* slot) {
_assert_(slot); _assert_(slot);
if ((slot->count > slot->capcnt || slot->size > slot->capsiz) && slot-> first) { if ((slot->count > slot->capcnt || slot->size > slot->capsiz) && slot-> first) {
Record* rec = slot->first; Record* rec = slot->first;
uint32_t rksiz = rec->ksiz & KSIZMAX; uint32_t rksiz = rec->ksiz & KSIZMAX;
char* dbuf = (char*)rec + sizeof(*rec); char* dbuf = (char*)rec + sizeof(*rec);
char stack[RECBUFSIZ]; char stack[RECBUFSIZ];
char* kbuf = rksiz > sizeof(stack) ? new char[rksiz] : stack; char* kbuf = rksiz > sizeof(stack) ? new char[rksiz] : stack;
std::memcpy(kbuf, dbuf, rksiz); std::memcpy(kbuf, dbuf, rksiz);
uint64_t hash = hash_record(kbuf, rksiz) / SLOTNUM; uint64_t hash = hash_record(kbuf, rksiz) / SLOTNUM;
Remover remover; Remover remover;
accept_impl(slot, hash, dbuf, rksiz, &remover, NULL, true); accept_impl(slot, hash, dbuf, rksiz, &remover, NULL, false);
if (kbuf != stack) delete[] kbuf; if (kbuf != stack) delete[] kbuf;
} }
} }
/** /**
* Get the hash value of a record. * Get the hash value of a record.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @return the hash value. * @return the hash value.
*/ */
uint64_t hash_record(const char* kbuf, size_t ksiz) { uint64_t hash_record(const char* kbuf, size_t ksiz) {
skipping to change at line 1918 skipping to change at line 2050
/** The capacity of memory usage. */ /** The capacity of memory usage. */
int64_t capsiz_; int64_t capsiz_;
/** The opaque data. */ /** The opaque data. */
char opaque_[OPAQUESIZ]; char opaque_[OPAQUESIZ];
/** The embedded data compressor. */ /** The embedded data compressor. */
Compressor* embcomp_; Compressor* embcomp_;
/** The data compressor. */ /** The data compressor. */
Compressor* comp_; Compressor* comp_;
/** The slot tables. */ /** The slot tables. */
Slot slots_[SLOTNUM]; Slot slots_[SLOTNUM];
/** The flag whether in LRU rotation. */
bool rttmode_;
/** The flag whether in transaction. */ /** The flag whether in transaction. */
bool tran_; bool tran_;
}; };
/** An alias of the cache tree database. */ /** An alias of the cache tree database. */
typedef PlantDB<CacheDB, BasicDB::TYPEGRASS> GrassDB; typedef PlantDB<CacheDB, BasicDB::TYPEGRASS> GrassDB;
} // common namespace } // common namespace
#endif // duplication check #endif // duplication check
 End of changes. 16 change blocks. 
13 lines changed or deleted 157 lines changed or added


 kccommon.h   kccommon.h 
skipping to change at line 81 skipping to change at line 81
inline long double modfl(long double val, long double* iptr) { inline long double modfl(long double val, long double* iptr) {
double integ; double integ;
double fract = std::modf(val, &integ); double fract = std::modf(val, &integ);
*iptr = integ; *iptr = integ;
return fract; return fract;
} }
#endif #endif
namespace std { namespace std {
using ::modfl; using ::modfl;
using ::log2;
using ::snprintf; using ::snprintf;
} }
#if __cplusplus > 199711L || defined(__GXX_EXPERIMENTAL_CXX0X__) || defined (_MSC_VER) #if __cplusplus > 199711L || defined(__GXX_EXPERIMENTAL_CXX0X__) || defined (_MSC_VER)
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#else #else
 End of changes. 1 change blocks. 
0 lines changed or deleted 1 lines changed or added


 kcdb.h   kcdb.h 
skipping to change at line 1133 skipping to change at line 1133
* @param visitor a visitor object. * @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only op eration. * @param writable true for writable operation, or false for read-only op eration.
* @param checker a progress checker object. If it is NULL, no checking is performed. * @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note The whole iteration is performed atomically and other threads ar e blocked. To avoid * @note The whole iteration is performed atomically and other threads ar e blocked. To avoid
* deadlock, any explicit database operation must not be performed in thi s function. * deadlock, any explicit database operation must not be performed in thi s function.
*/ */
virtual bool iterate(Visitor *visitor, bool writable = true, virtual bool iterate(Visitor *visitor, bool writable = true,
ProgressChecker* checker = NULL) = 0; ProgressChecker* checker = NULL) = 0;
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
virtual bool scan_parallel(Visitor *visitor, size_t thnum,
ProgressChecker* checker = NULL) = 0;
/**
* Synchronize updated contents with the file and the device. * Synchronize updated contents with the file and the device.
* @param hard true for physical synchronization with the device, or fals e for logical * @param hard true for physical synchronization with the device, or fals e for logical
* synchronization with the file system. * synchronization with the file system.
* @param proc a postprocessor object. If it is NULL, no postprocessing is performed. * @param proc a postprocessor object. If it is NULL, no postprocessing is performed.
* @param checker a progress checker object. If it is NULL, no checking is performed. * @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note The operation of the postprocessor is performed atomically and o ther threads accessing * @note The operation of the postprocessor is performed atomically and o ther threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not * the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function. * be performed in this function.
*/ */
 End of changes. 1 change blocks. 
0 lines changed or deleted 15 lines changed or added


 kcdbext.h   kcdbext.h 
skipping to change at line 66 skipping to change at line 66
/** The default cache bucket numer. */ /** The default cache bucket numer. */
static const int64_t DEFCBNUM = 1048583LL; static const int64_t DEFCBNUM = 1048583LL;
/** The bucket number of temprary databases. */ /** The bucket number of temprary databases. */
static const int64_t DBBNUM = 512LL << 10; static const int64_t DBBNUM = 512LL << 10;
/** The page size of temprary databases. */ /** The page size of temprary databases. */
static const int32_t DBPSIZ = 32768; static const int32_t DBPSIZ = 32768;
/** The mapped size of temprary databases. */ /** The mapped size of temprary databases. */
static const int64_t DBMSIZ = 516LL * 4096; static const int64_t DBMSIZ = 516LL * 4096;
/** The page cache capacity of temprary databases. */ /** The page cache capacity of temprary databases. */
static const int64_t DBPCCAP = 16LL << 20; static const int64_t DBPCCAP = 16LL << 20;
/** The default number of threads in parallel mode. */
static const size_t DEFTHNUM = 8;
public: public:
/** /**
* Value iterator for the reducer. * Value iterator for the reducer.
*/ */
class ValueIterator { class ValueIterator {
friend class MapReduce; friend class MapReduce;
public: public:
/** /**
* Get the next value. * Get the next value.
* @param sp the pointer to the variable into which the size of the reg ion of the return * @param sp the pointer to the variable into which the size of the reg ion of the return
skipping to change at line 130 skipping to change at line 132
/** The pointer of the current value. */ /** The pointer of the current value. */
const char* vptr_; const char* vptr_;
/** The size of the current value. */ /** The size of the current value. */
size_t vsiz_; size_t vsiz_;
}; };
/** /**
* Execution options. * Execution options.
*/ */
enum Option { enum Option {
XNOLOCK = 1 << 0, ///< avoid locking against update operations XNOLOCK = 1 << 0, ///< avoid locking against update operations
XNOCOMP = 1 << 1 ///< avoid compression of temporar XPARAMAP = 1 << 1, ///< run mappers in parallel
y databases XPARARED = 1 << 2, ///< run reducers in parallel
XNOCOMP = 1 << 8 ///< avoid compression of temporar
y databases
}; };
/** /**
* Default constructor. * Default constructor.
*/ */
explicit MapReduce() : explicit MapReduce() :
db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0) , db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0) ,
cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM) { mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM),
cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM),
redtasks_(NULL), redaborted_(false), lock_(NULL) {
_assert_(true); _assert_(true);
} }
/** /**
* Destructor. * Destructor.
*/ */
virtual ~MapReduce() { virtual ~MapReduce() {
_assert_(true); _assert_(true);
} }
/** /**
* Map a record data. * Map a record data.
skipping to change at line 213 skipping to change at line 219
virtual bool log(const char* name, const char* message) { virtual bool log(const char* name, const char* message) {
_assert_(name && message); _assert_(name && message);
return true; return true;
} }
/** /**
* Execute the MapReduce process about a database. * Execute the MapReduce process about a database.
* @param db the source database. * @param db the source database.
* @param tmppath the path of a directory for the temporary data storage. If it is an empty * @param tmppath the path of a directory for the temporary data storage. If it is an empty
* string, temporary data are handled on memory. * string, temporary data are handled on memory.
* @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking
* against update operations by other threads, MapReduce::XNOCOMP to avoi * against update operations by other threads, MapReduce::XPARAMAP to run
d compression of the mapper in
* temporary databases. * parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduc
e::XNOCOMP to avoid
* compression of temporary databases.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) {
int64_t count = db->count(); int64_t count = db->count();
if (count < 0) return false; if (count < 0) return false;
bool err = false; bool err = false;
double stime, etime; double stime, etime;
db_ = db; db_ = db;
rcomp_ = LEXICALCOMP; rcomp_ = LEXICALCOMP;
BasicDB* idb = db; BasicDB* idb = db;
skipping to change at line 307 skipping to change at line 314
if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime))
err = true; err = true;
if (err) { if (err) {
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
delete tmpdbs_[i]; delete tmpdbs_[i];
} }
delete[] tmpdbs_; delete[] tmpdbs_;
return false; return false;
} }
} }
if (opts & XPARARED) redtasks_ = new ReduceTaskQueue;
if (opts & XNOLOCK) { if (opts & XNOLOCK) {
MapChecker mapchecker; MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count()); MapVisitor mapvisitor(this, &mapchecker, db->count());
mapvisitor.visit_before(); mapvisitor.visit_before();
if (!err) { if (!err) {
BasicDB::Cursor* cur = db->cursor(); BasicDB::Cursor* cur = db->cursor();
if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr ue; if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr ue;
while (!err) { while (!err) {
if (!cur->accept(&mapvisitor, false, true)) { if (!cur->accept(&mapvisitor, false, true)) {
if (cur->error() != BasicDB::Error::NOREC) err = true; if (cur->error() != BasicDB::Error::NOREC) err = true;
break; break;
} }
} }
delete cur; delete cur;
} }
if (mapvisitor.error()) err = true; if (mapvisitor.error()) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
);
err = true;
}
mapvisitor.visit_after(); mapvisitor.visit_after();
} else if (opts & XPARAMAP) {
MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count());
lock_ = new Mutex();
if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker))
{
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
);
err = true;
}
delete lock_;
lock_ = NULL;
if (mapvisitor.error()) err = true;
} else { } else {
MapChecker mapchecker; MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count()); MapVisitor mapvisitor(this, &mapchecker, db->count());
if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ; if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ;
if (mapvisitor.error()) err = true; if (mapvisitor.error()) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
);
err = true;
}
}
if (redtasks_) {
delete redtasks_;
redtasks_ = NULL;
} }
if (!logf("clean", "closing the temporary databases")) err = true; if (!logf("clean", "closing the temporary databases")) err = true;
stime = time(); stime = time();
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
assert(tmpdbs_[i]); assert(tmpdbs_[i]);
const std::string& path = tmpdbs_[i]->path(); const std::string& path = tmpdbs_[i]->path();
if (!tmpdbs_[i]->clear()) { if (!tmpdbs_[i]->clear()) {
const BasicDB::Error& e = tmpdbs_[i]->error(); const BasicDB::Error& e = tmpdbs_[i]->error();
db->set_error(_KCCODELINE_, e.code(), e.message()); db->set_error(_KCCODELINE_, e.code(), e.message());
err = true; err = true;
skipping to change at line 368 skipping to change at line 397
* @param cbnum the bucket number of the internal cache. * @param cbnum the bucket number of the internal cache.
*/ */
void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) {
_assert_(true); _assert_(true);
dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM;
if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM;
clim_ = clim > 0 ? clim : DEFCLIM; clim_ = clim > 0 ? clim : DEFCLIM;
cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM;
if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_);
} }
/**
* Set the thread configurations.
* @param mapthnum the number of threads for the mapper.
* @param redthnum the number of threads for the reducer.
*/
void tune_thread(int32_t mapthnum, int32_t redthnum) {
_assert_(true);
mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM;
redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM;
}
protected: protected:
/** /**
* Emit a record from the mapper. * Emit a record from the mapper.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param vbuf the pointer to the value region. * @param vbuf the pointer to the value region.
* @param vsiz the size of the value region. * @param vsiz the size of the value region.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
bool err = false; bool err = false;
size_t rsiz = sizevarnum(vsiz) + vsiz; size_t rsiz = sizevarnum(vsiz) + vsiz;
char stack[NUMBUFSIZ*4]; char stack[NUMBUFSIZ*4];
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
char* wp = rbuf; char* wp = rbuf;
wp += writevarnum(rbuf, vsiz); wp += writevarnum(rbuf, vsiz);
std::memcpy(wp, vbuf, vsiz); std::memcpy(wp, vbuf, vsiz);
cache_->append(kbuf, ksiz, rbuf, rsiz); if (lock_) {
lock_->lock();
cache_->append(kbuf, ksiz, rbuf, rsiz);
lock_->unlock();
} else {
cache_->append(kbuf, ksiz, rbuf, rsiz);
}
if (rbuf != stack) delete[] rbuf; if (rbuf != stack) delete[] rbuf;
csiz_ += rsiz; csiz_ += rsiz;
return !err; return !err;
} }
private: private:
/** /**
* Task queue for parallel reducer.
*/
class ReduceTaskQueue : public TaskQueue {
public:
/**
* Task for parallel reducer.
*/
class ReduceTask : public Task {
friend class ReduceTaskQueue;
public:
/** constructor */
explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, con
st Values& values) :
mr_(mr), key_(kbuf, ksiz), values_(values) {}
private:
MapReduce* mr_; ///< driver
std::string key_; ///< key
Values values_; ///< values
};
/** constructor */
explicit ReduceTaskQueue() {}
private:
/** process a task */
void do_task(Task* task) {
ReduceTask* rtask = (ReduceTask*)task;
ValueIterator iter(rtask->values_.begin(), rtask->values_.end());
if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter
))
rtask->mr_->redaborted_ = true;
delete rtask;
}
};
/**
* Checker for the map process. * Checker for the map process.
*/ */
class MapChecker : public BasicDB::ProgressChecker { class MapChecker : public BasicDB::ProgressChecker {
public: public:
/** constructor */ /** constructor */
explicit MapChecker() : stop_(false) {} explicit MapChecker() : stop_(false) {}
/** stop the process */ /** stop the process */
void stop() { void stop() {
stop_ = true; stop_ = true;
} }
skipping to change at line 457 skipping to change at line 533
if (!mr_->postprocess()) err_ = true; if (!mr_->postprocess()) err_ = true;
} }
private: private:
/** visit a record */ /** visit a record */
const char* visit_full(const char* kbuf, size_t ksiz, const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) { const char* vbuf, size_t vsiz, size_t* sp) {
if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) {
checker_->stop(); checker_->stop();
err_ = true; err_ = true;
} }
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { if (mr_->lock_) {
checker_->stop(); mr_->lock_->lock();
err_ = true; if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
checker_->stop();
err_ = true;
}
mr_->lock_->unlock();
} else {
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
checker_->stop();
err_ = true;
}
} }
return NOP; return NOP;
} }
MapReduce* mr_; ///< driver MapReduce* mr_; ///< driver
MapChecker* checker_; ///< checker MapChecker* checker_; ///< checker
int64_t scale_; ///< number of records int64_t scale_; ///< number of records
double stime_; ///< start time double stime_; ///< start time
bool err_; ///< error flag bool err_; ///< error flag
}; };
/** /**
skipping to change at line 541 skipping to change at line 626
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool execute_reduce() { bool execute_reduce() {
bool err = false; bool err = false;
int64_t scale = 0; int64_t scale = 0;
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
scale += tmpdbs_[i]->count(); scale += tmpdbs_[i]->count();
} }
if (!logf("reduce", "started the reduce process: scale=%lld", (long lon g)scale)) err = true; if (!logf("reduce", "started the reduce process: scale=%lld", (long lon g)scale)) err = true;
double stime = time(); double stime = time();
if (redtasks_) redtasks_->start(redthnum_);
std::priority_queue<MergeLine> lines; std::priority_queue<MergeLine> lines;
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
MergeLine line; MergeLine line;
line.cur = tmpdbs_[i]->cursor(); line.cur = tmpdbs_[i]->cursor();
line.rcomp = rcomp_; line.rcomp = rcomp_;
line.cur->jump(); line.cur->jump();
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) { if (line.kbuf) {
lines.push(line); lines.push(line);
} else { } else {
delete line.cur; delete line.cur;
} }
} }
char* lkbuf = NULL; char* lkbuf = NULL;
size_t lksiz = 0; size_t lksiz = 0;
Values values; Values values;
while (!err && !lines.empty()) { while (!err && !lines.empty()) {
MergeLine line = lines.top(); MergeLine line = lines.top();
lines.pop(); lines.pop();
if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks iz))) { if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks iz))) {
if (!call_reducer(lkbuf, lksiz, values)) err = true; if (!call_reducer(lkbuf, lksiz, values)) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer fail
ed");
err = true;
}
values.clear(); values.clear();
} }
values.push_back(std::string(line.vbuf, line.vsiz));
delete[] lkbuf; delete[] lkbuf;
lkbuf = line.kbuf; lkbuf = line.kbuf;
lksiz = line.ksiz; lksiz = line.ksiz;
values.push_back(std::string(line.vbuf, line.vsiz));
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) { if (line.kbuf) {
lines.push(line); lines.push(line);
} else { } else {
delete line.cur; delete line.cur;
} }
} }
if (lkbuf) { if (lkbuf) {
if (!err && !call_reducer(lkbuf, lksiz, values)) err = true; if (!err && !call_reducer(lkbuf, lksiz, values)) {
values.clear(); db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed
");
err = true;
}
delete[] lkbuf; delete[] lkbuf;
} }
while (!lines.empty()) { while (!lines.empty()) {
MergeLine line = lines.top(); MergeLine line = lines.top();
lines.pop(); lines.pop();
delete[] line.kbuf; delete[] line.kbuf;
delete line.cur; delete line.cur;
} }
if (redtasks_) redtasks_->finish();
double etime = time(); double etime = time();
if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s time)) err = true; if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s time)) err = true;
return !err; return !err;
} }
/** /**
* Call the reducer. * Call the reducer.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param values a vector of the values. * @param values a vector of the values.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) {
_assert_(kbuf && ksiz <= MEMMAXSIZ); _assert_(kbuf && ksiz <= MEMMAXSIZ);
if (redtasks_) {
if (redaborted_) return false;
ReduceTaskQueue::ReduceTask* task =
new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values);
redtasks_->add_task(task);
return true;
}
bool err = false; bool err = false;
ValueIterator iter(values.begin(), values.end()); ValueIterator iter(values.begin(), values.end());
if (!reduce(kbuf, ksiz, &iter)) err = true; if (!reduce(kbuf, ksiz, &iter)) err = true;
return !err; return !err;
} }
/** Dummy constructor to forbid the use. */ /** Dummy constructor to forbid the use. */
MapReduce(const MapReduce&); MapReduce(const MapReduce&);
/** Dummy Operator to forbid the use. */ /** Dummy Operator to forbid the use. */
MapReduce& operator =(const MapReduce&); MapReduce& operator =(const MapReduce&);
/** The internal database. */ /** The internal database. */
BasicDB* db_; BasicDB* db_;
/** The record comparator. */ /** The record comparator. */
Comparator* rcomp_; Comparator* rcomp_;
/** The temporary databases. */ /** The temporary databases. */
BasicDB** tmpdbs_; BasicDB** tmpdbs_;
/** The number of temporary databases. */ /** The number of temporary databases. */
size_t dbnum_; size_t dbnum_;
/** The logical clock for temporary databases. */ /** The logical clock for temporary databases. */
int64_t dbclock_; int64_t dbclock_;
/** The number of the mapper threads. */
size_t mapthnum_;
/** The number of the reducer threads. */
size_t redthnum_;
/** The cache for emitter. */ /** The cache for emitter. */
TinyHashMap* cache_; TinyHashMap* cache_;
/** The current size of the cache for emitter. */ /** The current size of the cache for emitter. */
int64_t csiz_; int64_t csiz_;
/** The limit size of the cache for emitter. */ /** The limit size of the cache for emitter. */
int64_t clim_; int64_t clim_;
/** The bucket number of the cache for emitter. */ /** The bucket number of the cache for emitter. */
int64_t cbnum_; int64_t cbnum_;
/** The task queue for parallel reducer. */
TaskQueue* redtasks_;
/** The flag whether aborted. */
bool redaborted_;
/** The whole lock. */
Mutex* lock_;
}; };
/** /**
* Index database. * Index database.
* @note This class is designed to implement an indexing storage with an ef ficient appending * @note This class is designed to implement an indexing storage with an ef ficient appending
* operation for the existing record values. This class is a wrapper of th e polymorphic * operation for the existing record values. This class is a wrapper of th e polymorphic
* database, featuring buffering mechanism to alleviate IO overhead in the database layer. This * database, featuring buffering mechanism to alleviate IO overhead in the database layer. This
* class can be inherited but overwriting methods is forbidden. Before eve ry database operation, * class can be inherited but overwriting methods is forbidden. Before eve ry database operation,
* it is necessary to call the IndexDB::open method in order to open a data base file and connect * it is necessary to call the IndexDB::open method in order to open a data base file and connect
* the database object to it. To avoid data missing or corruption, it is i mportant to close * the database object to it. To avoid data missing or corruption, it is i mportant to close
 End of changes. 21 change blocks. 
16 lines changed or deleted 134 lines changed or added


 kcdirdb.h   kcdirdb.h 
skipping to change at line 530 skipping to change at line 530
set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false; return false;
} }
ScopedVisitor svis(visitor); ScopedVisitor svis(visitor);
bool err = false; bool err = false;
if (!iterate_impl(visitor, checker)) err = true; if (!iterate_impl(visitor, checker)) err = true;
trigger_meta(MetaTrigger::ITERATE, "iterate"); trigger_meta(MetaTrigger::ITERATE, "iterate");
return !err; return !err;
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check
er = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 0;
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
ScopedVisitor svis(visitor);
rlock_.lock_reader_all();
bool err = false;
if (!scan_parallel_impl(visitor, thnum, checker)) err = true;
rlock_.unlock_all();
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return !err;
}
/**
* Get the last happened error. * Get the last happened error.
* @return the last happened error. * @return the last happened error.
*/ */
Error error() const { Error error() const {
_assert_(true); _assert_(true);
return error_; return error_;
} }
/** /**
* Set the error information. * Set the error information.
* @param file the file name of the program source code. * @param file the file name of the program source code.
skipping to change at line 2030 skipping to change at line 2057
return !err; return !err;
} }
/** /**
* Iterate to accept a visitor for each record. * Iterate to accept a visitor for each record.
* @param visitor a visitor object. * @param visitor a visitor object.
* @param checker a progress checker object. * @param checker a progress checker object.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool iterate_impl(Visitor* visitor, ProgressChecker* checker) { bool iterate_impl(Visitor* visitor, ProgressChecker* checker) {
_assert_(visitor); _assert_(visitor);
DirStream dir;
if (!dir.open(path_)) {
set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed");
return false;
}
int64_t allcnt = count_; int64_t allcnt = count_;
if (checker && !checker->check("iterate", "beginning", 0, allcnt)) { if (checker && !checker->check("iterate", "beginning", 0, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
DirStream dir;
if (!dir.open(path_)) {
set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed");
return false;
}
bool err = false; bool err = false;
std::string name; std::string name;
int64_t curcnt = 0; int64_t curcnt = 0;
while (dir.read(&name)) { while (dir.read(&name)) {
if (*name.c_str() == *KCDDBMAGICFILE) continue; if (*name.c_str() == *KCDDBMAGICFILE) continue;
const std::string& rpath = path_ + File::PATHCHR + name; const std::string& rpath = path_ + File::PATHCHR + name;
Record rec; Record rec;
if (read_record(rpath, &rec)) { if (read_record(rpath, &rec)) {
if (!accept_visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, rec. rsiz, if (!accept_visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, rec. rsiz,
visitor, rpath, name.c_str())) err = true; visitor, rpath, name.c_str())) err = true;
skipping to change at line 2062 skipping to change at line 2089
set_error(_KCCODELINE_, Error::BROKEN, "missing record"); set_error(_KCCODELINE_, Error::BROKEN, "missing record");
err = true; err = true;
} }
curcnt++; curcnt++;
if (checker && !checker->check("iterate", "processing", curcnt, allcn t)) { if (checker && !checker->check("iterate", "processing", curcnt, allcn t)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true; err = true;
break; break;
} }
} }
if (!dir.close()) {
set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed");
err = true;
}
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true; err = true;
} }
return !err;
}
/**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object.
* @return true on success, or false on failure.
*/
bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker*
checker) {
assert(visitor && thnum <= MEMMAXSIZ);
int64_t allcnt = count_;
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt
)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
DirStream dir;
if (!dir.open(path_)) {
set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed");
return false;
}
class ThreadImpl : public Thread {
public:
explicit ThreadImpl() :
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0),
dir_(NULL), itmtx_(NULL), error_() {}
void init(DirDB* db, Visitor* visitor, ProgressChecker* checker, int6
4_t allcnt,
DirStream* dir, Mutex* itmtx) {
db_ = db;
visitor_ = visitor;
checker_ = checker;
allcnt_ = allcnt;
dir_ = dir;
itmtx_ = itmtx;
}
const Error& error() {
return error_;
}
private:
void run() {
DirDB* db = db_;
Visitor* visitor = visitor_;
ProgressChecker* checker = checker_;
int64_t allcnt = allcnt_;
DirStream* dir = dir_;
Mutex* itmtx = itmtx_;
const std::string& path = db->path_;
while (true) {
itmtx->lock();
std::string name;
if (!dir->read(&name)) {
itmtx->unlock();
break;
}
itmtx->unlock();
if (*name.c_str() == *KCDDBMAGICFILE) continue;
const std::string& rpath = path + File::PATHCHR + name;
Record rec;
if (db->read_record(rpath, &rec)) {
size_t vsiz;
visitor->visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, &vs
iz);
delete[] rec.rbuf;
} else {
error_ = db->error();
break;
}
if (checker && !checker->check("scan_parallel", "processing", -1,
allcnt)) {
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
error_ = db->error();
break;
}
}
}
DirDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
DirStream* dir_;
Mutex* itmtx_;
Error error_;
};
bool err = false;
Mutex itmtx;
ThreadImpl* threads = new ThreadImpl[thnum];
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->init(this, visitor, checker, allcnt, &dir, &itmtx);
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->start();
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->join();
if (thread->error() != Error::SUCCESS) {
*error_ = thread->error();
err = true;
}
}
delete[] threads;
if (!dir.close()) { if (!dir.close()) {
set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed"); set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed");
err = true; err = true;
} }
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt))
{
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true;
}
return !err; return !err;
} }
/** /**
* Synchronize updated contents with the file and the device. * Synchronize updated contents with the file and the device.
* @param hard true for physical synchronization with the device, or fals e for logical * @param hard true for physical synchronization with the device, or fals e for logical
* synchronization with the file system. * synchronization with the file system.
* @param proc a postprocessor object. * @param proc a postprocessor object.
* @param checker a progress checker object. * @param checker a progress checker object.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
 End of changes. 6 change blocks. 
5 lines changed or deleted 151 lines changed or added


 kchashdb.h   kchashdb.h 
skipping to change at line 723 skipping to change at line 723
return false; return false;
} }
} }
ScopedVisitor svis(visitor); ScopedVisitor svis(visitor);
bool err = false; bool err = false;
if (!iterate_impl(visitor, checker)) err = true; if (!iterate_impl(visitor, checker)) err = true;
trigger_meta(MetaTrigger::ITERATE, "iterate"); trigger_meta(MetaTrigger::ITERATE, "iterate");
return !err; return !err;
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check
er = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 1;
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
if ((int64_t)thnum > bnum_) thnum = bnum_;
ScopedVisitor svis(visitor);
rlock_.lock_reader_all();
bool err = false;
if (!scan_parallel_impl(visitor, thnum, checker)) err = true;
rlock_.unlock_all();
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return !err;
}
/**
* Get the last happened error. * Get the last happened error.
* @return the last happened error. * @return the last happened error.
*/ */
Error error() const { Error error() const {
_assert_(true); _assert_(true);
return error_; return error_;
} }
/** /**
* Set the error information. * Set the error information.
* @param file the file name of the program source code. * @param file the file name of the program source code.
skipping to change at line 2215 skipping to change at line 2243
} }
} }
} }
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
return true; return true;
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object.
* @return true on success, or false on failure.
*/
bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker*
checker) {
assert(visitor && thnum <= MEMMAXSIZ);
int64_t allcnt = count_;
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt
)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
bool err = false;
std::vector<int64_t> offs;
int64_t bnum = bnum_;
size_t cap = (thnum + 1) * INT8MAX;
for (int64_t bidx = 0; bidx < bnum; bidx++) {
int64_t off = get_bucket(bidx);
if (off > 0) {
offs.push_back(off);
if (offs.size() >= cap) break;
}
}
if (!offs.empty()) {
std::sort(offs.begin(), offs.end());
if (thnum > offs.size()) thnum = offs.size();
class ThreadImpl : public Thread {
public:
explicit ThreadImpl() :
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0),
begoff_(0), endoff_(0), error_() {}
void init(HashDB* db, Visitor* visitor, ProgressChecker* checker, i
nt64_t allcnt,
int64_t begoff, int64_t endoff) {
db_ = db;
visitor_ = visitor;
checker_ = checker;
allcnt_ = allcnt;
begoff_ = begoff;
endoff_ = endoff;
}
const Error& error() {
return error_;
}
private:
void run() {
HashDB* db = db_;
Visitor* visitor = visitor_;
ProgressChecker* checker = checker_;
int64_t off = begoff_;
int64_t end = endoff_;
int64_t allcnt = allcnt_;
Compressor* comp = db->comp_;
Record rec;
char rbuf[RECBUFSIZ];
while (off > 0 && off < end) {
rec.off = off;
if (!db->read_record(&rec, rbuf)) {
error_ = db->error();
break;
}
if (rec.psiz == UINT16MAX) {
off += rec.rsiz;
} else {
if (!rec.vbuf && !db->read_record_body(&rec)) {
delete[] rec.bbuf;
error_ = db->error();
break;
}
const char* vbuf = rec.vbuf;
size_t vsiz = rec.vsiz;
char* zbuf = NULL;
size_t zsiz = 0;
if (comp) {
zbuf = comp->decompress(vbuf, vsiz, &zsiz);
if (!zbuf) {
db->set_error(_KCCODELINE_, Error::SYSTEM, "data decompre
ssion failed");
delete[] rec.bbuf;
error_ = db->error();
break;
}
vbuf = zbuf;
vsiz = zsiz;
}
visitor->visit_full(rec.kbuf, rec.ksiz, vbuf, vsiz, &vsiz);
delete[] zbuf;
delete[] rec.bbuf;
off += rec.rsiz;
if (checker && !checker->check("scan_parallel", "processing",
-1, allcnt)) {
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed")
;
error_ = db->error();
break;
}
}
}
}
HashDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
int64_t begoff_;
int64_t endoff_;
Error error_;
};
ThreadImpl* threads = new ThreadImpl[thnum];
double range = (double)offs.size() / thnum;
for (size_t i = 0; i < thnum; i++) {
int64_t cidx = i * range;
int64_t nidx = (i + 1) * range;
int64_t begoff = i < 1 ? roff_ : offs[cidx];
int64_t endoff = i < thnum - 1 ? offs[nidx] : (int64_t)lsiz_;
ThreadImpl* thread = threads + i;
thread->init(this, visitor, checker, allcnt, begoff, endoff);
thread->start();
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->join();
if (thread->error() != Error::SUCCESS) {
*error_ = thread->error();
err = true;
}
}
delete[] threads;
}
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt))
{
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true;
}
return !err;
}
/**
* Synchronize updated contents with the file and the device. * Synchronize updated contents with the file and the device.
* @param hard true for physical synchronization with the device, or fals e for logical * @param hard true for physical synchronization with the device, or fals e for logical
* synchronization with the file system. * synchronization with the file system.
* @param proc a postprocessor object. * @param proc a postprocessor object.
* @param checker a progress checker object. * @param checker a progress checker object.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* ch ecker) { bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* ch ecker) {
_assert_(true); _assert_(true);
bool err = false; bool err = false;
 End of changes. 2 change blocks. 
0 lines changed or deleted 171 lines changed or added


 kclangc.h   kclangc.h 
skipping to change at line 383 skipping to change at line 383
KCVISITFULL fullproc, KCVISITEMPTY emptyproc, KCVISITFULL fullproc, KCVISITEMPTY emptyproc,
void* opq, int32_t writable); void* opq, int32_t writable);
/** /**
* Iterate to accept a visitor for each record. * Iterate to accept a visitor for each record.
* @param db a database object. * @param db a database object.
* @param fullproc a call back function to visit a record. * @param fullproc a call back function to visit a record.
* @param opq an opaque pointer to be given to the call back function. * @param opq an opaque pointer to be given to the call back function.
* @param writable true for writable operation, or false for read-only oper ation. * @param writable true for writable operation, or false for read-only oper ation.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note The whole iteration is performed atomically and other threads are * @note The whole iteration is performed atomically and other threads are
blocked. blocked. To avoid
* deadlock, any explicit database operation must not be performed in this
function.
*/ */
int32_t kcdbiterate(KCDB* db, KCVISITFULL fullproc, void* opq, int32_t writ able); int32_t kcdbiterate(KCDB* db, KCVISITFULL fullproc, void* opq, int32_t writ able);
/** /**
* Scan each record in parallel.
* @param db a database object.
* @param fullproc a call back function to visit a record.
* @param opq an opaque pointer to be given to the call back function.
* @param thnum the number of worker threads.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones. T
he return value of
* the visitor is just ignored. To avoid deadlock, any explicit database o
peration must not
* be performed in this function.
*/
int32_t kcdbscanpara(KCDB* db, KCVISITFULL fullproc, void* opq, size_t thnu
m);
/**
* Set the value of a record. * Set the value of a record.
* @param db a database object. * @param db a database object.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param vbuf the pointer to the value region. * @param vbuf the pointer to the value region.
* @param vsiz the size of the value region. * @param vsiz the size of the value region.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note If no record corresponds to the key, a new record is created. If the corresponding * @note If no record corresponds to the key, a new record is created. If the corresponding
* record exists, the value is overwritten. * record exists, the value is overwritten.
*/ */
 End of changes. 2 change blocks. 
2 lines changed or deleted 20 lines changed or added


 kcplantdb.h   kcplantdb.h 
skipping to change at line 1262 skipping to change at line 1262
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true; err = true;
} }
if (atran && !commit_transaction()) err = true; if (atran && !commit_transaction()) err = true;
if (autosync_ && !autotran_ && writable && !fix_auto_synchronization()) err = true; if (autosync_ && !autotran_ && writable && !fix_auto_synchronization()) err = true;
trigger_meta(MetaTrigger::ITERATE, "iterate"); trigger_meta(MetaTrigger::ITERATE, "iterate");
return !err; return !err;
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check
er = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 0;
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
bool err = false;
if (writer_) {
if (checker && !checker->check("scan_parallel", "cleaning the leaf no
de cache", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!clean_leaf_cache()) err = true;
}
ScopedVisitor svis(visitor);
int64_t allcnt = count_;
if (checker && !checker->check("scan_parallel", "beginning", 0, allcnt)
) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
class ProgressCheckerImpl : public ProgressChecker {
public:
explicit ProgressCheckerImpl() : ok_(1) {}
void stop() {
ok_.set(0);
}
private:
bool check(const char* name, const char* message, int64_t curcnt, int
64_t allcnt) {
return ok_ > 0;
}
AtomicInt64 ok_;
};
ProgressCheckerImpl ichecker;
class VisitorImpl : public Visitor {
public:
explicit VisitorImpl(PlantDB* db, Visitor* visitor,
ProgressChecker* checker, int64_t allcnt,
ProgressCheckerImpl* ichecker) :
db_(db), visitor_(visitor), checker_(checker), allcnt_(allcnt),
ichecker_(ichecker), error_() {}
const Error& error() {
return error_;
}
private:
const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) {
if (ksiz < 2 || ksiz >= NUMBUFSIZ || kbuf[0] != LNPREFIX) return NO
P;
uint64_t prev;
size_t step = readvarnum(vbuf, vsiz, &prev);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
uint64_t next;
step = readvarnum(vbuf, vsiz, &next);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
while (vsiz > 1) {
uint64_t rksiz;
step = readvarnum(vbuf, vsiz, &rksiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
uint64_t rvsiz;
step = readvarnum(vbuf, vsiz, &rvsiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
if (vsiz < rksiz + rvsiz) break;
size_t xvsiz;
visitor_->visit_full(vbuf, rksiz, vbuf + rksiz, rvsiz, &xvsiz);
vbuf += rksiz;
vsiz -= rksiz;
vbuf += rvsiz;
vsiz -= rvsiz;
if (checker_ && !checker_->check("scan_parallel", "processing", -
1, allcnt_)) {
db_->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
error_ = db_->error();
ichecker_->stop();
break;
}
}
return NOP;
}
PlantDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
ProgressCheckerImpl* ichecker_;
Error error_;
};
VisitorImpl ivisitor(this, visitor, checker, allcnt, &ichecker);
if (!db_.scan_parallel(&ivisitor, thnum, &ichecker)) err = true;
if (ivisitor.error() != Error::SUCCESS) {
const Error& e = ivisitor.error();
db_.set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt))
{
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true;
}
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return !err;
}
/**
* Get the last happened error. * Get the last happened error.
* @return the last happened error. * @return the last happened error.
*/ */
Error error() const { Error error() const {
_assert_(true); _assert_(true);
return db_.error(); return db_.error();
} }
/** /**
* Set the error information. * Set the error information.
* @param file the file name of the program source code. * @param file the file name of the program source code.
 End of changes. 1 change blocks. 
0 lines changed or deleted 128 lines changed or added


 kcpolydb.h   kcpolydb.h 
skipping to change at line 322 skipping to change at line 322
*/ */
bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* che cker = NULL) { bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* che cker = NULL) {
_assert_(visitor); _assert_(visitor);
if (type_ == TYPEVOID) { if (type_ == TYPEVOID) {
set_error(_KCCODELINE_, Error::INVALID, "not opened"); set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false; return false;
} }
return db_->iterate(visitor, writable, checker); return db_->iterate(visitor, writable, checker);
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check
er = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
if (type_ == TYPEVOID) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
return db_->scan_parallel(visitor, thnum, checker);
}
/**
* Get the last happened error. * Get the last happened error.
* @return the last happened error. * @return the last happened error.
*/ */
Error error() const { Error error() const {
_assert_(true); _assert_(true);
if (type_ == TYPEVOID) return error_; if (type_ == TYPEVOID) return error_;
return db_->error(); return db_->error();
} }
/** /**
* Set the error information. * Set the error information.
 End of changes. 1 change blocks. 
0 lines changed or deleted 22 lines changed or added


 kcprotodb.h   kcprotodb.h 
skipping to change at line 592 skipping to change at line 592
} }
} }
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
trigger_meta(MetaTrigger::ITERATE, "iterate"); trigger_meta(MetaTrigger::ITERATE, "iterate");
return true; return true;
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check
er = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 1;
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
ScopedVisitor svis(visitor);
int64_t allcnt = recs_.size();
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt
)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
class ThreadImpl : public Thread {
public:
explicit ThreadImpl() :
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0),
itp_(NULL), itend_(), itmtx_(NULL), error_() {}
void init(ProtoDB* db, Visitor* visitor, ProgressChecker* checker, in
t64_t allcnt,
typename STRMAP::const_iterator* itp, typename STRMAP::cons
t_iterator itend,
Mutex* itmtx) {
db_ = db;
visitor_ = visitor;
checker_ = checker;
allcnt_ = allcnt;
itp_ = itp;
itend_ = itend;
itmtx_ = itmtx;
}
const Error& error() {
return error_;
}
private:
void run() {
ProtoDB* db = db_;
Visitor* visitor = visitor_;
ProgressChecker* checker = checker_;
int64_t allcnt = allcnt_;
typename STRMAP::const_iterator* itp = itp_;
typename STRMAP::const_iterator itend = itend_;
Mutex* itmtx = itmtx_;
while (true) {
itmtx->lock();
if (*itp == itend) {
itmtx->unlock();
break;
}
const std::string& key = (*itp)->first;
const std::string& value = (*itp)->second;
++(*itp);
itmtx->unlock();
size_t vsiz;
visitor->visit_full(key.data(), key.size(), value.data(), value.s
ize(), &vsiz);
if (checker && !checker->check("scan_parallel", "processing", -1,
allcnt)) {
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
error_ = db->error();
break;
}
}
}
ProtoDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
typename STRMAP::const_iterator* itp_;
typename STRMAP::const_iterator itend_;
Mutex* itmtx_;
Error error_;
};
bool err = false;
typename STRMAP::const_iterator it = recs_.begin();
typename STRMAP::const_iterator itend = recs_.end();
Mutex itmtx;
ThreadImpl* threads = new ThreadImpl[thnum];
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->init(this, visitor, checker, allcnt, &it, itend, &itmtx);
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->start();
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->join();
if (thread->error() != Error::SUCCESS) {
*error_ = thread->error();
err = true;
}
}
delete[] threads;
if (err) return false;
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt))
{
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return true;
}
/**
* Get the last happened error. * Get the last happened error.
* @return the last happened error. * @return the last happened error.
*/ */
Error error() const { Error error() const {
_assert_(true); _assert_(true);
return error_; return error_;
} }
/** /**
* Set the error information. * Set the error information.
* @param file the file name of the program source code. * @param file the file name of the program source code.
 End of changes. 1 change blocks. 
0 lines changed or deleted 121 lines changed or added


 kcstashdb.h   kcstashdb.h 
skipping to change at line 488 skipping to change at line 488
} }
} }
if (checker && !checker->check("iterate", "ending", -1, allcnt)) { if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
trigger_meta(MetaTrigger::ITERATE, "iterate"); trigger_meta(MetaTrigger::ITERATE, "iterate");
return true; return true;
} }
/** /**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking
is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones.
The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database
operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check
er = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 1;
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
if (thnum > bnum_) thnum = bnum_;
ScopedVisitor svis(visitor);
int64_t allcnt = count_;
if (checker && !checker->check("scan_parallel", "beginning", 0, allcnt)
) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
class ThreadImpl : public Thread {
public:
explicit ThreadImpl() :
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0),
begidx_(0), endidx_(0), error_() {}
void init(StashDB* db, Visitor* visitor, ProgressChecker* checker, in
t64_t allcnt,
size_t begidx, size_t endidx) {
db_ = db;
visitor_ = visitor;
checker_ = checker;
allcnt_ = allcnt;
begidx_ = begidx;
endidx_ = endidx;
}
const Error& error() {
return error_;
}
private:
void run() {
StashDB* db = db_;
Visitor* visitor = visitor_;
ProgressChecker* checker = checker_;
int64_t allcnt = allcnt_;
size_t endidx = endidx_;
char** buckets = db->buckets_;
for (size_t i = begidx_; i < endidx; i++) {
char* rbuf = buckets[i];
while (rbuf) {
Record rec(rbuf);
rbuf = rec.child_;
size_t vsiz;
visitor->visit_full(rec.kbuf_, rec.ksiz_, rec.vbuf_, rec.vsiz_,
&vsiz);
if (checker && !checker->check("scan_parallel", "processing", -
1, allcnt)) {
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
error_ = db->error();
break;
}
}
}
}
StashDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
size_t begidx_;
size_t endidx_;
Error error_;
};
bool err = false;
rlock_.lock_reader_all();
ThreadImpl* threads = new ThreadImpl[thnum];
double range = (double)bnum_ / thnum;
for (size_t i = 0; i < thnum; i++) {
size_t cidx = i * range;
size_t nidx = (i + 1) * range;
if (i < 1) cidx = 0;
if (i >= thnum - 1) nidx = bnum_;
ThreadImpl* thread = threads + i;
thread->init(this, visitor, checker, allcnt, cidx, nidx);
thread->start();
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->join();
if (thread->error() != Error::SUCCESS) {
*error_ = thread->error();
err = true;
}
}
delete[] threads;
rlock_.unlock_all();
if (err) return false;
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt))
{
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return true;
}
/**
* Get the last happened error. * Get the last happened error.
* @return the last happened error. * @return the last happened error.
*/ */
Error error() const { Error error() const {
_assert_(true); _assert_(true);
return error_; return error_;
} }
/** /**
* Set the error information. * Set the error information.
* @param file the file name of the program source code. * @param file the file name of the program source code.
 End of changes. 1 change blocks. 
0 lines changed or deleted 114 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/