| kcdbext.h | | kcdbext.h | |
| | | | |
| skipping to change at line 46 | | skipping to change at line 46 | |
| | | | |
| /** | | /** | |
| * MapReduce framework. | | * MapReduce framework. | |
| * @note Although this framework is not distributed or concurrent, it is us
eful for aggregate | | * @note Although this framework is not distributed or concurrent, it is us
eful for aggregate | |
| * calculation with less CPU loading and less memory usage. | | * calculation with less CPU loading and less memory usage. | |
| */ | | */ | |
| class MapReduce { | | class MapReduce { | |
| public: | | public: | |
| class ValueIterator; | | class ValueIterator; | |
| private: | | private: | |
|
| | | class FlushThread; | |
| | | class ReduceTaskQueue; | |
| class MapVisitor; | | class MapVisitor; | |
| struct MergeLine; | | struct MergeLine; | |
| /** An alias of vector of loaded values. */ | | /** An alias of vector of loaded values. */ | |
| typedef std::vector<std::string> Values; | | typedef std::vector<std::string> Values; | |
| /** The default number of temporary databases. */ | | /** The default number of temporary databases. */ | |
| static const size_t DEFDBNUM = 8; | | static const size_t DEFDBNUM = 8; | |
| /** The maxinum number of temporary databases. */ | | /** The maxinum number of temporary databases. */ | |
| static const size_t MAXDBNUM = 256; | | static const size_t MAXDBNUM = 256; | |
| /** The default cache limit. */ | | /** The default cache limit. */ | |
| static const int64_t DEFCLIM = 512LL << 20; | | static const int64_t DEFCLIM = 512LL << 20; | |
| | | | |
| skipping to change at line 68 | | skipping to change at line 70 | |
| /** The bucket number of temprary databases. */ | | /** The bucket number of temprary databases. */ | |
| static const int64_t DBBNUM = 512LL << 10; | | static const int64_t DBBNUM = 512LL << 10; | |
| /** The page size of temprary databases. */ | | /** The page size of temprary databases. */ | |
| static const int32_t DBPSIZ = 32768; | | static const int32_t DBPSIZ = 32768; | |
| /** The mapped size of temprary databases. */ | | /** The mapped size of temprary databases. */ | |
| static const int64_t DBMSIZ = 516LL * 4096; | | static const int64_t DBMSIZ = 516LL * 4096; | |
| /** The page cache capacity of temprary databases. */ | | /** The page cache capacity of temprary databases. */ | |
| static const int64_t DBPCCAP = 16LL << 20; | | static const int64_t DBPCCAP = 16LL << 20; | |
| /** The default number of threads in parallel mode. */ | | /** The default number of threads in parallel mode. */ | |
| static const size_t DEFTHNUM = 8; | | static const size_t DEFTHNUM = 8; | |
|
| | | /** The number of slots of the record lock. */ | |
| | | static const int32_t RLOCKSLOT = 256; | |
| public: | | public: | |
| /** | | /** | |
| * Value iterator for the reducer. | | * Value iterator for the reducer. | |
| */ | | */ | |
| class ValueIterator { | | class ValueIterator { | |
| friend class MapReduce; | | friend class MapReduce; | |
| public: | | public: | |
| /** | | /** | |
| * Get the next value. | | * Get the next value. | |
| * @param sp the pointer to the variable into which the size of the reg
ion of the return | | * @param sp the pointer to the variable into which the size of the reg
ion of the return | |
| | | | |
| skipping to change at line 134 | | skipping to change at line 138 | |
| /** The size of the current value. */ | | /** The size of the current value. */ | |
| size_t vsiz_; | | size_t vsiz_; | |
| }; | | }; | |
| /** | | /** | |
| * Execution options. | | * Execution options. | |
| */ | | */ | |
| enum Option { | | enum Option { | |
| XNOLOCK = 1 << 0, ///< avoid locking against update
operations | | XNOLOCK = 1 << 0, ///< avoid locking against update
operations | |
| XPARAMAP = 1 << 1, ///< run mappers in parallel | | XPARAMAP = 1 << 1, ///< run mappers in parallel | |
| XPARARED = 1 << 2, ///< run reducers in parallel | | XPARARED = 1 << 2, ///< run reducers in parallel | |
|
| | | XPARAFLS = 1 << 3, ///< run cache flushers in paralle
l | |
| XNOCOMP = 1 << 8 ///< avoid compression of temporar
y databases | | XNOCOMP = 1 << 8 ///< avoid compression of temporar
y databases | |
| }; | | }; | |
| /** | | /** | |
| * Default constructor. | | * Default constructor. | |
| */ | | */ | |
| explicit MapReduce() : | | explicit MapReduce() : | |
| db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0)
, | | db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0)
, | |
|
| mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), | | mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM), | |
| cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), | | cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NUL | |
| redtasks_(NULL), redaborted_(false), lock_(NULL) { | | L), | |
| | | redtasks_(NULL), redaborted_(false), rlocks_(NULL) { | |
| _assert_(true); | | _assert_(true); | |
| } | | } | |
| /** | | /** | |
| * Destructor. | | * Destructor. | |
| */ | | */ | |
| virtual ~MapReduce() { | | virtual ~MapReduce() { | |
| _assert_(true); | | _assert_(true); | |
| } | | } | |
| /** | | /** | |
| * Map a record data. | | * Map a record data. | |
| | | | |
| skipping to change at line 315 | | skipping to change at line 320 | |
| err = true; | | err = true; | |
| if (err) { | | if (err) { | |
| for (size_t i = 0; i < dbnum_; i++) { | | for (size_t i = 0; i < dbnum_; i++) { | |
| delete tmpdbs_[i]; | | delete tmpdbs_[i]; | |
| } | | } | |
| delete[] tmpdbs_; | | delete[] tmpdbs_; | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; | | if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; | |
|
| | | if (opts & XPARAFLS) flsths_ = new std::deque<FlushThread*>; | |
| if (opts & XNOLOCK) { | | if (opts & XNOLOCK) { | |
| MapChecker mapchecker; | | MapChecker mapchecker; | |
| MapVisitor mapvisitor(this, &mapchecker, db->count()); | | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |
| mapvisitor.visit_before(); | | mapvisitor.visit_before(); | |
| if (!err) { | | if (!err) { | |
| BasicDB::Cursor* cur = db->cursor(); | | BasicDB::Cursor* cur = db->cursor(); | |
| if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr
ue; | | if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr
ue; | |
| while (!err) { | | while (!err) { | |
| if (!cur->accept(&mapvisitor, false, true)) { | | if (!cur->accept(&mapvisitor, false, true)) { | |
| if (cur->error() != BasicDB::Error::NOREC) err = true; | | if (cur->error() != BasicDB::Error::NOREC) err = true; | |
| | | | |
| skipping to change at line 338 | | skipping to change at line 344 | |
| delete cur; | | delete cur; | |
| } | | } | |
| if (mapvisitor.error()) { | | if (mapvisitor.error()) { | |
| db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
); | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
); | |
| err = true; | | err = true; | |
| } | | } | |
| mapvisitor.visit_after(); | | mapvisitor.visit_after(); | |
| } else if (opts & XPARAMAP) { | | } else if (opts & XPARAMAP) { | |
| MapChecker mapchecker; | | MapChecker mapchecker; | |
| MapVisitor mapvisitor(this, &mapchecker, db->count()); | | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |
|
| lock_ = new Mutex(); | | rlocks_ = new SlottedMutex(RLOCKSLOT); | |
| if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker))
{ | | if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker))
{ | |
| db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
); | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
); | |
| err = true; | | err = true; | |
| } | | } | |
|
| delete lock_; | | delete rlocks_; | |
| lock_ = NULL; | | rlocks_ = NULL; | |
| if (mapvisitor.error()) err = true; | | if (mapvisitor.error()) err = true; | |
| } else { | | } else { | |
| MapChecker mapchecker; | | MapChecker mapchecker; | |
| MapVisitor mapvisitor(this, &mapchecker, db->count()); | | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |
| if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true
; | | if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true
; | |
| if (mapvisitor.error()) { | | if (mapvisitor.error()) { | |
| db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
); | | db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"
); | |
| err = true; | | err = true; | |
| } | | } | |
| } | | } | |
|
| | | if (flsths_) { | |
| | | delete flsths_; | |
| | | flsths_ = NULL; | |
| | | } | |
| if (redtasks_) { | | if (redtasks_) { | |
| delete redtasks_; | | delete redtasks_; | |
| redtasks_ = NULL; | | redtasks_ = NULL; | |
| } | | } | |
| if (!logf("clean", "closing the temporary databases")) err = true; | | if (!logf("clean", "closing the temporary databases")) err = true; | |
| stime = time(); | | stime = time(); | |
| for (size_t i = 0; i < dbnum_; i++) { | | for (size_t i = 0; i < dbnum_; i++) { | |
|
| assert(tmpdbs_[i]); | | | |
| const std::string& path = tmpdbs_[i]->path(); | | const std::string& path = tmpdbs_[i]->path(); | |
| if (!tmpdbs_[i]->clear()) { | | if (!tmpdbs_[i]->clear()) { | |
| const BasicDB::Error& e = tmpdbs_[i]->error(); | | const BasicDB::Error& e = tmpdbs_[i]->error(); | |
| db->set_error(_KCCODELINE_, e.code(), e.message()); | | db->set_error(_KCCODELINE_, e.code(), e.message()); | |
| err = true; | | err = true; | |
| } | | } | |
| if (!tmpdbs_[i]->close()) { | | if (!tmpdbs_[i]->close()) { | |
| const BasicDB::Error& e = tmpdbs_[i]->error(); | | const BasicDB::Error& e = tmpdbs_[i]->error(); | |
| db->set_error(_KCCODELINE_, e.code(), e.message()); | | db->set_error(_KCCODELINE_, e.code(), e.message()); | |
| err = true; | | err = true; | |
| | | | |
| skipping to change at line 401 | | skipping to change at line 410 | |
| dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | | dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | |
| if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | | if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | |
| clim_ = clim > 0 ? clim : DEFCLIM; | | clim_ = clim > 0 ? clim : DEFCLIM; | |
| cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | | cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | |
| if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | | if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | |
| } | | } | |
| /** | | /** | |
| * Set the thread configurations. | | * Set the thread configurations. | |
| * @param mapthnum the number of threads for the mapper. | | * @param mapthnum the number of threads for the mapper. | |
| * @param redthnum the number of threads for the reducer. | | * @param redthnum the number of threads for the reducer. | |
|
| | | * @param flsthnum the number of threads for the internal flusher. | |
| */ | | */ | |
|
| void tune_thread(int32_t mapthnum, int32_t redthnum) { | | void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) { | |
| _assert_(true); | | _assert_(true); | |
| mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; | | mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; | |
| redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; | | redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; | |
|
| | | flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM; | |
| } | | } | |
| protected: | | protected: | |
| /** | | /** | |
| * Emit a record from the mapper. | | * Emit a record from the mapper. | |
| * @param kbuf the pointer to the key region. | | * @param kbuf the pointer to the key region. | |
| * @param ksiz the size of the key region. | | * @param ksiz the size of the key region. | |
| * @param vbuf the pointer to the value region. | | * @param vbuf the pointer to the value region. | |
| * @param vsiz the size of the value region. | | * @param vsiz the size of the value region. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { | | bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { | |
| _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | | _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | |
| bool err = false; | | bool err = false; | |
| size_t rsiz = sizevarnum(vsiz) + vsiz; | | size_t rsiz = sizevarnum(vsiz) + vsiz; | |
| char stack[NUMBUFSIZ*4]; | | char stack[NUMBUFSIZ*4]; | |
| char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | | char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | |
| char* wp = rbuf; | | char* wp = rbuf; | |
| wp += writevarnum(rbuf, vsiz); | | wp += writevarnum(rbuf, vsiz); | |
| std::memcpy(wp, vbuf, vsiz); | | std::memcpy(wp, vbuf, vsiz); | |
|
| if (lock_) { | | if (rlocks_) { | |
| lock_->lock(); | | size_t bidx = TinyHashMap::hash_record(kbuf, ksiz) % cbnum_; | |
| | | size_t lidx = bidx % RLOCKSLOT; | |
| | | rlocks_->lock(lidx); | |
| cache_->append(kbuf, ksiz, rbuf, rsiz); | | cache_->append(kbuf, ksiz, rbuf, rsiz); | |
|
| lock_->unlock(); | | rlocks_->unlock(lidx); | |
| } else { | | } else { | |
| cache_->append(kbuf, ksiz, rbuf, rsiz); | | cache_->append(kbuf, ksiz, rbuf, rsiz); | |
| } | | } | |
| if (rbuf != stack) delete[] rbuf; | | if (rbuf != stack) delete[] rbuf; | |
| csiz_ += rsiz; | | csiz_ += rsiz; | |
| return !err; | | return !err; | |
| } | | } | |
| private: | | private: | |
| /** | | /** | |
|
| | | * Cache flusher. | |
| | | */ | |
| | | class FlushThread : public Thread { | |
| | | public: | |
| | | /** constructor */ | |
| | | explicit FlushThread(MapReduce* mr, BasicDB* tmpdb, | |
| | | TinyHashMap* cache, size_t csiz, bool cown) : | |
| | | mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), er | |
| | | r_(false) {} | |
| | | /** perform the concrete process */ | |
| | | void run() { | |
| | | if (!mr_->logf("map", "started to flushing the cache: count=%lld size | |
| | | =%lld", | |
| | | (long long)cache_->count(), (long long)csiz_)) err_ = | |
| | | true; | |
| | | double stime = time(); | |
| | | BasicDB* tmpdb = tmpdb_; | |
| | | TinyHashMap* cache = cache_; | |
| | | bool cown = cown_; | |
| | | TinyHashMap::Sorter sorter(cache); | |
| | | const char* kbuf, *vbuf; | |
| | | size_t ksiz, vsiz; | |
| | | while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { | |
| | | if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { | |
| | | const BasicDB::Error& e = tmpdb->error(); | |
| | | mr_->db_->set_error(_KCCODELINE_, e.code(), e.message()); | |
| | | err_ = true; | |
| | | } | |
| | | sorter.step(); | |
| | | if (cown) cache->remove(kbuf, ksiz); | |
| | | } | |
| | | double etime = time(); | |
| | | if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime | |
| | | - stime)) | |
| | | err_ = true; | |
| | | if (cown) delete cache; | |
| | | } | |
| | | /** check the error flag. */ | |
| | | bool error() { | |
| | | return err_; | |
| | | } | |
| | | private: | |
| | | MapReduce* mr_; ///< driver | |
| | | BasicDB* tmpdb_; ///< temprary database | |
| | | TinyHashMap* cache_; ///< cache for emitter | |
| | | size_t csiz_; ///< current cache size | |
| | | bool cown_; ///< cache ownership flag | |
| | | bool err_; ///< error flag | |
| | | }; | |
| | | /** | |
| * Task queue for parallel reducer. | | * Task queue for parallel reducer. | |
| */ | | */ | |
| class ReduceTaskQueue : public TaskQueue { | | class ReduceTaskQueue : public TaskQueue { | |
| public: | | public: | |
| /** | | /** | |
| * Task for parallel reducer. | | * Task for parallel reducer. | |
| */ | | */ | |
| class ReduceTask : public Task { | | class ReduceTask : public Task { | |
| friend class ReduceTaskQueue; | | friend class ReduceTaskQueue; | |
| public: | | public: | |
| | | | |
| skipping to change at line 508 | | skipping to change at line 567 | |
| /** get the error flag */ | | /** get the error flag */ | |
| bool error() { | | bool error() { | |
| return err_; | | return err_; | |
| } | | } | |
| /** preprocess the mappter */ | | /** preprocess the mappter */ | |
| void visit_before() { | | void visit_before() { | |
| mr_->dbclock_ = 0; | | mr_->dbclock_ = 0; | |
| mr_->cache_ = new TinyHashMap(mr_->cbnum_); | | mr_->cache_ = new TinyHashMap(mr_->cbnum_); | |
| mr_->csiz_ = 0; | | mr_->csiz_ = 0; | |
| if (!mr_->preprocess()) err_ = true; | | if (!mr_->preprocess()) err_ = true; | |
|
| if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | | if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; | |
| if (!mr_->logf("map", "started the map process: scale=%lld", (long lo
ng)scale_)) | | if (!mr_->logf("map", "started the map process: scale=%lld", (long lo
ng)scale_)) | |
| err_ = true; | | err_ = true; | |
| stime_ = time(); | | stime_ = time(); | |
| } | | } | |
| /** postprocess the mappter and call the reducer */ | | /** postprocess the mappter and call the reducer */ | |
| void visit_after() { | | void visit_after() { | |
|
| if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | | if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; | |
| double etime = time(); | | double etime = time(); | |
| if (!mr_->logf("map", "the map process finished: time=%.6f", etime -
stime_)) | | if (!mr_->logf("map", "the map process finished: time=%.6f", etime -
stime_)) | |
| err_ = true; | | err_ = true; | |
| if (!mr_->midprocess()) err_ = true; | | if (!mr_->midprocess()) err_ = true; | |
|
| if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | | if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; | |
| delete mr_->cache_; | | delete mr_->cache_; | |
|
| | | if (mr_->flsths_ && !mr_->flsths_->empty()) { | |
| | | std::deque<FlushThread*>::iterator flthit = mr_->flsths_->begin(); | |
| | | std::deque<FlushThread*>::iterator flthitend = mr_->flsths_->end(); | |
| | | while (flthit != flthitend) { | |
| | | FlushThread* flth = *flthit; | |
| | | flth->join(); | |
| | | if (flth->error()) err_ = true; | |
| | | delete flth; | |
| | | ++flthit; | |
| | | } | |
| | | } | |
| if (!err_ && !mr_->execute_reduce()) err_ = true; | | if (!err_ && !mr_->execute_reduce()) err_ = true; | |
| if (!mr_->postprocess()) err_ = true; | | if (!mr_->postprocess()) err_ = true; | |
| } | | } | |
| private: | | private: | |
| /** visit a record */ | | /** visit a record */ | |
| const char* visit_full(const char* kbuf, size_t ksiz, | | const char* visit_full(const char* kbuf, size_t ksiz, | |
| const char* vbuf, size_t vsiz, size_t* sp) { | | const char* vbuf, size_t vsiz, size_t* sp) { | |
| if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { | | if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { | |
| checker_->stop(); | | checker_->stop(); | |
| err_ = true; | | err_ = true; | |
| } | | } | |
|
| if (mr_->lock_) { | | if (mr_->rlocks_) { | |
| mr_->lock_->lock(); | | if (mr_->csiz_ >= mr_->clim_) { | |
| if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | | mr_->rlocks_->lock_all(); | |
| checker_->stop(); | | if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | |
| err_ = true; | | checker_->stop(); | |
| | | err_ = true; | |
| | | } | |
| | | mr_->rlocks_->unlock_all(); | |
| } | | } | |
|
| mr_->lock_->unlock(); | | | |
| } else { | | } else { | |
| if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | | if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | |
| checker_->stop(); | | checker_->stop(); | |
| err_ = true; | | err_ = true; | |
| } | | } | |
| } | | } | |
| return NOP; | | return NOP; | |
| } | | } | |
| MapReduce* mr_; ///< driver | | MapReduce* mr_; ///< driver | |
| MapChecker* checker_; ///< checker | | MapChecker* checker_; ///< checker | |
| | | | |
| skipping to change at line 592 | | skipping to change at line 664 | |
| va_end(ap); | | va_end(ap); | |
| return log(name, message.c_str()); | | return log(name, message.c_str()); | |
| } | | } | |
| /** | | /** | |
| * Flush all cache records. | | * Flush all cache records. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool flush_cache() { | | bool flush_cache() { | |
| _assert_(true); | | _assert_(true); | |
| bool err = false; | | bool err = false; | |
|
| if (!logf("map", "started to flushing the cache: count=%lld size=%lld", | | | |
| (long long)cache_->count(), (long long)csiz_)) err = true; | | | |
| double stime = time(); | | | |
| BasicDB* tmpdb = tmpdbs_[dbclock_]; | | BasicDB* tmpdb = tmpdbs_[dbclock_]; | |
|
| TinyHashMap::Sorter sorter(cache_); | | dbclock_ = (dbclock_ + 1) % dbnum_; | |
| const char* kbuf, *vbuf; | | if (flsths_) { | |
| size_t ksiz, vsiz; | | size_t num = flsths_->size(); | |
| while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { | | if (num >= flsthnum_ || num >= dbnum_) { | |
| if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { | | FlushThread* flth = flsths_->front(); | |
| const BasicDB::Error& e = tmpdb->error(); | | flsths_->pop_front(); | |
| db_->set_error(_KCCODELINE_, e.code(), e.message()); | | flth->join(); | |
| err = true; | | if (flth->error()) err = true; | |
| | | delete flth; | |
| } | | } | |
|
| sorter.step(); | | FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true) | |
| | | ; | |
| | | cache_ = new TinyHashMap(cbnum_); | |
| | | csiz_ = 0; | |
| | | flth->start(); | |
| | | flsths_->push_back(flth); | |
| | | } else { | |
| | | FlushThread flth(this, tmpdb, cache_, csiz_, false); | |
| | | flth.run(); | |
| | | if (flth.error()) err = true; | |
| | | cache_->clear(); | |
| | | csiz_ = 0; | |
| } | | } | |
|
| cache_->clear(); | | | |
| csiz_ = 0; | | | |
| dbclock_ = (dbclock_ + 1) % dbnum_; | | | |
| double etime = time(); | | | |
| if (!logf("map", "flushing the cache finished: time=%.6f", etime - stim | | | |
| e)) err = true; | | | |
| return !err; | | return !err; | |
| } | | } | |
| /** | | /** | |
| * Execute the reduce part. | | * Execute the reduce part. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| */ | | */ | |
| bool execute_reduce() { | | bool execute_reduce() { | |
| bool err = false; | | bool err = false; | |
| int64_t scale = 0; | | int64_t scale = 0; | |
| for (size_t i = 0; i < dbnum_; i++) { | | for (size_t i = 0; i < dbnum_; i++) { | |
| | | | |
| skipping to change at line 721 | | skipping to change at line 796 | |
| /** The temporary databases. */ | | /** The temporary databases. */ | |
| BasicDB** tmpdbs_; | | BasicDB** tmpdbs_; | |
| /** The number of temporary databases. */ | | /** The number of temporary databases. */ | |
| size_t dbnum_; | | size_t dbnum_; | |
| /** The logical clock for temporary databases. */ | | /** The logical clock for temporary databases. */ | |
| int64_t dbclock_; | | int64_t dbclock_; | |
| /** The number of the mapper threads. */ | | /** The number of the mapper threads. */ | |
| size_t mapthnum_; | | size_t mapthnum_; | |
| /** The number of the reducer threads. */ | | /** The number of the reducer threads. */ | |
| size_t redthnum_; | | size_t redthnum_; | |
|
| | | /** The number of the flusher threads. */ | |
| | | size_t flsthnum_; | |
| /** The cache for emitter. */ | | /** The cache for emitter. */ | |
| TinyHashMap* cache_; | | TinyHashMap* cache_; | |
| /** The current size of the cache for emitter. */ | | /** The current size of the cache for emitter. */ | |
|
| int64_t csiz_; | | AtomicInt64 csiz_; | |
| /** The limit size of the cache for emitter. */ | | /** The limit size of the cache for emitter. */ | |
| int64_t clim_; | | int64_t clim_; | |
| /** The bucket number of the cache for emitter. */ | | /** The bucket number of the cache for emitter. */ | |
| int64_t cbnum_; | | int64_t cbnum_; | |
|
| | | /** The flush threads. */ | |
| | | std::deque<FlushThread*>* flsths_; | |
| /** The task queue for parallel reducer. */ | | /** The task queue for parallel reducer. */ | |
| TaskQueue* redtasks_; | | TaskQueue* redtasks_; | |
| /** The flag whether aborted. */ | | /** The flag whether aborted. */ | |
| bool redaborted_; | | bool redaborted_; | |
| /** The whole lock. */ | | /** The whole lock. */ | |
|
| Mutex* lock_; | | SlottedMutex* rlocks_; | |
| }; | | }; | |
| | | | |
| /** | | /** | |
| * Index database. | | * Index database. | |
| * @note This class is designed to implement an indexing storage with an ef
ficient appending | | * @note This class is designed to implement an indexing storage with an ef
ficient appending | |
| * operation for the existing record values. This class is a wrapper of th
e polymorphic | | * operation for the existing record values. This class is a wrapper of th
e polymorphic | |
| * database, featuring buffering mechanism to alleviate IO overhead in the
database layer. This | | * database, featuring buffering mechanism to alleviate IO overhead in the
database layer. This | |
| * class can be inherited but overwriting methods is forbidden. Before eve
ry database operation, | | * class can be inherited but overwriting methods is forbidden. Before eve
ry database operation, | |
| * it is necessary to call the IndexDB::open method in order to open a data
base file and connect | | * it is necessary to call the IndexDB::open method in order to open a data
base file and connect | |
| * the database object to it. To avoid data missing or corruption, it is i
mportant to close | | * the database object to it. To avoid data missing or corruption, it is i
mportant to close | |
| | | | |
End of changes. 29 change blocks. |
| 40 lines changed or deleted | | 124 lines changed or added | |
|