| client.h | | client.h | |
| | | | |
| skipping to change at line 32 | | skipping to change at line 32 | |
| todo: switch to asio...this will fit nicely with that. | | todo: switch to asio...this will fit nicely with that. | |
| */ | | */ | |
| | | | |
| #pragma once | | #pragma once | |
| | | | |
| #include "../pch.h" | | #include "../pch.h" | |
| #include "security.h" | | #include "security.h" | |
| #include "namespace.h" | | #include "namespace.h" | |
| #include "lasterror.h" | | #include "lasterror.h" | |
| #include "stats/top.h" | | #include "stats/top.h" | |
|
| //#include "repl/rs.h" | | | |
| | | | |
| namespace mongo { | | namespace mongo { | |
| | | | |
| extern class ReplSet *theReplSet; | | extern class ReplSet *theReplSet; | |
|
| | | | |
| class AuthenticationInfo; | | class AuthenticationInfo; | |
| class Database; | | class Database; | |
| class CurOp; | | class CurOp; | |
| class Command; | | class Command; | |
| class Client; | | class Client; | |
|
| | | class MessagingPort; | |
| | | | |
| extern boost::thread_specific_ptr<Client> currentClient; | | extern boost::thread_specific_ptr<Client> currentClient; | |
| | | | |
| class Client : boost::noncopyable { | | class Client : boost::noncopyable { | |
| public: | | public: | |
|
| | | static Client *syncThread; | |
| | | void iAmSyncThread() { | |
| | | wassert( syncThread == 0 ); | |
| | | syncThread = this; | |
| | | } | |
| | | bool isSyncThread() const { return this == syncThread; } // true if | |
| | | this client is the replication secondary pull thread | |
| | | | |
| static mongo::mutex clientsMutex; | | static mongo::mutex clientsMutex; | |
| static set<Client*> clients; // always be in clientsMutex when mani
pulating this | | static set<Client*> clients; // always be in clientsMutex when mani
pulating this | |
|
| | | | |
| static int recommendedYieldMicros( int * writers = 0 , int * reader
s = 0 ); | | static int recommendedYieldMicros( int * writers = 0 , int * reader
s = 0 ); | |
| | | | |
|
| | | /* set _god=true temporarily, safely */ | |
| class GodScope { | | class GodScope { | |
| bool _prev; | | bool _prev; | |
| public: | | public: | |
| GodScope(); | | GodScope(); | |
| ~GodScope(); | | ~GodScope(); | |
| }; | | }; | |
| | | | |
| /* Set database we want to use, then, restores when we finish (are
out of scope) | | /* Set database we want to use, then, restores when we finish (are
out of scope) | |
| Note this is also helpful if an exception happens as the state i
f fixed up. | | Note this is also helpful if an exception happens as the state i
f fixed up. | |
| */ | | */ | |
| | | | |
| skipping to change at line 151 | | skipping to change at line 157 | |
| } | | } | |
| | | | |
| /** | | /** | |
| * call after going back into the lock, will re-establish non-t
hread safe stuff | | * call after going back into the lock, will re-establish non-t
hread safe stuff | |
| */ | | */ | |
| void relocked(){ | | void relocked(){ | |
| _finishInit(); | | _finishInit(); | |
| } | | } | |
| | | | |
| friend class CurOp; | | friend class CurOp; | |
|
| }; | | }; // class Client::Context | |
| | | | |
| private: | | private: | |
|
| | | void _dropns( const string& ns ); | |
| | | | |
| CurOp * _curOp; | | CurOp * _curOp; | |
| Context * _context; | | Context * _context; | |
| bool _shutdown; | | bool _shutdown; | |
| set<string> _tempCollections; | | set<string> _tempCollections; | |
| const char *_desc; | | const char *_desc; | |
| bool _god; | | bool _god; | |
| AuthenticationInfo _ai; | | AuthenticationInfo _ai; | |
| ReplTime _lastOp; | | ReplTime _lastOp; | |
| BSONObj _handshake; | | BSONObj _handshake; | |
| BSONObj _remoteId; | | BSONObj _remoteId; | |
| | | | |
|
| void _dropns( const string& ns ); | | | |
| | | | |
| public: | | public: | |
|
| | | MessagingPort * const _mp; | |
| | | | |
| string clientAddress() const; | | string clientAddress() const; | |
| AuthenticationInfo * getAuthenticationInfo(){ return &_ai; } | | AuthenticationInfo * getAuthenticationInfo(){ return &_ai; } | |
| bool isAdmin() { return _ai.isAuthorized( "admin" ); } | | bool isAdmin() { return _ai.isAuthorized( "admin" ); } | |
| CurOp* curop() { return _curOp; } | | CurOp* curop() { return _curOp; } | |
| Context* getContext(){ return _context; } | | Context* getContext(){ return _context; } | |
| Database* database() { return _context ? _context->db() : 0; } | | Database* database() { return _context ? _context->db() : 0; } | |
| const char *ns() const { return _context->ns(); } | | const char *ns() const { return _context->ns(); } | |
| const char *desc() const { return _desc; } | | const char *desc() const { return _desc; } | |
| | | | |
|
| Client(const char *desc); | | Client(const char *desc, MessagingPort *p = 0); | |
| ~Client(); | | ~Client(); | |
| | | | |
| void addTempCollection( const string& ns ); | | void addTempCollection( const string& ns ); | |
| | | | |
| void _invalidateDB(const string& db); | | void _invalidateDB(const string& db); | |
| static void invalidateDB(const string& db); | | static void invalidateDB(const string& db); | |
|
| | | | |
| static void invalidateNS( const string& ns ); | | static void invalidateNS( const string& ns ); | |
| | | | |
|
| void setLastOp( ReplTime op ) { | | void setLastOp( ReplTime op ) { _lastOp = op; } | |
| _lastOp = op; | | ReplTime getLastOp() const { return _lastOp; } | |
| } | | | |
| | | | |
| ReplTime getLastOp() const { | | | |
| return _lastOp; | | | |
| } | | | |
| | | | |
|
| | | /* report what the last operation was. used by getlasterror */ | |
| void appendLastOp( BSONObjBuilder& b ) { | | void appendLastOp( BSONObjBuilder& b ) { | |
| if( theReplSet ) { | | if( theReplSet ) { | |
| b.append("lastOp" , (long long) _lastOp); | | b.append("lastOp" , (long long) _lastOp); | |
| } | | } | |
| else { | | else { | |
| OpTime lo(_lastOp); | | OpTime lo(_lastOp); | |
| if ( ! lo.isNull() ) | | if ( ! lo.isNull() ) | |
| b.appendTimestamp( "lastOp" , lo.asDate() ); | | b.appendTimestamp( "lastOp" , lo.asDate() ); | |
| } | | } | |
| } | | } | |
| | | | |
| /* each thread which does db operations has a Client object in TLS. | | /* each thread which does db operations has a Client object in TLS. | |
| call this when your thread starts. | | call this when your thread starts. | |
| */ | | */ | |
|
| static void initThread(const char *desc); | | static Client& initThread(const char *desc, MessagingPort *mp = 0); | |
| | | | |
| /* | | /* | |
| this has to be called as the client goes away, but before thread
termination | | this has to be called as the client goes away, but before thread
termination | |
| @return true if anything was done | | @return true if anything was done | |
| */ | | */ | |
| bool shutdown(); | | bool shutdown(); | |
| | | | |
| /* this is for map/reduce writes */ | | /* this is for map/reduce writes */ | |
| bool isGod() const { return _god; } | | bool isGod() const { return _god; } | |
| | | | |
| | | | |
| skipping to change at line 223 | | skipping to change at line 226 | |
| @return true if anything was done | | @return true if anything was done | |
| */ | | */ | |
| bool shutdown(); | | bool shutdown(); | |
| | | | |
| /* this is for map/reduce writes */ | | /* this is for map/reduce writes */ | |
| bool isGod() const { return _god; } | | bool isGod() const { return _god; } | |
| | | | |
| friend class CurOp; | | friend class CurOp; | |
| | | | |
| string toString() const; | | string toString() const; | |
|
| | | | |
| void gotHandshake( const BSONObj& o ); | | void gotHandshake( const BSONObj& o ); | |
|
| | | | |
| BSONObj getRemoteID() const { return _remoteId; } | | BSONObj getRemoteID() const { return _remoteId; } | |
| BSONObj getHandshake() const { return _handshake; } | | BSONObj getHandshake() const { return _handshake; } | |
| }; | | }; | |
| | | | |
|
| | | /** get the Client object for this thread. */ | |
| inline Client& cc() { | | inline Client& cc() { | |
| Client * c = currentClient.get(); | | Client * c = currentClient.get(); | |
| assert( c ); | | assert( c ); | |
| return *c; | | return *c; | |
| } | | } | |
| | | | |
| /* each thread which does db operations has a Client object in TLS. | | /* each thread which does db operations has a Client object in TLS. | |
| call this when your thread starts. | | call this when your thread starts. | |
| */ | | */ | |
|
| inline void Client::initThread(const char *desc) { | | inline Client& Client::initThread(const char *desc, MessagingPort *mp)
{ | |
| setThreadName(desc); | | setThreadName(desc); | |
| assert( currentClient.get() == 0 ); | | assert( currentClient.get() == 0 ); | |
|
| currentClient.reset( new Client(desc) ); | | Client *c = new Client(desc, mp); | |
| | | currentClient.reset(c); | |
| mongo::lastError.initThread(); | | mongo::lastError.initThread(); | |
|
| | | return *c; | |
| } | | } | |
| | | | |
| inline Client::GodScope::GodScope(){ | | inline Client::GodScope::GodScope(){ | |
| _prev = cc()._god; | | _prev = cc()._god; | |
| cc()._god = true; | | cc()._god = true; | |
| } | | } | |
| | | | |
| inline Client::GodScope::~GodScope(){ | | inline Client::GodScope::~GodScope(){ | |
| cc()._god = _prev; | | cc()._god = _prev; | |
| } | | } | |
| | | | |
| skipping to change at line 278 | | skipping to change at line 282 | |
| dbMutex.unlock_shared(); | | dbMutex.unlock_shared(); | |
| dbMutex.lock(); | | dbMutex.lock(); | |
| | | | |
| if ( cc().getContext() ) | | if ( cc().getContext() ) | |
| cc().getContext()->unlocked(); | | cc().getContext()->unlocked(); | |
| } | | } | |
| } | | } | |
| | | | |
| string sayClientState(); | | string sayClientState(); | |
| | | | |
|
| inline bool haveClient(){ | | inline bool haveClient() { return currentClient.get() > 0; } | |
| return currentClient.get() > 0; | | | |
| } | | | |
| }; | | }; | |
| | | | |
End of changes. 22 change blocks. |
| 23 lines changed or deleted | | 26 lines changed or added | |
|
| message.h | | message.h | |
| | | | |
| skipping to change at line 33 | | skipping to change at line 33 | |
| | | | |
| namespace mongo { | | namespace mongo { | |
| | | | |
| extern bool noUnixSocket; | | extern bool noUnixSocket; | |
| | | | |
| class Message; | | class Message; | |
| class MessagingPort; | | class MessagingPort; | |
| class PiggyBackData; | | class PiggyBackData; | |
| typedef AtomicUInt MSGID; | | typedef AtomicUInt MSGID; | |
| | | | |
|
| class Listener { | | class Listener : boost::noncopyable { | |
| public: | | public: | |
| Listener(const string &ip, int p, bool logConnect=true ) : _port(p)
, _ip(ip), _logConnect(logConnect), _elapsedTime(0){ } | | Listener(const string &ip, int p, bool logConnect=true ) : _port(p)
, _ip(ip), _logConnect(logConnect), _elapsedTime(0){ } | |
| virtual ~Listener() { | | virtual ~Listener() { | |
| if ( _timeTracker == this ) | | if ( _timeTracker == this ) | |
| _timeTracker = 0; | | _timeTracker = 0; | |
| } | | } | |
| void initAndListen(); // never returns unless error (start a thread
) | | void initAndListen(); // never returns unless error (start a thread
) | |
| | | | |
| /* spawn a thread, etc., then return */ | | /* spawn a thread, etc., then return */ | |
| virtual void accepted(int sock, const SockAddr& from); | | virtual void accepted(int sock, const SockAddr& from); | |
| | | | |
| skipping to change at line 77 | | skipping to change at line 77 | |
| } | | } | |
| | | | |
| private: | | private: | |
| string _ip; | | string _ip; | |
| bool _logConnect; | | bool _logConnect; | |
| long long _elapsedTime; | | long long _elapsedTime; | |
| | | | |
| static const Listener* _timeTracker; | | static const Listener* _timeTracker; | |
| }; | | }; | |
| | | | |
|
| class AbstractMessagingPort { | | class AbstractMessagingPort : boost::noncopyable { | |
| public: | | public: | |
| virtual ~AbstractMessagingPort() { } | | virtual ~AbstractMessagingPort() { } | |
| virtual void reply(Message& received, Message& response, MSGID resp
onseTo) = 0; // like the reply below, but doesn't rely on received.data sti
ll being available | | virtual void reply(Message& received, Message& response, MSGID resp
onseTo) = 0; // like the reply below, but doesn't rely on received.data sti
ll being available | |
| virtual void reply(Message& received, Message& response) = 0; | | virtual void reply(Message& received, Message& response) = 0; | |
| | | | |
| virtual HostAndPort remote() const = 0; | | virtual HostAndPort remote() const = 0; | |
| virtual unsigned remotePort() const = 0; | | virtual unsigned remotePort() const = 0; | |
| | | | |
| virtual int getClientId(){ | | virtual int getClientId(){ | |
| int x = remotePort(); | | int x = remotePort(); | |
| x = x << 16; | | x = x << 16; | |
|
| | | x |= ( ( 0xFF0 & (long long)this ) >> 8 ); // lowest byte in po
inter often meaningless | |
| return x; | | return x; | |
| } | | } | |
| }; | | }; | |
| | | | |
| class MessagingPort : public AbstractMessagingPort { | | class MessagingPort : public AbstractMessagingPort { | |
| public: | | public: | |
| MessagingPort(int sock, const SockAddr& farEnd); | | MessagingPort(int sock, const SockAddr& farEnd); | |
| | | | |
| // in some cases the timeout will actually be 2x this value - eg we
do a partial send, | | // in some cases the timeout will actually be 2x this value - eg we
do a partial send, | |
| // then the timeout fires, then we try to send again, then the time
out fires again with | | // then the timeout fires, then we try to send again, then the time
out fires again with | |
| | | | |
| skipping to change at line 138 | | skipping to change at line 139 | |
| | | | |
| int unsafe_recv( char *buf, int max ); | | int unsafe_recv( char *buf, int max ); | |
| private: | | private: | |
| int sock; | | int sock; | |
| PiggyBackData * piggyBackData; | | PiggyBackData * piggyBackData; | |
| public: | | public: | |
| SockAddr farEnd; | | SockAddr farEnd; | |
| int _timeout; | | int _timeout; | |
| int _logLevel; // passed to log() when logging errors | | int _logLevel; // passed to log() when logging errors | |
| | | | |
|
| | | static void closeAllSockets(unsigned tagMask = 0xffffffff); | |
| | | | |
| | | /* ports can be tagged with various classes. see closeAllSockets(t | |
| | | ag). defaults to 0. */ | |
| | | unsigned tag; | |
| | | | |
| friend class PiggyBackData; | | friend class PiggyBackData; | |
| }; | | }; | |
| | | | |
| enum Operations { | | enum Operations { | |
| opReply = 1, /* reply. responseTo is set. */ | | opReply = 1, /* reply. responseTo is set. */ | |
| dbMsg = 1000, /* generic msg command followed by a string */ | | dbMsg = 1000, /* generic msg command followed by a string */ | |
| dbUpdate = 2001, /* update object */ | | dbUpdate = 2001, /* update object */ | |
| dbInsert = 2002, | | dbInsert = 2002, | |
| //dbGetByOID = 2003, | | //dbGetByOID = 2003, | |
| dbQuery = 2004, | | dbQuery = 2004, | |
| | | | |
| skipping to change at line 173 | | skipping to change at line 179 | |
| case dbGetMore: return "getmore"; | | case dbGetMore: return "getmore"; | |
| case dbDelete: return "remove"; | | case dbDelete: return "remove"; | |
| case dbKillCursors: return "killcursors"; | | case dbKillCursors: return "killcursors"; | |
| default: | | default: | |
| PRINT(op); | | PRINT(op); | |
| assert(0); | | assert(0); | |
| return ""; | | return ""; | |
| } | | } | |
| } | | } | |
| | | | |
|
| | | inline bool opIsWrite( int op ){ | |
| | | switch ( op ){ | |
| | | | |
| | | case 0: | |
| | | case opReply: | |
| | | case dbMsg: | |
| | | case dbQuery: | |
| | | case dbGetMore: | |
| | | case dbKillCursors: | |
| | | return false; | |
| | | | |
| | | case dbUpdate: | |
| | | case dbInsert: | |
| | | case dbDelete: | |
| | | return false; | |
| | | | |
| | | default: | |
| | | PRINT(op); | |
| | | assert(0); | |
| | | return ""; | |
| | | } | |
| | | | |
| | | } | |
| | | | |
| #pragma pack(1) | | #pragma pack(1) | |
| /* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol | | /* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol | |
| */ | | */ | |
| struct MSGHEADER { | | struct MSGHEADER { | |
| int messageLength; // total message size, including this | | int messageLength; // total message size, including this | |
| int requestID; // identifier for this message | | int requestID; // identifier for this message | |
| int responseTo; // requestID from the original request | | int responseTo; // requestID from the original request | |
| // (used in reponses from db) | | // (used in reponses from db) | |
| int opCode; | | int opCode; | |
| }; | | }; | |
| | | | |
End of changes. 5 change blocks. |
| 2 lines changed or deleted | | 33 lines changed or added | |
|
| rs.h | | rs.h | |
| | | | |
| skipping to change at line 47 | | skipping to change at line 47 | |
| class OplogReader; | | class OplogReader; | |
| extern bool replSet; // true if using repl sets | | extern bool replSet; // true if using repl sets | |
| extern class ReplSet *theReplSet; // null until initialized | | extern class ReplSet *theReplSet; // null until initialized | |
| extern Tee *rsLog; | | extern Tee *rsLog; | |
| | | | |
| /* member of a replica set */ | | /* member of a replica set */ | |
| class Member : public List1<Member>::Base { | | class Member : public List1<Member>::Base { | |
| public: | | public: | |
| Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg
*c, bool self); | | Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg
*c, bool self); | |
| string fullName() const { return h().toString(); } | | string fullName() const { return h().toString(); } | |
|
| const ReplSetConfig::MemberCfg& config() const { return *_config; } | | const ReplSetConfig::MemberCfg& config() const { return _config; } | |
| const HeartbeatInfo& hbinfo() const { return _hbinfo; } | | const HeartbeatInfo& hbinfo() const { return _hbinfo; } | |
|
| string lhb() { return _hbinfo.lastHeartbeatMsg; } | | HeartbeatInfo& get_hbinfo() { return _hbinfo; } | |
| | | string lhb() const { return _hbinfo.lastHeartbeatMsg; } | |
| MemberState state() const { return _hbinfo.hbstate; } | | MemberState state() const { return _hbinfo.hbstate; } | |
| const HostAndPort& h() const { return _h; } | | const HostAndPort& h() const { return _h; } | |
| unsigned id() const { return _hbinfo.id(); } | | unsigned id() const { return _hbinfo.id(); } | |
|
| bool potentiallyHot() const { return _config->potentiallyHot(); } / | | bool potentiallyHot() const { return _config.potentiallyHot(); } // | |
| / not arbiter, not priority 0 | | not arbiter, not priority 0 | |
| | | | |
| void summarizeMember(stringstream& s) const; | | void summarizeMember(stringstream& s) const; | |
| friend class ReplSetImpl; | | friend class ReplSetImpl; | |
| private: | | private: | |
|
| const ReplSetConfig::MemberCfg *_config; /* todo: when this changes | | const ReplSetConfig::MemberCfg _config; | |
| ??? */ | | const HostAndPort _h; | |
| HostAndPort _h; | | | |
| HeartbeatInfo _hbinfo; | | HeartbeatInfo _hbinfo; | |
| }; | | }; | |
| | | | |
| class Manager : public task::Server { | | class Manager : public task::Server { | |
| ReplSetImpl *rs; | | ReplSetImpl *rs; | |
| bool busyWithElectSelf; | | bool busyWithElectSelf; | |
| int _primary; | | int _primary; | |
|
| const Member* findOtherPrimary(); | | | |
| | | /** @param two - if true two primaries were seen. this can happen | |
| | | transiently, in addition to our | |
| | | polling being only occasional. in this case null | |
| | | is returned, but the caller should | |
| | | not assume primary itself in that situation. | |
| | | */ | |
| | | const Member* findOtherPrimary(bool& two); | |
| | | | |
| void noteARemoteIsPrimary(const Member *); | | void noteARemoteIsPrimary(const Member *); | |
| virtual void starting(); | | virtual void starting(); | |
| public: | | public: | |
| Manager(ReplSetImpl *rs); | | Manager(ReplSetImpl *rs); | |
| ~Manager(); | | ~Manager(); | |
| void msgReceivedNewConfig(BSONObj); | | void msgReceivedNewConfig(BSONObj); | |
| void msgCheckNewState(); | | void msgCheckNewState(); | |
| }; | | }; | |
| | | | |
| struct Target; | | struct Target; | |
| | | | |
| skipping to change at line 105 | | skipping to change at line 111 | |
| steppedDown = 0; | | steppedDown = 0; | |
| } | | } | |
| | | | |
| /* if we've stepped down, this is when we are allowed to try to ele
ct ourself again. | | /* if we've stepped down, this is when we are allowed to try to ele
ct ourself again. | |
| todo: handle possible weirdnesses at clock skews etc. | | todo: handle possible weirdnesses at clock skews etc. | |
| */ | | */ | |
| time_t steppedDown; | | time_t steppedDown; | |
| | | | |
| int totalVotes() const; | | int totalVotes() const; | |
| bool aMajoritySeemsToBeUp() const; | | bool aMajoritySeemsToBeUp() const; | |
|
| | | bool shouldRelinquish() const; | |
| void electSelf(); | | void electSelf(); | |
| void electCmdReceived(BSONObj, BSONObjBuilder*); | | void electCmdReceived(BSONObj, BSONObjBuilder*); | |
| void multiCommand(BSONObj cmd, list<Target>& L); | | void multiCommand(BSONObj cmd, list<Target>& L); | |
| }; | | }; | |
| | | | |
| /** most operations on a ReplSet object should be done while locked. th
at logic implemented here. */ | | /** most operations on a ReplSet object should be done while locked. th
at logic implemented here. */ | |
| class RSBase : boost::noncopyable { | | class RSBase : boost::noncopyable { | |
| public: | | public: | |
| const unsigned magic; | | const unsigned magic; | |
| void assertValid() { assert( magic == 0x12345677 ); } | | void assertValid() { assert( magic == 0x12345677 ); } | |
| private: | | private: | |
| mutex m; | | mutex m; | |
| int _locked; | | int _locked; | |
| ThreadLocalValue<bool> _lockedByMe; | | ThreadLocalValue<bool> _lockedByMe; | |
| protected: | | protected: | |
| RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } | | RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } | |
| ~RSBase() { | | ~RSBase() { | |
|
| log() << "~RSBase should never be called?" << rsLog; | | /* this can happen if we throw in the constructor; otherwise ne | |
| assert(false); | | ver happens. thus we log it as it is quite unusual. */ | |
| | | log() << "replSet ~RSBase called" << rsLog; | |
| } | | } | |
| | | | |
| class lock { | | class lock { | |
| RSBase& rsbase; | | RSBase& rsbase; | |
| auto_ptr<scoped_lock> sl; | | auto_ptr<scoped_lock> sl; | |
| public: | | public: | |
| lock(RSBase* b) : rsbase(*b) { | | lock(RSBase* b) : rsbase(*b) { | |
| if( rsbase._lockedByMe.get() ) | | if( rsbase._lockedByMe.get() ) | |
| return; // recursive is ok... | | return; // recursive is ok... | |
| | | | |
| | | | |
| skipping to change at line 178 | | skipping to change at line 185 | |
| const Member *primary; | | const Member *primary; | |
| }; | | }; | |
| const SP get() { | | const SP get() { | |
| scoped_lock lk(m); | | scoped_lock lk(m); | |
| return sp; | | return sp; | |
| } | | } | |
| MemberState getState() const { return sp.state; } | | MemberState getState() const { return sp.state; } | |
| const Member* getPrimary() const { return sp.primary; } | | const Member* getPrimary() const { return sp.primary; } | |
| void change(MemberState s, const Member *self) { | | void change(MemberState s, const Member *self) { | |
| scoped_lock lk(m); | | scoped_lock lk(m); | |
|
| | | if( sp.state != s ) { | |
| | | log() << "replSet " << s.toString() << rsLog; | |
| | | } | |
| sp.state = s; | | sp.state = s; | |
| if( s.primary() ) { | | if( s.primary() ) { | |
| sp.primary = self; | | sp.primary = self; | |
| } | | } | |
| else { | | else { | |
| if( self == sp.primary ) | | if( self == sp.primary ) | |
| sp.primary = 0; | | sp.primary = 0; | |
| } | | } | |
| } | | } | |
| void set(MemberState s, const Member *p) { | | void set(MemberState s, const Member *p) { | |
| scoped_lock lk(m); | | scoped_lock lk(m); | |
| sp.state = s; sp.primary = p; | | sp.state = s; sp.primary = p; | |
| } | | } | |
| void setSelfPrimary(const Member *self) { change(MemberState::RS_PR
IMARY, self); } | | void setSelfPrimary(const Member *self) { change(MemberState::RS_PR
IMARY, self); } | |
| void setOtherPrimary(const Member *mem) { | | void setOtherPrimary(const Member *mem) { | |
| scoped_lock lk(m); | | scoped_lock lk(m); | |
| assert( !sp.state.primary() ); | | assert( !sp.state.primary() ); | |
| sp.primary = mem; | | sp.primary = mem; | |
| } | | } | |
|
| | | void noteRemoteIsPrimary(const Member *remote) { | |
| | | scoped_lock lk(m); | |
| | | if( !sp.state.secondary() && !sp.state.fatal() ) | |
| | | sp.state = MemberState::RS_RECOVERING; | |
| | | sp.primary = remote; | |
| | | } | |
| StateBox() : m("StateBox") { } | | StateBox() : m("StateBox") { } | |
| private: | | private: | |
| mutex m; | | mutex m; | |
| SP sp; | | SP sp; | |
| }; | | }; | |
| | | | |
| void parseReplsetCmdLine(string cfgString, string& setname, vector<Host
AndPort>& seeds, set<HostAndPort>& seedSet ); | | void parseReplsetCmdLine(string cfgString, string& setname, vector<Host
AndPort>& seeds, set<HostAndPort>& seedSet ); | |
| | | | |
| /** Parameter given to the --replSet command line option (parsed). | | /** Parameter given to the --replSet command line option (parsed). | |
| Syntax is "<setname>/<seedhost1>,<seedhost2>" | | Syntax is "<setname>/<seedhost1>,<seedhost2>" | |
| | | | |
| skipping to change at line 229 | | skipping to change at line 245 | |
| */ | | */ | |
| class ReplSetImpl : protected RSBase { | | class ReplSetImpl : protected RSBase { | |
| public: | | public: | |
| /** info on our state if the replset isn't yet "up". for example,
if we are pre-initiation. */ | | /** info on our state if the replset isn't yet "up". for example,
if we are pre-initiation. */ | |
| enum StartupStatus { | | enum StartupStatus { | |
| PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, | | PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, | |
| EMPTYUNREACHABLE=4, STARTED=5, SOON=6 | | EMPTYUNREACHABLE=4, STARTED=5, SOON=6 | |
| }; | | }; | |
| static StartupStatus startupStatus; | | static StartupStatus startupStatus; | |
| static string startupStatusMsg; | | static string startupStatusMsg; | |
|
| static string stateAsStr(MemberState state); | | | |
| static string stateAsHtml(MemberState state); | | static string stateAsHtml(MemberState state); | |
| | | | |
| /* todo thread */ | | /* todo thread */ | |
| void msgUpdateHBInfo(HeartbeatInfo); | | void msgUpdateHBInfo(HeartbeatInfo); | |
| | | | |
| StateBox box; | | StateBox box; | |
| | | | |
| OpTime lastOpTimeWritten; | | OpTime lastOpTimeWritten; | |
| long long lastH; // hash we use to make sure we are reading the rig
ht flow of ops and aren't on an out-of-date "fork" | | long long lastH; // hash we use to make sure we are reading the rig
ht flow of ops and aren't on an out-of-date "fork" | |
| private: | | private: | |
| set<ReplSetHealthPollTask*> healthTasks; | | set<ReplSetHealthPollTask*> healthTasks; | |
| void endOldHealthTasks(); | | void endOldHealthTasks(); | |
| void startHealthTaskFor(Member *m); | | void startHealthTaskFor(Member *m); | |
| | | | |
|
| private: | | | |
| Consensus elect; | | Consensus elect; | |
|
| bool ok() const { return !box.getState().fatal(); } | | | |
| | | | |
| void relinquish(); | | void relinquish(); | |
| void forgetPrimary(); | | void forgetPrimary(); | |
|
| | | | |
| protected: | | protected: | |
| bool _stepDown(); | | bool _stepDown(); | |
| private: | | private: | |
| void assumePrimary(); | | void assumePrimary(); | |
| void loadLastOpTimeWritten(); | | void loadLastOpTimeWritten(); | |
| void changeState(MemberState s); | | void changeState(MemberState s); | |
|
| | | | |
| protected: | | protected: | |
| // "heartbeat message" | | // "heartbeat message" | |
| // sent in requestHeartbeat respond in field "hbm" | | // sent in requestHeartbeat respond in field "hbm" | |
| char _hbmsg[256]; // we change this unlocked, thus not an stl::stri
ng | | char _hbmsg[256]; // we change this unlocked, thus not an stl::stri
ng | |
|
| | | time_t _hbmsgTime; // when it was logged | |
| public: | | public: | |
| void sethbmsg(string s, int logLevel = 0); | | void sethbmsg(string s, int logLevel = 0); | |
| protected: | | protected: | |
|
| bool initFromConfig(ReplSetConfig& c); // true if ok; throws if con
fig really bad; false if config doesn't include self | | bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true i
f ok; throws if config really bad; false if config doesn't include self | |
| void _fillIsMaster(BSONObjBuilder&); | | void _fillIsMaster(BSONObjBuilder&); | |
| void _fillIsMasterHost(const Member*, vector<string>&, vector<strin
g>&, vector<string>&); | | void _fillIsMasterHost(const Member*, vector<string>&, vector<strin
g>&, vector<string>&); | |
| const ReplSetConfig& config() { return *_cfg; } | | const ReplSetConfig& config() { return *_cfg; } | |
| string name() const { return _name; } /* @return replica set's logi
cal name */ | | string name() const { return _name; } /* @return replica set's logi
cal name */ | |
| MemberState state() const { return box.getState(); } | | MemberState state() const { return box.getState(); } | |
| void _fatal(); | | void _fatal(); | |
| void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) con
st; | | void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) con
st; | |
| void _summarizeAsHtml(stringstream&) const; | | void _summarizeAsHtml(stringstream&) const; | |
| void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStat
us command | | void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStat
us command | |
| | | | |
| | | | |
| skipping to change at line 326 | | skipping to change at line 337 | |
| friend class Member; | | friend class Member; | |
| friend class Manager; | | friend class Manager; | |
| friend class Consensus; | | friend class Consensus; | |
| | | | |
| private: | | private: | |
| /* pulling data from primary related - see rs_sync.cpp */ | | /* pulling data from primary related - see rs_sync.cpp */ | |
| bool initialSyncOplogApplication(string hn, const Member *primary,
OpTime applyGTE, OpTime minValid); | | bool initialSyncOplogApplication(string hn, const Member *primary,
OpTime applyGTE, OpTime minValid); | |
| void _syncDoInitialSync(); | | void _syncDoInitialSync(); | |
| void syncDoInitialSync(); | | void syncDoInitialSync(); | |
| void _syncThread(); | | void _syncThread(); | |
|
| | | bool tryToGoLiveAsASecondary(OpTime&); // readlocks | |
| void syncTail(); | | void syncTail(); | |
| void syncApply(const BSONObj &o); | | void syncApply(const BSONObj &o); | |
|
| | | unsigned _syncRollback(OplogReader& r); | |
| void syncRollback(OplogReader& r); | | void syncRollback(OplogReader& r); | |
| void syncFixUp(HowToFixUp& h, OplogReader& r); | | void syncFixUp(HowToFixUp& h, OplogReader& r); | |
| public: | | public: | |
| void syncThread(); | | void syncThread(); | |
| }; | | }; | |
| | | | |
| class ReplSet : public ReplSetImpl { | | class ReplSet : public ReplSetImpl { | |
| public: | | public: | |
| ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdlin
e) { } | | ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdlin
e) { } | |
| | | | |
| | | | |
| skipping to change at line 347 | | skipping to change at line 360 | |
| | | | |
| bool stepDown() { return _stepDown(); } | | bool stepDown() { return _stepDown(); } | |
| | | | |
| string selfFullName() { | | string selfFullName() { | |
| lock lk(this); | | lock lk(this); | |
| return _self->fullName(); | | return _self->fullName(); | |
| } | | } | |
| | | | |
| /* call after constructing to start - returns fairly quickly after
la[unching its threads */ | | /* call after constructing to start - returns fairly quickly after
la[unching its threads */ | |
| void go() { _go(); } | | void go() { _go(); } | |
|
| | | | |
| void fatal() { _fatal(); } | | void fatal() { _fatal(); } | |
|
| bool isPrimary(); | | bool isPrimary() { return box.getState().primary(); } | |
| bool isSecondary(); | | bool isSecondary() { return box.getState().secondary(); } | |
| MemberState state() const { return ReplSetImpl::state(); } | | MemberState state() const { return ReplSetImpl::state(); } | |
| string name() const { return ReplSetImpl::name(); } | | string name() const { return ReplSetImpl::name(); } | |
| const ReplSetConfig& config() { return ReplSetImpl::config(); } | | const ReplSetConfig& config() { return ReplSetImpl::config(); } | |
| void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) cons
t { _getOplogDiagsAsHtml(server_id,ss); } | | void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) cons
t { _getOplogDiagsAsHtml(server_id,ss); } | |
| void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss)
; } | | void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss)
; } | |
| void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b
); } | | void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b
); } | |
| void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } | | void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } | |
| | | | |
| /* we have a new config (reconfig) - apply it. | | /* we have a new config (reconfig) - apply it. | |
| @param comment write a no-op comment to the oplog about it. onl
y makes sense if one is primary and initiating the reconf. | | @param comment write a no-op comment to the oplog about it. onl
y makes sense if one is primary and initiating the reconf. | |
| */ | | */ | |
| void haveNewConfig(ReplSetConfig& c, bool comment); | | void haveNewConfig(ReplSetConfig& c, bool comment); | |
| | | | |
| /* if we delete old configs, this needs to assure locking. currentl
y we don't so it is ok. */ | | /* if we delete old configs, this needs to assure locking. currentl
y we don't so it is ok. */ | |
| const ReplSetConfig& getConfig() { return config(); } | | const ReplSetConfig& getConfig() { return config(); } | |
| | | | |
| bool lockedByMe() { return RSBase::lockedByMe(); } | | bool lockedByMe() { return RSBase::lockedByMe(); } | |
| | | | |
| // heartbeat msg to send to others; descriptive diagnostic info | | // heartbeat msg to send to others; descriptive diagnostic info | |
|
| string hbmsg() const { return _hbmsg; } | | string hbmsg() const { | |
| | | if( time(0)-_hbmsgTime > 120 ) return ""; | |
| | | return _hbmsg; | |
| | | } | |
| }; | | }; | |
| | | | |
| /** base class for repl set commands. checks basic things such as in r
s mode before the command | | /** base class for repl set commands. checks basic things such as in r
s mode before the command | |
| does its real work | | does its real work | |
| */ | | */ | |
| class ReplSetCommand : public Command { | | class ReplSetCommand : public Command { | |
| protected: | | protected: | |
| ReplSetCommand(const char * s, bool show=false) : Command(s, show)
{ } | | ReplSetCommand(const char * s, bool show=false) : Command(s, show)
{ } | |
| virtual bool slaveOk() const { return true; } | | virtual bool slaveOk() const { return true; } | |
| virtual bool adminOnly() const { return true; } | | virtual bool adminOnly() const { return true; } | |
| | | | |
| skipping to change at line 391 | | skipping to change at line 408 | |
| virtual LockType locktype() const { return NONE; } | | virtual LockType locktype() const { return NONE; } | |
| virtual void help( stringstream &help ) const { help << "internal";
} | | virtual void help( stringstream &help ) const { help << "internal";
} | |
| bool check(string& errmsg, BSONObjBuilder& result) { | | bool check(string& errmsg, BSONObjBuilder& result) { | |
| if( !replSet ) { | | if( !replSet ) { | |
| errmsg = "not running with --replSet"; | | errmsg = "not running with --replSet"; | |
| return false; | | return false; | |
| } | | } | |
| if( theReplSet == 0 ) { | | if( theReplSet == 0 ) { | |
| result.append("startupStatus", ReplSet::startupStatus); | | result.append("startupStatus", ReplSet::startupStatus); | |
| errmsg = ReplSet::startupStatusMsg.empty() ? "replset unkno
wn error 2" : ReplSet::startupStatusMsg; | | errmsg = ReplSet::startupStatusMsg.empty() ? "replset unkno
wn error 2" : ReplSet::startupStatusMsg; | |
|
| | | if( ReplSet::startupStatus == 3 ) | |
| | | result.append("info", "run rs.initiate(...) if not yet | |
| | | done for the set"); | |
| return false; | | return false; | |
| } | | } | |
| return true; | | return true; | |
| } | | } | |
| }; | | }; | |
| | | | |
| /** inlines ----------------- */ | | /** inlines ----------------- */ | |
| | | | |
| inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig:
:MemberCfg *c, bool self) : | | inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig:
:MemberCfg *c, bool self) : | |
|
| _config(c), _h(h), _hbinfo(ord) { | | _config(*c), _h(h), _hbinfo(ord) | |
| if( self ) { | | { | |
| _hbinfo.health = 1.0; | | if( self ) | |
| } | | _hbinfo.health = 1.0; | |
| } | | | |
| | | | |
| inline bool ReplSet::isPrimary() { | | | |
| /* todo replset */ | | | |
| return box.getState().primary(); | | | |
| } | | | |
| | | | |
| inline bool ReplSet::isSecondary() { | | | |
| return box.getState().secondary(); | | | |
| } | | } | |
| | | | |
| } | | } | |
| | | | |
End of changes. 23 change blocks. |
| 34 lines changed or deleted | | 47 lines changed or added | |
|
| rs_config.h | | rs_config.h | |
| | | | |
| skipping to change at line 44 | | skipping to change at line 44 | |
| /* if something is misconfigured, throws an exception. | | /* if something is misconfigured, throws an exception. | |
| if couldn't be queried or is just blank, ok() will be false. | | if couldn't be queried or is just blank, ok() will be false. | |
| */ | | */ | |
| ReplSetConfig(const HostAndPort& h); | | ReplSetConfig(const HostAndPort& h); | |
| | | | |
| ReplSetConfig(BSONObj cfg); | | ReplSetConfig(BSONObj cfg); | |
| | | | |
| bool ok() const { return _ok; } | | bool ok() const { return _ok; } | |
| | | | |
| struct MemberCfg { | | struct MemberCfg { | |
|
| MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(fal
se) { } | | MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(fal
se), slaveDelay(0), hidden(false) { } | |
| int _id; /* ordinal */ | | int _id; /* ordinal */ | |
| unsigned votes; /* how many votes this node gets. default
1. */ | | unsigned votes; /* how many votes this node gets. default
1. */ | |
| HostAndPort h; | | HostAndPort h; | |
| double priority; /* 0 means can never be primary */ | | double priority; /* 0 means can never be primary */ | |
| bool arbiterOnly; | | bool arbiterOnly; | |
|
| | | int slaveDelay; /* seconds. int rather than unsigned for | |
| | | convenient to/front bson conversion. */ | |
| | | bool hidden; /* if set, don't advertise to drives in i | |
| | | sMaster. for non-primaries (priority 0) */ | |
| | | | |
| void check() const; /* check validity, assert if not. */ | | void check() const; /* check validity, assert if not. */ | |
| BSONObj asBson() const; | | BSONObj asBson() const; | |
| bool potentiallyHot() const { | | bool potentiallyHot() const { | |
| return !arbiterOnly && priority > 0; | | return !arbiterOnly && priority > 0; | |
| } | | } | |
|
| | | bool operator==(const MemberCfg& r) const { | |
| | | return _id==r._id && votes == r.votes && h == r.h && priori | |
| | | ty == r.priority && | |
| | | arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDe | |
| | | lay && hidden == r.hidden; | |
| | | } | |
| | | bool operator!=(const MemberCfg& r) const { return !(*this == r | |
| | | ); } | |
| }; | | }; | |
|
| | | | |
| vector<MemberCfg> members; | | vector<MemberCfg> members; | |
| string _id; | | string _id; | |
| int version; | | int version; | |
| HealthOptions ho; | | HealthOptions ho; | |
| string md5; | | string md5; | |
| BSONObj getLastErrorDefaults; | | BSONObj getLastErrorDefaults; | |
| | | | |
| list<HostAndPort> otherMemberHostnames() const; // except self | | list<HostAndPort> otherMemberHostnames() const; // except self | |
| | | | |
| /** @return true if could connect, and there is no cfg object there
at all */ | | /** @return true if could connect, and there is no cfg object there
at all */ | |
| | | | |
| skipping to change at line 71 | | skipping to change at line 80 | |
| BSONObj getLastErrorDefaults; | | BSONObj getLastErrorDefaults; | |
| | | | |
| list<HostAndPort> otherMemberHostnames() const; // except self | | list<HostAndPort> otherMemberHostnames() const; // except self | |
| | | | |
| /** @return true if could connect, and there is no cfg object there
at all */ | | /** @return true if could connect, and there is no cfg object there
at all */ | |
| bool empty() const { return version == EMPTYCONFIG; } | | bool empty() const { return version == EMPTYCONFIG; } | |
| | | | |
| string toString() const { return asBson().toString(); } | | string toString() const { return asBson().toString(); } | |
| | | | |
| /** validate the settings. does not call check() on each member, yo
u have to do that separately. */ | | /** validate the settings. does not call check() on each member, yo
u have to do that separately. */ | |
|
| void check() const; | | void checkRsConfig() const; | |
| | | | |
| /** check if modification makes sense */ | | /** check if modification makes sense */ | |
| static bool legalChange(const ReplSetConfig& old, const ReplSetConf
ig& n, string& errmsg); | | static bool legalChange(const ReplSetConfig& old, const ReplSetConf
ig& n, string& errmsg); | |
| | | | |
| //static void receivedNewConfig(BSONObj); | | //static void receivedNewConfig(BSONObj); | |
| void saveConfigLocally(BSONObj comment); // to local db | | void saveConfigLocally(BSONObj comment); // to local db | |
| string saveConfigEverywhere(); // returns textual info on what happ
ened | | string saveConfigEverywhere(); // returns textual info on what happ
ened | |
| | | | |
| BSONObj asBson() const; | | BSONObj asBson() const; | |
| | | | |
| | | | |
End of changes. 5 change blocks. |
| 2 lines changed or deleted | | 16 lines changed or added | |
|