| kcdbext.h | | kcdbext.h | |
| | | | |
| skipping to change at line 44 | | skipping to change at line 44 | |
| | | | |
| namespace kyotocabinet { // common namespace | | namespace kyotocabinet { // common namespace | |
| | | | |
| /** | | /** | |
| * MapReduce framework. | | * MapReduce framework. | |
| * @note Although this framework is not distributed or concurrent, it is us
eful for aggregate | | * @note Although this framework is not distributed or concurrent, it is us
eful for aggregate | |
| * calculation with less CPU loading and less memory usage. | | * calculation with less CPU loading and less memory usage. | |
| */ | | */ | |
| class MapReduce { | | class MapReduce { | |
| public: | | public: | |
|
| class MapEmitter; | | | |
| class ValueIterator; | | class ValueIterator; | |
| private: | | private: | |
| class MapVisitor; | | class MapVisitor; | |
| struct MergeLine; | | struct MergeLine; | |
| /** An alias of vector of loaded values. */ | | /** An alias of vector of loaded values. */ | |
| typedef std::vector<std::string> Values; | | typedef std::vector<std::string> Values; | |
| /** The default number of temporary databases. */ | | /** The default number of temporary databases. */ | |
| static const size_t DEFDBNUM = 8; | | static const size_t DEFDBNUM = 8; | |
| /** The maxinum number of temporary databases. */ | | /** The maxinum number of temporary databases. */ | |
| static const size_t MAXDBNUM = 256; | | static const size_t MAXDBNUM = 256; | |
| | | | |
| skipping to change at line 69 | | skipping to change at line 68 | |
| /** The bucket number of temprary databases. */ | | /** The bucket number of temprary databases. */ | |
| static const int64_t DBBNUM = 512LL << 10; | | static const int64_t DBBNUM = 512LL << 10; | |
| /** The page size of temprary databases. */ | | /** The page size of temprary databases. */ | |
| static const int32_t DBPSIZ = 32768; | | static const int32_t DBPSIZ = 32768; | |
| /** The mapped size of temprary databases. */ | | /** The mapped size of temprary databases. */ | |
| static const int64_t DBMSIZ = 516LL * 4096; | | static const int64_t DBMSIZ = 516LL * 4096; | |
| /** The page cache capacity of temprary databases. */ | | /** The page cache capacity of temprary databases. */ | |
| static const int64_t DBPCCAP = 16LL << 20; | | static const int64_t DBPCCAP = 16LL << 20; | |
| public: | | public: | |
| /** | | /** | |
|
| * Data emitter for the mapper. | | | |
| */ | | | |
| class MapEmitter { | | | |
| friend class MapReduce; | | | |
| friend class MapReduce::MapVisitor; | | | |
| public: | | | |
| /** | | | |
| * Emit a record from the mapper. | | | |
| * @param kbuf the pointer to the key region. | | | |
| * @param ksiz the size of the key region. | | | |
| * @param vbuf the pointer to the value region. | | | |
| * @param vsiz the size of the value region. | | | |
| * @return true on success, or false on failure. | | | |
| */ | | | |
| bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) | | | |
| { | | | |
| _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | | | |
| bool err = false; | | | |
| size_t rsiz = sizevarnum(vsiz) + vsiz; | | | |
| char stack[NUMBUFSIZ*4]; | | | |
| char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | | | |
| char* wp = rbuf; | | | |
| wp += writevarnum(rbuf, vsiz); | | | |
| std::memcpy(wp, vbuf, vsiz); | | | |
| mr_->cache_->append(kbuf, ksiz, rbuf, rsiz); | | | |
| if (rbuf != stack) delete[] rbuf; | | | |
| mr_->csiz_ += rsiz; | | | |
| return !err; | | | |
| } | | | |
| private: | | | |
| /** | | | |
| * Default constructor. | | | |
| */ | | | |
| explicit MapEmitter(MapReduce* mr) : mr_(mr) { | | | |
| _assert_(true); | | | |
| } | | | |
| /** | | | |
| * Destructor. | | | |
| */ | | | |
| ~MapEmitter() { | | | |
| _assert_(true); | | | |
| } | | | |
| /** Dummy constructor to forbid the use. */ | | | |
| MapEmitter(const MapEmitter&); | | | |
| /** Dummy Operator to forbid the use. */ | | | |
| MapEmitter& operator =(const MapEmitter&); | | | |
| /** The owner object. */ | | | |
| MapReduce* mr_; | | | |
| }; | | | |
| /** | | | |
| * Value iterator for the reducer. | | * Value iterator for the reducer. | |
| */ | | */ | |
| class ValueIterator { | | class ValueIterator { | |
| friend class MapReduce; | | friend class MapReduce; | |
| public: | | public: | |
| /** | | /** | |
| * Get the next value. | | * Get the next value. | |
| * @param sp the pointer to the variable into which the size of the reg
ion of the return | | * @param sp the pointer to the variable into which the size of the reg
ion of the return | |
| * value is assigned. | | * value is assigned. | |
| * @return the pointer to the next value region, or NULL if no value re
mains. | | * @return the pointer to the next value region, or NULL if no value re
mains. | |
| | | | |
| skipping to change at line 202 | | skipping to change at line 152 | |
| */ | | */ | |
| virtual ~MapReduce() { | | virtual ~MapReduce() { | |
| _assert_(true); | | _assert_(true); | |
| } | | } | |
| /** | | /** | |
| * Map a record data. | | * Map a record data. | |
| * @param kbuf the pointer to the key region. | | * @param kbuf the pointer to the key region. | |
| * @param ksiz the size of the key region. | | * @param ksiz the size of the key region. | |
| * @param vbuf the pointer to the value region. | | * @param vbuf the pointer to the value region. | |
| * @param vsiz the size of the value region. | | * @param vsiz the size of the value region. | |
|
| * @param emitter the emitter object. | | | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
|
| * @note To avoid deadlock, any explicit database operation must not be p | | * @note This function can call the MapReduce::emit method to emit a reco | |
| erformed in this | | rd. To avoid | |
| * function. | | * deadlock, any explicit database operation must not be performed in thi | |
| | | s function. | |
| */ | | */ | |
|
| virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t | | virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t | |
| vsiz, | | vsiz) = 0; | |
| MapEmitter* emitter) = 0; | | | |
| /** | | /** | |
| * Reduce a record data. | | * Reduce a record data. | |
| * @param kbuf the pointer to the key region. | | * @param kbuf the pointer to the key region. | |
| * @param ksiz the size of the key region. | | * @param ksiz the size of the key region. | |
| * @param iter the iterator to get the values. | | * @param iter the iterator to get the values. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| * @note To avoid deadlock, any explicit database operation must not be p
erformed in this | | * @note To avoid deadlock, any explicit database operation must not be p
erformed in this | |
| * function. | | * function. | |
| */ | | */ | |
| virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) =
0; | | virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) =
0; | |
| /** | | /** | |
| * Preprocess the map operations. | | * Preprocess the map operations. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
|
| | | * @note This function can call the MapReduce::emit method to emit a reco | |
| | | rd. To avoid | |
| | | * deadlock, any explicit database operation must not be performed in thi | |
| | | s function. | |
| */ | | */ | |
| virtual bool preprocess() { | | virtual bool preprocess() { | |
| _assert_(true); | | _assert_(true); | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
| * Mediate between the map and the reduce phases. | | * Mediate between the map and the reduce phases. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
|
| | | * @note This function can call the MapReduce::emit method to emit a reco | |
| | | rd. To avoid | |
| | | * deadlock, any explicit database operation must not be performed in thi | |
| | | s function. | |
| */ | | */ | |
| virtual bool midprocess() { | | virtual bool midprocess() { | |
| _assert_(true); | | _assert_(true); | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
| * Postprocess the reduce operations. | | * Postprocess the reduce operations. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
|
| | | * @note To avoid deadlock, any explicit database operation must not be p | |
| | | erformed in this | |
| | | * function. | |
| */ | | */ | |
| virtual bool postprocess() { | | virtual bool postprocess() { | |
| _assert_(true); | | _assert_(true); | |
| return true; | | return true; | |
| } | | } | |
| /** | | /** | |
| * Process a log message. | | * Process a log message. | |
| * @param name the name of the event. | | * @param name the name of the event. | |
| * @param message a supplement message. | | * @param message a supplement message. | |
| * @return true on success, or false on failure. | | * @return true on success, or false on failure. | |
| | | | |
| skipping to change at line 359 | | skipping to change at line 313 | |
| delete[] tmpdbs_; | | delete[] tmpdbs_; | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| if (opts & XNOLOCK) { | | if (opts & XNOLOCK) { | |
| MapChecker mapchecker; | | MapChecker mapchecker; | |
| MapVisitor mapvisitor(this, &mapchecker, db->count()); | | MapVisitor mapvisitor(this, &mapchecker, db->count()); | |
| mapvisitor.visit_before(); | | mapvisitor.visit_before(); | |
| if (!err) { | | if (!err) { | |
| BasicDB::Cursor* cur = db->cursor(); | | BasicDB::Cursor* cur = db->cursor(); | |
|
| if (!cur->jump()) err = true; | | if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr
ue; | |
| while (!err) { | | while (!err) { | |
| if (!cur->accept(&mapvisitor, false, true)) { | | if (!cur->accept(&mapvisitor, false, true)) { | |
| if (cur->error() != BasicDB::Error::NOREC) err = true; | | if (cur->error() != BasicDB::Error::NOREC) err = true; | |
| break; | | break; | |
| } | | } | |
| } | | } | |
| delete cur; | | delete cur; | |
| } | | } | |
| if (mapvisitor.error()) err = true; | | if (mapvisitor.error()) err = true; | |
| mapvisitor.visit_after(); | | mapvisitor.visit_after(); | |
| | | | |
| skipping to change at line 414 | | skipping to change at line 368 | |
| * @param cbnum the bucket number of the internal cache. | | * @param cbnum the bucket number of the internal cache. | |
| */ | | */ | |
| void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { | | void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { | |
| _assert_(true); | | _assert_(true); | |
| dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | | dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; | |
| if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | | if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; | |
| clim_ = clim > 0 ? clim : DEFCLIM; | | clim_ = clim > 0 ? clim : DEFCLIM; | |
| cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | | cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; | |
| if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | | if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); | |
| } | | } | |
|
| | | protected: | |
| | | /** | |
| | | * Emit a record from the mapper. | |
| | | * @param kbuf the pointer to the key region. | |
| | | * @param ksiz the size of the key region. | |
| | | * @param vbuf the pointer to the value region. | |
| | | * @param vsiz the size of the value region. | |
| | | * @return true on success, or false on failure. | |
| | | */ | |
| | | bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { | |
| | | _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); | |
| | | bool err = false; | |
| | | size_t rsiz = sizevarnum(vsiz) + vsiz; | |
| | | char stack[NUMBUFSIZ*4]; | |
| | | char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; | |
| | | char* wp = rbuf; | |
| | | wp += writevarnum(rbuf, vsiz); | |
| | | std::memcpy(wp, vbuf, vsiz); | |
| | | cache_->append(kbuf, ksiz, rbuf, rsiz); | |
| | | if (rbuf != stack) delete[] rbuf; | |
| | | csiz_ += rsiz; | |
| | | return !err; | |
| | | } | |
| private: | | private: | |
| /** | | /** | |
| * Checker for the map process. | | * Checker for the map process. | |
| */ | | */ | |
| class MapChecker : public BasicDB::ProgressChecker { | | class MapChecker : public BasicDB::ProgressChecker { | |
| public: | | public: | |
| /** constructor */ | | /** constructor */ | |
| explicit MapChecker() : stop_(false) {} | | explicit MapChecker() : stop_(false) {} | |
| /** stop the process */ | | /** stop the process */ | |
| void stop() { | | void stop() { | |
| | | | |
| skipping to change at line 444 | | skipping to change at line 421 | |
| } | | } | |
| bool stop_; ///< flag for stop | | bool stop_; ///< flag for stop | |
| }; | | }; | |
| /** | | /** | |
| * Visitor for the map process. | | * Visitor for the map process. | |
| */ | | */ | |
| class MapVisitor : public BasicDB::Visitor { | | class MapVisitor : public BasicDB::Visitor { | |
| public: | | public: | |
| /** constructor */ | | /** constructor */ | |
| explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale)
: | | explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale)
: | |
|
| mr_(mr), checker_(checker), emitter_(mr), scale_(scale), | | mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) { | |
| stime_(0), err_(false) {} | | } | |
| /** get the error flag */ | | /** get the error flag */ | |
| bool error() { | | bool error() { | |
| return err_; | | return err_; | |
| } | | } | |
| /** preprocess the mappter */ | | /** preprocess the mappter */ | |
| void visit_before() { | | void visit_before() { | |
|
| if (!mr_->preprocess()) err_ = true; | | | |
| stime_ = time(); | | | |
| mr_->dbclock_ = 0; | | mr_->dbclock_ = 0; | |
| mr_->cache_ = new TinyHashMap(mr_->cbnum_); | | mr_->cache_ = new TinyHashMap(mr_->cbnum_); | |
| mr_->csiz_ = 0; | | mr_->csiz_ = 0; | |
|
| | | if (!mr_->preprocess()) err_ = true; | |
| | | if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | |
| if (!mr_->logf("map", "started the map process: scale=%lld", (long lo
ng)scale_)) | | if (!mr_->logf("map", "started the map process: scale=%lld", (long lo
ng)scale_)) | |
| err_ = true; | | err_ = true; | |
|
| | | stime_ = time(); | |
| } | | } | |
| /** postprocess the mappter and call the reducer */ | | /** postprocess the mappter and call the reducer */ | |
| void visit_after() { | | void visit_after() { | |
| if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | | if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | |
|
| delete mr_->cache_; | | | |
| double etime = time(); | | double etime = time(); | |
| if (!mr_->logf("map", "the map process finished: time=%.6f", etime -
stime_)) | | if (!mr_->logf("map", "the map process finished: time=%.6f", etime -
stime_)) | |
| err_ = true; | | err_ = true; | |
| if (!mr_->midprocess()) err_ = true; | | if (!mr_->midprocess()) err_ = true; | |
|
| | | if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | |
| | | delete mr_->cache_; | |
| if (!err_ && !mr_->execute_reduce()) err_ = true; | | if (!err_ && !mr_->execute_reduce()) err_ = true; | |
| if (!mr_->postprocess()) err_ = true; | | if (!mr_->postprocess()) err_ = true; | |
| } | | } | |
| private: | | private: | |
| /** visit a record */ | | /** visit a record */ | |
| const char* visit_full(const char* kbuf, size_t ksiz, | | const char* visit_full(const char* kbuf, size_t ksiz, | |
| const char* vbuf, size_t vsiz, size_t* sp) { | | const char* vbuf, size_t vsiz, size_t* sp) { | |
|
| if (!mr_->map(kbuf, ksiz, vbuf, vsiz, &emitter_)) { | | if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { | |
| checker_->stop(); | | checker_->stop(); | |
| err_ = true; | | err_ = true; | |
| } | | } | |
| if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | | if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | |
| checker_->stop(); | | checker_->stop(); | |
| err_ = true; | | err_ = true; | |
| } | | } | |
| return NOP; | | return NOP; | |
| } | | } | |
| MapReduce* mr_; ///< driver | | MapReduce* mr_; ///< driver | |
| MapChecker* checker_; ///< checker | | MapChecker* checker_; ///< checker | |
|
| MapEmitter emitter_; ///< emitter | | | |
| int64_t scale_; ///< number of records | | int64_t scale_; ///< number of records | |
| double stime_; ///< start time | | double stime_; ///< start time | |
| bool err_; ///< error flag | | bool err_; ///< error flag | |
| }; | | }; | |
| /** | | /** | |
| * Front line of a merging list. | | * Front line of a merging list. | |
| */ | | */ | |
| struct MergeLine { | | struct MergeLine { | |
| BasicDB::Cursor* cur; ///< cursor | | BasicDB::Cursor* cur; ///< cursor | |
| Comparator* rcomp; ///< record comparator | | Comparator* rcomp; ///< record comparator | |
| | | | |
| skipping to change at line 1109 | | skipping to change at line 1086 | |
| * code is appended at the end of the region of the return value, the ret
urn value can be | | * code is appended at the end of the region of the return value, the ret
urn value can be | |
| * treated as a C-style string. Because the region of the return value i
s allocated with the | | * treated as a C-style string. Because the region of the return value i
s allocated with the | |
| * the new[] operator, it should be released with the delete[] operator w
hen it is no longer | | * the new[] operator, it should be released with the delete[] operator w
hen it is no longer | |
| * in use. | | * in use. | |
| */ | | */ | |
| char* get(const char* kbuf, size_t ksiz, size_t* sp) { | | char* get(const char* kbuf, size_t ksiz, size_t* sp) { | |
| _assert_(kbuf && ksiz <= MEMMAXSIZ && sp); | | _assert_(kbuf && ksiz <= MEMMAXSIZ && sp); | |
| ScopedRWLock lock(&mlock_, false); | | ScopedRWLock lock(&mlock_, false); | |
| if (omode_ == 0) { | | if (omode_ == 0) { | |
| set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); | | set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); | |
|
| | | *sp = 0; | |
| return false; | | return false; | |
| } | | } | |
| if (!cache_) return db_.get(kbuf, ksiz, sp); | | if (!cache_) return db_.get(kbuf, ksiz, sp); | |
| size_t dvsiz = 0; | | size_t dvsiz = 0; | |
| char* dvbuf = db_.get(kbuf, ksiz, &dvsiz); | | char* dvbuf = db_.get(kbuf, ksiz, &dvsiz); | |
| size_t cvsiz = 0; | | size_t cvsiz = 0; | |
| const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz); | | const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz); | |
| struct Record { | | struct Record { | |
| char* buf; | | char* buf; | |
| size_t size; | | size_t size; | |
| | | | |
| skipping to change at line 1137 | | skipping to change at line 1115 | |
| Record* rp = recs + i; | | Record* rp = recs + i; | |
| rp->buf = tmpdb->get(kbuf, ksiz, &rp->size); | | rp->buf = tmpdb->get(kbuf, ksiz, &rp->size); | |
| if (rp->buf) { | | if (rp->buf) { | |
| rsiz += rp->size; | | rsiz += rp->size; | |
| hit = true; | | hit = true; | |
| } | | } | |
| } | | } | |
| } | | } | |
| if (!hit) { | | if (!hit) { | |
| delete[] recs; | | delete[] recs; | |
|
| if (!dvbuf && !cvbuf) return NULL; | | if (!dvbuf && !cvbuf) { | |
| | | *sp = 0; | |
| | | return NULL; | |
| | | } | |
| if (!dvbuf) { | | if (!dvbuf) { | |
| dvbuf = new char[cvsiz+1]; | | dvbuf = new char[cvsiz+1]; | |
| std::memcpy(dvbuf, cvbuf, cvsiz); | | std::memcpy(dvbuf, cvbuf, cvsiz); | |
| *sp = cvsiz; | | *sp = cvsiz; | |
| return dvbuf; | | return dvbuf; | |
| } | | } | |
| if (!cvbuf) { | | if (!cvbuf) { | |
| *sp = dvsiz; | | *sp = dvsiz; | |
| return dvbuf; | | return dvbuf; | |
| } | | } | |
| | | | |
End of changes. 20 change blocks. |
| 67 lines changed or deleted | | 54 lines changed or added | |
|