kcdbext.h   kcdbext.h 
skipping to change at line 44 skipping to change at line 44
namespace kyotocabinet { // common namespace namespace kyotocabinet { // common namespace
/** /**
* MapReduce framework. * MapReduce framework.
* @note Although this framework is not distributed or concurrent, it is us eful for aggregate * @note Although this framework is not distributed or concurrent, it is us eful for aggregate
* calculation with less CPU loading and less memory usage. * calculation with less CPU loading and less memory usage.
*/ */
class MapReduce { class MapReduce {
public: public:
class MapEmitter;
class ValueIterator; class ValueIterator;
private: private:
class MapVisitor; class MapVisitor;
struct MergeLine; struct MergeLine;
/** An alias of vector of loaded values. */ /** An alias of vector of loaded values. */
typedef std::vector<std::string> Values; typedef std::vector<std::string> Values;
/** The default number of temporary databases. */ /** The default number of temporary databases. */
static const size_t DEFDBNUM = 8; static const size_t DEFDBNUM = 8;
/** The maxinum number of temporary databases. */ /** The maxinum number of temporary databases. */
static const size_t MAXDBNUM = 256; static const size_t MAXDBNUM = 256;
skipping to change at line 69 skipping to change at line 68
/** The bucket number of temprary databases. */ /** The bucket number of temprary databases. */
static const int64_t DBBNUM = 512LL << 10; static const int64_t DBBNUM = 512LL << 10;
/** The page size of temprary databases. */ /** The page size of temprary databases. */
static const int32_t DBPSIZ = 32768; static const int32_t DBPSIZ = 32768;
/** The mapped size of temprary databases. */ /** The mapped size of temprary databases. */
static const int64_t DBMSIZ = 516LL * 4096; static const int64_t DBMSIZ = 516LL * 4096;
/** The page cache capacity of temprary databases. */ /** The page cache capacity of temprary databases. */
static const int64_t DBPCCAP = 16LL << 20; static const int64_t DBPCCAP = 16LL << 20;
public: public:
/** /**
* Data emitter for the mapper.
*/
class MapEmitter {
friend class MapReduce;
friend class MapReduce::MapVisitor;
public:
/**
* Emit a record from the mapper.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
*/
bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz)
{
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
bool err = false;
size_t rsiz = sizevarnum(vsiz) + vsiz;
char stack[NUMBUFSIZ*4];
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
char* wp = rbuf;
wp += writevarnum(rbuf, vsiz);
std::memcpy(wp, vbuf, vsiz);
mr_->cache_->append(kbuf, ksiz, rbuf, rsiz);
if (rbuf != stack) delete[] rbuf;
mr_->csiz_ += rsiz;
return !err;
}
private:
/**
* Default constructor.
*/
explicit MapEmitter(MapReduce* mr) : mr_(mr) {
_assert_(true);
}
/**
* Destructor.
*/
~MapEmitter() {
_assert_(true);
}
/** Dummy constructor to forbid the use. */
MapEmitter(const MapEmitter&);
/** Dummy Operator to forbid the use. */
MapEmitter& operator =(const MapEmitter&);
/** The owner object. */
MapReduce* mr_;
};
/**
* Value iterator for the reducer. * Value iterator for the reducer.
*/ */
class ValueIterator { class ValueIterator {
friend class MapReduce; friend class MapReduce;
public: public:
/** /**
* Get the next value. * Get the next value.
* @param sp the pointer to the variable into which the size of the reg ion of the return * @param sp the pointer to the variable into which the size of the reg ion of the return
* value is assigned. * value is assigned.
* @return the pointer to the next value region, or NULL if no value re mains. * @return the pointer to the next value region, or NULL if no value re mains.
skipping to change at line 202 skipping to change at line 152
*/ */
virtual ~MapReduce() { virtual ~MapReduce() {
_assert_(true); _assert_(true);
} }
/** /**
* Map a record data. * Map a record data.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param vbuf the pointer to the value region. * @param vbuf the pointer to the value region.
* @param vsiz the size of the value region. * @param vsiz the size of the value region.
* @param emitter the emitter object.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note To avoid deadlock, any explicit database operation must not be p * @note This function can call the MapReduce::emit method to emit a reco
erformed in this rd. To avoid
* function. * deadlock, any explicit database operation must not be performed in thi
s function.
*/ */
virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t
vsiz, vsiz) = 0;
MapEmitter* emitter) = 0;
/** /**
* Reduce a record data. * Reduce a record data.
* @param kbuf the pointer to the key region. * @param kbuf the pointer to the key region.
* @param ksiz the size of the key region. * @param ksiz the size of the key region.
* @param iter the iterator to get the values. * @param iter the iterator to get the values.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note To avoid deadlock, any explicit database operation must not be p erformed in this * @note To avoid deadlock, any explicit database operation must not be p erformed in this
* function. * function.
*/ */
virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0;
/** /**
* Preprocess the map operations. * Preprocess the map operations.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note This function can call the MapReduce::emit method to emit a reco
rd. To avoid
* deadlock, any explicit database operation must not be performed in thi
s function.
*/ */
virtual bool preprocess() { virtual bool preprocess() {
_assert_(true); _assert_(true);
return true; return true;
} }
/** /**
* Mediate between the map and the reduce phases. * Mediate between the map and the reduce phases.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note This function can call the MapReduce::emit method to emit a reco
rd. To avoid
* deadlock, any explicit database operation must not be performed in thi
s function.
*/ */
virtual bool midprocess() { virtual bool midprocess() {
_assert_(true); _assert_(true);
return true; return true;
} }
/** /**
* Postprocess the reduce operations. * Postprocess the reduce operations.
* @return true on success, or false on failure. * @return true on success, or false on failure.
* @note To avoid deadlock, any explicit database operation must not be p
erformed in this
* function.
*/ */
virtual bool postprocess() { virtual bool postprocess() {
_assert_(true); _assert_(true);
return true; return true;
} }
/** /**
* Process a log message. * Process a log message.
* @param name the name of the event. * @param name the name of the event.
* @param message a supplement message. * @param message a supplement message.
* @return true on success, or false on failure. * @return true on success, or false on failure.
skipping to change at line 359 skipping to change at line 313
delete[] tmpdbs_; delete[] tmpdbs_;
return false; return false;
} }
} }
if (opts & XNOLOCK) { if (opts & XNOLOCK) {
MapChecker mapchecker; MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, db->count()); MapVisitor mapvisitor(this, &mapchecker, db->count());
mapvisitor.visit_before(); mapvisitor.visit_before();
if (!err) { if (!err) {
BasicDB::Cursor* cur = db->cursor(); BasicDB::Cursor* cur = db->cursor();
if (!cur->jump()) err = true; if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = tr ue;
while (!err) { while (!err) {
if (!cur->accept(&mapvisitor, false, true)) { if (!cur->accept(&mapvisitor, false, true)) {
if (cur->error() != BasicDB::Error::NOREC) err = true; if (cur->error() != BasicDB::Error::NOREC) err = true;
break; break;
} }
} }
delete cur; delete cur;
} }
if (mapvisitor.error()) err = true; if (mapvisitor.error()) err = true;
mapvisitor.visit_after(); mapvisitor.visit_after();
skipping to change at line 414 skipping to change at line 368
* @param cbnum the bucket number of the internal cache. * @param cbnum the bucket number of the internal cache.
*/ */
void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) {
_assert_(true); _assert_(true);
dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM;
if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM;
clim_ = clim > 0 ? clim : DEFCLIM; clim_ = clim > 0 ? clim : DEFCLIM;
cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM;
if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_);
} }
protected:
/**
* Emit a record from the mapper.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
*/
bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
bool err = false;
size_t rsiz = sizevarnum(vsiz) + vsiz;
char stack[NUMBUFSIZ*4];
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
char* wp = rbuf;
wp += writevarnum(rbuf, vsiz);
std::memcpy(wp, vbuf, vsiz);
cache_->append(kbuf, ksiz, rbuf, rsiz);
if (rbuf != stack) delete[] rbuf;
csiz_ += rsiz;
return !err;
}
private: private:
/** /**
* Checker for the map process. * Checker for the map process.
*/ */
class MapChecker : public BasicDB::ProgressChecker { class MapChecker : public BasicDB::ProgressChecker {
public: public:
/** constructor */ /** constructor */
explicit MapChecker() : stop_(false) {} explicit MapChecker() : stop_(false) {}
/** stop the process */ /** stop the process */
void stop() { void stop() {
skipping to change at line 444 skipping to change at line 421
} }
bool stop_; ///< flag for stop bool stop_; ///< flag for stop
}; };
/** /**
* Visitor for the map process. * Visitor for the map process.
*/ */
class MapVisitor : public BasicDB::Visitor { class MapVisitor : public BasicDB::Visitor {
public: public:
/** constructor */ /** constructor */
explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) : explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) :
mr_(mr), checker_(checker), emitter_(mr), scale_(scale), mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {
stime_(0), err_(false) {} }
/** get the error flag */ /** get the error flag */
bool error() { bool error() {
return err_; return err_;
} }
/** preprocess the mappter */ /** preprocess the mappter */
void visit_before() { void visit_before() {
if (!mr_->preprocess()) err_ = true;
stime_ = time();
mr_->dbclock_ = 0; mr_->dbclock_ = 0;
mr_->cache_ = new TinyHashMap(mr_->cbnum_); mr_->cache_ = new TinyHashMap(mr_->cbnum_);
mr_->csiz_ = 0; mr_->csiz_ = 0;
if (!mr_->preprocess()) err_ = true;
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true;
if (!mr_->logf("map", "started the map process: scale=%lld", (long lo ng)scale_)) if (!mr_->logf("map", "started the map process: scale=%lld", (long lo ng)scale_))
err_ = true; err_ = true;
stime_ = time();
} }
/** postprocess the mappter and call the reducer */ /** postprocess the mappter and call the reducer */
void visit_after() { void visit_after() {
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true;
delete mr_->cache_;
double etime = time(); double etime = time();
if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_)) if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_))
err_ = true; err_ = true;
if (!mr_->midprocess()) err_ = true; if (!mr_->midprocess()) err_ = true;
if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true;
delete mr_->cache_;
if (!err_ && !mr_->execute_reduce()) err_ = true; if (!err_ && !mr_->execute_reduce()) err_ = true;
if (!mr_->postprocess()) err_ = true; if (!mr_->postprocess()) err_ = true;
} }
private: private:
/** visit a record */ /** visit a record */
const char* visit_full(const char* kbuf, size_t ksiz, const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) { const char* vbuf, size_t vsiz, size_t* sp) {
if (!mr_->map(kbuf, ksiz, vbuf, vsiz, &emitter_)) { if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) {
checker_->stop(); checker_->stop();
err_ = true; err_ = true;
} }
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
checker_->stop(); checker_->stop();
err_ = true; err_ = true;
} }
return NOP; return NOP;
} }
MapReduce* mr_; ///< driver MapReduce* mr_; ///< driver
MapChecker* checker_; ///< checker MapChecker* checker_; ///< checker
MapEmitter emitter_; ///< emitter
int64_t scale_; ///< number of records int64_t scale_; ///< number of records
double stime_; ///< start time double stime_; ///< start time
bool err_; ///< error flag bool err_; ///< error flag
}; };
/** /**
* Front line of a merging list. * Front line of a merging list.
*/ */
struct MergeLine { struct MergeLine {
BasicDB::Cursor* cur; ///< cursor BasicDB::Cursor* cur; ///< cursor
Comparator* rcomp; ///< record comparator Comparator* rcomp; ///< record comparator
skipping to change at line 1109 skipping to change at line 1086
* code is appended at the end of the region of the return value, the ret urn value can be * code is appended at the end of the region of the return value, the ret urn value can be
* treated as a C-style string. Because the region of the return value i s allocated with the * treated as a C-style string. Because the region of the return value i s allocated with the
* the new[] operator, it should be released with the delete[] operator w hen it is no longer * the new[] operator, it should be released with the delete[] operator w hen it is no longer
* in use. * in use.
*/ */
char* get(const char* kbuf, size_t ksiz, size_t* sp) { char* get(const char* kbuf, size_t ksiz, size_t* sp) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && sp); _assert_(kbuf && ksiz <= MEMMAXSIZ && sp);
ScopedRWLock lock(&mlock_, false); ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) { if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
*sp = 0;
return false; return false;
} }
if (!cache_) return db_.get(kbuf, ksiz, sp); if (!cache_) return db_.get(kbuf, ksiz, sp);
size_t dvsiz = 0; size_t dvsiz = 0;
char* dvbuf = db_.get(kbuf, ksiz, &dvsiz); char* dvbuf = db_.get(kbuf, ksiz, &dvsiz);
size_t cvsiz = 0; size_t cvsiz = 0;
const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz); const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz);
struct Record { struct Record {
char* buf; char* buf;
size_t size; size_t size;
skipping to change at line 1137 skipping to change at line 1115
Record* rp = recs + i; Record* rp = recs + i;
rp->buf = tmpdb->get(kbuf, ksiz, &rp->size); rp->buf = tmpdb->get(kbuf, ksiz, &rp->size);
if (rp->buf) { if (rp->buf) {
rsiz += rp->size; rsiz += rp->size;
hit = true; hit = true;
} }
} }
} }
if (!hit) { if (!hit) {
delete[] recs; delete[] recs;
if (!dvbuf && !cvbuf) return NULL; if (!dvbuf && !cvbuf) {
*sp = 0;
return NULL;
}
if (!dvbuf) { if (!dvbuf) {
dvbuf = new char[cvsiz+1]; dvbuf = new char[cvsiz+1];
std::memcpy(dvbuf, cvbuf, cvsiz); std::memcpy(dvbuf, cvbuf, cvsiz);
*sp = cvsiz; *sp = cvsiz;
return dvbuf; return dvbuf;
} }
if (!cvbuf) { if (!cvbuf) {
*sp = dvsiz; *sp = dvsiz;
return dvbuf; return dvbuf;
} }
 End of changes. 20 change blocks. 
67 lines changed or deleted 54 lines changed or added


 kclangc.h   kclangc.h 
skipping to change at line 23 skipping to change at line 23
************************************************************************** ***********************/ ************************************************************************** ***********************/
#ifndef _KCLANGC_H /* duplication check */ #ifndef _KCLANGC_H /* duplication check */
#define _KCLANGC_H #define _KCLANGC_H
#if defined(__cplusplus) #if defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
#if !defined(__STDC_LIMIT_MACROS) #if !defined(__STDC_LIMIT_MACROS)
#define __STDC_LIMIT_MACROS 1 #define __STDC_LIMIT_MACROS 1 /**< enable limit macros for C++ * /
#endif #endif
#include <assert.h> #include <assert.h>
#include <ctype.h> #include <ctype.h>
#include <errno.h> #include <errno.h>
#include <float.h> #include <float.h>
#include <limits.h> #include <limits.h>
#include <locale.h> #include <locale.h>
#include <math.h> #include <math.h>
#include <setjmp.h> #include <setjmp.h>
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 kcpolydb.h   kcpolydb.h 
skipping to change at line 1350 skipping to change at line 1350
* @param func the function name of the program source code. * @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logge r::INFO for normal * @param kind the kind of the event. Logger::DEBUG for debugging, Logge r::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal err or. * information, Logger::WARN for warning, and Logger::ERROR for fatal err or.
* @param message the supplement message. * @param message the supplement message.
*/ */
void log(const char* file, int32_t line, const char* func, Logger::Kind k ind, void log(const char* file, int32_t line, const char* func, Logger::Kind k ind,
const char* message) { const char* message) {
_assert_(file && line > 0 && func && message); _assert_(file && line > 0 && func && message);
if (logger_) { if (logger_) {
logger_->log(file, line, func, kind, message); logger_->log(file, line, func, kind, message);
} else if(type_ != TYPEVOID) { } else if (type_ != TYPEVOID) {
db_->log(file, line, func, kind, message); db_->log(file, line, func, kind, message);
} }
} }
/** /**
* Set the internal logger. * Set the internal logger.
* @param logger the logger object. * @param logger the logger object.
* @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging, * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
* Logger::INFO for normal information, Logger::WARN for warning, and Log ger::ERROR for fatal * Logger::INFO for normal information, Logger::WARN for warning, and Log ger::ERROR for fatal
* error. * error.
* @return true on success, or false on failure. * @return true on success, or false on failure.
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/