bsonelement.h   bsonelement.h 
skipping to change at line 512 skipping to change at line 512
return _numberDouble(); return _numberDouble();
case NumberInt: case NumberInt:
return *reinterpret_cast< const int* >( value() ); return *reinterpret_cast< const int* >( value() );
case NumberLong: case NumberLong:
return (double) *reinterpret_cast< const long long* >( value() ); return (double) *reinterpret_cast< const long long* >( value() );
default: default:
return 0; return 0;
} }
} }
/** Retrieve int value for the element safely. Zero returned if not a number. */ /** Retrieve int value for the element safely. Zero returned if not a number. Converted to int if another numeric type. */
inline int BSONElement::numberInt() const { inline int BSONElement::numberInt() const {
switch( type() ) { switch( type() ) {
case NumberDouble: case NumberDouble:
return (int) _numberDouble(); return (int) _numberDouble();
case NumberInt: case NumberInt:
return _numberInt(); return _numberInt();
case NumberLong: case NumberLong:
return (int) _numberLong(); return (int) _numberLong();
default: default:
return 0; return 0;
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 bsonobj.h   bsonobj.h 
skipping to change at line 43 skipping to change at line 43
See bsonspec.org. See bsonspec.org.
Note that BSONObj's have a smart pointer capability built in -- so y ou can Note that BSONObj's have a smart pointer capability built in -- so y ou can
pass them around by value. The reference counts used to implement t his pass them around by value. The reference counts used to implement t his
do not use locking, so copying and destroying BSONObj's are not thre ad-safe do not use locking, so copying and destroying BSONObj's are not thre ad-safe
operations. operations.
BSON object format: BSON object format:
\code code
<unsigned totalSize> {<byte BSONType><cstring FieldName><Data>}* EOO <unsigned totalSize> {<byte BSONType><cstring FieldName><Data>}* EOO
totalSize includes itself. totalSize includes itself.
Data: Data:
Bool: <byte> Bool: <byte>
EOO: nothing follows EOO: nothing follows
Undefined: nothing follows Undefined: nothing follows
OID: an OID object OID: an OID object
NumberDouble: <double> NumberDouble: <double>
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 chunk.h   chunk.h 
skipping to change at line 129 skipping to change at line 129
bool moveAndCommit( const Shard& to , string& errmsg ); bool moveAndCommit( const Shard& to , string& errmsg );
const char * getNS(){ return "config.chunks"; } const char * getNS(){ return "config.chunks"; }
void serialize(BSONObjBuilder& to, ShardChunkVersion myLastMod=0); void serialize(BSONObjBuilder& to, ShardChunkVersion myLastMod=0);
void unserialize(const BSONObj& from); void unserialize(const BSONObj& from);
string modelServer() const; string modelServer() const;
void appendShortVersion( const char * name , BSONObjBuilder& b ); void appendShortVersion( const char * name , BSONObjBuilder& b );
void _markModified();
static int MaxChunkSize; static int MaxChunkSize;
string genID() const; string genID() const;
static string genID( const string& ns , const BSONObj& min ); static string genID( const string& ns , const BSONObj& min );
const ChunkManager* getManager() const { return _manager; } const ChunkManager* getManager() const { return _manager; }
bool modified(); bool getModified() { return _modified; }
void setModified( bool modified ) { _modified = modified; }
ShardChunkVersion getVersionOnConfigServer() const; ShardChunkVersion getVersionOnConfigServer() const;
private: private:
bool _splitIfShould( long dataWritten ); bool _splitIfShould( long dataWritten );
// main shard info // main shard info
ChunkManager * _manager; ChunkManager * _manager;
ShardKeyPattern skey() const; ShardKeyPattern skey() const;
 End of changes. 2 change blocks. 
3 lines changed or deleted 2 lines changed or added


 client.h   client.h 
skipping to change at line 32 skipping to change at line 32
todo: switch to asio...this will fit nicely with that. todo: switch to asio...this will fit nicely with that.
*/ */
#pragma once #pragma once
#include "../pch.h" #include "../pch.h"
#include "security.h" #include "security.h"
#include "namespace.h" #include "namespace.h"
#include "lasterror.h" #include "lasterror.h"
#include "stats/top.h" #include "stats/top.h"
//#include "repl/rs.h"
namespace mongo { namespace mongo {
extern class ReplSet *theReplSet; extern class ReplSet *theReplSet;
class AuthenticationInfo; class AuthenticationInfo;
class Database; class Database;
class CurOp; class CurOp;
class Command; class Command;
class Client; class Client;
class MessagingPort;
extern boost::thread_specific_ptr<Client> currentClient; extern boost::thread_specific_ptr<Client> currentClient;
class Client : boost::noncopyable { class Client : boost::noncopyable {
public: public:
static Client *syncThread;
void iAmSyncThread() {
wassert( syncThread == 0 );
syncThread = this;
}
bool isSyncThread() const { return this == syncThread; } // true if
this client is the replication secondary pull thread
static mongo::mutex clientsMutex; static mongo::mutex clientsMutex;
static set<Client*> clients; // always be in clientsMutex when mani pulating this static set<Client*> clients; // always be in clientsMutex when mani pulating this
static int recommendedYieldMicros( int * writers = 0 , int * reader s = 0 ); static int recommendedYieldMicros( int * writers = 0 , int * reader s = 0 );
/* set _god=true temporarily, safely */
class GodScope { class GodScope {
bool _prev; bool _prev;
public: public:
GodScope(); GodScope();
~GodScope(); ~GodScope();
}; };
/* Set database we want to use, then, restores when we finish (are out of scope) /* Set database we want to use, then, restores when we finish (are out of scope)
Note this is also helpful if an exception happens as the state i f fixed up. Note this is also helpful if an exception happens as the state i f fixed up.
*/ */
skipping to change at line 151 skipping to change at line 157
} }
/** /**
* call after going back into the lock, will re-establish non-t hread safe stuff * call after going back into the lock, will re-establish non-t hread safe stuff
*/ */
void relocked(){ void relocked(){
_finishInit(); _finishInit();
} }
friend class CurOp; friend class CurOp;
}; }; // class Client::Context
private: private:
void _dropns( const string& ns );
CurOp * _curOp; CurOp * _curOp;
Context * _context; Context * _context;
bool _shutdown; bool _shutdown;
set<string> _tempCollections; set<string> _tempCollections;
const char *_desc; const char *_desc;
bool _god; bool _god;
AuthenticationInfo _ai; AuthenticationInfo _ai;
ReplTime _lastOp; ReplTime _lastOp;
BSONObj _handshake; BSONObj _handshake;
BSONObj _remoteId; BSONObj _remoteId;
void _dropns( const string& ns );
public: public:
MessagingPort * const _mp;
string clientAddress() const; string clientAddress() const;
AuthenticationInfo * getAuthenticationInfo(){ return &_ai; } AuthenticationInfo * getAuthenticationInfo(){ return &_ai; }
bool isAdmin() { return _ai.isAuthorized( "admin" ); } bool isAdmin() { return _ai.isAuthorized( "admin" ); }
CurOp* curop() { return _curOp; } CurOp* curop() { return _curOp; }
Context* getContext(){ return _context; } Context* getContext(){ return _context; }
Database* database() { return _context ? _context->db() : 0; } Database* database() { return _context ? _context->db() : 0; }
const char *ns() const { return _context->ns(); } const char *ns() const { return _context->ns(); }
const char *desc() const { return _desc; } const char *desc() const { return _desc; }
Client(const char *desc); Client(const char *desc, MessagingPort *p = 0);
~Client(); ~Client();
void addTempCollection( const string& ns ); void addTempCollection( const string& ns );
void _invalidateDB(const string& db); void _invalidateDB(const string& db);
static void invalidateDB(const string& db); static void invalidateDB(const string& db);
static void invalidateNS( const string& ns ); static void invalidateNS( const string& ns );
void setLastOp( ReplTime op ) { void setLastOp( ReplTime op ) { _lastOp = op; }
_lastOp = op; ReplTime getLastOp() const { return _lastOp; }
}
ReplTime getLastOp() const {
return _lastOp;
}
/* report what the last operation was. used by getlasterror */
void appendLastOp( BSONObjBuilder& b ) { void appendLastOp( BSONObjBuilder& b ) {
if( theReplSet ) { if( theReplSet ) {
b.append("lastOp" , (long long) _lastOp); b.append("lastOp" , (long long) _lastOp);
} }
else { else {
OpTime lo(_lastOp); OpTime lo(_lastOp);
if ( ! lo.isNull() ) if ( ! lo.isNull() )
b.appendTimestamp( "lastOp" , lo.asDate() ); b.appendTimestamp( "lastOp" , lo.asDate() );
} }
} }
/* each thread which does db operations has a Client object in TLS. /* each thread which does db operations has a Client object in TLS.
call this when your thread starts. call this when your thread starts.
*/ */
static void initThread(const char *desc); static Client& initThread(const char *desc, MessagingPort *mp = 0);
/* /*
this has to be called as the client goes away, but before thread termination this has to be called as the client goes away, but before thread termination
@return true if anything was done @return true if anything was done
*/ */
bool shutdown(); bool shutdown();
/* this is for map/reduce writes */ /* this is for map/reduce writes */
bool isGod() const { return _god; } bool isGod() const { return _god; }
skipping to change at line 223 skipping to change at line 226
@return true if anything was done @return true if anything was done
*/ */
bool shutdown(); bool shutdown();
/* this is for map/reduce writes */ /* this is for map/reduce writes */
bool isGod() const { return _god; } bool isGod() const { return _god; }
friend class CurOp; friend class CurOp;
string toString() const; string toString() const;
void gotHandshake( const BSONObj& o ); void gotHandshake( const BSONObj& o );
BSONObj getRemoteID() const { return _remoteId; } BSONObj getRemoteID() const { return _remoteId; }
BSONObj getHandshake() const { return _handshake; } BSONObj getHandshake() const { return _handshake; }
}; };
/** get the Client object for this thread. */
inline Client& cc() { inline Client& cc() {
Client * c = currentClient.get(); Client * c = currentClient.get();
assert( c ); assert( c );
return *c; return *c;
} }
/* each thread which does db operations has a Client object in TLS. /* each thread which does db operations has a Client object in TLS.
call this when your thread starts. call this when your thread starts.
*/ */
inline void Client::initThread(const char *desc) { inline Client& Client::initThread(const char *desc, MessagingPort *mp) {
setThreadName(desc); setThreadName(desc);
assert( currentClient.get() == 0 ); assert( currentClient.get() == 0 );
currentClient.reset( new Client(desc) ); Client *c = new Client(desc, mp);
currentClient.reset(c);
mongo::lastError.initThread(); mongo::lastError.initThread();
return *c;
} }
inline Client::GodScope::GodScope(){ inline Client::GodScope::GodScope(){
_prev = cc()._god; _prev = cc()._god;
cc()._god = true; cc()._god = true;
} }
inline Client::GodScope::~GodScope(){ inline Client::GodScope::~GodScope(){
cc()._god = _prev; cc()._god = _prev;
} }
skipping to change at line 278 skipping to change at line 282
dbMutex.unlock_shared(); dbMutex.unlock_shared();
dbMutex.lock(); dbMutex.lock();
if ( cc().getContext() ) if ( cc().getContext() )
cc().getContext()->unlocked(); cc().getContext()->unlocked();
} }
} }
string sayClientState(); string sayClientState();
inline bool haveClient(){ inline bool haveClient() { return currentClient.get() > 0; }
return currentClient.get() > 0;
}
}; };
 End of changes. 22 change blocks. 
