kcdbext.h | kcdbext.h | |||
---|---|---|---|---|
skipping to change at line 170 | skipping to change at line 170 | |||
* Execution options. | * Execution options. | |||
*/ | */ | |||
enum Option { | enum Option { | |||
XNOLOCK = 1 << 0, ///< avoid locking against update operations | XNOLOCK = 1 << 0, ///< avoid locking against update operations | |||
XNOCOMP = 1 << 1 ///< avoid compression of temporar y databases | XNOCOMP = 1 << 1 ///< avoid compression of temporar y databases | |||
}; | }; | |||
/** | /** | |||
* Default constructor. | * Default constructor. | |||
*/ | */ | |||
explicit MapReduce() : | explicit MapReduce() : | |||
tmpdbs_(NULL), dbnum_(MRDEFDBNUM), dbclock_(0), keyclock_(0), | rcomp_(NULL), tmpdbs_(NULL), dbnum_(MRDEFDBNUM), dbclock_(0), keyclock_ (0), | |||
cache_(NULL), csiz_(0), clim_(MRDEFCLIM), cbnum_(MRDEFCBNUM) { | cache_(NULL), csiz_(0), clim_(MRDEFCLIM), cbnum_(MRDEFCBNUM) { | |||
_assert_(true); | _assert_(true); | |||
} | } | |||
/** | /** | |||
* Destructor. | * Destructor. | |||
*/ | */ | |||
virtual ~MapReduce() { | virtual ~MapReduce() { | |||
_assert_(true); | _assert_(true); | |||
} | } | |||
/** | /** | |||
skipping to change at line 199 | skipping to change at line 199 | |||
* function. | * function. | |||
*/ | */ | |||
virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz, | virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz, | |||
MapEmitter* emitter) = 0; | MapEmitter* emitter) = 0; | |||
/** | /** | |||
* Reduce a record data. | * Reduce a record data. | |||
* @param kbuf the pointer to the key region. | * @param kbuf the pointer to the key region. | |||
* @param ksiz the size of the key region. | * @param ksiz the size of the key region. | |||
* @param iter the iterator to get the values. | * @param iter the iterator to get the values. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
* @note Database operations can be performed in this function. | * @note To avoid deadlock, any explicit database operation must not be p | |||
erformed in this | ||||
* function. | ||||
*/ | */ | |||
virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; | virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; | |||
/** | /** | |||
* Preprocess the map operations. | ||||
* @return true on success, or false on failure. | ||||
*/ | ||||
virtual bool preprocess() { | ||||
_assert_(true); | ||||
return true; | ||||
} | ||||
/** | ||||
* Mediate between the map and the reduce phases. | ||||
* @return true on success, or false on failure. | ||||
*/ | ||||
virtual bool midprocess() { | ||||
_assert_(true); | ||||
return true; | ||||
} | ||||
/** | ||||
* Postprocess the reduce operations. | ||||
* @return true on success, or false on failure. | ||||
*/ | ||||
virtual bool postprocess() { | ||||
_assert_(true); | ||||
return true; | ||||
} | ||||
/** | ||||
* Process a log message. | * Process a log message. | |||
* @param name the name of the event. | * @param name the name of the event. | |||
* @param message a supplement message. | * @param message a supplement message. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
virtual bool log(const char* name, const char* message) { | virtual bool log(const char* name, const char* message) { | |||
_assert_(name && message); | _assert_(name && message); | |||
return true; | return true; | |||
} | } | |||
/** | /** | |||
* Execute the MapReduce process about a database. | * Execute the MapReduce process about a database. | |||
* @param db the source database. | * @param db the source database. | |||
* @param tmppath the path of a directory for the temporary data storage. If it is an empty | * @param tmppath the path of a directory for the temporary data storage. If it is an empty | |||
* string, temporary data are handled on memory. | * string, temporary data are handled on memory. | |||
* @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking | * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking | |||
* against update operations by other threads, MapReduce::XNOCOMP to avoi d compression of | * against update operations by other threads, MapReduce::XNOCOMP to avoi d compression of | |||
* temporary databases. | * temporary databases. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { | bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { | |||
int64_t count = db->count(); | ||||
if (count < 0) return false; | ||||
bool err = false; | bool err = false; | |||
double stime, etime; | double stime, etime; | |||
rcomp_ = LEXICALCOMP; | ||||
BasicDB* idb = db; | ||||
if (typeid(*db) == typeid(PolyDB)) { | ||||
PolyDB* pdb = (PolyDB*)idb; | ||||
idb = pdb->reveal_inner_db(); | ||||
} | ||||
const std::type_info& info = typeid(*idb); | ||||
if (info == typeid(GrassDB)) { | ||||
GrassDB* gdb = (GrassDB*)idb; | ||||
rcomp_ = gdb->rcomp(); | ||||
} else if (info == typeid(TreeDB)) { | ||||
TreeDB* tdb = (TreeDB*)idb; | ||||
rcomp_ = tdb->rcomp(); | ||||
} else if (info == typeid(ForestDB)) { | ||||
ForestDB* fdb = (ForestDB*)idb; | ||||
rcomp_ = fdb->rcomp(); | ||||
} | ||||
tmpdbs_ = new BasicDB*[dbnum_]; | tmpdbs_ = new BasicDB*[dbnum_]; | |||
if (tmppath.empty()) { | if (tmppath.empty()) { | |||
if (!logf("prepare", "started to open temporary databases on memory") ) err = true; | if (!logf("prepare", "started to open temporary databases on memory") ) err = true; | |||
stime = time(); | stime = time(); | |||
for (size_t i = 0; i < dbnum_; i++) { | for (size_t i = 0; i < dbnum_; i++) { | |||
GrassDB* gdb = new GrassDB; | GrassDB* gdb = new GrassDB; | |||
int32_t myopts = 0; | int32_t myopts = 0; | |||
if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS; | if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS; | |||
gdb->tune_options(myopts); | gdb->tune_options(myopts); | |||
gdb->tune_buckets(MRDBBNUM / 2); | gdb->tune_buckets(MRDBBNUM / 2); | |||
gdb->tune_page(MRDBPSIZ); | gdb->tune_page(MRDBPSIZ); | |||
gdb->tune_page_cache(MRDBPCCAP); | gdb->tune_page_cache(MRDBPCCAP); | |||
gdb->tune_comparator(rcomp_); | ||||
gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUN CATE); | gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUN CATE); | |||
tmpdbs_[i] = gdb; | tmpdbs_[i] = gdb; | |||
} | } | |||
etime = time(); | etime = time(); | |||
if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) | if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) | |||
err = true; | err = true; | |||
if (err) { | if (err) { | |||
delete[] tmpdbs_; | delete[] tmpdbs_; | |||
return false; | return false; | |||
} | } | |||
skipping to change at line 272 | skipping to change at line 317 | |||
strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", | strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", | |||
tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR); | tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR); | |||
TreeDB* tdb = new TreeDB; | TreeDB* tdb = new TreeDB; | |||
int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR; | int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR; | |||
if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS; | if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS; | |||
tdb->tune_options(myopts); | tdb->tune_options(myopts); | |||
tdb->tune_buckets(MRDBBNUM); | tdb->tune_buckets(MRDBBNUM); | |||
tdb->tune_page(MRDBPSIZ); | tdb->tune_page(MRDBPSIZ); | |||
tdb->tune_map(MRDBMSIZ); | tdb->tune_map(MRDBMSIZ); | |||
tdb->tune_page_cache(MRDBPCCAP); | tdb->tune_page_cache(MRDBPCCAP); | |||
tdb->tune_comparator(rcomp_); | ||||
if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeD B::OTRUNCATE)) { | if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeD B::OTRUNCATE)) { | |||
const BasicDB::Error& e = tdb->error(); | const BasicDB::Error& e = tdb->error(); | |||
db->set_error(_KCCODELINE_, e.code(), e.message()); | db->set_error(_KCCODELINE_, e.code(), e.message()); | |||
err = true; | err = true; | |||
} | } | |||
tmpdbs_[i] = tdb; | tmpdbs_[i] = tdb; | |||
} | } | |||
etime = time(); | etime = time(); | |||
if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) | if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) | |||
err = true; | err = true; | |||
if (err) { | if (err) { | |||
for (size_t i = 0; i < dbnum_; i++) { | for (size_t i = 0; i < dbnum_; i++) { | |||
delete tmpdbs_[i]; | delete tmpdbs_[i]; | |||
} | } | |||
delete[] tmpdbs_; | delete[] tmpdbs_; | |||
return false; | return false; | |||
} | } | |||
} | } | |||
dbclock_ = 0; | ||||
keyclock_ = 0; | ||||
cache_ = new TinyHashMap(cbnum_); | ||||
csiz_ = 0; | ||||
int64_t scale = db->count(); | ||||
Thread::yield(); | ||||
if (!logf("map", "started the map process: scale=%lld", (long long)scal | ||||
e)) err = true; | ||||
stime = time(); | ||||
MapChecker mapchecker; | ||||
MapVisitor mapvisitor(this, &mapchecker); | ||||
if (opts & XNOLOCK) { | if (opts & XNOLOCK) { | |||
MapChecker mapchecker; | ||||
MapVisitor mapvisitor(this, &mapchecker, db->count()); | ||||
mapvisitor.visit_before(); | ||||
if (!err) { | if (!err) { | |||
BasicDB::Cursor* cur = db->cursor(); | BasicDB::Cursor* cur = db->cursor(); | |||
if (!cur->jump()) err = true; | if (!cur->jump()) err = true; | |||
while (!err) { | while (!err) { | |||
if (!cur->accept(&mapvisitor, false, true)) { | if (!cur->accept(&mapvisitor, false, true)) { | |||
if (cur->error() != BasicDB::Error::NOREC) err = true; | if (cur->error() != BasicDB::Error::NOREC) err = true; | |||
break; | break; | |||
} | } | |||
} | } | |||
delete cur; | delete cur; | |||
} | } | |||
if (mapvisitor.error()) err = true; | ||||
mapvisitor.visit_after(); | ||||
} else { | } else { | |||
MapChecker mapchecker; | ||||
MapVisitor mapvisitor(this, &mapchecker, db->count()); | ||||
if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ; | if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ; | |||
if (mapvisitor.error()) err = true; | ||||
} | } | |||
if (cache_->count() > 0 && !flush_cache()) err = true; | ||||
delete cache_; | ||||
etime = time(); | ||||
if (!logf("map", "the map process finished: time=%.6f", etime - stime)) | ||||
err = true; | ||||
Thread::yield(); | ||||
scale = 0; | ||||
for (size_t i = 0; i < dbnum_; i++) { | ||||
scale += tmpdbs_[i]->count(); | ||||
} | ||||
if (!logf("reduce", "started the reduce process: scale=%lld", (long lon | ||||
g)scale)) err = true; | ||||
stime = time(); | ||||
std::priority_queue<MergeLine> lines; | ||||
for (size_t i = 0; i < dbnum_; i++) { | ||||
MergeLine line; | ||||
line.cur = tmpdbs_[i]->cursor(); | ||||
line.comp = LEXICALCOMP; | ||||
line.cur->jump(); | ||||
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | ||||
if (line.kbuf) { | ||||
lines.push(line); | ||||
} else { | ||||
delete line.cur; | ||||
} | ||||
} | ||||
char* lkbuf = NULL; | ||||
size_t lksiz = 0; | ||||
Values values; | ||||
while (!err && !lines.empty()) { | ||||
MergeLine line = lines.top(); | ||||
lines.pop(); | ||||
if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks | ||||
iz))) { | ||||
if (!call_reducer(lkbuf, lksiz, values)) err = true; | ||||
values.clear(); | ||||
} | ||||
values.push_back(std::string(line.vbuf, line.vsiz)); | ||||
delete[] lkbuf; | ||||
lkbuf = line.kbuf; | ||||
lksiz = line.ksiz; | ||||
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | ||||
if (line.kbuf) { | ||||
lines.push(line); | ||||
} else { | ||||
delete line.cur; | ||||
} | ||||
} | ||||
if (lkbuf) { | ||||
if (!err && !call_reducer(lkbuf, lksiz, values)) err = true; | ||||
values.clear(); | ||||
delete[] lkbuf; | ||||
} | ||||
while (!lines.empty()) { | ||||
MergeLine line = lines.top(); | ||||
lines.pop(); | ||||
delete[] line.kbuf; | ||||
delete line.cur; | ||||
} | ||||
etime = time(); | ||||
if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s | ||||
time)) err = true; | ||||
Thread::yield(); | ||||
if (!logf("clean", "closing the temporary databases")) err = true; | if (!logf("clean", "closing the temporary databases")) err = true; | |||
stime = time(); | stime = time(); | |||
for (size_t i = 0; i < dbnum_; i++) { | for (size_t i = 0; i < dbnum_; i++) { | |||
assert(tmpdbs_[i]); | assert(tmpdbs_[i]); | |||
std::string path = tmpdbs_[i]->path(); | std::string path = tmpdbs_[i]->path(); | |||
if (!tmpdbs_[i]->clear()) { | if (!tmpdbs_[i]->clear()) { | |||
const BasicDB::Error& e = tmpdbs_[i]->error(); | const BasicDB::Error& e = tmpdbs_[i]->error(); | |||
db->set_error(_KCCODELINE_, e.code(), e.message()); | db->set_error(_KCCODELINE_, e.code(), e.message()); | |||
err = true; | err = true; | |||
} | } | |||
skipping to change at line 457 | skipping to change at line 442 | |||
return !stop_; | return !stop_; | |||
} | } | |||
bool stop_; ///< flag for stop | bool stop_; ///< flag for stop | |||
}; | }; | |||
/** | /** | |||
* Visitor for the map process. | * Visitor for the map process. | |||
*/ | */ | |||
class MapVisitor : public BasicDB::Visitor { | class MapVisitor : public BasicDB::Visitor { | |||
public: | public: | |||
/** constructor */ | /** constructor */ | |||
explicit MapVisitor(MapReduce* mr, MapChecker* checker) : | explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) | |||
mr_(mr), checker_(checker), emitter_(mr) {} | : | |||
mr_(mr), checker_(checker), emitter_(mr), scale_(scale), | ||||
stime_(0), err_(false) {} | ||||
/** get the error flag */ | ||||
bool error() { | ||||
return err_; | ||||
} | ||||
/** preprocess the mappter */ | ||||
void visit_before() { | ||||
if (!mr_->preprocess()) err_ = true; | ||||
stime_ = time(); | ||||
mr_->dbclock_ = 0; | ||||
mr_->keyclock_ = 0; | ||||
mr_->cache_ = new TinyHashMap(mr_->cbnum_); | ||||
mr_->csiz_ = 0; | ||||
if (!mr_->logf("map", "started the map process: scale=%lld", (long lo | ||||
ng)scale_)) | ||||
err_ = true; | ||||
} | ||||
/** postprocess the mappter and call the reducer */ | ||||
void visit_after() { | ||||
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; | ||||
delete mr_->cache_; | ||||
if (!mr_->midprocess()) err_ = true; | ||||
double etime = time(); | ||||
if (!mr_->logf("map", "the map process finished: time=%.6f", etime - | ||||
stime_)) | ||||
err_ = true; | ||||
if (!err_ && !mr_->execute_reduce()) err_ = true; | ||||
if (!mr_->postprocess()) err_ = true; | ||||
} | ||||
private: | private: | |||
/** visit a record */ | /** visit a record */ | |||
const char* visit_full(const char* kbuf, size_t ksiz, | const char* visit_full(const char* kbuf, size_t ksiz, | |||
const char* vbuf, size_t vsiz, size_t* sp) { | const char* vbuf, size_t vsiz, size_t* sp) { | |||
if (!mr_->map(kbuf, ksiz, vbuf, vsiz, &emitter_)) checker_->stop(); | if (!mr_->map(kbuf, ksiz, vbuf, vsiz, &emitter_)) { | |||
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) checker_->stop() | checker_->stop(); | |||
; | err_ = true; | |||
} | ||||
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { | ||||
checker_->stop(); | ||||
err_ = true; | ||||
} | ||||
return NOP; | return NOP; | |||
} | } | |||
MapReduce* mr_; ///< driver | MapReduce* mr_; ///< driver | |||
MapChecker* checker_; ///< checker | MapChecker* checker_; ///< checker | |||
MapEmitter emitter_; ///< emitter | MapEmitter emitter_; ///< emitter | |||
int64_t scale_; ///< number of records | ||||
double stime_; ///< start time | ||||
bool err_; ///< error flag | ||||
}; | }; | |||
/** | /** | |||
* Front line of a merging list. | * Front line of a merging list. | |||
*/ | */ | |||
struct MergeLine { | struct MergeLine { | |||
BasicDB::Cursor* cur; ///< cursor | BasicDB::Cursor* cur; ///< cursor | |||
Comparator* comp; ///< lexical comparator | Comparator* rcomp; ///< record comparator | |||
char* kbuf; ///< pointer to the key | char* kbuf; ///< pointer to the key | |||
size_t ksiz; ///< size of the key | size_t ksiz; ///< size of the key | |||
const char* vbuf; ///< pointer to the value | const char* vbuf; ///< pointer to the value | |||
size_t vsiz; ///< size of the value | size_t vsiz; ///< size of the value | |||
/** comparing operator */ | /** comparing operator */ | |||
bool operator <(const MergeLine& right) const { | bool operator <(const MergeLine& right) const { | |||
return comp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0; | return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0; | |||
} | } | |||
}; | }; | |||
/** | /** | |||
* Process a log message. | * Process a log message. | |||
* @param name the name of the event. | * @param name the name of the event. | |||
* @param format the printf-like format string. | * @param format the printf-like format string. | |||
* @param ... used according to the format string. | * @param ... used according to the format string. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool logf(const char* name, const char* format, ...) { | bool logf(const char* name, const char* format, ...) { | |||
skipping to change at line 528 | skipping to change at line 549 | |||
sorter.step(); | sorter.step(); | |||
} | } | |||
cache_->clear(); | cache_->clear(); | |||
csiz_ = 0; | csiz_ = 0; | |||
dbclock_ = (dbclock_ + 1) % dbnum_; | dbclock_ = (dbclock_ + 1) % dbnum_; | |||
double etime = time(); | double etime = time(); | |||
if (!logf("map", "flushing the cache finished: time=%.6f", etime - stim e)) err = true; | if (!logf("map", "flushing the cache finished: time=%.6f", etime - stim e)) err = true; | |||
return !err; | return !err; | |||
} | } | |||
/** | /** | |||
* Execute the reduce part. | ||||
* @return true on success, or false on failure. | ||||
*/ | ||||
bool execute_reduce() { | ||||
bool err = false; | ||||
int64_t scale = 0; | ||||
for (size_t i = 0; i < dbnum_; i++) { | ||||
scale += tmpdbs_[i]->count(); | ||||
} | ||||
if (!logf("reduce", "started the reduce process: scale=%lld", (long lon | ||||
g)scale)) err = true; | ||||
double stime = time(); | ||||
std::priority_queue<MergeLine> lines; | ||||
for (size_t i = 0; i < dbnum_; i++) { | ||||
MergeLine line; | ||||
line.cur = tmpdbs_[i]->cursor(); | ||||
line.rcomp = rcomp_; | ||||
line.cur->jump(); | ||||
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | ||||
if (line.kbuf) { | ||||
lines.push(line); | ||||
} else { | ||||
delete line.cur; | ||||
} | ||||
} | ||||
char* lkbuf = NULL; | ||||
size_t lksiz = 0; | ||||
Values values; | ||||
while (!err && !lines.empty()) { | ||||
MergeLine line = lines.top(); | ||||
lines.pop(); | ||||
if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks | ||||
iz))) { | ||||
if (!call_reducer(lkbuf, lksiz, values)) err = true; | ||||
values.clear(); | ||||
} | ||||
values.push_back(std::string(line.vbuf, line.vsiz)); | ||||
delete[] lkbuf; | ||||
lkbuf = line.kbuf; | ||||
lksiz = line.ksiz; | ||||
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); | ||||
if (line.kbuf) { | ||||
lines.push(line); | ||||
} else { | ||||
delete line.cur; | ||||
} | ||||
} | ||||
if (lkbuf) { | ||||
if (!err && !call_reducer(lkbuf, lksiz, values)) err = true; | ||||
values.clear(); | ||||
delete[] lkbuf; | ||||
} | ||||
while (!lines.empty()) { | ||||
MergeLine line = lines.top(); | ||||
lines.pop(); | ||||
delete[] line.kbuf; | ||||
delete line.cur; | ||||
} | ||||
double etime = time(); | ||||
if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s | ||||
time)) err = true; | ||||
return !err; | ||||
} | ||||
/** | ||||
* Call the reducer. | * Call the reducer. | |||
* @param kbuf the pointer to the key region. | * @param kbuf the pointer to the key region. | |||
* @param ksiz the size of the key region. | * @param ksiz the size of the key region. | |||
* @param values a vector of the values. | * @param values a vector of the values. | |||
* @return true on success, or false on failure. | * @return true on success, or false on failure. | |||
*/ | */ | |||
bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { | bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { | |||
_assert_(kbuf && ksiz <= MEMMAXSIZ); | _assert_(kbuf && ksiz <= MEMMAXSIZ); | |||
bool err = false; | bool err = false; | |||
ValueIterator iter(values.begin(), values.end()); | ValueIterator iter(values.begin(), values.end()); | |||
if (!reduce(kbuf, ksiz, &iter)) err = true; | if (!reduce(kbuf, ksiz, &iter)) err = true; | |||
return !err; | return !err; | |||
} | } | |||
/** Dummy constructor to forbid the use. */ | /** Dummy constructor to forbid the use. */ | |||
MapReduce(const MapReduce&); | MapReduce(const MapReduce&); | |||
/** Dummy Operator to forbid the use. */ | /** Dummy Operator to forbid the use. */ | |||
MapReduce& operator =(const MapReduce&); | MapReduce& operator =(const MapReduce&); | |||
/** The record comparator. */ | ||||
Comparator* rcomp_; | ||||
/** The temporary databases. */ | /** The temporary databases. */ | |||
BasicDB** tmpdbs_; | BasicDB** tmpdbs_; | |||
/** The number of temporary databases. */ | /** The number of temporary databases. */ | |||
size_t dbnum_; | size_t dbnum_; | |||
/** The logical clock for temporary databases. */ | /** The logical clock for temporary databases. */ | |||
int64_t dbclock_; | int64_t dbclock_; | |||
/** The logical clock for keys. */ | /** The logical clock for keys. */ | |||
int64_t keyclock_; | int64_t keyclock_; | |||
/** The cache for emitter. */ | /** The cache for emitter. */ | |||
TinyHashMap* cache_; | TinyHashMap* cache_; | |||
End of changes. 20 change blocks. | ||||
83 lines changed or deleted | 168 lines changed or added | |||