connections.h | connections.h | |||
---|---|---|---|---|
skipping to change at line 47 | skipping to change at line 47 | |||
} | } | |||
throws exception on connect error (but fine to try again later with a new | throws exception on connect error (but fine to try again later with a new | |||
scopedconn object for same host). | scopedconn object for same host). | |||
*/ | */ | |||
class ScopedConn { | class ScopedConn { | |||
public: | public: | |||
/** throws assertions if connect failure etc. */ | /** throws assertions if connect failure etc. */ | |||
ScopedConn(string hostport); | ScopedConn(string hostport); | |||
~ScopedConn(); | ~ScopedConn(); | |||
DBClientConnection* operator->(); | ||||
/* If we were to run a query and not exhaust the cursor, future use | ||||
of the connection would be problematic. | ||||
So here what we do is wrapper known safe methods and not allow c | ||||
ursor-style queries at all. This makes | ||||
ScopedConn limited in functionality but very safe. More non-cur | ||||
sor wrappers can be added here if needed. | ||||
*/ | ||||
bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj & | ||||
info, int options=0) { | ||||
return conn()->runCommand(dbname, cmd, info, options); | ||||
} | ||||
unsigned long long count(const string &ns) { | ||||
return conn()->count(ns); | ||||
} | ||||
BSONObj findOne(const string &ns, const Query& q, const BSONObj *fi | ||||
eldsToReturn = 0, int queryOptions = 0) { | ||||
return conn()->findOne(ns, q, fieldsToReturn, queryOptions); | ||||
} | ||||
private: | private: | |||
auto_ptr<scoped_lock> connLock; | auto_ptr<scoped_lock> connLock; | |||
static mutex mapMutex; | static mutex mapMutex; | |||
struct X { | struct X { | |||
mutex z; | mutex z; | |||
DBClientConnection cc; | DBClientConnection cc; | |||
X() : z("X"), cc(/*reconnect*/true, 0, /*timeout*/10) { | X() : z("X"), cc(/*reconnect*/ true, 0, | |||
/*timeout*/ theReplSet ? theReplSet->config(). | ||||
ho.heartbeatTimeoutMillis/1000.0 : 10.0) { | ||||
cc._logLevel = 2; | cc._logLevel = 2; | |||
} | } | |||
} *x; | } *x; | |||
typedef map<string,ScopedConn::X*> M; | typedef map<string,ScopedConn::X*> M; | |||
static M& _map; | static M& _map; | |||
DBClientConnection* conn() { return &x->cc; } | ||||
}; | }; | |||
inline ScopedConn::ScopedConn(string hostport) { | inline ScopedConn::ScopedConn(string hostport) { | |||
bool first = false; | bool first = false; | |||
{ | { | |||
scoped_lock lk(mapMutex); | scoped_lock lk(mapMutex); | |||
x = _map[hostport]; | x = _map[hostport]; | |||
if( x == 0 ) { | if( x == 0 ) { | |||
x = _map[hostport] = new X(); | x = _map[hostport] = new X(); | |||
first = true; | first = true; | |||
skipping to change at line 87 | skipping to change at line 104 | |||
// we already locked above... | // we already locked above... | |||
string err; | string err; | |||
x->cc.connect(hostport, err); | x->cc.connect(hostport, err); | |||
} | } | |||
inline ScopedConn::~ScopedConn() { | inline ScopedConn::~ScopedConn() { | |||
// conLock releases... | // conLock releases... | |||
} | } | |||
inline DBClientConnection* ScopedConn::operator->() { | /*inline DBClientConnection* ScopedConn::operator->() { | |||
return &x->cc; | return &x->cc; | |||
} | }*/ | |||
} | } | |||
End of changes. 5 change blocks. | ||||
4 lines changed or deleted | 27 lines changed or added | |||
dbclient.h | dbclient.h | |||
---|---|---|---|---|
skipping to change at line 798 | skipping to change at line 798 | |||
boost::scoped_ptr<SockAddr> server; | boost::scoped_ptr<SockAddr> server; | |||
bool failed; // true if some sort of fatal error has ever happened | bool failed; // true if some sort of fatal error has ever happened | |||
bool autoReconnect; | bool autoReconnect; | |||
time_t lastReconnectTry; | time_t lastReconnectTry; | |||
HostAndPort _server; // remember for reconnects | HostAndPort _server; // remember for reconnects | |||
string _serverString; | string _serverString; | |||
int _port; | int _port; | |||
void _checkConnection(); | void _checkConnection(); | |||
void checkConnection() { if( failed ) _checkConnection(); } | void checkConnection() { if( failed ) _checkConnection(); } | |||
map< string, pair<string,string> > authCache; | map< string, pair<string,string> > authCache; | |||
int _timeout; | double _timeout; | |||
bool _connect( string& errmsg ); | bool _connect( string& errmsg ); | |||
public: | public: | |||
/** | /** | |||
@param _autoReconnect if true, automatically reconnect on a conn ection failure | @param _autoReconnect if true, automatically reconnect on a conn ection failure | |||
@param cp used by DBClientReplicaSet. You do not need to specif y this parameter | @param cp used by DBClientReplicaSet. You do not need to specif y this parameter | |||
@param timeout tcp timeout in seconds - this is for read/write, not connect. | @param timeout tcp timeout in seconds - this is for read/write, not connect. | |||
Connect timeout is fixed, but short, at 5 seconds. | Connect timeout is fixed, but short, at 5 seconds. | |||
*/ | */ | |||
DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* c p=0, int timeout=0) : | DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* c p=0, double timeout=0) : | |||
clientSet(cp), failed(false), autoReconnect(_autoReconnect) , lastReconnectTry(0), _timeout(timeout) { } | clientSet(cp), failed(false), autoReconnect(_autoReconnect) , lastReconnectTry(0), _timeout(timeout) { } | |||
/** Connect to a Mongo database server. | /** Connect to a Mongo database server. | |||
If autoReconnect is true, you can try to use the DBClientConnect ion even when | If autoReconnect is true, you can try to use the DBClientConnect ion even when | |||
false was returned -- it will try to connect again. | false was returned -- it will try to connect again. | |||
@param serverHostname host to connect to. can include port numb er ( 127.0.0.1 , 127.0.0.1:5555 ) | @param serverHostname host to connect to. can include port numb er ( 127.0.0.1 , 127.0.0.1:5555 ) | |||
If you use IPv6 you must add a port number ( ::1:27017 ) | If you use IPv6 you must add a port number ( ::1:27017 ) | |||
@param errmsg any relevant error message will appended to the st ring | @param errmsg any relevant error message will appended to the st ring | |||
End of changes. 2 change blocks. | ||||
2 lines changed or deleted | 2 lines changed or added | |||
goodies.h | goodies.h | |||
---|---|---|---|---|
skipping to change at line 652 | skipping to change at line 652 | |||
break; | break; | |||
throw 0; | throw 0; | |||
} | } | |||
x = x * 10 + *p++ - '0'; | x = x * 10 + *p++ - '0'; | |||
} | } | |||
return x; | return x; | |||
} | } | |||
// for convenience, '{' is greater than anything and stops number parsi ng | // for convenience, '{' is greater than anything and stops number parsi ng | |||
inline int lexNumCmp( const char *s1, const char *s2 ) { | inline int lexNumCmp( const char *s1, const char *s2 ) { | |||
//cout << "START : " << s1 << "\t" << s2 << endl; | ||||
while( *s1 && *s2 ) { | while( *s1 && *s2 ) { | |||
bool p1 = ( *s1 == '{' ); | bool p1 = ( *s1 == (char)255 ); | |||
bool p2 = ( *s2 == '{' ); | bool p2 = ( *s2 == (char)255 ); | |||
//cout << "\t\t " << p1 << "\t" << p2 << endl; | ||||
if ( p1 && !p2 ) | if ( p1 && !p2 ) | |||
return 1; | return 1; | |||
if ( p2 && !p1 ) | if ( p2 && !p1 ) | |||
return -1; | return -1; | |||
bool n1 = isNumber( *s1 ); | bool n1 = isNumber( *s1 ); | |||
bool n2 = isNumber( *s2 ); | bool n2 = isNumber( *s2 ); | |||
if ( n1 && n2 ) { | if ( n1 && n2 ) { | |||
char * e1; | // get rid of leading 0s | |||
char * e2; | while ( *s1 == '0' ) s1++; | |||
long l1 = strtol( s1 , &e1 , 10 ); | while ( *s2 == '0' ) s2++; | |||
long l2 = strtol( s2 , &e2 , 10 ); | ||||
if ( l1 > l2 ) | char * e1 = (char*)s1; | |||
char * e2 = (char*)s2; | ||||
// find length | ||||
// if end of string, will break immediately ('\0') | ||||
while ( isNumber (*e1) ) e1++; | ||||
while ( isNumber (*e2) ) e2++; | ||||
int len1 = e1-s1; | ||||
int len2 = e2-s2; | ||||
int result; | ||||
// if one is longer than the other, return | ||||
if ( len1 > len2 ) { | ||||
return 1; | return 1; | |||
else if ( l1 < l2 ) | } | |||
else if ( len2 > len1 ) { | ||||
return -1; | return -1; | |||
} | ||||
// if the lengths are equal, just strcmp | ||||
else if ( (result = strncmp(s1, s2, len1)) != 0 ) { | ||||
return result; | ||||
} | ||||
// otherwise, the numbers are equal | ||||
s1 = e1; | s1 = e1; | |||
s2 = e2; | s2 = e2; | |||
continue; | continue; | |||
} | } | |||
if ( n1 ) | if ( n1 ) | |||
return 1; | return 1; | |||
if ( n2 ) | if ( n2 ) | |||
return -1; | return -1; | |||
End of changes. 7 change blocks. | ||||
8 lines changed or deleted | 29 lines changed or added | |||
message.h | message.h | |||
---|---|---|---|---|
skipping to change at line 101 | skipping to change at line 101 | |||
} | } | |||
}; | }; | |||
class MessagingPort : public AbstractMessagingPort { | class MessagingPort : public AbstractMessagingPort { | |||
public: | public: | |||
MessagingPort(int sock, const SockAddr& farEnd); | MessagingPort(int sock, const SockAddr& farEnd); | |||
// in some cases the timeout will actually be 2x this value - eg we do a partial send, | // in some cases the timeout will actually be 2x this value - eg we do a partial send, | |||
// then the timeout fires, then we try to send again, then the time out fires again with | // then the timeout fires, then we try to send again, then the time out fires again with | |||
// no data sent, then we detect that the other side is down | // no data sent, then we detect that the other side is down | |||
MessagingPort(int timeout = 0, int logLevel = 0 ); | MessagingPort(double timeout = 0, int logLevel = 0 ); | |||
virtual ~MessagingPort(); | virtual ~MessagingPort(); | |||
void shutdown(); | void shutdown(); | |||
bool connect(SockAddr& farEnd); | bool connect(SockAddr& farEnd); | |||
/* it's assumed if you reuse a message object, that it doesn't cros s MessagingPort's. | /* it's assumed if you reuse a message object, that it doesn't cros s MessagingPort's. | |||
also, the Message data will go out of scope on the subsequent re cv call. | also, the Message data will go out of scope on the subsequent re cv call. | |||
*/ | */ | |||
skipping to change at line 136 | skipping to change at line 136 | |||
// recv len or throw SocketException | // recv len or throw SocketException | |||
void recv( char * data , int len ); | void recv( char * data , int len ); | |||
int unsafe_recv( char *buf, int max ); | int unsafe_recv( char *buf, int max ); | |||
private: | private: | |||
int sock; | int sock; | |||
PiggyBackData * piggyBackData; | PiggyBackData * piggyBackData; | |||
public: | public: | |||
SockAddr farEnd; | SockAddr farEnd; | |||
int _timeout; | double _timeout; | |||
int _logLevel; // passed to log() when logging errors | int _logLevel; // passed to log() when logging errors | |||
static void closeAllSockets(unsigned tagMask = 0xffffffff); | static void closeAllSockets(unsigned tagMask = 0xffffffff); | |||
/* ports can be tagged with various classes. see closeAllSockets(t ag). defaults to 0. */ | /* ports can be tagged with various classes. see closeAllSockets(t ag). defaults to 0. */ | |||
unsigned tag; | unsigned tag; | |||
friend class PiggyBackData; | friend class PiggyBackData; | |||
}; | }; | |||
End of changes. 2 change blocks. | ||||
2 lines changed or deleted | 2 lines changed or added | |||
multicmd.h | multicmd.h | |||
---|---|---|---|---|
skipping to change at line 46 | skipping to change at line 46 | |||
class _MultiCommandJob : public BackgroundJob { | class _MultiCommandJob : public BackgroundJob { | |||
public: | public: | |||
BSONObj& cmd; | BSONObj& cmd; | |||
Target& d; | Target& d; | |||
_MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { } | _MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { } | |||
private: | private: | |||
string name() { return "MultiCommandJob"; } | string name() { return "MultiCommandJob"; } | |||
void run() { | void run() { | |||
try { | try { | |||
ScopedConn c(d.toHost); | ScopedConn c(d.toHost); | |||
d.ok = c->runCommand("admin", cmd, d.result); | d.ok = c.runCommand("admin", cmd, d.result); | |||
} | } | |||
catch(DBException&) { | catch(DBException&) { | |||
DEV log() << "dev caught dbexception on multiCommand " << d .toHost << rsLog; | DEV log() << "dev caught dbexception on multiCommand " << d .toHost << rsLog; | |||
} | } | |||
} | } | |||
}; | }; | |||
inline void multiCommand(BSONObj cmd, list<Target>& L) { | inline void multiCommand(BSONObj cmd, list<Target>& L) { | |||
typedef shared_ptr<_MultiCommandJob> P; | typedef shared_ptr<_MultiCommandJob> P; | |||
list<P> jobs; | list<P> jobs; | |||
End of changes. 1 change blocks. | ||||
1 lines changed or deleted | 1 lines changed or added | |||
rs_member.h | rs_member.h | |||
---|---|---|---|---|
skipping to change at line 69 | skipping to change at line 69 | |||
bool operator!=(const MemberState& r) const { return s != r.s; } | bool operator!=(const MemberState& r) const { return s != r.s; } | |||
}; | }; | |||
/* this is supposed to be just basic information on a member, | /* this is supposed to be just basic information on a member, | |||
and copy constructable. */ | and copy constructable. */ | |||
class HeartbeatInfo { | class HeartbeatInfo { | |||
unsigned _id; | unsigned _id; | |||
public: | public: | |||
HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN), health(-1.0),downSince(0),skew(INT_MIN) { } | HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN), health(-1.0),downSince(0),skew(INT_MIN) { } | |||
HeartbeatInfo(unsigned id); | HeartbeatInfo(unsigned id); | |||
bool up() const { return health > 0; } | ||||
unsigned id() const { return _id; } | unsigned id() const { return _id; } | |||
MemberState hbstate; | MemberState hbstate; | |||
double health; | double health; | |||
time_t upSince; | time_t upSince; | |||
long long downSince; | long long downSince; | |||
time_t lastHeartbeat; | time_t lastHeartbeat; | |||
string lastHeartbeatMsg; | string lastHeartbeatMsg; | |||
OpTime opTime; | OpTime opTime; | |||
int skew; | int skew; | |||
bool up() const { return health > 0; } | ||||
/** health is set to -1 on startup. that means we haven't even che | ||||
cked yet. 0 means we checked and it failed. */ | ||||
bool maybeUp() const { return health != 0; } | ||||
long long timeDown() const; // ms | long long timeDown() const; // ms | |||
/* true if changed in a way of interest to the repl set manager. */ | /* true if changed in a way of interest to the repl set manager. */ | |||
bool changed(const HeartbeatInfo& old) const; | bool changed(const HeartbeatInfo& old) const; | |||
}; | }; | |||
inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { | inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { | |||
hbstate = MemberState::RS_UNKNOWN; | hbstate = MemberState::RS_UNKNOWN; | |||
health = -1.0; | health = -1.0; | |||
downSince = 0; | downSince = 0; | |||
End of changes. 2 change blocks. | ||||
1 lines changed or deleted | 6 lines changed or added | |||
sock.h | sock.h | |||
---|---|---|---|---|
skipping to change at line 117 | skipping to change at line 117 | |||
if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &x, sizeof(x)) < 0 ) | if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &x, sizeof(x)) < 0 ) | |||
out() << "Failed to set socket opt, SO_REUSEADDR" << endl; | out() << "Failed to set socket opt, SO_REUSEADDR" << endl; | |||
} | } | |||
#endif | #endif | |||
inline string makeUnixSockPath(int port){ | inline string makeUnixSockPath(int port){ | |||
return "/tmp/mongodb-" + BSONObjBuilder::numStr(port) + ".sock"; | return "/tmp/mongodb-" + BSONObjBuilder::numStr(port) + ".sock"; | |||
} | } | |||
inline void setSockTimeouts(int sock, int secs) { | inline void setSockTimeouts(int sock, double secs) { | |||
struct timeval tv; | struct timeval tv; | |||
tv.tv_sec = secs; | tv.tv_sec = (int)secs; | |||
tv.tv_usec = 0; | tv.tv_usec = (int)((long long)(secs*1000*1000) % (1000*1000)); | |||
bool report = logLevel > 3; // solaris doesn't provide these | bool report = logLevel > 3; // solaris doesn't provide these | |||
DEV report = true; | DEV report = true; | |||
bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, s izeof(tv) ) == 0; | bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, s izeof(tv) ) == 0; | |||
if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; | if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << endl; | |||
ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof (tv) ) == 0; | ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, sizeof (tv) ) == 0; | |||
DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << en dl; | DEV if( report && !ok ) log() << "unabled to set SO_RCVTIMEO" << en dl; | |||
} | } | |||
// If an ip address is passed in, just return that. If a hostname is p assed | // If an ip address is passed in, just return that. If a hostname is p assed | |||
// in, look up its ip and return that. Returns "" on failure. | // in, look up its ip and return that. Returns "" on failure. | |||
End of changes. 2 change blocks. | ||||
3 lines changed or deleted | 3 lines changed or added | |||