23 lines changed or deleted 26 lines changed or added


 clientcursor.h   clientcursor.h 
skipping to change at line 141 skipping to change at line 141
operator bool() { return _c; } operator bool() { return _c; }
ClientCursor * operator-> () { return _c; } ClientCursor * operator-> () { return _c; }
private: private:
ClientCursor *_c; ClientCursor *_c;
CursorId _id; CursorId _id;
}; };
/*const*/ CursorId cursorid; /*const*/ CursorId cursorid;
const string ns; const string ns;
const shared_ptr<Cursor> c; const shared_ptr<Cursor> c;
int pos; // # objects into the cursor so far int pos; // # objects into the cursor so far
BSONObj query; const BSONObj query; // used for logging diags only; opt
ional in constructor
const int _queryOptions; // see enum QueryOptions dbclient.h const int _queryOptions; // see enum QueryOptions dbclient.h
OpTime _slaveReadTill; OpTime _slaveReadTill;
Database * const _db; Database * const _db;
ClientCursor(int queryOptions, shared_ptr<Cursor>& _c, const string & _ns) : ClientCursor(int queryOptions, shared_ptr<Cursor>& _c, const string & _ns, BSONObj _query = BSONObj()) :
_idleAgeMillis(0), _pinValue(0), _idleAgeMillis(0), _pinValue(0),
_doingDeletes(false), _yieldSometimesTracker(128,10), _doingDeletes(false), _yieldSometimesTracker(128,10),
ns(_ns), c(_c), ns(_ns), c(_c),
pos(0), _queryOptions(queryOptions), pos(0), query(_query),
_queryOptions(queryOptions),
_db( cc().database() ) _db( cc().database() )
{ {
assert( _db ); assert( _db );
assert( str::startsWith(_ns, _db->name) ); assert( str::startsWith(_ns, _db->name) );
if( queryOptions & QueryOption_NoCursorTimeout ) if( queryOptions & QueryOption_NoCursorTimeout )
noTimeout(); noTimeout();
recursive_scoped_lock lock(ccmutex); recursive_scoped_lock lock(ccmutex);
cursorid = allocCursorId_inlock(); cursorid = allocCursorId_inlock();
clientCursorsById.insert( make_pair(cursorid, this) ); clientCursorsById.insert( make_pair(cursorid, this) );
} }
 End of changes. 3 change blocks. 
4 lines changed or deleted 6 lines changed or added


 commands.h   commands.h 
