kcdbext.h   kcdbext.h 
skipping to change at line 46 skipping to change at line 46
/** /**
* MapReduce framework. * MapReduce framework.
* @note Although this framework is not distributed or concurrent, it is us eful for aggregate * @note Although this framework is not distributed or concurrent, it is us eful for aggregate
* calculation with less CPU loading and less memory usage. * calculation with less CPU loading and less memory usage.
*/ */
class MapReduce { class MapReduce {
public: public:
class ValueIterator; class ValueIterator;
private: private:
class FlushThread;
class ReduceTaskQueue;
class MapVisitor; class MapVisitor;
struct MergeLine; struct MergeLine;
/** An alias of vector of loaded values. */ /** An alias of vector of loaded values. */
typedef std::vector<std::string> Values; typedef std::vector<std::string> Values;
/** The default number of temporary databases. */ /** The default number of temporary databases. */
static const size_t DEFDBNUM = 8; static const size_t DEFDBNUM = 8;
/** The maxinum number of temporary databases. */ /** The maxinum number of temporary databases. */
static const size_t MAXDBNUM = 256; static const size_t MAXDBNUM = 256;
/** The default cache limit. */ /** The default cache limit. */
static const int64_t DEFCLIM = 512LL << 20; static const int64_t DEFCLIM = 512LL << 20;
skipping to change at line 68 skipping to change at line 70
/** The bucket number of temprary databases. */ /** The bucket number of temprary databases. */
static const int64_t DBBNUM = 512LL << 10; static const int64_t DBBNUM = 512LL << 10;
/** The page size of temprary databases. */ /** The page size of temprary databases. */
static const int32_t DBPSIZ = 32768; static const int32_t DBPSIZ = 32768;
/** The mapped size of temprary databases. */ /** The mapped size of temprary databases. */
static const int64_t DBMSIZ = 516LL * 4096; static const int64_t DBMSIZ = 516LL * 4096;
/** The page cache capacity of temprary databases. */ /** The page cache capacity of temprary databases. */
static const int64_t DBPCCAP = 16LL << 20; static const int64_t DBPCCAP = 16LL << 20;
/** The default number of threads in parallel mode. */ /** The default number of threads in parallel mode. */
static const size_t DEFTHNUM = 8; static const size_t DEFTHNUM = 8;
/** The number of slots of the record lock. */
static const int32_t RLOCKSLOT = 256;
public: public:
/** /**
* Value iterator for the reducer. * Value iterator for the reducer.
*/ */
class ValueIterator { class ValueIterator {
friend class MapReduce; friend class MapReduce;
public: public:
/** /**
* Get the next value. * Get the next value.
* @param sp the pointer to the variable into which the size of the reg ion of the return * @param sp the pointer to the variable into which the size of the reg ion of the return
skipping to change at line 134 skipping to change at line 138
/** The size of the current value. */ /** The size of the current value. */
size_t vsiz_; size_t vsiz_;
}; };
/** /**
* Execution options. * Execution options.
*/ */
enum Option { enum Option {
XNOLOCK = 1 << 0, ///< avoid locking against update operations XNOLOCK = 1 << 0, ///< avoid locking against update operations
XPARAMAP = 1 << 1, ///< run mappers in parallel XPARAMAP = 1 << 1, ///< run mappers in parallel
XPARARED = 1 << 2, ///< run reducers in parallel XPARARED = 1 << 2, ///< run reducers in parallel
XPARAFLS = 1 << 3, ///< run cache flushers in paralle l
XNOCOMP = 1 << 8 ///< avoid compression of temporar y databases XNOCOMP = 1 << 8 ///< avoid compression of temporar y databases
}; };
/** /**
* Default constructor. * Default constructor.
*/ */
explicit MapReduce() : explicit MapReduce() :
db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0) , db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0) ,
mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM),
cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NUL
redtasks_(NULL), redaborted_(false), lock_(NULL) { L),
redtasks_(NULL), redaborted_(false), rlocks_(NULL) {
_assert_(true); _assert_(true);
} }
/** /**
* Destructor. * Destructor.
*/ */
virtual ~MapReduce() { virtual ~MapReduce() {
_assert_(true); _assert_(true);
} }
/** /**
* Map a record data. * Map a record data.
skipping to change at line 315 skipping to change at line 320
err = true; err = true;
if (err) { if (err) {
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
delete tmpdbs_[i]; delete tmpdbs_[i];
} }
delete[] tmpdbs_; delete[] tmpdbs_;
return false; return false;
} }
} }
if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; if (opts & XPARARED) redtasks_ = new ReduceTaskQueue;
if (opts & XPARAFLS) flsths_ = new std::deque<FlushThread*>;
if (opts & XNOLOCK) { if (opts & XNOLOCK) {
MapChecker mapchecker; MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count()); MapVisitor mapvisitor(this, &mapchecker, db->count());
mapvisitor.visit_before(); mapvisitor.visit_before();
if (!err) { if (!err) {
BasicDB::Cursor* cur = db->cursor(); BasicDB::Cursor* cur = db->cursor();
if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr ue; if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr ue;
while (!err) { while (!err) {
if (!cur->accept(&mapvisitor, false, true)) { if (!cur->accept(&mapvisitor, false, true)) {
if (cur->error() != BasicDB::Error::NOREC) err = true; if (cur->error() != BasicDB::Error::NOREC) err = true;
skipping to change at line 338 skipping to change at line 344
delete cur; delete cur;
} }
if (mapvisitor.error()) { if (mapvisitor.error()) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" ); db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" );
err = true; err = true;
} }
mapvisitor.visit_after(); mapvisitor.visit_after();
} else if (opts & XPARAMAP) { } else if (opts & XPARAMAP) {
MapChecker mapchecker; MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count()); MapVisitor mapvisitor(this, &mapchecker, db->count());
lock_ = new Mutex(); rlocks_ = new SlottedMutex(RLOCKSLOT);
if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) { if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" ); db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" );
err = true; err = true;
} }
delete lock_; delete rlocks_;
lock_ = NULL; rlocks_ = NULL;
if (mapvisitor.error()) err = true; if (mapvisitor.error()) err = true;
} else { } else {
MapChecker mapchecker; MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count()); MapVisitor mapvisitor(this, &mapchecker, db->count());
if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ; if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ;
if (mapvisitor.error()) { if (mapvisitor.error()) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" ); db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" );
err = true; err = true;
} }
} }
if (flsths_) {
delete flsths_;
flsths_ = NULL;
}
if (redtasks_) { if (redtasks_) {
delete redtasks_; delete redtasks_;
redtasks_ = NULL; redtasks_ = NULL;
} }
if (!logf("clean", "closing the temporary databases")) err = true; if (!logf("clean", "closing the temporary databases")) err = true;
stime = time(); stime = time();
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
assert(tmpdbs_[i]);
const std::string& path = tmpdbs_[i]->path(); const std::string& path = tmpdbs_[i]->path();
if (!tmpdbs_[i]->clear()) { if (!tmpdbs_[i]->clear()) {
const BasicDB::Error& e = tmpdbs_[i]->error(); const BasicDB::Error& e = tmpdbs_[i]->error();
db->set_error(_KCCODELINE_, e.code(), e.message()); db->set_error(_KCCODELINE_, e.code(), e.message());
err = true; err = true;
} }
if (!tmpdbs_[i]->close()) { if (!tmpdbs_[i]->close()) {
const BasicDB::Error& e = tmpdbs_[i]->error(); const BasicDB::Error& e = tmpdbs_[i]->error();
db->set_error(_KCCODELINE_, e.code(), e.message()); db->set_error(_KCCODELINE_, e.code(), e.message());
err = true; err = true;
skipping to change at line 401 skipping to change at line 410
dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM;
if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM;
clim_ = clim > 0 ? clim : DEFCLIM; clim_ = clim > 0 ? clim : DEFCLIM;
cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM;
if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_);
} }
/** /**
* Set the thread configurations. * Set the thread configurations.
* @param mapthnum the number of threads for the mapper. * @param mapthnum the number of threads for the mapper.
* @param redthnum the number of threads for the reducer. * @param redthnum the number of threads for the reducer.
* @param flsthnum the number of threads for the internal flusher.
*/ */
void tune_thread(int32_t mapthnum, int32_t redthnum) { void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) {
_assert_(true); _assert_(true);
mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM;
redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM;
flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM;
} }
protected: protected:
/** /**
* Emit a record from the mapper. * Emit a record from the mapper.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param vbuf the pointer to the value region. * @param vbuf the pointer to the value region.
* @param vsiz the size of the value region. * @param vsiz the size of the value region.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
bool err = false; bool err = false;
size_t rsiz = sizevarnum(vsiz) + vsiz; size_t rsiz = sizevarnum(vsiz) + vsiz;
char stack[NUMBUFSIZ*4]; char stack[NUMBUFSIZ*4];
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
char* wp = rbuf; char* wp = rbuf;
wp += writevarnum(rbuf, vsiz); wp += writevarnum(rbuf, vsiz);
std::memcpy(wp, vbuf, vsiz); std::memcpy(wp, vbuf, vsiz);
if (lock_) { if (rlocks_) {
lock_->lock(); size_t bidx = TinyHashMap::hash_record(kbuf, ksiz) % cbnum_;
size_t lidx = bidx % RLOCKSLOT;
rlocks_->lock(lidx);
cache_->append(kbuf, ksiz, rbuf, rsiz); cache_->append(kbuf, ksiz, rbuf, rsiz);
lock_->unlock(); rlocks_->unlock(lidx);
} else { } else {
cache_->append(kbuf, ksiz, rbuf, rsiz); cache_->append(kbuf, ksiz, rbuf, rsiz);
} }
if (rbuf != stack) delete[] rbuf; if (rbuf != stack) delete[] rbuf;
csiz_ += rsiz; csiz_ += rsiz;
return !err; return !err;
} }
private: private:
/** /**
* Cache flusher.
*/
class FlushThread : public Thread {
public:
/** constructor */
explicit FlushThread(MapReduce* mr, BasicDB* tmpdb,
TinyHashMap* cache, size_t csiz, bool cown) :
mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), er
r_(false) {}
/** perform the concrete process */
void run() {
if (!mr_->logf("map", "started to flushing the cache: count=%lld size
=%lld",
(long long)cache_->count(), (long long)csiz_)) err_ =
true;
double stime = time();
BasicDB* tmpdb = tmpdb_;
TinyHashMap* cache = cache_;
bool cown = cown_;
TinyHashMap::Sorter sorter(cache);
const char* kbuf, *vbuf;
size_t ksiz, vsiz;
while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) {
const BasicDB::Error& e = tmpdb->error();
mr_->db_->set_error(_KCCODELINE_, e.code(), e.message());
err_ = true;
}
sorter.step();
if (cown) cache->remove(kbuf, ksiz);
}
double etime = time();
if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime
- stime))
err_ = true;
if (cown) delete cache;
}
/** check the error flag. */
bool error() {
return err_;
}
private:
MapReduce* mr_; ///< driver
BasicDB* tmpdb_; ///< temprary database
TinyHashMap* cache_; ///< cache for emitter
size_t csiz_; ///< current cache size
bool cown_; ///< cache ownership flag
bool err_; ///< error flag
};
/**
* Task queue for parallel reducer. * Task queue for parallel reducer.
*/ */
class ReduceTaskQueue : public TaskQueue { class ReduceTaskQueue : public TaskQueue {
public: public:
/** /**
* Task for parallel reducer. * Task for parallel reducer.
*/ */
class ReduceTask : public Task { class ReduceTask : public Task {
friend class ReduceTaskQueue; friend class ReduceTaskQueue;
public: public:
skipping to change at line 508 skipping to change at line 567
/** get the error flag */ /** get the error flag */
bool error() { bool error() {
return err_; return err_;
} }
/** preprocess the mappter */ /** preprocess the mappter */
void visit_before() { void visit_before() {
mr_->dbclock_ = 0; mr_->dbclock_ = 0;
mr_->cache_ = new TinyHashMap(mr_->cbnum_); mr_->cache_ = new TinyHashMap(mr_->cbnum_);
mr_->csiz_ = 0; mr_->csiz_ = 0;
if (!mr_->preprocess()) err_ = true; if (!mr_->preprocess()) err_ = true;
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
if (!mr_->logf("map", "started the map process: scale=%lld", (long lo ng)scale_)) if (!mr_->logf("map", "started the map process: scale=%lld", (long lo ng)scale_))
err_ = true; err_ = true;
stime_ = time(); stime_ = time();
} }
/** postprocess the mappter and call the reducer */ /** postprocess the mappter and call the reducer */
void visit_after() { void visit_after() {
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
double etime = time(); double etime = time();
if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_)) if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_))
err_ = true; err_ = true;
if (!mr_->midprocess()) err_ = true; if (!mr_->midprocess()) err_ = true;
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
delete mr_->cache_; delete mr_->cache_;
if (mr_->flsths_ && !mr_->flsths_->empty()) {
std::deque<FlushThread*>::iterator flthit = mr_->flsths_->begin();
std::deque<FlushThread*>::iterator flthitend = mr_->flsths_->end();
while (flthit != flthitend) {
FlushThread* flth = *flthit;
flth->join();
if (flth->error()) err_ = true;
delete flth;
++flthit;
}
}
if (!err_ && !mr_->execute_reduce()) err_ = true; if (!err_ && !mr_->execute_reduce()) err_ = true;
if (!mr_->postprocess()) err_ = true; if (!mr_->postprocess()) err_ = true;
} }
private: private:
/** visit a record */ /** visit a record */
const char* visit_full(const char* kbuf, size_t ksiz, const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) { const char* vbuf, size_t vsiz, size_t* sp) {
if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) {
checker_->stop(); checker_->stop();
err_ = true; err_ = true;
} }
if (mr_->lock_) { if (mr_->rlocks_) {
mr_->lock_->lock(); if (mr_->csiz_ >= mr_->clim_) {
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { mr_->rlocks_->lock_all();
checker_->stop(); if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
err_ = true; checker_->stop();
err_ = true;
}
mr_->rlocks_->unlock_all();
} }
mr_->lock_->unlock();
} else { } else {
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
checker_->stop(); checker_->stop();
err_ = true; err_ = true;
} }
} }
return NOP; return NOP;
} }
MapReduce* mr_; ///< driver MapReduce* mr_; ///< driver
MapChecker* checker_; ///< checker MapChecker* checker_; ///< checker
skipping to change at line 592 skipping to change at line 664
va_end(ap); va_end(ap);
return log(name, message.c_str()); return log(name, message.c_str());
} }
/** /**
* Flush all cache records. * Flush all cache records.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool flush_cache() { bool flush_cache() {
_assert_(true); _assert_(true);
bool err = false; bool err = false;
if (!logf("map", "started to flushing the cache: count=%lld size=%lld",
(long long)cache_->count(), (long long)csiz_)) err = true;
double stime = time();
BasicDB* tmpdb = tmpdbs_[dbclock_]; BasicDB* tmpdb = tmpdbs_[dbclock_];
TinyHashMap::Sorter sorter(cache_); dbclock_ = (dbclock_ + 1) % dbnum_;
const char* kbuf, *vbuf; if (flsths_) {
size_t ksiz, vsiz; size_t num = flsths_->size();
while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { if (num >= flsthnum_ || num >= dbnum_) {
if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { FlushThread* flth = flsths_->front();
const BasicDB::Error& e = tmpdb->error(); flsths_->pop_front();
db_->set_error(_KCCODELINE_, e.code(), e.message()); flth->join();
err = true; if (flth->error()) err = true;
delete flth;
} }
sorter.step(); FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true)
;
cache_ = new TinyHashMap(cbnum_);
csiz_ = 0;
flth->start();
flsths_->push_back(flth);
} else {
FlushThread flth(this, tmpdb, cache_, csiz_, false);
flth.run();
if (flth.error()) err = true;
cache_->clear();
csiz_ = 0;
} }
cache_->clear();
csiz_ = 0;
dbclock_ = (dbclock_ + 1) % dbnum_;
double etime = time();
if (!logf("map", "flushing the cache finished: time=%.6f", etime - stim
e)) err = true;
return !err; return !err;
} }
/** /**
* Execute the reduce part. * Execute the reduce part.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool execute_reduce() { bool execute_reduce() {
bool err = false; bool err = false;
int64_t scale = 0; int64_t scale = 0;
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
skipping to change at line 721 skipping to change at line 796
/** The temporary databases. */ /** The temporary databases. */
BasicDB** tmpdbs_; BasicDB** tmpdbs_;
/** The number of temporary databases. */ /** The number of temporary databases. */
size_t dbnum_; size_t dbnum_;
/** The logical clock for temporary databases. */ /** The logical clock for temporary databases. */
int64_t dbclock_; int64_t dbclock_;
/** The number of the mapper threads. */ /** The number of the mapper threads. */
size_t mapthnum_; size_t mapthnum_;
/** The number of the reducer threads. */ /** The number of the reducer threads. */
size_t redthnum_; size_t redthnum_;
/** The number of the flusher threads. */
size_t flsthnum_;
/** The cache for emitter. */ /** The cache for emitter. */
TinyHashMap* cache_; TinyHashMap* cache_;
/** The current size of the cache for emitter. */ /** The current size of the cache for emitter. */
int64_t csiz_; AtomicInt64 csiz_;
/** The limit size of the cache for emitter. */ /** The limit size of the cache for emitter. */
int64_t clim_; int64_t clim_;
/** The bucket number of the cache for emitter. */ /** The bucket number of the cache for emitter. */
int64_t cbnum_; int64_t cbnum_;
/** The flush threads. */
std::deque<FlushThread*>* flsths_;
/** The task queue for parallel reducer. */ /** The task queue for parallel reducer. */
TaskQueue* redtasks_; TaskQueue* redtasks_;
/** The flag whether aborted. */ /** The flag whether aborted. */
bool redaborted_; bool redaborted_;
/** The whole lock. */ /** The whole lock. */
Mutex* lock_; SlottedMutex* rlocks_;
}; };
/** /**
* Index database. * Index database.
* @note This class is designed to implement an indexing storage with an ef ficient appending * @note This class is designed to implement an indexing storage with an ef ficient appending
* operation for the existing record values. This class is a wrapper of th e polymorphic * operation for the existing record values. This class is a wrapper of th e polymorphic
* database, featuring buffering mechanism to alleviate IO overhead in the database layer. This * database, featuring buffering mechanism to alleviate IO overhead in the database layer. This
* class can be inherited but overwriting methods is forbidden. Before eve ry database operation, * class can be inherited but overwriting methods is forbidden. Before eve ry database operation,
* it is necessary to call the IndexDB::open method in order to open a data base file and connect * it is necessary to call the IndexDB::open method in order to open a data base file and connect
* the database object to it. To avoid data missing or corruption, it is i mportant to close * the database object to it. To avoid data missing or corruption, it is i mportant to close
 End of changes. 29 change blocks. 
40 lines changed or deleted 124 lines changed or added


 kcdirdb.h   kcdirdb.h 
skipping to change at line 2107 skipping to change at line 2107
return !err; return !err;
} }
/** /**
* Scan each record in parallel. * Scan each record in parallel.
* @param visitor a visitor object. * @param visitor a visitor object.
* @param thnum the number of worker threads. * @param thnum the number of worker threads.
* @param checker a progress checker object. * @param checker a progress checker object.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* checker) { bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* checker) {
assert(visitor && thnum <= MEMMAXSIZ); _assert_(visitor && thnum <= MEMMAXSIZ);
int64_t allcnt = count_; int64_t allcnt = count_;
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt )) { if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt )) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
DirStream dir; DirStream dir;
if (!dir.open(path_)) { if (!dir.open(path_)) {
set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed"); set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed");
return false; return false;
} }
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 kchashdb.h   kchashdb.h 
skipping to change at line 2250 skipping to change at line 2250
return true; return true;
} }
/** /**
* Scan each record in parallel. * Scan each record in parallel.
* @param visitor a visitor object. * @param visitor a visitor object.
* @param thnum the number of worker threads. * @param thnum the number of worker threads.
* @param checker a progress checker object. * @param checker a progress checker object.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* checker) { bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* checker) {
assert(visitor && thnum <= MEMMAXSIZ); _assert_(visitor && thnum <= MEMMAXSIZ);
int64_t allcnt = count_; int64_t allcnt = count_;
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt )) { if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt )) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false; return false;
} }
bool err = false; bool err = false;
std::vector<int64_t> offs; std::vector<int64_t> offs;
int64_t bnum = bnum_; int64_t bnum = bnum_;
size_t cap = (thnum + 1) * INT8MAX; size_t cap = (thnum + 1) * INT8MAX;
for (int64_t bidx = 0; bidx < bnum; bidx++) { for (int64_t bidx = 0; bidx < bnum; bidx++) {
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 kcmap.h   kcmap.h 
skipping to change at line 751 skipping to change at line 751
*/ */
class Sorter { class Sorter {
public: public:
/** /**
* Constructor. * Constructor.
* @param map the container. * @param map the container.
* @note This object will be invalidated when the map object is updated once. * @note This object will be invalidated when the map object is updated once.
*/ */
explicit Sorter(TinyHashMap* map) : map_(map), ridx_(0), recs_() { explicit Sorter(TinyHashMap* map) : map_(map), ridx_(0), recs_() {
_assert_(map); _assert_(map);
recs_.reserve(map->count_);
char** buckets = map_->buckets_; char** buckets = map_->buckets_;
size_t bnum = map_->bnum_; size_t bnum = map_->bnum_;
for (size_t i = 0; i < bnum; i++) { for (size_t i = 0; i < bnum; i++) {
char* rbuf = buckets[i]; char* rbuf = buckets[i];
while (rbuf) { while (rbuf) {
Record rec(rbuf); Record rec(rbuf);
recs_.push_back(rbuf); recs_.push_back(rbuf);
rbuf = *(char**)rbuf; rbuf = *(char**)rbuf;
} }
} }
skipping to change at line 1061 skipping to change at line 1060
count_ = 0; count_ = 0;
} }
/** /**
* Get the number of records. * Get the number of records.
* @return the number of records. * @return the number of records.
*/ */
size_t count() { size_t count() {
_assert_(true); _assert_(true);
return count_; return count_;
} }
/**
* Get the hash value of a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return the hash value.
*/
static size_t hash_record(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
return hashmurmur(kbuf, ksiz);
}
private: private:
/** /**
* Record data. * Record data.
*/ */
struct Record { struct Record {
/** constructor */ /** constructor */
Record(char* child, const char* kbuf, uint64_t ksiz, Record(char* child, const char* kbuf, uint64_t ksiz,
const char* vbuf, uint64_t vsiz, uint64_t psiz) : const char* vbuf, uint64_t vsiz, uint64_t psiz) :
child_(child), kbuf_(kbuf), ksiz_(ksiz), vbuf_(vbuf), vsiz_(vsiz), psiz_(psiz) { child_(child), kbuf_(kbuf), ksiz_(ksiz), vbuf_(vbuf), vsiz_(vsiz), psiz_(psiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ && ps iz <= MEMMAXSIZ); _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ && ps iz <= MEMMAXSIZ);
skipping to change at line 1209 skipping to change at line 1218
delete[] rbuf; delete[] rbuf;
rbuf = child; rbuf = child;
} }
} }
if (bnum_ >= MAPZMAPBNUM) { if (bnum_ >= MAPZMAPBNUM) {
mapfree(buckets_); mapfree(buckets_);
} else { } else {
delete[] buckets_; delete[] buckets_;
} }
} }
/**
* Get the hash value of a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return the hash value.
*/
size_t hash_record(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
return hashmurmur(kbuf, ksiz);
}
/** Dummy constructor to forbid the use. */ /** Dummy constructor to forbid the use. */
TinyHashMap(const TinyHashMap&); TinyHashMap(const TinyHashMap&);
/** Dummy Operator to forbid the use. */ /** Dummy Operator to forbid the use. */
TinyHashMap& operator =(const TinyHashMap&); TinyHashMap& operator =(const TinyHashMap&);
/** The bucket array. */ /** The bucket array. */
char** buckets_; char** buckets_;
/** The number of buckets. */ /** The number of buckets. */
size_t bnum_; size_t bnum_;
/** The number of records. */ /** The number of records. */
size_t count_; size_t count_;
 End of changes. 3 change blocks. 
11 lines changed or deleted 10 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/