| kccachedb.h | | kccachedb.h | |
| | | | |
| skipping to change at line 141 | | skipping to change at line 141 | |
| rvsiz = zsiz; | | rvsiz = zsiz; | |
| } | | } | |
| } | | } | |
| size_t vsiz; | | size_t vsiz; | |
| const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vs
iz); | | const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vs
iz); | |
| delete[] zbuf; | | delete[] zbuf; | |
| if (vbuf == Visitor::REMOVE) { | | if (vbuf == Visitor::REMOVE) { | |
| uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | | uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | |
| Slot* slot = db_->slots_ + sidx_; | | Slot* slot = db_->slots_ + sidx_; | |
| Repeater repeater(Visitor::REMOVE, 0); | | Repeater repeater(Visitor::REMOVE, 0); | |
|
| db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, tr
ue); | | db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, fa
lse); | |
| } else if (vbuf == Visitor::NOP) { | | } else if (vbuf == Visitor::NOP) { | |
| if (step) step_impl(); | | if (step) step_impl(); | |
| } else { | | } else { | |
| uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | | uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; | |
| Slot* slot = db_->slots_ + sidx_; | | Slot* slot = db_->slots_ + sidx_; | |
| Repeater repeater(vbuf, vsiz); | | Repeater repeater(vbuf, vsiz); | |
|
| db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, tr
ue); | | db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, fa
lse); | |
| if (step) step_impl(); | | if (step) step_impl(); | |
| } | | } | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
| * Jump the cursor to the first record for forward scan. | | * Jump the cursor to the first record for forward scan. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool jump() { | | bool jump() { | |
| _assert_(true); | | _assert_(true); | |
| | | | |
| skipping to change at line 377 | | skipping to change at line 377 | |
| FOPEN = 1 << 0, ///< dummy for compatibility | | FOPEN = 1 << 0, ///< dummy for compatibility | |
| FFATAL = 1 << 1 ///< dummy for compatibility | | FFATAL = 1 << 1 ///< dummy for compatibility | |
| }; | | }; | |
| /** | | /** | |
| * Default constructor. | | * Default constructor. | |
| */ | | */ | |
| explicit CacheDB() : | | explicit CacheDB() : | |
| mlock_(), flock_(), error_(), logger_(NULL), logkinds_(0), mtrigger_(
NULL), | | mlock_(), flock_(), error_(), logger_(NULL), logkinds_(0), mtrigger_(
NULL), | |
| omode_(0), curs_(), path_(""), type_(TYPECACHE), | | omode_(0), curs_(), path_(""), type_(TYPECACHE), | |
| opts_(0), bnum_(DEFBNUM), capcnt_(-1), capsiz_(-1), | | opts_(0), bnum_(DEFBNUM), capcnt_(-1), capsiz_(-1), | |
|
| opaque_(), embcomp_(ZLIBRAWCOMP), comp_(NULL), slots_(), tran_(false)
{ | | opaque_(), embcomp_(ZLIBRAWCOMP), comp_(NULL), slots_(), rttmode_(tru
e), tran_(false) { | |
| _assert_(true); | | _assert_(true); | |
| } | | } | |
| /** | | /** | |
| * Destructor. | | * Destructor. | |
| * @note If the database is not closed, it is closed implicitly. | | * @note If the database is not closed, it is closed implicitly. | |
| */ | | */ | |
| virtual ~CacheDB() { | | virtual ~CacheDB() { | |
| _assert_(true); | | _assert_(true); | |
| if (omode_ != 0) close(); | | if (omode_ != 0) close(); | |
| if (!curs_.empty()) { | | if (!curs_.empty()) { | |
| | | | |
| skipping to change at line 425 | | skipping to change at line 425 | |
| if (writable && !(omode_ & OWRITER)) { | | if (writable && !(omode_ & OWRITER)) { | |
| set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | | set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | |
| return false; | | return false; | |
| } | | } | |
| if (ksiz > KSIZMAX) ksiz = KSIZMAX; | | if (ksiz > KSIZMAX) ksiz = KSIZMAX; | |
| uint64_t hash = hash_record(kbuf, ksiz); | | uint64_t hash = hash_record(kbuf, ksiz); | |
| int32_t sidx = hash % SLOTNUM; | | int32_t sidx = hash % SLOTNUM; | |
| hash /= SLOTNUM; | | hash /= SLOTNUM; | |
| Slot* slot = slots_ + sidx; | | Slot* slot = slots_ + sidx; | |
| slot->lock.lock(); | | slot->lock.lock(); | |
|
| accept_impl(slot, hash, kbuf, ksiz, visitor, comp_, false); | | accept_impl(slot, hash, kbuf, ksiz, visitor, comp_, rttmode_); | |
| slot->lock.unlock(); | | slot->lock.unlock(); | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
| * Accept a visitor to multiple records at once. | | * Accept a visitor to multiple records at once. | |
| * @param keys specifies a string vector of the keys. | | * @param keys specifies a string vector of the keys. | |
| * @param visitor a visitor object. | | * @param visitor a visitor object. | |
| * @param writable true for writable operation, or false for read-only op
eration. | | * @param writable true for writable operation, or false for read-only op
eration. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| * @note The operations for specified records are performed atomically an
d other threads | | * @note The operations for specified records are performed atomically an
d other threads | |
| | | | |
| skipping to change at line 483 | | skipping to change at line 483 | |
| std::set<int32_t>::iterator sit = sidxs.begin(); | | std::set<int32_t>::iterator sit = sidxs.begin(); | |
| std::set<int32_t>::iterator sitend = sidxs.end(); | | std::set<int32_t>::iterator sitend = sidxs.end(); | |
| while (sit != sitend) { | | while (sit != sitend) { | |
| Slot* slot = slots_ + *sit; | | Slot* slot = slots_ + *sit; | |
| slot->lock.lock(); | | slot->lock.lock(); | |
| ++sit; | | ++sit; | |
| } | | } | |
| for (size_t i = 0; i < knum; i++) { | | for (size_t i = 0; i < knum; i++) { | |
| RecordKey* rkey = rkeys + i; | | RecordKey* rkey = rkeys + i; | |
| Slot* slot = slots_ + rkey->sidx; | | Slot* slot = slots_ + rkey->sidx; | |
|
| accept_impl(slot, rkey->hash, rkey->kbuf, rkey->ksiz, visitor, comp_,
false); | | accept_impl(slot, rkey->hash, rkey->kbuf, rkey->ksiz, visitor, comp_,
rttmode_); | |
| } | | } | |
| sit = sidxs.begin(); | | sit = sidxs.begin(); | |
| sitend = sidxs.end(); | | sitend = sidxs.end(); | |
| while (sit != sitend) { | | while (sit != sitend) { | |
| Slot* slot = slots_ + *sit; | | Slot* slot = slots_ + *sit; | |
| slot->lock.unlock(); | | slot->lock.unlock(); | |
| ++sit; | | ++sit; | |
| } | | } | |
| delete[] rkeys; | | delete[] rkeys; | |
| return true; | | return true; | |
| | | | |
| skipping to change at line 546 | | skipping to change at line 546 | |
| rvbuf = zbuf; | | rvbuf = zbuf; | |
| rvsiz = zsiz; | | rvsiz = zsiz; | |
| } | | } | |
| } | | } | |
| size_t vsiz; | | size_t vsiz; | |
| const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &
vsiz); | | const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &
vsiz); | |
| delete[] zbuf; | | delete[] zbuf; | |
| if (vbuf == Visitor::REMOVE) { | | if (vbuf == Visitor::REMOVE) { | |
| uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | | uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | |
| Repeater repeater(Visitor::REMOVE, 0); | | Repeater repeater(Visitor::REMOVE, 0); | |
|
| accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, true); | | accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false); | |
| } else if (vbuf != Visitor::NOP) { | | } else if (vbuf != Visitor::NOP) { | |
| uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | | uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; | |
| Repeater repeater(vbuf, vsiz); | | Repeater repeater(vbuf, vsiz); | |
|
| accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, true); | | accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false); | |
| } | | } | |
| rec = next; | | rec = next; | |
| curcnt++; | | curcnt++; | |
| if (checker && !checker->check("iterate", "processing", curcnt, all
cnt)) { | | if (checker && !checker->check("iterate", "processing", curcnt, all
cnt)) { | |
| set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| } | | } | |
| if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |
| set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| return false; | | return false; | |
| } | | } | |
| trigger_meta(MetaTrigger::ITERATE, "iterate"); | | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
|
| | | * Scan each record in parallel. | |
| | | * @param visitor a visitor object. | |
| | | * @param thnum the number of worker threads. | |
| | | * @param checker a progress checker object. If it is NULL, no checking | |
| | | is performed. | |
| | | * @return true on success, or false on failure. | |
| | | * @note This function is for reading records and not for updating ones. | |
| | | The return value of | |
| | | * the visitor is just ignored. To avoid deadlock, any explicit database | |
| | | operation must not | |
| | | * be performed in this function. | |
| | | */ | |
| | | bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | |
| | | er = NULL) { | |
| | | _assert_(visitor && thnum <= MEMMAXSIZ); | |
| | | ScopedRWLock lock(&mlock_, false); | |
| | | if (omode_ == 0) { | |
| | | set_error(_KCCODELINE_, Error::INVALID, "not opened"); | |
| | | return false; | |
| | | } | |
| | | if (thnum < 1) thnum = 1; | |
| | | thnum = std::pow(2, (int32_t)std::log2(thnum * std::sqrt(2))); | |
| | | if (thnum > (size_t)SLOTNUM) thnum = SLOTNUM; | |
| | | ScopedVisitor svis(visitor); | |
| | | int64_t allcnt = count_impl(); | |
| | | if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt | |
| | | )) { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | return false; | |
| | | } | |
| | | class ThreadImpl : public Thread { | |
| | | public: | |
| | | explicit ThreadImpl() : | |
| | | db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), slots_(), | |
| | | error_() {} | |
| | | void init(CacheDB* db, Visitor* visitor, ProgressChecker* checker, in | |
| | | t64_t allcnt) { | |
| | | db_ = db; | |
| | | visitor_ = visitor; | |
| | | checker_ = checker; | |
| | | allcnt_ = allcnt; | |
| | | } | |
| | | void add_slot(Slot* slot) { | |
| | | slots_.push_back(slot); | |
| | | } | |
| | | const Error& error() { | |
| | | return error_; | |
| | | } | |
| | | private: | |
| | | void run() { | |
| | | CacheDB* db = db_; | |
| | | Visitor* visitor = visitor_; | |
| | | ProgressChecker* checker = checker_; | |
| | | int64_t allcnt = allcnt_; | |
| | | Compressor* comp = db->comp_; | |
| | | std::vector<Slot*>::iterator sit = slots_.begin(); | |
| | | std::vector<Slot*>::iterator sitend = slots_.end(); | |
| | | while (sit != sitend) { | |
| | | Slot* slot = *sit; | |
| | | Record* rec = slot->first; | |
| | | while (rec) { | |
| | | Record* next = rec->next; | |
| | | uint32_t rksiz = rec->ksiz & KSIZMAX; | |
| | | char* dbuf = (char*)rec + sizeof(*rec); | |
| | | const char* rvbuf = dbuf + rksiz; | |
| | | size_t rvsiz = rec->vsiz; | |
| | | char* zbuf = NULL; | |
| | | size_t zsiz = 0; | |
| | | if (comp) { | |
| | | zbuf = comp->decompress(rvbuf, rvsiz, &zsiz); | |
| | | if (zbuf) { | |
| | | rvbuf = zbuf; | |
| | | rvsiz = zsiz; | |
| | | } | |
| | | } | |
| | | size_t vsiz; | |
| | | visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vsiz); | |
| | | delete[] zbuf; | |
| | | rec = next; | |
| | | if (checker && !checker->check("scan_parallel", "processing", - | |
| | | 1, allcnt)) { | |
| | | db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | error_ = db->error(); | |
| | | break; | |
| | | } | |
| | | } | |
| | | ++sit; | |
| | | } | |
| | | } | |
| | | CacheDB* db_; | |
| | | Visitor* visitor_; | |
| | | ProgressChecker* checker_; | |
| | | int64_t allcnt_; | |
| | | std::vector<Slot*> slots_; | |
| | | Error error_; | |
| | | }; | |
| | | bool err = false; | |
| | | bool orttmode = rttmode_; | |
| | | rttmode_ = false; | |
| | | ThreadImpl* threads = new ThreadImpl[thnum]; | |
| | | for (int32_t i = 0; i < SLOTNUM; i++) { | |
| | | ThreadImpl* thread = threads + (i % thnum); | |
| | | thread->add_slot(slots_ + i); | |
| | | } | |
| | | for (size_t i = 0; i < thnum; i++) { | |
| | | ThreadImpl* thread = threads + i; | |
| | | thread->init(this, visitor, checker, allcnt); | |
| | | thread->start(); | |
| | | } | |
| | | for (size_t i = 0; i < thnum; i++) { | |
| | | ThreadImpl* thread = threads + i; | |
| | | thread->join(); | |
| | | if (thread->error() != Error::SUCCESS) { | |
| | | *error_ = thread->error(); | |
| | | err = true; | |
| | | } | |
| | | } | |
| | | delete[] threads; | |
| | | rttmode_ = orttmode; | |
| | | if (err) return false; | |
| | | if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | |
| | | { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | return false; | |
| | | } | |
| | | trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | |
| | | return true; | |
| | | } | |
| | | /** | |
| * Get the last happened error. | | * Get the last happened error. | |
| * @return the last happened error. | | * @return the last happened error. | |
| */ | | */ | |
| Error error() const { | | Error error() const { | |
| _assert_(true); | | _assert_(true); | |
| return error_; | | return error_; | |
| } | | } | |
| /** | | /** | |
| * Set the error information. | | * Set the error information. | |
| * @param file the file name of the program source code. | | * @param file the file name of the program source code. | |
| | | | |
| skipping to change at line 1043 | | skipping to change at line 1163 | |
| _assert_(true); | | _assert_(true); | |
| ScopedRWLock lock(&mlock_, true); | | ScopedRWLock lock(&mlock_, true); | |
| if (omode_ != 0) { | | if (omode_ != 0) { | |
| set_error(_KCCODELINE_, Error::INVALID, "already opened"); | | set_error(_KCCODELINE_, Error::INVALID, "already opened"); | |
| return false; | | return false; | |
| } | | } | |
| capsiz_ = size; | | capsiz_ = size; | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
|
| | | * Switch the mode of LRU rotation. | |
| | | * @param rttmode true to enable LRU rotation, false to disable LRU rotat | |
| | | ion. | |
| | | * @return true on success, or false on failure. | |
| | | * @note This function can be called while the database is opened. | |
| | | */ | |
| | | bool switch_rotation(bool rttmode) { | |
| | | _assert_(true); | |
| | | ScopedRWLock lock(&mlock_, true); | |
| | | rttmode_ = rttmode; | |
| | | return true; | |
| | | } | |
| | | /** | |
| * Get the opaque data. | | * Get the opaque data. | |
| * @return the pointer to the opaque data region, whose size is 16 bytes. | | * @return the pointer to the opaque data region, whose size is 16 bytes. | |
| */ | | */ | |
| char* opaque() { | | char* opaque() { | |
| _assert_(true); | | _assert_(true); | |
| ScopedRWLock lock(&mlock_, false); | | ScopedRWLock lock(&mlock_, false); | |
| if (omode_ == 0) { | | if (omode_ == 0) { | |
| set_error(_KCCODELINE_, Error::INVALID, "not opened"); | | set_error(_KCCODELINE_, Error::INVALID, "not opened"); | |
| return NULL; | | return NULL; | |
| } | | } | |
| | | | |
| skipping to change at line 1481 | | skipping to change at line 1613 | |
| Visitor* visitor_; ///< visitor | | Visitor* visitor_; ///< visitor | |
| }; | | }; | |
| /** | | /** | |
| * Accept a visitor to a record. | | * Accept a visitor to a record. | |
| * @param slot the slot of the record. | | * @param slot the slot of the record. | |
| * @param hash the hash value of the key. | | * @param hash the hash value of the key. | |
| * @param kbuf the pointer to the key region. | | * @param kbuf the pointer to the key region. | |
| * @param ksiz the size of the key region. | | * @param ksiz the size of the key region. | |
| * @param visitor a visitor object. | | * @param visitor a visitor object. | |
| * @param comp the data compressor. | | * @param comp the data compressor. | |
|
| * @param isiter true for iterator use, or false for direct use. | | * @param rtt whether to move the record to the last. | |
| */ | | */ | |
| void accept_impl(Slot* slot, uint64_t hash, const char* kbuf, size_t ksiz
, Visitor* visitor, | | void accept_impl(Slot* slot, uint64_t hash, const char* kbuf, size_t ksiz
, Visitor* visitor, | |
|
| Compressor* comp, bool isiter) { | | Compressor* comp, bool rtt) { | |
| _assert_(slot && kbuf && ksiz <= MEMMAXSIZ && visitor); | | _assert_(slot && kbuf && ksiz <= MEMMAXSIZ && visitor); | |
| size_t bidx = hash % slot->bnum; | | size_t bidx = hash % slot->bnum; | |
| Record* rec = slot->buckets[bidx]; | | Record* rec = slot->buckets[bidx]; | |
| Record** entp = slot->buckets + bidx; | | Record** entp = slot->buckets + bidx; | |
| uint32_t fhash = fold_hash(hash) & ~KSIZMAX; | | uint32_t fhash = fold_hash(hash) & ~KSIZMAX; | |
| while (rec) { | | while (rec) { | |
| uint32_t rhash = rec->ksiz & ~KSIZMAX; | | uint32_t rhash = rec->ksiz & ~KSIZMAX; | |
| uint32_t rksiz = rec->ksiz & KSIZMAX; | | uint32_t rksiz = rec->ksiz & KSIZMAX; | |
| if (fhash > rhash) { | | if (fhash > rhash) { | |
| entp = &rec->left; | | entp = &rec->left; | |
| | | | |
| skipping to change at line 1597 | | skipping to change at line 1729 | |
| *entp = rec; | | *entp = rec; | |
| if (rec->prev) rec->prev->next = rec; | | if (rec->prev) rec->prev->next = rec; | |
| if (rec->next) rec->next->prev = rec; | | if (rec->next) rec->next->prev = rec; | |
| dbuf = (char*)rec + sizeof(*rec); | | dbuf = (char*)rec + sizeof(*rec); | |
| } | | } | |
| } | | } | |
| std::memcpy(dbuf + ksiz, vbuf, vsiz); | | std::memcpy(dbuf + ksiz, vbuf, vsiz); | |
| rec->vsiz = vsiz; | | rec->vsiz = vsiz; | |
| delete[] zbuf; | | delete[] zbuf; | |
| } | | } | |
|
| if (!isiter && slot->last != rec) { | | if (rtt && slot->last != rec) { | |
| if (!curs_.empty()) escape_cursors(rec); | | if (!curs_.empty()) escape_cursors(rec); | |
| if (slot->first == rec) slot->first = rec->next; | | if (slot->first == rec) slot->first = rec->next; | |
| if (rec->prev) rec->prev->next = rec->next; | | if (rec->prev) rec->prev->next = rec->next; | |
| if (rec->next) rec->next->prev = rec->prev; | | if (rec->next) rec->next->prev = rec->prev; | |
| rec->prev = slot->last; | | rec->prev = slot->last; | |
| rec->next = NULL; | | rec->next = NULL; | |
| slot->last->next = rec; | | slot->last->next = rec; | |
| slot->last = rec; | | slot->last = rec; | |
| } | | } | |
| if (adj) adjust_slot_capacity(slot); | | if (adj) adjust_slot_capacity(slot); | |
| | | | |
| skipping to change at line 1770 | | skipping to change at line 1902 | |
| TranLogList::const_iterator itbeg = logs.begin(); | | TranLogList::const_iterator itbeg = logs.begin(); | |
| while (it != itbeg) { | | while (it != itbeg) { | |
| --it; | | --it; | |
| const char* kbuf = it->key.c_str(); | | const char* kbuf = it->key.c_str(); | |
| size_t ksiz = it->key.size(); | | size_t ksiz = it->key.size(); | |
| const char* vbuf = it->value.c_str(); | | const char* vbuf = it->value.c_str(); | |
| size_t vsiz = it->value.size(); | | size_t vsiz = it->value.size(); | |
| uint64_t hash = hash_record(kbuf, ksiz) / SLOTNUM; | | uint64_t hash = hash_record(kbuf, ksiz) / SLOTNUM; | |
| if (it->full) { | | if (it->full) { | |
| Setter setter(vbuf, vsiz); | | Setter setter(vbuf, vsiz); | |
|
| accept_impl(slot, hash, kbuf, ksiz, &setter, NULL, true); | | accept_impl(slot, hash, kbuf, ksiz, &setter, NULL, false); | |
| } else { | | } else { | |
| Remover remover; | | Remover remover; | |
|
| accept_impl(slot, hash, kbuf, ksiz, &remover, NULL, true); | | accept_impl(slot, hash, kbuf, ksiz, &remover, NULL, false); | |
| } | | } | |
| } | | } | |
| } | | } | |
| /** | | /** | |
| * Addjust a slot table to the capacity. | | * Addjust a slot table to the capacity. | |
| * @param slot the slot table. | | * @param slot the slot table. | |
| */ | | */ | |
| void adjust_slot_capacity(Slot* slot) { | | void adjust_slot_capacity(Slot* slot) { | |
| _assert_(slot); | | _assert_(slot); | |
| if ((slot->count > slot->capcnt || slot->size > slot->capsiz) && slot->
first) { | | if ((slot->count > slot->capcnt || slot->size > slot->capsiz) && slot->
first) { | |
| Record* rec = slot->first; | | Record* rec = slot->first; | |
| uint32_t rksiz = rec->ksiz & KSIZMAX; | | uint32_t rksiz = rec->ksiz & KSIZMAX; | |
| char* dbuf = (char*)rec + sizeof(*rec); | | char* dbuf = (char*)rec + sizeof(*rec); | |
| char stack[RECBUFSIZ]; | | char stack[RECBUFSIZ]; | |
| char* kbuf = rksiz > sizeof(stack) ? new char[rksiz] : stack; | | char* kbuf = rksiz > sizeof(stack) ? new char[rksiz] : stack; | |
| std::memcpy(kbuf, dbuf, rksiz); | | std::memcpy(kbuf, dbuf, rksiz); | |
| uint64_t hash = hash_record(kbuf, rksiz) / SLOTNUM; | | uint64_t hash = hash_record(kbuf, rksiz) / SLOTNUM; | |
| Remover remover; | | Remover remover; | |
|
| accept_impl(slot, hash, dbuf, rksiz, &remover, NULL, true); | | accept_impl(slot, hash, dbuf, rksiz, &remover, NULL, false); | |
| if (kbuf != stack) delete[] kbuf; | | if (kbuf != stack) delete[] kbuf; | |
| } | | } | |
| } | | } | |
| /** | | /** | |
| * Get the hash value of a record. | | * Get the hash value of a record. | |
| * @param kbuf the pointer to the key region. | | * @param kbuf the pointer to the key region. | |
| * @param ksiz the size of the key region. | | * @param ksiz the size of the key region. | |
| * @return the hash value. | | * @return the hash value. | |
| */ | | */ | |
| uint64_t hash_record(const char* kbuf, size_t ksiz) { | | uint64_t hash_record(const char* kbuf, size_t ksiz) { | |
| | | | |
| skipping to change at line 1918 | | skipping to change at line 2050 | |
| /** The capacity of memory usage. */ | | /** The capacity of memory usage. */ | |
| int64_t capsiz_; | | int64_t capsiz_; | |
| /** The opaque data. */ | | /** The opaque data. */ | |
| char opaque_[OPAQUESIZ]; | | char opaque_[OPAQUESIZ]; | |
| /** The embedded data compressor. */ | | /** The embedded data compressor. */ | |
| Compressor* embcomp_; | | Compressor* embcomp_; | |
| /** The data compressor. */ | | /** The data compressor. */ | |
| Compressor* comp_; | | Compressor* comp_; | |
| /** The slot tables. */ | | /** The slot tables. */ | |
| Slot slots_[SLOTNUM]; | | Slot slots_[SLOTNUM]; | |
|
| | | /** The flag whether in LRU rotation. */ | |
| | | bool rttmode_; | |
| /** The flag whether in transaction. */ | | /** The flag whether in transaction. */ | |
| bool tran_; | | bool tran_; | |
| }; | | }; | |
| | | | |
| /** An alias of the cache tree database. */ | | /** An alias of the cache tree database. */ | |
| typedef PlantDB<CacheDB, BasicDB::TYPEGRASS> GrassDB; | | typedef PlantDB<CacheDB, BasicDB::TYPEGRASS> GrassDB; | |
| | | | |
| } // common namespace | | } // common namespace | |
| | | | |
| #endif // duplication check | | #endif // duplication check | |
| | | | |
End of changes. 16 change blocks. |
| 13 lines changed or deleted | | 157 lines changed or added | |
|
| kcdbext.h | | kcdbext.h | |
| | | | |
| skipping to change at line 66 | | skipping to change at line 66 | |
| /** The default cache bucket numer. */ | | /** The default cache bucket numer. */ | |
| static const int64_t DEFCBNUM = 1048583LL; | | static const int64_t DEFCBNUM = 1048583LL; | |
| /** The bucket number of temprary databases. */ | | /** The bucket number of temprary databases. */ | |
| static const int64_t DBBNUM = 512LL << 10; | | static const int64_t DBBNUM = 512LL << 10; | |
| /** The page size of temprary databases. */ | | /** The page size of temprary databases. */ | |
| static const int32_t DBPSIZ = 32768; | | static const int32_t DBPSIZ = 32768; | |
| /** The mapped size of temprary databases. */ | | /** The mapped size of temprary databases. */ | |
| static const int64_t DBMSIZ = 516LL * 4096; | | static const int64_t DBMSIZ = 516LL * 4096; | |
| /** The page cache capacity of temprary databases. */ | | /** The page cache capacity of temprary databases. */ | |
| static const int64_t DBPCCAP = 16LL << 20; | | static const int64_t DBPCCAP = 16LL << 20; | |
|
| | | /** The default number of threads in parallel mode. */ | |
| | | static const size_t DEFTHNUM = 8; | |
| public: | | public: | |
| /** | | /** | |
| * Value iterator for the reducer. | | * Value iterator for the reducer. | |
| */ | | */ | |
| class ValueIterator { | | class ValueIterator { | |
| friend class MapReduce; | | friend class MapReduce; | |
| public: | | public: | |
| /** | | /** | |
| * Get the next value. | | * Get the next value. | |
| * @param sp the pointer to the variable into which the size of the reg
ion of the return | | * @param sp the pointer to the variable into which the size of the reg
ion of the return | |
| | | | |
| skipping to change at line 130 | | skipping to change at line 132 | |
| /** The pointer of the current value. */ | | /** The pointer of the current value. */ | |
| const char* vptr_; | | const char* vptr_; | |
| /** The size of the current value. */ | | /** The size of the current value. */ | |
| size_t vsiz_; | | size_t vsiz_; | |
| }; | | }; | |
| /** | | /** | |
| * Execution options. | | * Execution options. | |
| */ | | */ | |
| enum Option { | | enum Option { | |
| XNOLOCK = 1 << 0, ///< avoid locking against update
operations | | XNOLOCK = 1 << 0, ///< avoid locking against update
operations | |
|
| XNOCOMP = 1 << 1 ///< avoid compression of temporar | | XPARAMAP = 1 << 1, ///< run mappers in parallel | |
| y databases | | XPARARED = 1 << 2, ///< run reducers in parallel | |
| | | XNOCOMP = 1 << 8 ///< avoid compression of temporar | |
| | | y databases | |
| }; | | }; | |
| /** | | /** | |
| * Default constructor. | | * Default constructor. | |
| */ | | */ | |
| explicit MapReduce() : | | explicit MapReduce() : | |
| db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0)
, | | db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0)
, | |
|
| cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM) { | | mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), | |
| | | cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), | |
| | | redtasks_(NULL), redaborted_(false), lock_(NULL) { | |
| _assert_(true); | | _assert_(true); | |
| } | | } | |
| /** | | /** | |
| * Destructor. | | * Destructor. | |
| */ | | */ | |
| virtual ~MapReduce() { | | virtual ~MapReduce() { | |
| _assert_(true); | | _assert_(true); | |
| } | | } | |
| /** | | /** | |
| * Map a record data. | | * Map a record data. | |
| | | | |
| skipping to change at line 213 | | skipping to change at line 219 | |
| virtual bool log(const char* name, const char* message) { | | virtual bool log(const char* name, const char* message) { | |
| _assert_(name && message); | | _assert_(name && message); | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
| * Execute the MapReduce process about a database. | | * Execute the MapReduce process about a database. | |
| * @param db the source database. | | * @param db the source database. | |
| * @param tmppath the path of a directory for the temporary data storage.
If it is an empty | | * @param tmppath the path of a directory for the temporary data storage.
If it is an empty | |
| * string, temporary data are handled on memory. | | * string, temporary data are handled on memory. | |
| * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to
avoid locking | | * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to
avoid locking | |
|
| * against update operations by other threads, MapReduce::XNOCOMP to avoi | | * against update operations by other threads, MapReduce::XPARAMAP to run | |
| d compression of | | the mapper in | |
| * temporary databases. | | * parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduc | |
| | | e::XNOCOMP to avoid | |
| | | * compression of temporary databases. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts
= 0) { | | bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts
= 0) { | |
| int64_t count = db->count(); | | int64_t count = db->count(); | |
| if (count < 0) return false; | | if (count < 0) return false; | |
| bool err = false; | | bool err = false; | |
| double stime, etime; | | double stime, etime; | |
| db_ = db; | | db_ = db; | |
| rcomp_ = LEXICALCOMP; | | rcomp_ = LEXICALCOMP; | |
| BasicDB* idb = db; | | BasicDB* idb = db; | |
| | | | |
| skipping to change at line 307 | | skipping to change at line 314 | |
| if (!logf("prepare", "opening temporary databases finished: time=%.6f
", etime - stime)) | | if (!logf("prepare", "opening temporary databases finished: time=%.6f
", etime - stime)) | |
| err = true; | | err = true; | |
| if (err) { | | if (err) { | |
| for (size_t i = 0; i < dbnum_; i++) { | | for (size_t i = 0; i < dbnum_; i++) { | |
| delete tmpdbs_[i]; | | delete tmpdbs_[i]; | |
| } | | } | |
| delete[] tmpdbs_; | | delete[] tmpdbs_; | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
|
| | | if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; | |
| if (opts & XNOLOCK) { | | if (opts & XNOLOCK) { | |
| MapChecker mapchecker; | | MapChecker mapchecker; | |
| MapVisitor mapvisitor(this, &mapchecker, db->count()); | | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |
| mapvisitor.visit_before(); | | mapvisitor.visit_before(); | |
| if (!err) { | | if (!err) { | |
| BasicDB::Cursor* cur = db->cursor(); | | BasicDB::Cursor* cur = db->cursor(); | |
| if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr
ue; | | if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr
ue; | |
| while (!err) { | | while (!err) { | |
| if (!cur->accept(&mapvisitor, false, true)) { | | if (!cur->accept(&mapvisitor, false, true)) { | |
| if (cur->error() != BasicDB::Error::NOREC) err = true; | | if (cur->error() != BasicDB::Error::NOREC) err = true; | |
| break; | | break; | |
| } | | } | |
| } | | } | |
| delete cur; | | delete cur; | |
| } | | } | |
|
| if (mapvisitor.error()) err = true; | | if (mapvisitor.error()) { | |
| | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" | |
| | | ); | |
| | | err = true; | |
| | | } | |
| mapvisitor.visit_after(); | | mapvisitor.visit_after(); | |
|
| | | } else if (opts & XPARAMAP) { | |
| | | MapChecker mapchecker; | |
| | | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |
| | | lock_ = new Mutex(); | |
| | | if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) | |
| | | { | |
| | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" | |
| | | ); | |
| | | err = true; | |
| | | } | |
| | | delete lock_; | |
| | | lock_ = NULL; | |
| | | if (mapvisitor.error()) err = true; | |
| } else { | | } else { | |
| MapChecker mapchecker; | | MapChecker mapchecker; | |
| MapVisitor mapvisitor(this, &mapchecker, db->count()); | | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |
| if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true
; | | if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true
; | |
|
| if (mapvisitor.error()) err = true; | | if (mapvisitor.error()) { | |
| | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed" | |
| | | ); | |
| | | err = true; | |
| | | } | |
| | | } | |
| | | if (redtasks_) { | |
| | | delete redtasks_; | |
| | | redtasks_ = NULL; | |
| } | | } | |
| if (!logf("clean", "closing the temporary databases")) err = true; | | if (!logf("clean", "closing the temporary databases")) err = true; | |
| stime = time(); | | stime = time(); | |
| for (size_t i = 0; i < dbnum_; i++) { | | for (size_t i = 0; i < dbnum_; i++) { | |
| assert(tmpdbs_[i]); | | assert(tmpdbs_[i]); | |
| const std::string& path = tmpdbs_[i]->path(); | | const std::string& path = tmpdbs_[i]->path(); | |
| if (!tmpdbs_[i]->clear()) { | | if (!tmpdbs_[i]->clear()) { | |
| const BasicDB::Error& e = tmpdbs_[i]->error(); | | const BasicDB::Error& e = tmpdbs_[i]->error(); | |
| db->set_error(_KCCODELINE_, e.code(), e.message()); | | db->set_error(_KCCODELINE_, e.code(), e.message()); | |
| err = true; | | err = true; | |
| | | | |
| skipping to change at line 368 | | skipping to change at line 397 | |
| * @param cbnum the bucket number of the internal cache. | | * @param cbnum the bucket number of the internal cache. | |
| */ | | */ | |
| void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { | | void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { | |
| _assert_(true); | | _assert_(true); | |
| dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | | dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | |
| if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | | if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | |
| clim_ = clim > 0 ? clim : DEFCLIM; | | clim_ = clim > 0 ? clim : DEFCLIM; | |
| cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | | cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | |
| if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | | if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | |
| } | | } | |
|
| | | /** | |
| | | * Set the thread configurations. | |
| | | * @param mapthnum the number of threads for the mapper. | |
| | | * @param redthnum the number of threads for the reducer. | |
| | | */ | |
| | | void tune_thread(int32_t mapthnum, int32_t redthnum) { | |
| | | _assert_(true); | |
| | | mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; | |
| | | redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; | |
| | | } | |
| protected: | | protected: | |
| /** | | /** | |
| * Emit a record from the mapper. | | * Emit a record from the mapper. | |
| * @param kbuf the pointer to the key region. | | * @param kbuf the pointer to the key region. | |
| * @param ksiz the size of the key region. | | * @param ksiz the size of the key region. | |
| * @param vbuf the pointer to the value region. | | * @param vbuf the pointer to the value region. | |
| * @param vsiz the size of the value region. | | * @param vsiz the size of the value region. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { | | bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { | |
| _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | | _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | |
| bool err = false; | | bool err = false; | |
| size_t rsiz = sizevarnum(vsiz) + vsiz; | | size_t rsiz = sizevarnum(vsiz) + vsiz; | |
| char stack[NUMBUFSIZ*4]; | | char stack[NUMBUFSIZ*4]; | |
| char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | | char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | |
| char* wp = rbuf; | | char* wp = rbuf; | |
| wp += writevarnum(rbuf, vsiz); | | wp += writevarnum(rbuf, vsiz); | |
| std::memcpy(wp, vbuf, vsiz); | | std::memcpy(wp, vbuf, vsiz); | |
|
| cache_->append(kbuf, ksiz, rbuf, rsiz); | | if (lock_) { | |
| | | lock_->lock(); | |
| | | cache_->append(kbuf, ksiz, rbuf, rsiz); | |
| | | lock_->unlock(); | |
| | | } else { | |
| | | cache_->append(kbuf, ksiz, rbuf, rsiz); | |
| | | } | |
| if (rbuf != stack) delete[] rbuf; | | if (rbuf != stack) delete[] rbuf; | |
| csiz_ += rsiz; | | csiz_ += rsiz; | |
| return !err; | | return !err; | |
| } | | } | |
| private: | | private: | |
| /** | | /** | |
|
| | | * Task queue for parallel reducer. | |
| | | */ | |
| | | class ReduceTaskQueue : public TaskQueue { | |
| | | public: | |
| | | /** | |
| | | * Task for parallel reducer. | |
| | | */ | |
| | | class ReduceTask : public Task { | |
| | | friend class ReduceTaskQueue; | |
| | | public: | |
| | | /** constructor */ | |
| | | explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, con | |
| | | st Values& values) : | |
| | | mr_(mr), key_(kbuf, ksiz), values_(values) {} | |
| | | private: | |
| | | MapReduce* mr_; ///< driver | |
| | | std::string key_; ///< key | |
| | | Values values_; ///< values | |
| | | }; | |
| | | /** constructor */ | |
| | | explicit ReduceTaskQueue() {} | |
| | | private: | |
| | | /** process a task */ | |
| | | void do_task(Task* task) { | |
| | | ReduceTask* rtask = (ReduceTask*)task; | |
| | | ValueIterator iter(rtask->values_.begin(), rtask->values_.end()); | |
| | | if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter | |
| | | )) | |
| | | rtask->mr_->redaborted_ = true; | |
| | | delete rtask; | |
| | | } | |
| | | }; | |
| | | /** | |
| * Checker for the map process. | | * Checker for the map process. | |
| */ | | */ | |
| class MapChecker : public BasicDB::ProgressChecker { | | class MapChecker : public BasicDB::ProgressChecker { | |
| public: | | public: | |
| /** constructor */ | | /** constructor */ | |
| explicit MapChecker() : stop_(false) {} | | explicit MapChecker() : stop_(false) {} | |
| /** stop the process */ | | /** stop the process */ | |
| void stop() { | | void stop() { | |
| stop_ = true; | | stop_ = true; | |
| } | | } | |
| | | | |
| skipping to change at line 457 | | skipping to change at line 533 | |
| if (!mr_->postprocess()) err_ = true; | | if (!mr_->postprocess()) err_ = true; | |
| } | | } | |
| private: | | private: | |
| /** visit a record */ | | /** visit a record */ | |
| const char* visit_full(const char* kbuf, size_t ksiz, | | const char* visit_full(const char* kbuf, size_t ksiz, | |
| const char* vbuf, size_t vsiz, size_t* sp) { | | const char* vbuf, size_t vsiz, size_t* sp) { | |
| if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { | | if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { | |
| checker_->stop(); | | checker_->stop(); | |
| err_ = true; | | err_ = true; | |
| } | | } | |
|
| if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | | if (mr_->lock_) { | |
| checker_->stop(); | | mr_->lock_->lock(); | |
| err_ = true; | | if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | |
| | | checker_->stop(); | |
| | | err_ = true; | |
| | | } | |
| | | mr_->lock_->unlock(); | |
| | | } else { | |
| | | if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | |
| | | checker_->stop(); | |
| | | err_ = true; | |
| | | } | |
| } | | } | |
| return NOP; | | return NOP; | |
| } | | } | |
| MapReduce* mr_; ///< driver | | MapReduce* mr_; ///< driver | |
| MapChecker* checker_; ///< checker | | MapChecker* checker_; ///< checker | |
| int64_t scale_; ///< number of records | | int64_t scale_; ///< number of records | |
| double stime_; ///< start time | | double stime_; ///< start time | |
| bool err_; ///< error flag | | bool err_; ///< error flag | |
| }; | | }; | |
| /** | | /** | |
| | | | |
| skipping to change at line 541 | | skipping to change at line 626 | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool execute_reduce() { | | bool execute_reduce() { | |
| bool err = false; | | bool err = false; | |
| int64_t scale = 0; | | int64_t scale = 0; | |
| for (size_t i = 0; i < dbnum_; i++) { | | for (size_t i = 0; i < dbnum_; i++) { | |
| scale += tmpdbs_[i]->count(); | | scale += tmpdbs_[i]->count(); | |
| } | | } | |
| if (!logf("reduce", "started the reduce process: scale=%lld", (long lon
g)scale)) err = true; | | if (!logf("reduce", "started the reduce process: scale=%lld", (long lon
g)scale)) err = true; | |
| double stime = time(); | | double stime = time(); | |
|
| | | if (redtasks_) redtasks_->start(redthnum_); | |
| std::priority_queue<MergeLine> lines; | | std::priority_queue<MergeLine> lines; | |
| for (size_t i = 0; i < dbnum_; i++) { | | for (size_t i = 0; i < dbnum_; i++) { | |
| MergeLine line; | | MergeLine line; | |
| line.cur = tmpdbs_[i]->cursor(); | | line.cur = tmpdbs_[i]->cursor(); | |
| line.rcomp = rcomp_; | | line.rcomp = rcomp_; | |
| line.cur->jump(); | | line.cur->jump(); | |
| line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | | line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | |
| if (line.kbuf) { | | if (line.kbuf) { | |
| lines.push(line); | | lines.push(line); | |
| } else { | | } else { | |
| delete line.cur; | | delete line.cur; | |
| } | | } | |
| } | | } | |
| char* lkbuf = NULL; | | char* lkbuf = NULL; | |
| size_t lksiz = 0; | | size_t lksiz = 0; | |
| Values values; | | Values values; | |
| while (!err && !lines.empty()) { | | while (!err && !lines.empty()) { | |
| MergeLine line = lines.top(); | | MergeLine line = lines.top(); | |
| lines.pop(); | | lines.pop(); | |
| if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks
iz))) { | | if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks
iz))) { | |
|
| if (!call_reducer(lkbuf, lksiz, values)) err = true; | | if (!call_reducer(lkbuf, lksiz, values)) { | |
| | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer fail | |
| | | ed"); | |
| | | err = true; | |
| | | } | |
| values.clear(); | | values.clear(); | |
| } | | } | |
|
| values.push_back(std::string(line.vbuf, line.vsiz)); | | | |
| delete[] lkbuf; | | delete[] lkbuf; | |
| lkbuf = line.kbuf; | | lkbuf = line.kbuf; | |
| lksiz = line.ksiz; | | lksiz = line.ksiz; | |
|
| | | values.push_back(std::string(line.vbuf, line.vsiz)); | |
| line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | | line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | |
| if (line.kbuf) { | | if (line.kbuf) { | |
| lines.push(line); | | lines.push(line); | |
| } else { | | } else { | |
| delete line.cur; | | delete line.cur; | |
| } | | } | |
| } | | } | |
| if (lkbuf) { | | if (lkbuf) { | |
|
| if (!err && !call_reducer(lkbuf, lksiz, values)) err = true; | | if (!err && !call_reducer(lkbuf, lksiz, values)) { | |
| values.clear(); | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed | |
| | | "); | |
| | | err = true; | |
| | | } | |
| delete[] lkbuf; | | delete[] lkbuf; | |
| } | | } | |
| while (!lines.empty()) { | | while (!lines.empty()) { | |
| MergeLine line = lines.top(); | | MergeLine line = lines.top(); | |
| lines.pop(); | | lines.pop(); | |
| delete[] line.kbuf; | | delete[] line.kbuf; | |
| delete line.cur; | | delete line.cur; | |
| } | | } | |
|
| | | if (redtasks_) redtasks_->finish(); | |
| double etime = time(); | | double etime = time(); | |
| if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s
time)) err = true; | | if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s
time)) err = true; | |
| return !err; | | return !err; | |
| } | | } | |
| /** | | /** | |
| * Call the reducer. | | * Call the reducer. | |
| * @param kbuf the pointer to the key region. | | * @param kbuf the pointer to the key region. | |
| * @param ksiz the size of the key region. | | * @param ksiz the size of the key region. | |
| * @param values a vector of the values. | | * @param values a vector of the values. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { | | bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { | |
| _assert_(kbuf && ksiz <= MEMMAXSIZ); | | _assert_(kbuf && ksiz <= MEMMAXSIZ); | |
|
| | | if (redtasks_) { | |
| | | if (redaborted_) return false; | |
| | | ReduceTaskQueue::ReduceTask* task = | |
| | | new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values); | |
| | | redtasks_->add_task(task); | |
| | | return true; | |
| | | } | |
| bool err = false; | | bool err = false; | |
| ValueIterator iter(values.begin(), values.end()); | | ValueIterator iter(values.begin(), values.end()); | |
| if (!reduce(kbuf, ksiz, &iter)) err = true; | | if (!reduce(kbuf, ksiz, &iter)) err = true; | |
| return !err; | | return !err; | |
| } | | } | |
| /** Dummy constructor to forbid the use. */ | | /** Dummy constructor to forbid the use. */ | |
| MapReduce(const MapReduce&); | | MapReduce(const MapReduce&); | |
| /** Dummy Operator to forbid the use. */ | | /** Dummy Operator to forbid the use. */ | |
| MapReduce& operator =(const MapReduce&); | | MapReduce& operator =(const MapReduce&); | |
| /** The internal database. */ | | /** The internal database. */ | |
| BasicDB* db_; | | BasicDB* db_; | |
| /** The record comparator. */ | | /** The record comparator. */ | |
| Comparator* rcomp_; | | Comparator* rcomp_; | |
| /** The temporary databases. */ | | /** The temporary databases. */ | |
| BasicDB** tmpdbs_; | | BasicDB** tmpdbs_; | |
| /** The number of temporary databases. */ | | /** The number of temporary databases. */ | |
| size_t dbnum_; | | size_t dbnum_; | |
| /** The logical clock for temporary databases. */ | | /** The logical clock for temporary databases. */ | |
| int64_t dbclock_; | | int64_t dbclock_; | |
|
| | | /** The number of the mapper threads. */ | |
| | | size_t mapthnum_; | |
| | | /** The number of the reducer threads. */ | |
| | | size_t redthnum_; | |
| /** The cache for emitter. */ | | /** The cache for emitter. */ | |
| TinyHashMap* cache_; | | TinyHashMap* cache_; | |
| /** The current size of the cache for emitter. */ | | /** The current size of the cache for emitter. */ | |
| int64_t csiz_; | | int64_t csiz_; | |
| /** The limit size of the cache for emitter. */ | | /** The limit size of the cache for emitter. */ | |
| int64_t clim_; | | int64_t clim_; | |
| /** The bucket number of the cache for emitter. */ | | /** The bucket number of the cache for emitter. */ | |
| int64_t cbnum_; | | int64_t cbnum_; | |
|
| | | /** The task queue for parallel reducer. */ | |
| | | TaskQueue* redtasks_; | |
| | | /** The flag whether aborted. */ | |
| | | bool redaborted_; | |
| | | /** The whole lock. */ | |
| | | Mutex* lock_; | |
| }; | | }; | |
| | | | |
| /** | | /** | |
| * Index database. | | * Index database. | |
| * @note This class is designed to implement an indexing storage with an ef
ficient appending | | * @note This class is designed to implement an indexing storage with an ef
ficient appending | |
| * operation for the existing record values. This class is a wrapper of th
e polymorphic | | * operation for the existing record values. This class is a wrapper of th
e polymorphic | |
| * database, featuring buffering mechanism to alleviate IO overhead in the
database layer. This | | * database, featuring buffering mechanism to alleviate IO overhead in the
database layer. This | |
| * class can be inherited but overwriting methods is forbidden. Before eve
ry database operation, | | * class can be inherited but overwriting methods is forbidden. Before eve
ry database operation, | |
| * it is necessary to call the IndexDB::open method in order to open a data
base file and connect | | * it is necessary to call the IndexDB::open method in order to open a data
base file and connect | |
| * the database object to it. To avoid data missing or corruption, it is i
mportant to close | | * the database object to it. To avoid data missing or corruption, it is i
mportant to close | |
| | | | |
End of changes. 21 change blocks. |
| 16 lines changed or deleted | | 134 lines changed or added | |
|
| kcdirdb.h | | kcdirdb.h | |
| | | | |
| skipping to change at line 530 | | skipping to change at line 530 | |
| set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | | set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); | |
| return false; | | return false; | |
| } | | } | |
| ScopedVisitor svis(visitor); | | ScopedVisitor svis(visitor); | |
| bool err = false; | | bool err = false; | |
| if (!iterate_impl(visitor, checker)) err = true; | | if (!iterate_impl(visitor, checker)) err = true; | |
| trigger_meta(MetaTrigger::ITERATE, "iterate"); | | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |
| return !err; | | return !err; | |
| } | | } | |
| /** | | /** | |
|
| | | * Scan each record in parallel. | |
| | | * @param visitor a visitor object. | |
| | | * @param thnum the number of worker threads. | |
| | | * @param checker a progress checker object. If it is NULL, no checking | |
| | | is performed. | |
| | | * @return true on success, or false on failure. | |
| | | * @note This function is for reading records and not for updating ones. | |
| | | The return value of | |
| | | * the visitor is just ignored. To avoid deadlock, any explicit database | |
| | | operation must not | |
| | | * be performed in this function. | |
| | | */ | |
| | | bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | |
| | | er = NULL) { | |
| | | _assert_(visitor && thnum <= MEMMAXSIZ); | |
| | | ScopedRWLock lock(&mlock_, false); | |
| | | if (omode_ == 0) { | |
| | | set_error(_KCCODELINE_, Error::INVALID, "not opened"); | |
| | | return false; | |
| | | } | |
| | | if (thnum < 1) thnum = 0; | |
| | | if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | |
| | | ScopedVisitor svis(visitor); | |
| | | rlock_.lock_reader_all(); | |
| | | bool err = false; | |
| | | if (!scan_parallel_impl(visitor, thnum, checker)) err = true; | |
| | | rlock_.unlock_all(); | |
| | | trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | |
| | | return !err; | |
| | | } | |
| | | /** | |
| * Get the last happened error. | | * Get the last happened error. | |
| * @return the last happened error. | | * @return the last happened error. | |
| */ | | */ | |
| Error error() const { | | Error error() const { | |
| _assert_(true); | | _assert_(true); | |
| return error_; | | return error_; | |
| } | | } | |
| /** | | /** | |
| * Set the error information. | | * Set the error information. | |
| * @param file the file name of the program source code. | | * @param file the file name of the program source code. | |
| | | | |
| skipping to change at line 2030 | | skipping to change at line 2057 | |
| return !err; | | return !err; | |
| } | | } | |
| /** | | /** | |
| * Iterate to accept a visitor for each record. | | * Iterate to accept a visitor for each record. | |
| * @param visitor a visitor object. | | * @param visitor a visitor object. | |
| * @param checker a progress checker object. | | * @param checker a progress checker object. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool iterate_impl(Visitor* visitor, ProgressChecker* checker) { | | bool iterate_impl(Visitor* visitor, ProgressChecker* checker) { | |
| _assert_(visitor); | | _assert_(visitor); | |
|
| DirStream dir; | | | |
| if (!dir.open(path_)) { | | | |
| set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed"); | | | |
| return false; | | | |
| } | | | |
| int64_t allcnt = count_; | | int64_t allcnt = count_; | |
| if (checker && !checker->check("iterate", "beginning", 0, allcnt)) { | | if (checker && !checker->check("iterate", "beginning", 0, allcnt)) { | |
| set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| return false; | | return false; | |
| } | | } | |
|
| | | DirStream dir; | |
| | | if (!dir.open(path_)) { | |
| | | set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed"); | |
| | | return false; | |
| | | } | |
| bool err = false; | | bool err = false; | |
| std::string name; | | std::string name; | |
| int64_t curcnt = 0; | | int64_t curcnt = 0; | |
| while (dir.read(&name)) { | | while (dir.read(&name)) { | |
| if (*name.c_str() == *KCDDBMAGICFILE) continue; | | if (*name.c_str() == *KCDDBMAGICFILE) continue; | |
| const std::string& rpath = path_ + File::PATHCHR + name; | | const std::string& rpath = path_ + File::PATHCHR + name; | |
| Record rec; | | Record rec; | |
| if (read_record(rpath, &rec)) { | | if (read_record(rpath, &rec)) { | |
| if (!accept_visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, rec.
rsiz, | | if (!accept_visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, rec.
rsiz, | |
| visitor, rpath, name.c_str())) err = true; | | visitor, rpath, name.c_str())) err = true; | |
| | | | |
| skipping to change at line 2062 | | skipping to change at line 2089 | |
| set_error(_KCCODELINE_, Error::BROKEN, "missing record"); | | set_error(_KCCODELINE_, Error::BROKEN, "missing record"); | |
| err = true; | | err = true; | |
| } | | } | |
| curcnt++; | | curcnt++; | |
| if (checker && !checker->check("iterate", "processing", curcnt, allcn
t)) { | | if (checker && !checker->check("iterate", "processing", curcnt, allcn
t)) { | |
| set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| err = true; | | err = true; | |
| break; | | break; | |
| } | | } | |
| } | | } | |
|
| | | if (!dir.close()) { | |
| | | set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed"); | |
| | | err = true; | |
| | | } | |
| if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |
| set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| err = true; | | err = true; | |
| } | | } | |
|
| | | return !err; | |
| | | } | |
| | | /** | |
| | | * Scan each record in parallel. | |
| | | * @param visitor a visitor object. | |
| | | * @param thnum the number of worker threads. | |
| | | * @param checker a progress checker object. | |
| | | * @return true on success, or false on failure. | |
| | | */ | |
| | | bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* | |
| | | checker) { | |
| | | assert(visitor && thnum <= MEMMAXSIZ); | |
| | | int64_t allcnt = count_; | |
| | | if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt | |
| | | )) { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | return false; | |
| | | } | |
| | | DirStream dir; | |
| | | if (!dir.open(path_)) { | |
| | | set_error(_KCCODELINE_, Error::SYSTEM, "opening a directory failed"); | |
| | | return false; | |
| | | } | |
| | | class ThreadImpl : public Thread { | |
| | | public: | |
| | | explicit ThreadImpl() : | |
| | | db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), | |
| | | dir_(NULL), itmtx_(NULL), error_() {} | |
| | | void init(DirDB* db, Visitor* visitor, ProgressChecker* checker, int6 | |
| | | 4_t allcnt, | |
| | | DirStream* dir, Mutex* itmtx) { | |
| | | db_ = db; | |
| | | visitor_ = visitor; | |
| | | checker_ = checker; | |
| | | allcnt_ = allcnt; | |
| | | dir_ = dir; | |
| | | itmtx_ = itmtx; | |
| | | } | |
| | | const Error& error() { | |
| | | return error_; | |
| | | } | |
| | | private: | |
| | | void run() { | |
| | | DirDB* db = db_; | |
| | | Visitor* visitor = visitor_; | |
| | | ProgressChecker* checker = checker_; | |
| | | int64_t allcnt = allcnt_; | |
| | | DirStream* dir = dir_; | |
| | | Mutex* itmtx = itmtx_; | |
| | | const std::string& path = db->path_; | |
| | | while (true) { | |
| | | itmtx->lock(); | |
| | | std::string name; | |
| | | if (!dir->read(&name)) { | |
| | | itmtx->unlock(); | |
| | | break; | |
| | | } | |
| | | itmtx->unlock(); | |
| | | if (*name.c_str() == *KCDDBMAGICFILE) continue; | |
| | | const std::string& rpath = path + File::PATHCHR + name; | |
| | | Record rec; | |
| | | if (db->read_record(rpath, &rec)) { | |
| | | size_t vsiz; | |
| | | visitor->visit_full(rec.kbuf, rec.ksiz, rec.vbuf, rec.vsiz, &vs | |
| | | iz); | |
| | | delete[] rec.rbuf; | |
| | | } else { | |
| | | error_ = db->error(); | |
| | | break; | |
| | | } | |
| | | if (checker && !checker->check("scan_parallel", "processing", -1, | |
| | | allcnt)) { | |
| | | db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | error_ = db->error(); | |
| | | break; | |
| | | } | |
| | | } | |
| | | } | |
| | | DirDB* db_; | |
| | | Visitor* visitor_; | |
| | | ProgressChecker* checker_; | |
| | | int64_t allcnt_; | |
| | | DirStream* dir_; | |
| | | Mutex* itmtx_; | |
| | | Error error_; | |
| | | }; | |
| | | bool err = false; | |
| | | Mutex itmtx; | |
| | | ThreadImpl* threads = new ThreadImpl[thnum]; | |
| | | for (size_t i = 0; i < thnum; i++) { | |
| | | ThreadImpl* thread = threads + i; | |
| | | thread->init(this, visitor, checker, allcnt, &dir, &itmtx); | |
| | | } | |
| | | for (size_t i = 0; i < thnum; i++) { | |
| | | ThreadImpl* thread = threads + i; | |
| | | thread->start(); | |
| | | } | |
| | | for (size_t i = 0; i < thnum; i++) { | |
| | | ThreadImpl* thread = threads + i; | |
| | | thread->join(); | |
| | | if (thread->error() != Error::SUCCESS) { | |
| | | *error_ = thread->error(); | |
| | | err = true; | |
| | | } | |
| | | } | |
| | | delete[] threads; | |
| if (!dir.close()) { | | if (!dir.close()) { | |
| set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed"); | | set_error(_KCCODELINE_, Error::SYSTEM, "closing a directory failed"); | |
| err = true; | | err = true; | |
| } | | } | |
|
| | | if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | |
| | | { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | err = true; | |
| | | } | |
| return !err; | | return !err; | |
| } | | } | |
| /** | | /** | |
| * Synchronize updated contents with the file and the device. | | * Synchronize updated contents with the file and the device. | |
| * @param hard true for physical synchronization with the device, or fals
e for logical | | * @param hard true for physical synchronization with the device, or fals
e for logical | |
| * synchronization with the file system. | | * synchronization with the file system. | |
| * @param proc a postprocessor object. | | * @param proc a postprocessor object. | |
| * @param checker a progress checker object. | | * @param checker a progress checker object. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| | | | |
End of changes. 6 change blocks. |
| 5 lines changed or deleted | | 151 lines changed or added | |
|
| kchashdb.h | | kchashdb.h | |
| | | | |
| skipping to change at line 723 | | skipping to change at line 723 | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| ScopedVisitor svis(visitor); | | ScopedVisitor svis(visitor); | |
| bool err = false; | | bool err = false; | |
| if (!iterate_impl(visitor, checker)) err = true; | | if (!iterate_impl(visitor, checker)) err = true; | |
| trigger_meta(MetaTrigger::ITERATE, "iterate"); | | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |
| return !err; | | return !err; | |
| } | | } | |
| /** | | /** | |
|
| | | * Scan each record in parallel. | |
| | | * @param visitor a visitor object. | |
| | | * @param thnum the number of worker threads. | |
| | | * @param checker a progress checker object. If it is NULL, no checking | |
| | | is performed. | |
| | | * @return true on success, or false on failure. | |
| | | * @note This function is for reading records and not for updating ones. | |
| | | The return value of | |
| | | * the visitor is just ignored. To avoid deadlock, any explicit database | |
| | | operation must not | |
| | | * be performed in this function. | |
| | | */ | |
| | | bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | |
| | | er = NULL) { | |
| | | _assert_(visitor && thnum <= MEMMAXSIZ); | |
| | | ScopedRWLock lock(&mlock_, false); | |
| | | if (omode_ == 0) { | |
| | | set_error(_KCCODELINE_, Error::INVALID, "not opened"); | |
| | | return false; | |
| | | } | |
| | | if (thnum < 1) thnum = 1; | |
| | | if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | |
| | | if ((int64_t)thnum > bnum_) thnum = bnum_; | |
| | | ScopedVisitor svis(visitor); | |
| | | rlock_.lock_reader_all(); | |
| | | bool err = false; | |
| | | if (!scan_parallel_impl(visitor, thnum, checker)) err = true; | |
| | | rlock_.unlock_all(); | |
| | | trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | |
| | | return !err; | |
| | | } | |
| | | /** | |
| * Get the last happened error. | | * Get the last happened error. | |
| * @return the last happened error. | | * @return the last happened error. | |
| */ | | */ | |
| Error error() const { | | Error error() const { | |
| _assert_(true); | | _assert_(true); | |
| return error_; | | return error_; | |
| } | | } | |
| /** | | /** | |
| * Set the error information. | | * Set the error information. | |
| * @param file the file name of the program source code. | | * @param file the file name of the program source code. | |
| | | | |
| skipping to change at line 2215 | | skipping to change at line 2243 | |
| } | | } | |
| } | | } | |
| } | | } | |
| if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |
| set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| return false; | | return false; | |
| } | | } | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
|
| | | * Scan each record in parallel. | |
| | | * @param visitor a visitor object. | |
| | | * @param thnum the number of worker threads. | |
| | | * @param checker a progress checker object. | |
| | | * @return true on success, or false on failure. | |
| | | */ | |
| | | bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* | |
| | | checker) { | |
| | | assert(visitor && thnum <= MEMMAXSIZ); | |
| | | int64_t allcnt = count_; | |
| | | if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt | |
| | | )) { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | return false; | |
| | | } | |
| | | bool err = false; | |
| | | std::vector<int64_t> offs; | |
| | | int64_t bnum = bnum_; | |
| | | size_t cap = (thnum + 1) * INT8MAX; | |
| | | for (int64_t bidx = 0; bidx < bnum; bidx++) { | |
| | | int64_t off = get_bucket(bidx); | |
| | | if (off > 0) { | |
| | | offs.push_back(off); | |
| | | if (offs.size() >= cap) break; | |
| | | } | |
| | | } | |
| | | if (!offs.empty()) { | |
| | | std::sort(offs.begin(), offs.end()); | |
| | | if (thnum > offs.size()) thnum = offs.size(); | |
| | | class ThreadImpl : public Thread { | |
| | | public: | |
| | | explicit ThreadImpl() : | |
| | | db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), | |
| | | begoff_(0), endoff_(0), error_() {} | |
| | | void init(HashDB* db, Visitor* visitor, ProgressChecker* checker, i | |
| | | nt64_t allcnt, | |
| | | int64_t begoff, int64_t endoff) { | |
| | | db_ = db; | |
| | | visitor_ = visitor; | |
| | | checker_ = checker; | |
| | | allcnt_ = allcnt; | |
| | | begoff_ = begoff; | |
| | | endoff_ = endoff; | |
| | | } | |
| | | const Error& error() { | |
| | | return error_; | |
| | | } | |
| | | private: | |
| | | void run() { | |
| | | HashDB* db = db_; | |
| | | Visitor* visitor = visitor_; | |
| | | ProgressChecker* checker = checker_; | |
| | | int64_t off = begoff_; | |
| | | int64_t end = endoff_; | |
| | | int64_t allcnt = allcnt_; | |
| | | Compressor* comp = db->comp_; | |
| | | Record rec; | |
| | | char rbuf[RECBUFSIZ]; | |
| | | while (off > 0 && off < end) { | |
| | | rec.off = off; | |
| | | if (!db->read_record(&rec, rbuf)) { | |
| | | error_ = db->error(); | |
| | | break; | |
| | | } | |
| | | if (rec.psiz == UINT16MAX) { | |
| | | off += rec.rsiz; | |
| | | } else { | |
| | | if (!rec.vbuf && !db->read_record_body(&rec)) { | |
| | | delete[] rec.bbuf; | |
| | | error_ = db->error(); | |
| | | break; | |
| | | } | |
| | | const char* vbuf = rec.vbuf; | |
| | | size_t vsiz = rec.vsiz; | |
| | | char* zbuf = NULL; | |
| | | size_t zsiz = 0; | |
| | | if (comp) { | |
| | | zbuf = comp->decompress(vbuf, vsiz, &zsiz); | |
| | | if (!zbuf) { | |
| | | db->set_error(_KCCODELINE_, Error::SYSTEM, "data decompre | |
| | | ssion failed"); | |
| | | delete[] rec.bbuf; | |
| | | error_ = db->error(); | |
| | | break; | |
| | | } | |
| | | vbuf = zbuf; | |
| | | vsiz = zsiz; | |
| | | } | |
| | | visitor->visit_full(rec.kbuf, rec.ksiz, vbuf, vsiz, &vsiz); | |
| | | delete[] zbuf; | |
| | | delete[] rec.bbuf; | |
| | | off += rec.rsiz; | |
| | | if (checker && !checker->check("scan_parallel", "processing", | |
| | | -1, allcnt)) { | |
| | | db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed") | |
| | | ; | |
| | | error_ = db->error(); | |
| | | break; | |
| | | } | |
| | | } | |
| | | } | |
| | | } | |
| | | HashDB* db_; | |
| | | Visitor* visitor_; | |
| | | ProgressChecker* checker_; | |
| | | int64_t allcnt_; | |
| | | int64_t begoff_; | |
| | | int64_t endoff_; | |
| | | Error error_; | |
| | | }; | |
| | | ThreadImpl* threads = new ThreadImpl[thnum]; | |
| | | double range = (double)offs.size() / thnum; | |
| | | for (size_t i = 0; i < thnum; i++) { | |
| | | int64_t cidx = i * range; | |
| | | int64_t nidx = (i + 1) * range; | |
| | | int64_t begoff = i < 1 ? roff_ : offs[cidx]; | |
| | | int64_t endoff = i < thnum - 1 ? offs[nidx] : (int64_t)lsiz_; | |
| | | ThreadImpl* thread = threads + i; | |
| | | thread->init(this, visitor, checker, allcnt, begoff, endoff); | |
| | | thread->start(); | |
| | | } | |
| | | for (size_t i = 0; i < thnum; i++) { | |
| | | ThreadImpl* thread = threads + i; | |
| | | thread->join(); | |
| | | if (thread->error() != Error::SUCCESS) { | |
| | | *error_ = thread->error(); | |
| | | err = true; | |
| | | } | |
| | | } | |
| | | delete[] threads; | |
| | | } | |
| | | if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | |
| | | { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | err = true; | |
| | | } | |
| | | return !err; | |
| | | } | |
| | | /** | |
| * Synchronize updated contents with the file and the device. | | * Synchronize updated contents with the file and the device. | |
| * @param hard true for physical synchronization with the device, or fals
e for logical | | * @param hard true for physical synchronization with the device, or fals
e for logical | |
| * synchronization with the file system. | | * synchronization with the file system. | |
| * @param proc a postprocessor object. | | * @param proc a postprocessor object. | |
| * @param checker a progress checker object. | | * @param checker a progress checker object. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* ch
ecker) { | | bool synchronize_impl(bool hard, FileProcessor* proc, ProgressChecker* ch
ecker) { | |
| _assert_(true); | | _assert_(true); | |
| bool err = false; | | bool err = false; | |
| | | | |
End of changes. 2 change blocks. |
| 0 lines changed or deleted | | 171 lines changed or added | |
|
| kcplantdb.h | | kcplantdb.h | |
| | | | |
| skipping to change at line 1262 | | skipping to change at line 1262 | |
| if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | | if (checker && !checker->check("iterate", "ending", -1, allcnt)) { | |
| set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| err = true; | | err = true; | |
| } | | } | |
| if (atran && !commit_transaction()) err = true; | | if (atran && !commit_transaction()) err = true; | |
| if (autosync_ && !autotran_ && writable && !fix_auto_synchronization())
err = true; | | if (autosync_ && !autotran_ && writable && !fix_auto_synchronization())
err = true; | |
| trigger_meta(MetaTrigger::ITERATE, "iterate"); | | trigger_meta(MetaTrigger::ITERATE, "iterate"); | |
| return !err; | | return !err; | |
| } | | } | |
| /** | | /** | |
|
| | | * Scan each record in parallel. | |
| | | * @param visitor a visitor object. | |
| | | * @param thnum the number of worker threads. | |
| | | * @param checker a progress checker object. If it is NULL, no checking | |
| | | is performed. | |
| | | * @return true on success, or false on failure. | |
| | | * @note This function is for reading records and not for updating ones. | |
| | | The return value of | |
| | | * the visitor is just ignored. To avoid deadlock, any explicit database | |
| | | operation must not | |
| | | * be performed in this function. | |
| | | */ | |
| | | bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* check | |
| | | er = NULL) { | |
| | | _assert_(visitor && thnum <= MEMMAXSIZ); | |
| | | ScopedRWLock lock(&mlock_, true); | |
| | | if (omode_ == 0) { | |
| | | set_error(_KCCODELINE_, Error::INVALID, "not opened"); | |
| | | return false; | |
| | | } | |
| | | if (thnum < 1) thnum = 0; | |
| | | if (thnum > (size_t)INT8MAX) thnum = INT8MAX; | |
| | | bool err = false; | |
| | | if (writer_) { | |
| | | if (checker && !checker->check("scan_parallel", "cleaning the leaf no | |
| | | de cache", -1, -1)) { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | return false; | |
| | | } | |
| | | if (!clean_leaf_cache()) err = true; | |
| | | } | |
| | | ScopedVisitor svis(visitor); | |
| | | int64_t allcnt = count_; | |
| | | if (checker && !checker->check("scan_parallel", "beginning", 0, allcnt) | |
| | | ) { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | return false; | |
| | | } | |
| | | class ProgressCheckerImpl : public ProgressChecker { | |
| | | public: | |
| | | explicit ProgressCheckerImpl() : ok_(1) {} | |
| | | void stop() { | |
| | | ok_.set(0); | |
| | | } | |
| | | private: | |
| | | bool check(const char* name, const char* message, int64_t curcnt, int | |
| | | 64_t allcnt) { | |
| | | return ok_ > 0; | |
| | | } | |
| | | AtomicInt64 ok_; | |
| | | }; | |
| | | ProgressCheckerImpl ichecker; | |
| | | class VisitorImpl : public Visitor { | |
| | | public: | |
| | | explicit VisitorImpl(PlantDB* db, Visitor* visitor, | |
| | | ProgressChecker* checker, int64_t allcnt, | |
| | | ProgressCheckerImpl* ichecker) : | |
| | | db_(db), visitor_(visitor), checker_(checker), allcnt_(allcnt), | |
| | | ichecker_(ichecker), error_() {} | |
| | | const Error& error() { | |
| | | return error_; | |
| | | } | |
| | | private: | |
| | | const char* visit_full(const char* kbuf, size_t ksiz, | |
| | | const char* vbuf, size_t vsiz, size_t* sp) { | |
| | | if (ksiz < 2 || ksiz >= NUMBUFSIZ || kbuf[0] != LNPREFIX) return NO | |
| | | P; | |
| | | uint64_t prev; | |
| | | size_t step = readvarnum(vbuf, vsiz, &prev); | |
| | | if (step < 1) return NOP; | |
| | | vbuf += step; | |
| | | vsiz -= step; | |
| | | uint64_t next; | |
| | | step = readvarnum(vbuf, vsiz, &next); | |
| | | if (step < 1) return NOP; | |
| | | vbuf += step; | |
| | | vsiz -= step; | |
| | | while (vsiz > 1) { | |
| | | uint64_t rksiz; | |
| | | step = readvarnum(vbuf, vsiz, &rksiz); | |
| | | if (step < 1) break; | |
| | | vbuf += step; | |
| | | vsiz -= step; | |
| | | uint64_t rvsiz; | |
| | | step = readvarnum(vbuf, vsiz, &rvsiz); | |
| | | if (step < 1) break; | |
| | | vbuf += step; | |
| | | vsiz -= step; | |
| | | if (vsiz < rksiz + rvsiz) break; | |
| | | size_t xvsiz; | |
| | | visitor_->visit_full(vbuf, rksiz, vbuf + rksiz, rvsiz, &xvsiz); | |
| | | vbuf += rksiz; | |
| | | vsiz -= rksiz; | |
| | | vbuf += rvsiz; | |
| | | vsiz -= rvsiz; | |
| | | if (checker_ && !checker_->check("scan_parallel", "processing", - | |
| | | 1, allcnt_)) { | |
| | | db_->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | error_ = db_->error(); | |
| | | ichecker_->stop(); | |
| | | break; | |
| | | } | |
| | | } | |
| | | return NOP; | |
| | | } | |
| | | PlantDB* db_; | |
| | | Visitor* visitor_; | |
| | | ProgressChecker* checker_; | |
| | | int64_t allcnt_; | |
| | | ProgressCheckerImpl* ichecker_; | |
| | | Error error_; | |
| | | }; | |
| | | VisitorImpl ivisitor(this, visitor, checker, allcnt, &ichecker); | |
| | | if (!db_.scan_parallel(&ivisitor, thnum, &ichecker)) err = true; | |
| | | if (ivisitor.error() != Error::SUCCESS) { | |
| | | const Error& e = ivisitor.error(); | |
| | | db_.set_error(_KCCODELINE_, e.code(), e.message()); | |
| | | err = true; | |
| | | } | |
| | | if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) | |
| | | { | |
| | | set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); | |
| | | err = true; | |
| | | } | |
| | | trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); | |
| | | return !err; | |
| | | } | |
| | | /** | |
| * Get the last happened error. | | * Get the last happened error. | |
| * @return the last happened error. | | * @return the last happened error. | |
| */ | | */ | |
| Error error() const { | | Error error() const { | |
| _assert_(true); | | _assert_(true); | |
| return db_.error(); | | return db_.error(); | |
| } | | } | |
| /** | | /** | |
| * Set the error information. | | * Set the error information. | |
| * @param file the file name of the program source code. | | * @param file the file name of the program source code. | |
| | | | |
End of changes. 1 change blocks. |
| 0 lines changed or deleted | | 128 lines changed or added | |
|