skipping to change at line 88 skipping to change at line 88
*/ */
virtual bool slaveOverrideOk() { virtual bool slaveOverrideOk() {
return false; return false;
} }
/* Override and return true to if true,log the operation (logOp()) to the replication log. /* Override and return true to if true,log the operation (logOp()) to the replication log.
(not done if fromRepl of course) (not done if fromRepl of course)
Note if run() returns false, we do NOT log. Note if run() returns false, we do NOT log.
*/ */
virtual bool logTheOp() { virtual bool logTheOp() { return false; }
return false;
}
virtual void help( stringstream& help ) const; virtual void help( stringstream& help ) const;
/* Return true if authentication and security applies to the comman ds. Some commands /* Return true if authentication and security applies to the comman ds. Some commands
(e.g., getnonce, authenticate) can be done by anyone even unauth orized. (e.g., getnonce, authenticate) can be done by anyone even unauth orized.
*/ */
virtual bool requiresAuth() { return true; } virtual bool requiresAuth() { return true; }
/** @param webUI expose the command in the web ui as localhost:2801 7/<name> /** @param webUI expose the command in the web ui as localhost:2801 7/<name>
@param oldName an optional old, deprecated name for the command @param oldName an optional old, deprecated name for the command
 End of changes. 1 change blocks. 
3 lines changed or deleted 1 lines changed or added


 curop.h   curop.h 
skipping to change at line 223 skipping to change at line 223
_active = false; _active = false;
_reset(); _reset();
_op = 0; _op = 0;
// These addresses should never be written to again. The zeroe s are // These addresses should never be written to again. The zeroe s are
// placed here as a precaution because currentOp may be accesse d // placed here as a precaution because currentOp may be accesse d
// without the db mutex. // without the db mutex.
memset(_ns, 0, sizeof(_ns)); memset(_ns, 0, sizeof(_ns));
memset(_queryBuf, 0, sizeof(_queryBuf)); memset(_queryBuf, 0, sizeof(_queryBuf));
} }
~CurOp(){ ~CurOp();
if ( _wrapped )
_client->_curOp = _wrapped;
}
BSONObj info() { BSONObj info() {
if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) { if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) {
BSONObjBuilder b; BSONObjBuilder b;
b.append("err", "unauthorized"); b.append("err", "unauthorized");
return b.obj(); return b.obj();
} }
return infoNoauth(); return infoNoauth();
} }
 End of changes. 1 change blocks. 
4 lines changed or deleted 1 lines changed or added


 d_logic.h   d_logic.h 
skipping to change at line 156 skipping to change at line 156
unsigned long long extractVersion( BSONElement e , string& errmsg ); unsigned long long extractVersion( BSONElement e , string& errmsg );
/** /**
* @return true if we have any shard info for the ns * @return true if we have any shard info for the ns
*/ */
bool haveLocalShardingInfo( const string& ns ); bool haveLocalShardingInfo( const string& ns );
/** /**
* @return true if the current threads shard version is ok, or not in s harded version * @return true if the current threads shard version is ok, or not in s harded version
*/ */
bool shardVersionOk( const string& ns , string& errmsg ); bool shardVersionOk( const string& ns , bool write , string& errmsg );
/** /**
* @return true if we took care of the message and nothing else should be done * @return true if we took care of the message and nothing else should be done
*/ */
bool handlePossibleShardedMessage( Message &m, DbResponse * dbresponse ); bool handlePossibleShardedMessage( Message &m, DbResponse * dbresponse );
void logOpForSharding( const char * opstr , const char * ns , const BSO NObj& obj , BSONObj * patt ); void logOpForSharding( const char * opstr , const char * ns , const BSO NObj& obj , BSONObj * patt );
// ----------------- // -----------------
// --- writeback --- // --- writeback ---
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 goodies.h   goodies.h 
skipping to change at line 64 skipping to change at line 64
size_t i; size_t i;
size = backtrace(b, 20); size = backtrace(b, 20);
strings = backtrace_symbols(b, size); strings = backtrace_symbols(b, size);
for (i = 0; i < size; i++) for (i = 0; i < size; i++)
o << hex << b[i] << dec << ' '; o << hex << b[i] << dec << ' ';
o << '\n'; o << '\n';
for (i = 0; i < size; i++) for (i = 0; i < size; i++)
o << ' ' << strings[i] << '\n'; o << ' ' << strings[i] << '\n';
o.flush();
free (strings); free (strings);
} }
#else #else
inline void printStackTrace( ostream &o = cout ) { } inline void printStackTrace( ostream &o = cout ) { }
#endif #endif
/* set to TRUE if we are exiting */ /* set to TRUE if we are exiting */
extern bool goingAway; extern bool goingAway;
/* find the multimap member which matches a particular key and value. /* find the multimap member which matches a particular key and value.
skipping to change at line 545 skipping to change at line 545
int used = _outof - _num; int used = _outof - _num;
if ( used > newSize ){ if ( used > newSize ){
cout << "ERROR: can't resize since we're using (" << used < < ") more than newSize(" << newSize << ")" << endl; cout << "ERROR: can't resize since we're using (" << used < < ") more than newSize(" << newSize << ")" << endl;
return; return;
} }
_outof = newSize; _outof = newSize;
_num = _outof - used; _num = _outof - used;
} }
int available(){ int available() const {
return _num; return _num;
} }
int used(){ int used() const {
return _outof - _num; return _outof - _num;
} }
int outof() const { return _outof; }
private: private:
int _outof; int _outof;
int _num; int _num;
mongo::mutex _mutex; mongo::mutex _mutex;
}; };
class TicketHolderReleaser { class TicketHolderReleaser {
public: public:
TicketHolderReleaser( TicketHolder * holder ){ TicketHolderReleaser( TicketHolder * holder ){
_holder = holder; _holder = holder;
 End of changes. 4 change blocks. 
3 lines changed or deleted 5 lines changed or added


 health.h   health.h 
skipping to change at line 30 skipping to change at line 30
namespace mongo { namespace mongo {
/* throws */ /* throws */
bool requestHeartbeat(string setname, string fromHost, string memberFul lName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false); bool requestHeartbeat(string setname, string fromHost, string memberFul lName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false);
struct HealthOptions { struct HealthOptions {
HealthOptions() { HealthOptions() {
heartbeatSleepMillis = 2000; heartbeatSleepMillis = 2000;
heartbeatTimeoutMillis = 10000; heartbeatTimeoutMillis = 10000;
heartbeatConnRetries = 3; heartbeatConnRetries = 2;
} }
bool isDefault() const { return *this == HealthOptions(); } bool isDefault() const { return *this == HealthOptions(); }
// see http://www.mongodb.org/display/DOCS/Replica+Set+Internals // see http://www.mongodb.org/display/DOCS/Replica+Set+Internals
unsigned heartbeatSleepMillis; unsigned heartbeatSleepMillis;
unsigned heartbeatTimeoutMillis; unsigned heartbeatTimeoutMillis;
unsigned heartbeatConnRetries ; unsigned heartbeatConnRetries ;
void check() { void check() {
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 log.h   log.h 
skipping to change at line 298 skipping to change at line 298
} }
inline Nullstream& log( LogLevel l ) { inline Nullstream& log( LogLevel l ) {
return Logstream::get().prolog().setLogLevel( l ); return Logstream::get().prolog().setLogLevel( l );
} }
inline Nullstream& log() { inline Nullstream& log() {
return Logstream::get().prolog(); return Logstream::get().prolog();
} }
/* TODOCONCURRENCY */ inline Nullstream& error() {
inline ostream& stdcout() { return log( LL_ERROR );
return cout; }
inline Nullstream& warning() {
return log( LL_WARNING );
} }
/* default impl returns "" -- mongod overrides */ /* default impl returns "" -- mongod overrides */
extern const char * (*getcurns)(); extern const char * (*getcurns)();
inline Nullstream& problem( int level = 0 ) { inline Nullstream& problem( int level = 0 ) {
if ( level > logLevel ) if ( level > logLevel )
return nullstream; return nullstream;
Logstream& l = Logstream::get().prolog(); Logstream& l = Logstream::get().prolog();
l << ' ' << getcurns() << ' '; l << ' ' << getcurns() << ' ';
 End of changes. 1 change blocks. 
3 lines changed or deleted 6 lines changed or added


 message.h   message.h 
skipping to change at line 33 skipping to change at line 33
namespace mongo { namespace mongo {
extern bool noUnixSocket; extern bool noUnixSocket;
class Message; class Message;
class MessagingPort; class MessagingPort;
class PiggyBackData; class PiggyBackData;
typedef AtomicUInt MSGID; typedef AtomicUInt MSGID;
class Listener { class Listener : boost::noncopyable {
public: public:
Listener(const string &ip, int p, bool logConnect=true ) : _port(p) , _ip(ip), _logConnect(logConnect), _elapsedTime(0){ } Listener(const string &ip, int p, bool logConnect=true ) : _port(p) , _ip(ip), _logConnect(logConnect), _elapsedTime(0){ }
virtual ~Listener() { virtual ~Listener() {
if ( _timeTracker == this ) if ( _timeTracker == this )
_timeTracker = 0; _timeTracker = 0;
} }
void initAndListen(); // never returns unless error (start a thread ) void initAndListen(); // never returns unless error (start a thread )
/* spawn a thread, etc., then return */ /* spawn a thread, etc., then return */
virtual void accepted(int sock, const SockAddr& from); virtual void accepted(int sock, const SockAddr& from);
skipping to change at line 77 skipping to change at line 77
} }
private: private:
string _ip; string _ip;
bool _logConnect; bool _logConnect;
long long _elapsedTime; long long _elapsedTime;
static const Listener* _timeTracker; static const Listener* _timeTracker;
}; };
class AbstractMessagingPort { class AbstractMessagingPort : boost::noncopyable {
public: public:
virtual ~AbstractMessagingPort() { } virtual ~AbstractMessagingPort() { }
virtual void reply(Message& received, Message& response, MSGID resp onseTo) = 0; // like the reply below, but doesn't rely on received.data sti ll being available virtual void reply(Message& received, Message& response, MSGID resp onseTo) = 0; // like the reply below, but doesn't rely on received.data sti ll being available
virtual void reply(Message& received, Message& response) = 0; virtual void reply(Message& received, Message& response) = 0;
virtual HostAndPort remote() const = 0; virtual HostAndPort remote() const = 0;
virtual unsigned remotePort() const = 0; virtual unsigned remotePort() const = 0;
virtual int getClientId(){ virtual int getClientId(){
int x = remotePort(); int x = remotePort();
x = x << 16; x = x << 16;
x |= ( ( 0xFF0 & (long long)this ) >> 8 ); // lowest byte in po inter often meaningless
return x; return x;
} }
}; };
class MessagingPort : public AbstractMessagingPort { class MessagingPort : public AbstractMessagingPort {
public: public:
MessagingPort(int sock, const SockAddr& farEnd); MessagingPort(int sock, const SockAddr& farEnd);
// in some cases the timeout will actually be 2x this value - eg we do a partial send, // in some cases the timeout will actually be 2x this value - eg we do a partial send,
// then the timeout fires, then we try to send again, then the time out fires again with // then the timeout fires, then we try to send again, then the time out fires again with
skipping to change at line 138 skipping to change at line 139
int unsafe_recv( char *buf, int max ); int unsafe_recv( char *buf, int max );
private: private:
int sock; int sock;
PiggyBackData * piggyBackData; PiggyBackData * piggyBackData;
public: public:
SockAddr farEnd; SockAddr farEnd;
int _timeout; int _timeout;
int _logLevel; // passed to log() when logging errors int _logLevel; // passed to log() when logging errors
static void closeAllSockets(unsigned tagMask = 0xffffffff);
/* ports can be tagged with various classes. see closeAllSockets(t
ag). defaults to 0. */
unsigned tag;
friend class PiggyBackData; friend class PiggyBackData;
}; };
enum Operations { enum Operations {
opReply = 1, /* reply. responseTo is set. */ opReply = 1, /* reply. responseTo is set. */
dbMsg = 1000, /* generic msg command followed by a string */ dbMsg = 1000, /* generic msg command followed by a string */
dbUpdate = 2001, /* update object */ dbUpdate = 2001, /* update object */
dbInsert = 2002, dbInsert = 2002,
//dbGetByOID = 2003, //dbGetByOID = 2003,
dbQuery = 2004, dbQuery = 2004,
skipping to change at line 173 skipping to change at line 179
case dbGetMore: return "getmore"; case dbGetMore: return "getmore";
case dbDelete: return "remove"; case dbDelete: return "remove";
case dbKillCursors: return "killcursors"; case dbKillCursors: return "killcursors";
default: default:
PRINT(op); PRINT(op);
assert(0); assert(0);
return ""; return "";
} }
} }
inline bool opIsWrite( int op ){
switch ( op ){
case 0:
case opReply:
case dbMsg:
case dbQuery:
case dbGetMore:
case dbKillCursors:
return false;
case dbUpdate:
case dbInsert:
case dbDelete:
return false;
default:
PRINT(op);
assert(0);
return "";
}
}
#pragma pack(1) #pragma pack(1)
/* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol /* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol
*/ */
struct MSGHEADER { struct MSGHEADER {
int messageLength; // total message size, including this int messageLength; // total message size, including this
int requestID; // identifier for this message int requestID; // identifier for this message
int responseTo; // requestID from the original request int responseTo; // requestID from the original request
// (used in reponses from db) // (used in reponses from db)
int opCode; int opCode;
}; };
 End of changes. 5 change blocks. 
2 lines changed or deleted 33 lines changed or added


