kcdbext.h   kcdbext.h 
skipping to change at line 170 skipping to change at line 170
* Execution options. * Execution options.
*/ */
enum Option { enum Option {
XNOLOCK = 1 << 0, ///< avoid locking against update operations XNOLOCK = 1 << 0, ///< avoid locking against update operations
XNOCOMP = 1 << 1 ///< avoid compression of temporar y databases XNOCOMP = 1 << 1 ///< avoid compression of temporar y databases
}; };
/** /**
* Default constructor. * Default constructor.
*/ */
explicit MapReduce() : explicit MapReduce() :
tmpdbs_(NULL), dbnum_(MRDEFDBNUM), dbclock_(0), keyclock_(0), rcomp_(NULL), tmpdbs_(NULL), dbnum_(MRDEFDBNUM), dbclock_(0), keyclock_ (0),
cache_(NULL), csiz_(0), clim_(MRDEFCLIM), cbnum_(MRDEFCBNUM) { cache_(NULL), csiz_(0), clim_(MRDEFCLIM), cbnum_(MRDEFCBNUM) {
_assert_(true); _assert_(true);
} }
/** /**
* Destructor. * Destructor.
*/ */
virtual ~MapReduce() { virtual ~MapReduce() {
_assert_(true); _assert_(true);
} }
/** /**
skipping to change at line 199 skipping to change at line 199
* function. * function.
*/ */
virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz, virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz,
MapEmitter* emitter) = 0; MapEmitter* emitter) = 0;
/** /**
* Reduce a record data. * Reduce a record data.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param iter the iterator to get the values. * @param iter the iterator to get the values.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note Database operations can be performed in this function. * @note To avoid deadlock, any explicit database operation must not be p
erformed in this
* function.
*/ */
virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0;
/** /**
* Preprocess the map operations.
* @return true on success, or false on failure.
*/
virtual bool preprocess() {
_assert_(true);
return true;
}
/**
* Mediate between the map and the reduce phases.
* @return true on success, or false on failure.
*/
virtual bool midprocess() {
_assert_(true);
return true;
}
/**
* Postprocess the reduce operations.
* @return true on success, or false on failure.
*/
virtual bool postprocess() {
_assert_(true);
return true;
}
/**
* Process a log message. * Process a log message.
* @param name the name of the event. * @param name the name of the event.
* @param message a supplement message. * @param message a supplement message.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
virtual bool log(const char* name, const char* message) { virtual bool log(const char* name, const char* message) {
_assert_(name && message); _assert_(name && message);
return true; return true;
} }
/** /**
* Execute the MapReduce process about a database. * Execute the MapReduce process about a database.
* @param db the source database. * @param db the source database.
* @param tmppath the path of a directory for the temporary data storage. If it is an empty * @param tmppath the path of a directory for the temporary data storage. If it is an empty
* string, temporary data are handled on memory. * string, temporary data are handled on memory.
* @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking
* against update operations by other threads, MapReduce::XNOCOMP to avoi d compression of * against update operations by other threads, MapReduce::XNOCOMP to avoi d compression of
* temporary databases. * temporary databases.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) {
int64_t count = db->count();
if (count < 0) return false;
bool err = false; bool err = false;
double stime, etime; double stime, etime;
rcomp_ = LEXICALCOMP;
BasicDB* idb = db;
if (typeid(*db) == typeid(PolyDB)) {
PolyDB* pdb = (PolyDB*)idb;
idb = pdb->reveal_inner_db();
}
const std::type_info& info = typeid(*idb);
if (info == typeid(GrassDB)) {
GrassDB* gdb = (GrassDB*)idb;
rcomp_ = gdb->rcomp();
} else if (info == typeid(TreeDB)) {
TreeDB* tdb = (TreeDB*)idb;
rcomp_ = tdb->rcomp();
} else if (info == typeid(ForestDB)) {
ForestDB* fdb = (ForestDB*)idb;
rcomp_ = fdb->rcomp();
}
tmpdbs_ = new BasicDB*[dbnum_]; tmpdbs_ = new BasicDB*[dbnum_];
if (tmppath.empty()) { if (tmppath.empty()) {
if (!logf("prepare", "started to open temporary databases on memory") ) err = true; if (!logf("prepare", "started to open temporary databases on memory") ) err = true;
stime = time(); stime = time();
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
GrassDB* gdb = new GrassDB; GrassDB* gdb = new GrassDB;
int32_t myopts = 0; int32_t myopts = 0;
if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS; if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS;
gdb->tune_options(myopts); gdb->tune_options(myopts);
gdb->tune_buckets(MRDBBNUM / 2); gdb->tune_buckets(MRDBBNUM / 2);
gdb->tune_page(MRDBPSIZ); gdb->tune_page(MRDBPSIZ);
gdb->tune_page_cache(MRDBPCCAP); gdb->tune_page_cache(MRDBPCCAP);
gdb->tune_comparator(rcomp_);
gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUN CATE); gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUN CATE);
tmpdbs_[i] = gdb; tmpdbs_[i] = gdb;
} }
etime = time(); etime = time();
if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime))
err = true; err = true;
if (err) { if (err) {
delete[] tmpdbs_; delete[] tmpdbs_;
return false; return false;
} }
skipping to change at line 272 skipping to change at line 317
strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct",
tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR); tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR);
TreeDB* tdb = new TreeDB; TreeDB* tdb = new TreeDB;
int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR; int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR;
if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS; if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS;
tdb->tune_options(myopts); tdb->tune_options(myopts);
tdb->tune_buckets(MRDBBNUM); tdb->tune_buckets(MRDBBNUM);
tdb->tune_page(MRDBPSIZ); tdb->tune_page(MRDBPSIZ);
tdb->tune_map(MRDBMSIZ); tdb->tune_map(MRDBMSIZ);
tdb->tune_page_cache(MRDBPCCAP); tdb->tune_page_cache(MRDBPCCAP);
tdb->tune_comparator(rcomp_);
if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeD B::OTRUNCATE)) { if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeD B::OTRUNCATE)) {
const BasicDB::Error& e = tdb->error(); const BasicDB::Error& e = tdb->error();
db->set_error(_KCCODELINE_, e.code(), e.message()); db->set_error(_KCCODELINE_, e.code(), e.message());
err = true; err = true;
} }
tmpdbs_[i] = tdb; tmpdbs_[i] = tdb;
} }
etime = time(); etime = time();
if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime)) if (!logf("prepare", "opening temporary databases finished: time=%.6f ", etime - stime))
err = true; err = true;
if (err) { if (err) {
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
delete tmpdbs_[i]; delete tmpdbs_[i];
} }
delete[] tmpdbs_; delete[] tmpdbs_;
return false; return false;
} }
} }
dbclock_ = 0;
keyclock_ = 0;
cache_ = new TinyHashMap(cbnum_);
csiz_ = 0;
int64_t scale = db->count();
Thread::yield();
if (!logf("map", "started the map process: scale=%lld", (long long)scal
e)) err = true;
stime = time();
MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker);
if (opts & XNOLOCK) { if (opts & XNOLOCK) {
MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count());
mapvisitor.visit_before();
if (!err) { if (!err) {
BasicDB::Cursor* cur = db->cursor(); BasicDB::Cursor* cur = db->cursor();
if (!cur->jump()) err = true; if (!cur->jump()) err = true;
while (!err) { while (!err) {
if (!cur->accept(&mapvisitor, false, true)) { if (!cur->accept(&mapvisitor, false, true)) {
if (cur->error() != BasicDB::Error::NOREC) err = true; if (cur->error() != BasicDB::Error::NOREC) err = true;
break; break;
} }
} }
delete cur; delete cur;
} }
if (mapvisitor.error()) err = true;
mapvisitor.visit_after();
} else { } else {
MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count());
if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ; if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true ;
if (mapvisitor.error()) err = true;
} }
if (cache_->count() > 0 && !flush_cache()) err = true;
delete cache_;
etime = time();
if (!logf("map", "the map process finished: time=%.6f", etime - stime))
err = true;
Thread::yield();
scale = 0;
for (size_t i = 0; i < dbnum_; i++) {
scale += tmpdbs_[i]->count();
}
if (!logf("reduce", "started the reduce process: scale=%lld", (long lon
g)scale)) err = true;
stime = time();
std::priority_queue<MergeLine> lines;
for (size_t i = 0; i < dbnum_; i++) {
MergeLine line;
line.cur = tmpdbs_[i]->cursor();
line.comp = LEXICALCOMP;
line.cur->jump();
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) {
lines.push(line);
} else {
delete line.cur;
}
}
char* lkbuf = NULL;
size_t lksiz = 0;
Values values;
while (!err && !lines.empty()) {
MergeLine line = lines.top();
lines.pop();
if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks
iz))) {
if (!call_reducer(lkbuf, lksiz, values)) err = true;
values.clear();
}
values.push_back(std::string(line.vbuf, line.vsiz));
delete[] lkbuf;
lkbuf = line.kbuf;
lksiz = line.ksiz;
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) {
lines.push(line);
} else {
delete line.cur;
}
}
if (lkbuf) {
if (!err && !call_reducer(lkbuf, lksiz, values)) err = true;
values.clear();
delete[] lkbuf;
}
while (!lines.empty()) {
MergeLine line = lines.top();
lines.pop();
delete[] line.kbuf;
delete line.cur;
}
etime = time();
if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s
time)) err = true;
Thread::yield();
if (!logf("clean", "closing the temporary databases")) err = true; if (!logf("clean", "closing the temporary databases")) err = true;
stime = time(); stime = time();
for (size_t i = 0; i < dbnum_; i++) { for (size_t i = 0; i < dbnum_; i++) {
assert(tmpdbs_[i]); assert(tmpdbs_[i]);
std::string path = tmpdbs_[i]->path(); std::string path = tmpdbs_[i]->path();
if (!tmpdbs_[i]->clear()) { if (!tmpdbs_[i]->clear()) {
const BasicDB::Error& e = tmpdbs_[i]->error(); const BasicDB::Error& e = tmpdbs_[i]->error();
db->set_error(_KCCODELINE_, e.code(), e.message()); db->set_error(_KCCODELINE_, e.code(), e.message());
err = true; err = true;
} }
skipping to change at line 457 skipping to change at line 442
return !stop_; return !stop_;
} }
bool stop_; ///< flag for stop bool stop_; ///< flag for stop
}; };
/** /**
* Visitor for the map process. * Visitor for the map process.
*/ */
class MapVisitor : public BasicDB::Visitor { class MapVisitor : public BasicDB::Visitor {
public: public:
/** constructor */ /** constructor */
explicit MapVisitor(MapReduce* mr, MapChecker* checker) : explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale)
mr_(mr), checker_(checker), emitter_(mr) {} :
mr_(mr), checker_(checker), emitter_(mr), scale_(scale),
stime_(0), err_(false) {}
/** get the error flag */
bool error() {
return err_;
}
/** preprocess the mappter */
void visit_before() {
if (!mr_->preprocess()) err_ = true;
stime_ = time();
mr_->dbclock_ = 0;
mr_->keyclock_ = 0;
mr_->cache_ = new TinyHashMap(mr_->cbnum_);
mr_->csiz_ = 0;
if (!mr_->logf("map", "started the map process: scale=%lld", (long lo
ng)scale_))
err_ = true;
}
/** postprocess the mappter and call the reducer */
void visit_after() {
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true;
delete mr_->cache_;
if (!mr_->midprocess()) err_ = true;
double etime = time();
if (!mr_->logf("map", "the map process finished: time=%.6f", etime -
stime_))
err_ = true;
if (!err_ && !mr_->execute_reduce()) err_ = true;
if (!mr_->postprocess()) err_ = true;
}
private: private:
/** visit a record */ /** visit a record */
const char* visit_full(const char* kbuf, size_t ksiz, const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) { const char* vbuf, size_t vsiz, size_t* sp) {
if (!mr_->map(kbuf, ksiz, vbuf, vsiz, &emitter_)) checker_->stop(); if (!mr_->map(kbuf, ksiz, vbuf, vsiz, &emitter_)) {
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) checker_->stop() checker_->stop();
; err_ = true;
}
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
checker_->stop();
err_ = true;
}
return NOP; return NOP;
} }
MapReduce* mr_; ///< driver MapReduce* mr_; ///< driver
MapChecker* checker_; ///< checker MapChecker* checker_; ///< checker
MapEmitter emitter_; ///< emitter MapEmitter emitter_; ///< emitter
int64_t scale_; ///< number of records
double stime_; ///< start time
bool err_; ///< error flag
}; };
/** /**
* Front line of a merging list. * Front line of a merging list.
*/ */
struct MergeLine { struct MergeLine {
BasicDB::Cursor* cur; ///< cursor BasicDB::Cursor* cur; ///< cursor
Comparator* comp; ///< lexical comparator Comparator* rcomp; ///< record comparator
char* kbuf; ///< pointer to the key char* kbuf; ///< pointer to the key
size_t ksiz; ///< size of the key size_t ksiz; ///< size of the key
const char* vbuf; ///< pointer to the value const char* vbuf; ///< pointer to the value
size_t vsiz; ///< size of the value size_t vsiz; ///< size of the value
/** comparing operator */ /** comparing operator */
bool operator <(const MergeLine& right) const { bool operator <(const MergeLine& right) const {
return comp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0; return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0;
} }
}; };
/** /**
* Process a log message. * Process a log message.
* @param name the name of the event. * @param name the name of the event.
* @param format the printf-like format string. * @param format the printf-like format string.
* @param ... used according to the format string. * @param ... used according to the format string.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool logf(const char* name, const char* format, ...) { bool logf(const char* name, const char* format, ...) {
skipping to change at line 528 skipping to change at line 549
sorter.step(); sorter.step();
} }
cache_->clear(); cache_->clear();
csiz_ = 0; csiz_ = 0;
dbclock_ = (dbclock_ + 1) % dbnum_; dbclock_ = (dbclock_ + 1) % dbnum_;
double etime = time(); double etime = time();
if (!logf("map", "flushing the cache finished: time=%.6f", etime - stim e)) err = true; if (!logf("map", "flushing the cache finished: time=%.6f", etime - stim e)) err = true;
return !err; return !err;
} }
/** /**
* Execute the reduce part.
* @return true on success, or false on failure.
*/
bool execute_reduce() {
bool err = false;
int64_t scale = 0;
for (size_t i = 0; i < dbnum_; i++) {
scale += tmpdbs_[i]->count();
}
if (!logf("reduce", "started the reduce process: scale=%lld", (long lon
g)scale)) err = true;
double stime = time();
std::priority_queue<MergeLine> lines;
for (size_t i = 0; i < dbnum_; i++) {
MergeLine line;
line.cur = tmpdbs_[i]->cursor();
line.rcomp = rcomp_;
line.cur->jump();
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) {
lines.push(line);
} else {
delete line.cur;
}
}
char* lkbuf = NULL;
size_t lksiz = 0;
Values values;
while (!err && !lines.empty()) {
MergeLine line = lines.top();
lines.pop();
if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lks
iz))) {
if (!call_reducer(lkbuf, lksiz, values)) err = true;
values.clear();
}
values.push_back(std::string(line.vbuf, line.vsiz));
delete[] lkbuf;
lkbuf = line.kbuf;
lksiz = line.ksiz;
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) {
lines.push(line);
} else {
delete line.cur;
}
}
if (lkbuf) {
if (!err && !call_reducer(lkbuf, lksiz, values)) err = true;
values.clear();
delete[] lkbuf;
}
while (!lines.empty()) {
MergeLine line = lines.top();
lines.pop();
delete[] line.kbuf;
delete line.cur;
}
double etime = time();
if (!logf("reduce", "the reduce process finished: time=%.6f", etime - s
time)) err = true;
return !err;
}
/**
* Call the reducer. * Call the reducer.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param values a vector of the values. * @param values a vector of the values.
* @return true on success, or false on failure. * @return true on success, or false on failure.
*/ */
bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) {
_assert_(kbuf && ksiz <= MEMMAXSIZ); _assert_(kbuf && ksiz <= MEMMAXSIZ);
bool err = false; bool err = false;
ValueIterator iter(values.begin(), values.end()); ValueIterator iter(values.begin(), values.end());
if (!reduce(kbuf, ksiz, &iter)) err = true; if (!reduce(kbuf, ksiz, &iter)) err = true;
return !err; return !err;
} }
/** Dummy constructor to forbid the use. */ /** Dummy constructor to forbid the use. */
MapReduce(const MapReduce&); MapReduce(const MapReduce&);
/** Dummy Operator to forbid the use. */ /** Dummy Operator to forbid the use. */
MapReduce& operator =(const MapReduce&); MapReduce& operator =(const MapReduce&);
/** The record comparator. */
Comparator* rcomp_;
/** The temporary databases. */ /** The temporary databases. */
BasicDB** tmpdbs_; BasicDB** tmpdbs_;
/** The number of temporary databases. */ /** The number of temporary databases. */
size_t dbnum_; size_t dbnum_;
/** The logical clock for temporary databases. */ /** The logical clock for temporary databases. */
int64_t dbclock_; int64_t dbclock_;
/** The logical clock for keys. */ /** The logical clock for keys. */
int64_t keyclock_; int64_t keyclock_;
/** The cache for emitter. */ /** The cache for emitter. */
TinyHashMap* cache_; TinyHashMap* cache_;
 End of changes. 20 change blocks. 
83 lines changed or deleted 168 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/