 mmap.h   mmap.h 
skipping to change at line 172 skipping to change at line 172
virtual Flushable * prepareFlush(); virtual Flushable * prepareFlush();
/*void* viewOfs() { /*void* viewOfs() {
return view; return view;
}*/ }*/
long length() { long length() {
return len; return len;
} }
string filename() const { return _filename; }
private: private:
static void updateLength( const char *filename, long &length ); static void updateLength( const char *filename, long &length );
HANDLE fd; HANDLE fd;
HANDLE maphandle; HANDLE maphandle;
void *view; void *view;
long len; long len;
string _filename; string _filename;
protected: protected:
 End of changes. 1 change blocks. 
0 lines changed or deleted 2 lines changed or added


 mutex.h   mutex.h 
skipping to change at line 118 skipping to change at line 118
void leaving(mid m) { void leaving(mid m) {
if( magic != 0x12345678 ) return; if( magic != 0x12345678 ) return;
Preceeding& preceeding = *us.get(); Preceeding& preceeding = *us.get();
preceeding[m]--; preceeding[m]--;
if( preceeding[m] < 0 ) { if( preceeding[m] < 0 ) {
cout << "ERROR: lock count for " << m << " is " << preceedi ng[m] << endl; cout << "ERROR: lock count for " << m << " is " << preceedi ng[m] << endl;
assert( preceeding[m] >= 0 ); assert( preceeding[m] >= 0 );
} }
} }
}; };
extern MutexDebugger mutexDebugger; extern MutexDebugger &mutexDebugger;
// If you create a local static instance of this class, that instance w ill be destroyed // If you create a local static instance of this class, that instance w ill be destroyed
// before all global static objects are destroyed, so __destroyingStati cs will be set // before all global static objects are destroyed, so __destroyingStati cs will be set
// to true before the global static variables are destroyed. // to true before the global static variables are destroyed.
class StaticObserver : boost::noncopyable { class StaticObserver : boost::noncopyable {
public: public:
~StaticObserver() { __destroyingStatics = true; } ~StaticObserver() { __destroyingStatics = true; }
}; };
// On pthread systems, it is an error to destroy a mutex while held. S tatic global // On pthread systems, it is an error to destroy a mutex while held. S tatic global
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 oplog.h   oplog.h 
skipping to change at line 198 skipping to change at line 198
void destroyClientCursor() { void destroyClientCursor() {
if ( _findingStartCursor ) { if ( _findingStartCursor ) {
ClientCursor::erase( _findingStartCursor->cursorid ); ClientCursor::erase( _findingStartCursor->cursorid );
_findingStartCursor = 0; _findingStartCursor = 0;
} }
} }
void init() { void init() {
// Use a ClientCursor here so we can release db mutex while sca nning // Use a ClientCursor here so we can release db mutex while sca nning
// oplog (can take quite a while with large oplogs). // oplog (can take quite a while with large oplogs).
shared_ptr<Cursor> c = _qp.newReverseCursor(); shared_ptr<Cursor> c = _qp.newReverseCursor();
_findingStartCursor = new ClientCursor(QueryOption_NoCursorTime out, c, _qp.ns()); _findingStartCursor = new ClientCursor(QueryOption_NoCursorTime out, c, _qp.ns(), BSONObj());
_findingStartTimer.reset(); _findingStartTimer.reset();
_findingStartMode = Initial; _findingStartMode = Initial;
BSONElement tsElt = _qp.originalQuery()[ "ts" ]; BSONElement tsElt = _qp.originalQuery()[ "ts" ];
massert( 13044, "no ts field in query", !tsElt.eoo() ); massert( 13044, "no ts field in query", !tsElt.eoo() );
BSONObjBuilder b; BSONObjBuilder b;
b.append( tsElt ); b.append( tsElt );
BSONObj tsQuery = b.obj(); BSONObj tsQuery = b.obj();
_matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()) ); _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()) );
} }
}; };
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 optime.h   optime.h 
skipping to change at line 107 skipping to change at line 107
long long asLL() const { long long asLL() const {
return reinterpret_cast<const long long*>(&i)[0]; return reinterpret_cast<const long long*>(&i)[0];
} }
bool isNull() const { return secs == 0; } bool isNull() const { return secs == 0; }
string toStringLong() const { string toStringLong() const {
char buf[64]; char buf[64];
time_t_to_String(secs, buf); time_t_to_String(secs, buf);
stringstream ss; stringstream ss;
ss << buf << ' '; ss << time_t_to_String_short(secs) << ' ';
ss << hex << secs << ':' << i; ss << hex << secs << ':' << i;
return ss.str(); return ss.str();
} }
string toStringPretty() const { string toStringPretty() const {
stringstream ss; stringstream ss;
ss << time_t_to_String_short(secs) << ':' << hex << i; ss << time_t_to_String_short(secs) << ':' << hex << i;
return ss.str(); return ss.str();
} }
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 parallel.h   parallel.h 
skipping to change at line 272 skipping to change at line 272
bool join(); bool join();
private: private:
CommandResult( const string& server , const string& db , const BSONObj& cmd ); CommandResult( const string& server , const string& db , const BSONObj& cmd );
string _server; string _server;
string _db; string _db;
BSONObj _cmd; BSONObj _cmd;
boost::thread _thr; scoped_ptr<boost::thread> _thr;
BSONObj _res; BSONObj _res;
bool _done; bool _done;
bool _ok; bool _ok;
friend class Future; friend class Future;
}; };
static void commandThread(); static void commandThread( shared_ptr<CommandResult> res );
static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd ); static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd );
private:
static shared_ptr<CommandResult> * _grab;
}; };
} }
#include "undef_macros.h" #include "undef_macros.h"
 End of changes. 3 change blocks. 
5 lines changed or deleted 2 lines changed or added


 pdfile.h   pdfile.h 
skipping to change at line 82 skipping to change at line 82
DataFileHeader *getHeader() { DataFileHeader *getHeader() {
return header; return header;
} }
/* return max size an extent may be */ /* return max size an extent may be */
static int maxSize(); static int maxSize();
void flush( bool sync ); void flush( bool sync );
private: private:
void badOfs(int) const;
int defaultSize( const char *filename ) const; int defaultSize( const char *filename ) const;
Extent* getExtent(DiskLoc loc); Extent* getExtent(DiskLoc loc);
Extent* _getExtent(DiskLoc loc); Extent* _getExtent(DiskLoc loc);
Record* recordAt(DiskLoc dl); Record* recordAt(DiskLoc dl);
Record* makeRecord(DiskLoc dl, int size); Record* makeRecord(DiskLoc dl, int size);
void grow(DiskLoc dl, int size); void grow(DiskLoc dl, int size);
MMF mmf; MMF mmf;
MMF::Pointer _p; MMF::Pointer _p;
skipping to change at line 258 skipping to change at line 260
assert( x > 0 ); assert( x > 0 );
return (Record *) (((char *) this) + x); return (Record *) (((char *) this) + x);
} }
Extent* getNextExtent() { Extent* getNextExtent() {
return xnext.isNull() ? 0 : DataFileMgr::getExtent(xnext); return xnext.isNull() ? 0 : DataFileMgr::getExtent(xnext);
} }
Extent* getPrevExtent() { Extent* getPrevExtent() {
return xprev.isNull() ? 0 : DataFileMgr::getExtent(xprev); return xprev.isNull() ? 0 : DataFileMgr::getExtent(xprev);
} }
static int maxSize();
}; };
/* /*
---------------------- ----------------------
Header Header
---------------------- ----------------------
Extent (for a particular namespace) Extent (for a particular namespace)
Record Record
... ...
Record (some chained for unused space) Record (some chained for unused space)
skipping to change at line 342 skipping to change at line 346
} }
} // namespace mongo } // namespace mongo
#include "cursor.h" #include "cursor.h"
namespace mongo { namespace mongo {
inline Record* MongoDataFile::recordAt(DiskLoc dl) { inline Record* MongoDataFile::recordAt(DiskLoc dl) {
int ofs = dl.getOfs(); int ofs = dl.getOfs();
assert( ofs >= DataFileHeader::HeaderSize ); if( ofs < DataFileHeader::HeaderSize ) badOfs(ofs); // will uassert - external call to keep out of the normal code path
return (Record*) _p.at(ofs, -1); return (Record*) _p.at(ofs, -1);
} }
inline void MongoDataFile::grow(DiskLoc dl, int size) { inline void MongoDataFile::grow(DiskLoc dl, int size) {
int ofs = dl.getOfs(); int ofs = dl.getOfs();
_p.grow(ofs, size); _p.grow(ofs, size);
} }
inline Record* MongoDataFile::makeRecord(DiskLoc dl, int size) { inline Record* MongoDataFile::makeRecord(DiskLoc dl, int size) {
int ofs = dl.getOfs(); int ofs = dl.getOfs();
 End of changes. 3 change blocks. 
1 lines changed or deleted 5 lines changed or added


 rs.h   rs.h 
skipping to change at line 47 skipping to change at line 47
class OplogReader; class OplogReader;
extern bool replSet; // true if using repl sets extern bool replSet; // true if using repl sets
extern class ReplSet *theReplSet; // null until initialized extern class ReplSet *theReplSet; // null until initialized
extern Tee *rsLog; extern Tee *rsLog;
/* member of a replica set */ /* member of a replica set */
class Member : public List1<Member>::Base { class Member : public List1<Member>::Base {
public: public:
Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self); Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
string fullName() const { return h().toString(); } string fullName() const { return h().toString(); }
const ReplSetConfig::MemberCfg& config() const { return *_config; } const ReplSetConfig::MemberCfg& config() const { return _config; }
const HeartbeatInfo& hbinfo() const { return _hbinfo; } const HeartbeatInfo& hbinfo() const { return _hbinfo; }
string lhb() { return _hbinfo.lastHeartbeatMsg; } HeartbeatInfo& get_hbinfo() { return _hbinfo; }
string lhb() const { return _hbinfo.lastHeartbeatMsg; }
MemberState state() const { return _hbinfo.hbstate; } MemberState state() const { return _hbinfo.hbstate; }
const HostAndPort& h() const { return _h; } const HostAndPort& h() const { return _h; }
unsigned id() const { return _hbinfo.id(); } unsigned id() const { return _hbinfo.id(); }
bool potentiallyHot() const { return _config->potentiallyHot(); } / bool potentiallyHot() const { return _config.potentiallyHot(); } //
/ not arbiter, not priority 0 not arbiter, not priority 0
void summarizeMember(stringstream& s) const; void summarizeMember(stringstream& s) const;
friend class ReplSetImpl; friend class ReplSetImpl;
private: private:
const ReplSetConfig::MemberCfg *_config; /* todo: when this changes const ReplSetConfig::MemberCfg _config;
??? */ const HostAndPort _h;
HostAndPort _h;
HeartbeatInfo _hbinfo; HeartbeatInfo _hbinfo;
}; };
class Manager : public task::Server { class Manager : public task::Server {
ReplSetImpl *rs; ReplSetImpl *rs;
bool busyWithElectSelf; bool busyWithElectSelf;
int _primary; int _primary;
const Member* findOtherPrimary();
/** @param two - if true two primaries were seen. this can happen
transiently, in addition to our
polling being only occasional. in this case null
is returned, but the caller should
not assume primary itself in that situation.
*/
const Member* findOtherPrimary(bool& two);
void noteARemoteIsPrimary(const Member *); void noteARemoteIsPrimary(const Member *);
virtual void starting(); virtual void starting();
public: public:
Manager(ReplSetImpl *rs); Manager(ReplSetImpl *rs);
~Manager(); ~Manager();
void msgReceivedNewConfig(BSONObj); void msgReceivedNewConfig(BSONObj);
void msgCheckNewState(); void msgCheckNewState();
}; };
struct Target; struct Target;
skipping to change at line 105 skipping to change at line 111
steppedDown = 0; steppedDown = 0;
} }
/* if we've stepped down, this is when we are allowed to try to ele ct ourself again. /* if we've stepped down, this is when we are allowed to try to ele ct ourself again.
todo: handle possible weirdnesses at clock skews etc. todo: handle possible weirdnesses at clock skews etc.
*/ */
time_t steppedDown; time_t steppedDown;
int totalVotes() const; int totalVotes() const;
bool aMajoritySeemsToBeUp() const; bool aMajoritySeemsToBeUp() const;
bool shouldRelinquish() const;
void electSelf(); void electSelf();
void electCmdReceived(BSONObj, BSONObjBuilder*); void electCmdReceived(BSONObj, BSONObjBuilder*);
void multiCommand(BSONObj cmd, list<Target>& L); void multiCommand(BSONObj cmd, list<Target>& L);
}; };
/** most operations on a ReplSet object should be done while locked. th at logic implemented here. */ /** most operations on a ReplSet object should be done while locked. th at logic implemented here. */
class RSBase : boost::noncopyable { class RSBase : boost::noncopyable {
public: public:
const unsigned magic; const unsigned magic;
void assertValid() { assert( magic == 0x12345677 ); } void assertValid() { assert( magic == 0x12345677 ); }
private: private:
mutex m; mutex m;
int _locked; int _locked;
ThreadLocalValue<bool> _lockedByMe; ThreadLocalValue<bool> _lockedByMe;
protected: protected:
RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
~RSBase() { ~RSBase() {
log() << "~RSBase should never be called?" << rsLog; /* this can happen if we throw in the constructor; otherwise ne
assert(false); ver happens. thus we log it as it is quite unusual. */
log() << "replSet ~RSBase called" << rsLog;
} }
class lock { class lock {
RSBase& rsbase; RSBase& rsbase;
auto_ptr<scoped_lock> sl; auto_ptr<scoped_lock> sl;
public: public:
lock(RSBase* b) : rsbase(*b) { lock(RSBase* b) : rsbase(*b) {
if( rsbase._lockedByMe.get() ) if( rsbase._lockedByMe.get() )
return; // recursive is ok... return; // recursive is ok...
skipping to change at line 178 skipping to change at line 185
const Member *primary; const Member *primary;
}; };
const SP get() { const SP get() {
scoped_lock lk(m); scoped_lock lk(m);
return sp; return sp;
} }
MemberState getState() const { return sp.state; } MemberState getState() const { return sp.state; }
const Member* getPrimary() const { return sp.primary; } const Member* getPrimary() const { return sp.primary; }
void change(MemberState s, const Member *self) { void change(MemberState s, const Member *self) {
scoped_lock lk(m); scoped_lock lk(m);
if( sp.state != s ) {
log() << "replSet " << s.toString() << rsLog;
}
sp.state = s; sp.state = s;
if( s.primary() ) { if( s.primary() ) {
sp.primary = self; sp.primary = self;
} }
else { else {
if( self == sp.primary ) if( self == sp.primary )
sp.primary = 0; sp.primary = 0;
} }
} }
void set(MemberState s, const Member *p) { void set(MemberState s, const Member *p) {
scoped_lock lk(m); scoped_lock lk(m);
sp.state = s; sp.primary = p; sp.state = s; sp.primary = p;
} }
void setSelfPrimary(const Member *self) { change(MemberState::RS_PR IMARY, self); } void setSelfPrimary(const Member *self) { change(MemberState::RS_PR IMARY, self); }
void setOtherPrimary(const Member *mem) { void setOtherPrimary(const Member *mem) {
scoped_lock lk(m); scoped_lock lk(m);
assert( !sp.state.primary() ); assert( !sp.state.primary() );
sp.primary = mem; sp.primary = mem;
} }
void noteRemoteIsPrimary(const Member *remote) {
scoped_lock lk(m);
if( !sp.state.secondary() && !sp.state.fatal() )
sp.state = MemberState::RS_RECOVERING;
sp.primary = remote;
}
StateBox() : m("StateBox") { } StateBox() : m("StateBox") { }
private: private:
mutex m; mutex m;
SP sp; SP sp;
}; };
void parseReplsetCmdLine(string cfgString, string& setname, vector<Host AndPort>& seeds, set<HostAndPort>& seedSet ); void parseReplsetCmdLine(string cfgString, string& setname, vector<Host AndPort>& seeds, set<HostAndPort>& seedSet );
/** Parameter given to the --replSet command line option (parsed). /** Parameter given to the --replSet command line option (parsed).
Syntax is "<setname>/<seedhost1>,<seedhost2>" Syntax is "<setname>/<seedhost1>,<seedhost2>"
skipping to change at line 229 skipping to change at line 245
*/ */
class ReplSetImpl : protected RSBase { class ReplSetImpl : protected RSBase {
public: public:
/** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */ /** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */
enum StartupStatus { enum StartupStatus {
PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3,
EMPTYUNREACHABLE=4, STARTED=5, SOON=6 EMPTYUNREACHABLE=4, STARTED=5, SOON=6
}; };
static StartupStatus startupStatus; static StartupStatus startupStatus;
static string startupStatusMsg; static string startupStatusMsg;
static string stateAsStr(MemberState state);
static string stateAsHtml(MemberState state); static string stateAsHtml(MemberState state);
/* todo thread */ /* todo thread */
void msgUpdateHBInfo(HeartbeatInfo); void msgUpdateHBInfo(HeartbeatInfo);
StateBox box; StateBox box;
OpTime lastOpTimeWritten; OpTime lastOpTimeWritten;
long long lastH; // hash we use to make sure we are reading the rig ht flow of ops and aren't on an out-of-date "fork" long long lastH; // hash we use to make sure we are reading the rig ht flow of ops and aren't on an out-of-date "fork"
private: private:
set<ReplSetHealthPollTask*> healthTasks; set<ReplSetHealthPollTask*> healthTasks;
void endOldHealthTasks(); void endOldHealthTasks();
void startHealthTaskFor(Member *m); void startHealthTaskFor(Member *m);
private:
Consensus elect; Consensus elect;
bool ok() const { return !box.getState().fatal(); }
void relinquish(); void relinquish();
void forgetPrimary(); void forgetPrimary();
protected: protected:
bool _stepDown(); bool _stepDown();
private: private:
void assumePrimary(); void assumePrimary();
void loadLastOpTimeWritten(); void loadLastOpTimeWritten();
void changeState(MemberState s); void changeState(MemberState s);
protected: protected:
// "heartbeat message" // "heartbeat message"
// sent in requestHeartbeat respond in field "hbm" // sent in requestHeartbeat respond in field "hbm"
char _hbmsg[256]; // we change this unlocked, thus not an stl::stri ng char _hbmsg[256]; // we change this unlocked, thus not an stl::stri ng
time_t _hbmsgTime; // when it was logged
public: public:
void sethbmsg(string s, int logLevel = 0); void sethbmsg(string s, int logLevel = 0);
protected: protected:
bool initFromConfig(ReplSetConfig& c); // true if ok; throws if con fig really bad; false if config doesn't include self bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true i f ok; throws if config really bad; false if config doesn't include self
void _fillIsMaster(BSONObjBuilder&); void _fillIsMaster(BSONObjBuilder&);
void _fillIsMasterHost(const Member*, vector<string>&, vector<strin g>&, vector<string>&); void _fillIsMasterHost(const Member*, vector<string>&, vector<strin g>&, vector<string>&);
const ReplSetConfig& config() { return *_cfg; } const ReplSetConfig& config() { return *_cfg; }
string name() const { return _name; } /* @return replica set's logi cal name */ string name() const { return _name; } /* @return replica set's logi cal name */
MemberState state() const { return box.getState(); } MemberState state() const { return box.getState(); }
void _fatal(); void _fatal();
void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) con st; void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) con st;
void _summarizeAsHtml(stringstream&) const; void _summarizeAsHtml(stringstream&) const;
void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStat us command void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStat us command
skipping to change at line 326 skipping to change at line 337
friend class Member; friend class Member;
friend class Manager; friend class Manager;
friend class Consensus; friend class Consensus;
private: private:
/* pulling data from primary related - see rs_sync.cpp */ /* pulling data from primary related - see rs_sync.cpp */
bool initialSyncOplogApplication(string hn, const Member *primary, OpTime applyGTE, OpTime minValid); bool initialSyncOplogApplication(string hn, const Member *primary, OpTime applyGTE, OpTime minValid);
void _syncDoInitialSync(); void _syncDoInitialSync();
void syncDoInitialSync(); void syncDoInitialSync();
void _syncThread(); void _syncThread();
bool tryToGoLiveAsASecondary(OpTime&); // readlocks
void syncTail(); void syncTail();
void syncApply(const BSONObj &o); void syncApply(const BSONObj &o);
unsigned _syncRollback(OplogReader& r);
void syncRollback(OplogReader& r); void syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r); void syncFixUp(HowToFixUp& h, OplogReader& r);
public: public:
void syncThread(); void syncThread();
}; };
class ReplSet : public ReplSetImpl { class ReplSet : public ReplSetImpl {
public: public:
ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdlin e) { } ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdlin e) { }
skipping to change at line 347 skipping to change at line 360
bool stepDown() { return _stepDown(); } bool stepDown() { return _stepDown(); }
string selfFullName() { string selfFullName() {
lock lk(this); lock lk(this);
return _self->fullName(); return _self->fullName();
} }
/* call after constructing to start - returns fairly quickly after la[unching its threads */ /* call after constructing to start - returns fairly quickly after la[unching its threads */
void go() { _go(); } void go() { _go(); }
void fatal() { _fatal(); } void fatal() { _fatal(); }
bool isPrimary(); bool isPrimary() { return box.getState().primary(); }
bool isSecondary(); bool isSecondary() { return box.getState().secondary(); }
MemberState state() const { return ReplSetImpl::state(); } MemberState state() const { return ReplSetImpl::state(); }
string name() const { return ReplSetImpl::name(); } string name() const { return ReplSetImpl::name(); }
const ReplSetConfig& config() { return ReplSetImpl::config(); } const ReplSetConfig& config() { return ReplSetImpl::config(); }
void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) cons t { _getOplogDiagsAsHtml(server_id,ss); } void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) cons t { _getOplogDiagsAsHtml(server_id,ss); }
void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss) ; } void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss) ; }
void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b ); } void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b ); }
void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); }
/* we have a new config (reconfig) - apply it. /* we have a new config (reconfig) - apply it.
@param comment write a no-op comment to the oplog about it. onl y makes sense if one is primary and initiating the reconf. @param comment write a no-op comment to the oplog about it. onl y makes sense if one is primary and initiating the reconf.
*/ */
void haveNewConfig(ReplSetConfig& c, bool comment); void haveNewConfig(ReplSetConfig& c, bool comment);
/* if we delete old configs, this needs to assure locking. currentl y we don't so it is ok. */ /* if we delete old configs, this needs to assure locking. currentl y we don't so it is ok. */
const ReplSetConfig& getConfig() { return config(); } const ReplSetConfig& getConfig() { return config(); }
bool lockedByMe() { return RSBase::lockedByMe(); } bool lockedByMe() { return RSBase::lockedByMe(); }
// heartbeat msg to send to others; descriptive diagnostic info // heartbeat msg to send to others; descriptive diagnostic info
string hbmsg() const { return _hbmsg; } string hbmsg() const {
if( time(0)-_hbmsgTime > 120 ) return "";
return _hbmsg;
}
}; };
/** base class for repl set commands. checks basic things such as in r s mode before the command /** base class for repl set commands. checks basic things such as in r s mode before the command
does its real work does its real work
*/ */
class ReplSetCommand : public Command { class ReplSetCommand : public Command {
protected: protected:
ReplSetCommand(const char * s, bool show=false) : Command(s, show) { } ReplSetCommand(const char * s, bool show=false) : Command(s, show) { }
virtual bool slaveOk() const { return true; } virtual bool slaveOk() const { return true; }
virtual bool adminOnly() const { return true; } virtual bool adminOnly() const { return true; }
skipping to change at line 391 skipping to change at line 408
virtual LockType locktype() const { return NONE; } virtual LockType locktype() const { return NONE; }
virtual void help( stringstream &help ) const { help << "internal"; } virtual void help( stringstream &help ) const { help << "internal"; }
bool check(string& errmsg, BSONObjBuilder& result) { bool check(string& errmsg, BSONObjBuilder& result) {
if( !replSet ) { if( !replSet ) {
errmsg = "not running with --replSet"; errmsg = "not running with --replSet";
return false; return false;
} }
if( theReplSet == 0 ) { if( theReplSet == 0 ) {
result.append("startupStatus", ReplSet::startupStatus); result.append("startupStatus", ReplSet::startupStatus);
errmsg = ReplSet::startupStatusMsg.empty() ? "replset unkno wn error 2" : ReplSet::startupStatusMsg; errmsg = ReplSet::startupStatusMsg.empty() ? "replset unkno wn error 2" : ReplSet::startupStatusMsg;
if( ReplSet::startupStatus == 3 )
result.append("info", "run rs.initiate(...) if not yet
done for the set");
return false; return false;
} }
return true; return true;
} }
}; };
/** inlines ----------------- */ /** inlines ----------------- */
inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig: :MemberCfg *c, bool self) : inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig: :MemberCfg *c, bool self) :
_config(c), _h(h), _hbinfo(ord) { _config(*c), _h(h), _hbinfo(ord)
if( self ) { {
_hbinfo.health = 1.0; if( self )
} _hbinfo.health = 1.0;
}
inline bool ReplSet::isPrimary() {
/* todo replset */
return box.getState().primary();
}
inline bool ReplSet::isSecondary() {
return box.getState().secondary();
} }
} }
 End of changes. 23 change blocks. 
34 lines changed or deleted 47 lines changed or added


 rs_config.h   rs_config.h 
skipping to change at line 44 skipping to change at line 44
/* if something is misconfigured, throws an exception. /* if something is misconfigured, throws an exception.
if couldn't be queried or is just blank, ok() will be false. if couldn't be queried or is just blank, ok() will be false.
*/ */
ReplSetConfig(const HostAndPort& h); ReplSetConfig(const HostAndPort& h);
ReplSetConfig(BSONObj cfg); ReplSetConfig(BSONObj cfg);
bool ok() const { return _ok; } bool ok() const { return _ok; }
struct MemberCfg { struct MemberCfg {
MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(fal se) { } MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(fal se), slaveDelay(0), hidden(false) { }
int _id; /* ordinal */ int _id; /* ordinal */
unsigned votes; /* how many votes this node gets. default 1. */ unsigned votes; /* how many votes this node gets. default 1. */
HostAndPort h; HostAndPort h;
double priority; /* 0 means can never be primary */ double priority; /* 0 means can never be primary */
bool arbiterOnly; bool arbiterOnly;
int slaveDelay; /* seconds. int rather than unsigned for
convenient to/front bson conversion. */
bool hidden; /* if set, don't advertise to drives in i
sMaster. for non-primaries (priority 0) */
void check() const; /* check validity, assert if not. */ void check() const; /* check validity, assert if not. */
BSONObj asBson() const; BSONObj asBson() const;
bool potentiallyHot() const { bool potentiallyHot() const {
return !arbiterOnly && priority > 0; return !arbiterOnly && priority > 0;
} }
bool operator==(const MemberCfg& r) const {
return _id==r._id && votes == r.votes && h == r.h && priori
ty == r.priority &&
arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDe
lay && hidden == r.hidden;
}
bool operator!=(const MemberCfg& r) const { return !(*this == r
); }
}; };
vector<MemberCfg> members; vector<MemberCfg> members;
string _id; string _id;
int version; int version;
HealthOptions ho; HealthOptions ho;
string md5; string md5;
BSONObj getLastErrorDefaults; BSONObj getLastErrorDefaults;
list<HostAndPort> otherMemberHostnames() const; // except self list<HostAndPort> otherMemberHostnames() const; // except self
/** @return true if could connect, and there is no cfg object there at all */ /** @return true if could connect, and there is no cfg object there at all */
skipping to change at line 71 skipping to change at line 80
BSONObj getLastErrorDefaults; BSONObj getLastErrorDefaults;
list<HostAndPort> otherMemberHostnames() const; // except self list<HostAndPort> otherMemberHostnames() const; // except self
/** @return true if could connect, and there is no cfg object there at all */ /** @return true if could connect, and there is no cfg object there at all */
bool empty() const { return version == EMPTYCONFIG; } bool empty() const { return version == EMPTYCONFIG; }
string toString() const { return asBson().toString(); } string toString() const { return asBson().toString(); }
/** validate the settings. does not call check() on each member, yo u have to do that separately. */ /** validate the settings. does not call check() on each member, yo u have to do that separately. */
void check() const; void checkRsConfig() const;
/** check if modification makes sense */ /** check if modification makes sense */
static bool legalChange(const ReplSetConfig& old, const ReplSetConf ig& n, string& errmsg); static bool legalChange(const ReplSetConfig& old, const ReplSetConf ig& n, string& errmsg);
//static void receivedNewConfig(BSONObj); //static void receivedNewConfig(BSONObj);
void saveConfigLocally(BSONObj comment); // to local db void saveConfigLocally(BSONObj comment); // to local db
string saveConfigEverywhere(); // returns textual info on what happ ened string saveConfigEverywhere(); // returns textual info on what happ ened
BSONObj asBson() const; BSONObj asBson() const;
 End of changes. 5 change blocks. 
2 lines changed or deleted 16 lines changed or added


 rs_member.h   rs_member.h 
skipping to change at line 42 skipping to change at line 42
struct MemberState { struct MemberState {
enum MS { enum MS {
RS_STARTUP, RS_STARTUP,
RS_PRIMARY, RS_PRIMARY,
RS_SECONDARY, RS_SECONDARY,
RS_RECOVERING, RS_RECOVERING,
RS_FATAL, RS_FATAL,
RS_STARTUP2, RS_STARTUP2,
RS_UNKNOWN, /* remote node not yet reached */ RS_UNKNOWN, /* remote node not yet reached */
RS_ARBITER, RS_ARBITER,
RS_DOWN /* node not reachable for a report */ RS_DOWN, /* node not reachable for a report */
RS_ROLLBACK
} s; } s;
MemberState(MS ms = RS_UNKNOWN) : s(ms) { } MemberState(MS ms = RS_UNKNOWN) : s(ms) { }
explicit MemberState(int ms) : s((MS) ms) { } explicit MemberState(int ms) : s((MS) ms) { }
bool primary() const { return s == RS_PRIMARY; } bool primary() const { return s == RS_PRIMARY; }
bool secondary() const { return s == RS_SECONDARY; } bool secondary() const { return s == RS_SECONDARY; }
bool recovering() const { return s == RS_RECOVERING; } bool recovering() const { return s == RS_RECOVERING; }
bool startup2() const { return s == RS_STARTUP2; } bool startup2() const { return s == RS_STARTUP2; }
bool fatal() const { return s == RS_FATAL; } bool fatal() const { return s == RS_FATAL; }
bool rollback() const { return s == RS_ROLLBACK; }
string toString() const;
bool operator==(const MemberState& r) const { return s == r.s; } bool operator==(const MemberState& r) const { return s == r.s; }
bool operator!=(const MemberState& r) const { return s != r.s; } bool operator!=(const MemberState& r) const { return s != r.s; }
}; };
/* this is supposed to be just basic information on a member, /* this is supposed to be just basic information on a member,
and copy constructable. */ and copy constructable. */
class HeartbeatInfo { class HeartbeatInfo {
unsigned _id; unsigned _id;
public: public:
HeartbeatInfo() : _id(0xffffffff),skew(INT_MIN) { } HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN), health(-1.0),downSince(0),skew(INT_MIN) { }
HeartbeatInfo(unsigned id); HeartbeatInfo(unsigned id);
bool up() const { return health > 0; } bool up() const { return health > 0; }
unsigned id() const { return _id; } unsigned id() const { return _id; }
MemberState hbstate; MemberState hbstate;
double health; double health;
time_t upSince; time_t upSince;
long long downSince;
time_t lastHeartbeat; time_t lastHeartbeat;
string lastHeartbeatMsg; string lastHeartbeatMsg;
OpTime opTime; OpTime opTime;
int skew; int skew;
long long timeDown() const; // ms
/* true if changed in a way of interest to the repl set manager. */ /* true if changed in a way of interest to the repl set manager. */
bool changed(const HeartbeatInfo& old) const; bool changed(const HeartbeatInfo& old) const;
}; };
inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
health = -1.0; hbstate = MemberState::RS_UNKNOWN;
lastHeartbeat = upSince = 0; health = -1.0;
skew = INT_MIN; downSince = 0;
lastHeartbeat = upSince = 0;
skew = INT_MIN;
} }
inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const {
return health != old.health || return health != old.health ||
hbstate != old.hbstate; hbstate != old.hbstate;
} }
} }
 End of changes. 6 change blocks. 
5 lines changed or deleted 14 lines changed or added


 sock.h   sock.h 
skipping to change at line 255 skipping to change at line 255
log() << "can't get this server's hostname " << errnoWithDescri ption() << endl; log() << "can't get this server's hostname " << errnoWithDescri ption() << endl;
return ""; return "";
} }
return buf; return buf;
} }
string getHostNameCached(); string getHostNameCached();
class ListeningSockets { class ListeningSockets {
public: public:
ListeningSockets() : _mutex("ListeningSockets"), _sockets( new set< ListeningSockets() : _mutex("ListeningSockets"), _sockets( new set<
int>() ){ int>() ) { }
}
void add( int sock ){ void add( int sock ){
scoped_lock lk( _mutex ); scoped_lock lk( _mutex );
_sockets->insert( sock ); _sockets->insert( sock );
} }
void remove( int sock ){ void remove( int sock ){
scoped_lock lk( _mutex ); scoped_lock lk( _mutex );
_sockets->erase( sock ); _sockets->erase( sock );
} }
void closeAll(){ void closeAll(){
set<int>* s; set<int>* s;
{ {
scoped_lock lk( _mutex ); scoped_lock lk( _mutex );
s = _sockets; s = _sockets;
_sockets = new set<int>(); _sockets = new set<int>();
} }
for ( set<int>::iterator i=s->begin(); i!=s->end(); i++ ) {
for ( set<int>::iterator i=s->begin(); i!=s->end(); i++ ){
int sock = *i; int sock = *i;
log() << "\t going to close listening socket: " << sock << endl; log() << "closing listening socket: " << sock << endl;
closesocket( sock ); closesocket( sock );
} }
} }
static ListeningSockets* get(); static ListeningSockets* get();
private: private:
mongo::mutex _mutex; mongo::mutex _mutex;
set<int>* _sockets; set<int>* _sockets;
static ListeningSockets* _instance; static ListeningSockets* _instance;
}; };
} // namespace mongo } // namespace mongo
 End of changes. 7 change blocks. 
11 lines changed or deleted 4 lines changed or added


 util.h   util.h 
skipping to change at line 91 skipping to change at line 91
bool isSet() const { bool isSet() const {
return _combined > 0; return _combined > 0;
} }
string toString() const { string toString() const {
stringstream ss; stringstream ss;
ss << _major << "|" << _minor; ss << _major << "|" << _minor;
return ss.str(); return ss.str();
} }
int majorVersion() const { return _major; }
int minorVersion() const { return _minor; }
operator unsigned long long() const { return _combined; } operator unsigned long long() const { return _combined; }
ShardChunkVersion& operator=( const BSONElement& elem ){ ShardChunkVersion& operator=( const BSONElement& elem ){
switch ( elem.type() ){ switch ( elem.type() ){
case Timestamp: case Timestamp:
case NumberLong: case NumberLong:
case Date: case Date:
_combined = elem._numberLong(); _combined = elem._numberLong();
break; break;
case EOO: case EOO:
 End of changes. 1 change blocks. 
0 lines changed or deleted 3 lines changed or added


 version.h   version.h 
skipping to change at line 20 skipping to change at line 20
// mongo version // mongo version
extern const char versionString[]; extern const char versionString[];
string mongodVersion(); string mongodVersion();
const char * gitVersion(); const char * gitVersion();
void printGitVersion(); void printGitVersion();
string sysInfo(); string sysInfo();
void printSysInfo(); void printSysInfo();
void show_32_warning(); void show_warnings();
} // namespace mongo } // namespace mongo
#endif // UTIL_VERSION_HEADER #endif // UTIL_VERSION_HEADER
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/