clientcursor.h   clientcursor.h 
skipping to change at line 103 skipping to change at line 103
recursive_scoped_lock lock( ccmutex ); recursive_scoped_lock lock( ccmutex );
ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true ); ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true );
if ( cursor ) { if ( cursor ) {
uassert( 12051, "clientcursor already in use? driver pr oblem?", uassert( 12051, "clientcursor already in use? driver pr oblem?",
cursor->_pinValue < 100 ); cursor->_pinValue < 100 );
cursor->_pinValue += 100; cursor->_pinValue += 100;
_cursorid = cursorid; _cursorid = cursorid;
} }
} }
void release() { void release() {
if ( _cursorid == INVALID_CURSOR_ID ) {
return;
}
ClientCursor *cursor = c(); ClientCursor *cursor = c();
_cursorid = INVALID_CURSOR_ID; _cursorid = INVALID_CURSOR_ID;
if ( cursor ) { if ( cursor ) {
verify( cursor->_pinValue >= 100 ); verify( cursor->_pinValue >= 100 );
cursor->_pinValue -= 100; cursor->_pinValue -= 100;
} }
} }
~Pin() { DESTRUCTOR_GUARD( release(); ) } ~Pin() { DESTRUCTOR_GUARD( release(); ) }
ClientCursor *c() const { return ClientCursor::find( _cursorid ); } ClientCursor *c() const { return ClientCursor::find( _cursorid ); }
private: private:
 End of changes. 1 change blocks. 
0 lines changed or deleted 3 lines changed or added


 dbclient_rs.h   dbclient_rs.h 
skipping to change at line 450 skipping to change at line 450
virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0); virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0);
virtual void remove( const string &ns , Query obj , int flags ); virtual void remove( const string &ns , Query obj , int flags );
virtual void update( const string &ns , Query query , BSONObj obj , int flags ); virtual void update( const string &ns , Query query , BSONObj obj , int flags );
virtual void killCursor( long long cursorID ); virtual void killCursor( long long cursorID );
// ---- access raw connections ---- // ---- access raw connections ----
/**
* WARNING: this method is very dangerous - this object can decide
to free the
* returned master connection any time.
*
* @return the reference to the address that points to the master c
onnection.
*/
DBClientConnection& masterConn(); DBClientConnection& masterConn();
DBClientConnection& slaveConn(); DBClientConnection& slaveConn();
// ---- callback pieces ------- // ---- callback pieces -------
virtual void say( Message &toSend, bool isRetry = false , string* a ctualServer = 0); virtual void say( Message &toSend, bool isRetry = false , string* a ctualServer = 0);
virtual bool recv( Message &toRecv ); virtual bool recv( Message &toRecv );
virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL ); virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL );
/* this is the callback from our underlying connections to notify u s that we got a "not master" error. /* this is the callback from our underlying connections to notify u s that we got a "not master" error.
 End of changes. 1 change blocks. 
0 lines changed or deleted 8 lines changed or added


 dbclientinterface.h   dbclientinterface.h 
skipping to change at line 632 skipping to change at line 632
For fixed size (capped) collections, this size is the total/max size of the For fixed size (capped) collections, this size is the total/max size of the
collection. collection.
@param capped if true, this is a fixed size collection (where ol d data rolls out). @param capped if true, this is a fixed size collection (where ol d data rolls out).
@param max maximum number of objects if capped (optional). @param max maximum number of objects if capped (optional).
returns true if successful. returns true if successful.
*/ */
bool createCollection(const string &ns, long long size = 0, bool ca pped = false, int max = 0, BSONObj *info = 0); bool createCollection(const string &ns, long long size = 0, bool ca pped = false, int max = 0, BSONObj *info = 0);
/** Get error result from the last write operation (insert/update/d elete) on this connection. /** Get error result from the last write operation (insert/update/d elete) on this connection.
db doesn't change the command's behavior - it is just for auth checks.
@return error message text, or empty string if no error. @return error message text, or empty string if no error.
*/ */
string getLastError(const std::string& db,
bool fsync = false,
bool j = false,
int w = 0,
int wtimeout = 0);
// Same as above but defaults to using admin DB
string getLastError(bool fsync = false, bool j = false, int w = 0, int wtimeout = 0); string getLastError(bool fsync = false, bool j = false, int w = 0, int wtimeout = 0);
/** Get error result from the last write operation (insert/update/d elete) on this connection. /** Get error result from the last write operation (insert/update/d elete) on this connection.
db doesn't change the command's behavior - it is just for auth checks.
@return full error object. @return full error object.
If "w" is -1, wait for propagation to majority of nodes. If "w" is -1, wait for propagation to majority of nodes.
If "wtimeout" is 0, the operation will block indefinitely if ne eded. If "wtimeout" is 0, the operation will block indefinitely if ne eded.
*/ */
virtual BSONObj getLastErrorDetailed(const std::string& db,
bool fsync = false,
bool j = false,
int w = 0,
int wtimeout = 0);
// Same as above but defaults to using admin DB
virtual BSONObj getLastErrorDetailed(bool fsync = false, bool j = f alse, int w = 0, int wtimeout = 0); virtual BSONObj getLastErrorDetailed(bool fsync = false, bool j = f alse, int w = 0, int wtimeout = 0);
/** Can be called with the returned value from getLastErrorDetailed to extract an error string. /** Can be called with the returned value from getLastErrorDetailed to extract an error string.
If all you need is the string, just call getLastError() instead . If all you need is the string, just call getLastError() instead .
*/ */
static string getLastErrorString( const BSONObj& res ); static string getLastErrorString( const BSONObj& res );
/** Return the last error which has occurred, even if not the very last operation. /** Return the last error which has occurred, even if not the very last operation.
@return { err : <error message>, nPrev : <how_many_ops_back_occu rred>, ok : 1 } @return { err : <error message>, nPrev : <how_many_ops_back_occu rred>, ok : 1 }
 End of changes. 4 change blocks. 
0 lines changed or deleted 14 lines changed or added


 dur_recover.h   dur_recover.h 
skipping to change at line 31 skipping to change at line 31
void go(vector<boost::filesystem::path>& files); void go(vector<boost::filesystem::path>& files);
~RecoveryJob(); ~RecoveryJob();
/** @param data data between header and footer. compressed if r ecovering. */ /** @param data data between header and footer. compressed if r ecovering. */
void processSection(const JSectHeader *h, const void *data, uns igned len, const JSectFooter *f); void processSection(const JSectHeader *h, const void *data, uns igned len, const JSectFooter *f);
void close(); // locks and calls _close() void close(); // locks and calls _close()
static RecoveryJob & get() { return _instance; } static RecoveryJob & get() { return _instance; }
private: private:
void write(const ParsedJournalEntry& entry); // actually writes void write(const ParsedJournalEntry& entry, MongoMMF* mmf); //
to the file actually writes to the file
void applyEntry(const ParsedJournalEntry& entry, bool apply, bo void applyEntry(const ParsedJournalEntry& entry, bool apply, bo
ol dump); ol dump, MongoMMF* mmf);
void applyEntries(const vector<ParsedJournalEntry> &entries); void applyEntries(const vector<ParsedJournalEntry> &entries);
bool processFileBuffer(const void *, unsigned len); bool processFileBuffer(const void *, unsigned len);
bool processFile(boost::filesystem::path journalfile); bool processFile(boost::filesystem::path journalfile);
void _close(); // doesn't lock void _close(); // doesn't lock
MongoMMF* getMongoMMF(const ParsedJournalEntry& entry);
list<boost::shared_ptr<MongoMMF> > _mmfs; list<boost::shared_ptr<MongoMMF> > _mmfs;
unsigned long long _lastDataSyncedFromLastRun; unsigned long long _lastDataSyncedFromLastRun;
unsigned long long _lastSeqMentionedInConsoleLog; unsigned long long _lastSeqMentionedInConsoleLog;
public: public:
mongo::mutex _mx; // protects _mmfs mongo::mutex _mx; // protects _mmfs
private: private:
bool _recovering; // are we in recovery or WRITETODATAFILES bool _recovering; // are we in recovery or WRITETODATAFILES
 End of changes. 2 change blocks. 
4 lines changed or deleted 5 lines changed or added


 index.h   index.h 
skipping to change at line 252 skipping to change at line 252
struct IndexChanges { /*on an update*/ struct IndexChanges { /*on an update*/
BSONObjSet oldkeys; BSONObjSet oldkeys;
BSONObjSet newkeys; BSONObjSet newkeys;
vector<BSONObj*> removed; // these keys were removed as part of the change vector<BSONObj*> removed; // these keys were removed as part of the change
vector<BSONObj*> added; // these keys were added as part of the c hange vector<BSONObj*> added; // these keys were added as part of the c hange
/** @curObjLoc - the object we want to add's location. if it is al ready in the /** @curObjLoc - the object we want to add's location. if it is al ready in the
index, that is allowed here (for bg indexing case) . index, that is allowed here (for bg indexing case) .
*/ */
void dupCheck(IndexDetails& idx, DiskLoc curObjLoc) { void dupCheck(IndexDetails& idx, DiskLoc curObjLoc);
if( added.empty() || !idx.unique() )
return;
const Ordering ordering = Ordering::make(idx.keyPattern());
idx.idxInterface().uassertIfDups(idx, added, idx.head, curObjLo
c, ordering); // "E11001 duplicate key on update"
}
}; };
class NamespaceDetails; class NamespaceDetails;
// changedId should be initialized to false // changedId should be initialized to false
void getIndexChanges(vector<IndexChanges>& v, const char *ns, Namespace Details& d, void getIndexChanges(vector<IndexChanges>& v, const char *ns, Namespace Details& d,
BSONObj newObj, BSONObj oldObj, bool &cangedId); BSONObj newObj, BSONObj oldObj, bool &cangedId);
void dupCheck(vector<IndexChanges>& v, NamespaceDetails& d, DiskLoc cur ObjLoc); void dupCheck(vector<IndexChanges>& v, NamespaceDetails& d, DiskLoc cur ObjLoc);
void assureSysIndexesEmptied(const char *ns, IndexDetails *exceptForIdI ndex); void assureSysIndexesEmptied(const char *ns, IndexDetails *exceptForIdI ndex);
int removeFromSysIndexes(const char *ns, const char *idxName); int removeFromSysIndexes(const char *ns, const char *idxName);
 End of changes. 1 change blocks. 
7 lines changed or deleted 1 lines changed or added


 queue.h   queue.h 
skipping to change at line 62 skipping to change at line 62
_getSize(&_getSizeDefault) {} _getSize(&_getSizeDefault) {}
BlockingQueue(size_t size, getSizeFunc f) : BlockingQueue(size_t size, getSizeFunc f) :
_lock("BlockingQueue(custom size)"), _lock("BlockingQueue(custom size)"),
_maxSize(size), _maxSize(size),
_currentSize(0), _currentSize(0),
_getSize(f) {} _getSize(f) {}
void push(T const& t) { void push(T const& t) {
scoped_lock l( _lock ); scoped_lock l( _lock );
size_t tSize = _getSize(t); size_t tSize = _getSize(t);
while (_queue.size()+tSize >= _maxSize) { while (_currentSize + tSize >= _maxSize) {
_cvNoLongerFull.wait( l.boost() ); _cvNoLongerFull.wait( l.boost() );
} }
_queue.push( t ); _queue.push( t );
_currentSize += tSize; _currentSize += tSize;
_cvNoLongerEmpty.notify_one(); _cvNoLongerEmpty.notify_one();
} }
bool empty() const { bool empty() const {
scoped_lock l( _lock ); scoped_lock l( _lock );
return _queue.empty(); return _queue.empty();
 End of changes. 1 change blocks. 
1 lines changed or deleted 1 lines changed or added


 rs.h   rs.h 
skipping to change at line 22 skipping to change at line 22
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details. * GNU Affero General Public License for more details.
* *
* You should have received a copy of the GNU Affero General Public Licen se * You should have received a copy of the GNU Affero General Public Licen se
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#pragma once #pragma once
#include "mongo/db/commands.h" #include "mongo/db/commands.h"
#include "mongo/db/index.h"
#include "mongo/db/oplog.h" #include "mongo/db/oplog.h"
#include "mongo/db/oplogreader.h" #include "mongo/db/oplogreader.h"
#include "mongo/db/repl/rs_config.h" #include "mongo/db/repl/rs_config.h"
#include "mongo/db/repl/rs_exception.h" #include "mongo/db/repl/rs_exception.h"
#include "mongo/db/repl/rs_member.h" #include "mongo/db/repl/rs_member.h"
#include "mongo/db/repl/rs_optime.h" #include "mongo/db/repl/rs_optime.h"
#include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/rs_sync.h"
#include "mongo/util/concurrency/list.h" #include "mongo/util/concurrency/list.h"
#include "mongo/util/concurrency/msg.h" #include "mongo/util/concurrency/msg.h"
#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/concurrency/thread_pool.h"
skipping to change at line 500 skipping to change at line 501
void startThreads(); void startThreads();
friend class FeedbackThread; friend class FeedbackThread;
friend class CmdReplSetElect; friend class CmdReplSetElect;
friend class Member; friend class Member;
friend class Manager; friend class Manager;
friend class GhostSync; friend class GhostSync;
friend class Consensus; friend class Consensus;
private: private:
bool _syncDoInitialSync_clone( const char *master, const list<strin g>& dbs , bool dataPass ); bool _syncDoInitialSync_clone( const char *master, const list<strin g>& dbs , bool dataPass );
bool _syncDoInitialSync_applyToHead( replset::InitialSync& init, Op logReader* r , bool _syncDoInitialSync_applyToHead( replset::SyncTail& syncer, Opl ogReader* r ,
const Member* source, const BS ONObj& lastOp, const Member* source, const BS ONObj& lastOp,
BSONObj& minValidOut); BSONObj& minValidOut);
void _syncDoInitialSync(); void _syncDoInitialSync();
void syncDoInitialSync(); void syncDoInitialSync();
void _syncThread(); void _syncThread();
void syncTail(); void syncTail();
unsigned _syncRollback(OplogReader& r); unsigned _syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r); void syncFixUp(HowToFixUp& h, OplogReader& r);
// keep a list of hosts that we've tried recently that didn't work // keep a list of hosts that we've tried recently that didn't work
skipping to change at line 540 skipping to change at line 541
static const int replWriterThreadCount; static const int replWriterThreadCount;
static const int replPrefetcherThreadCount; static const int replPrefetcherThreadCount;
threadpool::ThreadPool& getPrefetchPool() { return _prefetcherPool; } threadpool::ThreadPool& getPrefetchPool() { return _prefetcherPool; }
threadpool::ThreadPool& getWriterPool() { return _writerPool; } threadpool::ThreadPool& getWriterPool() { return _writerPool; }
const ReplSetConfig::MemberCfg& myConfig() const { return _config; } const ReplSetConfig::MemberCfg& myConfig() const { return _config; }
bool tryToGoLiveAsASecondary(OpTime&); // readlocks bool tryToGoLiveAsASecondary(OpTime&); // readlocks
void syncRollback(OplogReader& r); void syncRollback(OplogReader& r);
void syncThread(); void syncThread();
const OpTime lastOtherOpTime() const; const OpTime lastOtherOpTime() const;
static void setMinValid(BSONObj obj);
int oplogVersion;
private: private:
IndexPrefetchConfig _indexPrefetchConfig; IndexPrefetchConfig _indexPrefetchConfig;
}; };
class ReplSet : public ReplSetImpl { class ReplSet : public ReplSetImpl {
public: public:
ReplSet(); ReplSet();
ReplSet(ReplSetCmdline& replSetCmdline); ReplSet(ReplSetCmdline& replSetCmdline);
virtual ~ReplSet() {} virtual ~ReplSet() {}
skipping to change at line 672 skipping to change at line 675
/** inlines ----------------- */ /** inlines ----------------- */
inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig: :MemberCfg *c, bool self) : inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig: :MemberCfg *c, bool self) :
_config(*c), _h(h), _hbinfo(ord) { _config(*c), _h(h), _hbinfo(ord) {
verify(c); verify(c);
if( self ) if( self )
_hbinfo.health = 1.0; _hbinfo.health = 1.0;
} }
inline bool ignoreUniqueIndex(IndexDetails& idx) {
if (!idx.unique()) {
return false;
}
if (!theReplSet) {
return false;
}
// see SERVER-6671
MemberState ms = theReplSet->state();
if (! ((ms == MemberState::RS_STARTUP2) ||
(ms == MemberState::RS_RECOVERING) ||
(ms == MemberState::RS_ROLLBACK))) {
return false;
}
// 2 is the oldest oplog version where operations
// are fully idempotent.
if (theReplSet->oplogVersion < 2) {
return false;
}
// Never ignore _id index
if (idx.isIdIndex()) {
return false;
}
return true;
}
} }
 End of changes. 5 change blocks. 
1 lines changed or deleted 31 lines changed or added


 rs_sync.h   rs_sync.h 
skipping to change at line 23 skipping to change at line 23
* You should have received a copy of the GNU Affero General Public Lice nse * You should have received a copy of the GNU Affero General Public Lice nse
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#pragma once #pragma once
#include <deque> #include <deque>
#include <vector> #include <vector>
#include "mongo/db/client.h" #include "mongo/db/client.h"
#include "mongo/db/dur.h"
#include "mongo/db/jsobj.h" #include "mongo/db/jsobj.h"
#include "mongo/db/oplog.h" #include "mongo/db/oplog.h"
#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/concurrency/thread_pool.h"
namespace mongo { namespace mongo {
namespace replset { namespace replset {
class BackgroundSyncInterface; class BackgroundSyncInterface;
/** /**
* "Normal" replica set syncing * "Normal" replica set syncing
*/ */
class SyncTail : public Sync { class SyncTail : public Sync {
typedef void (*MultiSyncApplyFunc)(const std::vector<BSONObj>& ops, SyncTail* st); typedef void (*MultiSyncApplyFunc)(const std::vector<BSONObj>& ops, SyncTail* st);
public: public:
SyncTail(BackgroundSyncInterface *q); SyncTail(BackgroundSyncInterface *q);
virtual ~SyncTail(); virtual ~SyncTail();
virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert = false); virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert = false);
/**
* Apply ops from applyGTEObj's ts to at least minValidObj's ts. N
ote that, due to
* batching, this may end up applying ops beyond minValidObj's ts.
*
* @param applyGTEObj the op to start replicating at. This is actu
ally not used except in
* comparision to minValidObj: the background sy
nc thread keeps its own
* record of where we're synced to and starts pr
oviding ops from that
* point.
* @param minValidObj the op to finish syncing at. This function c
annot return (other than
* fatally erroring out) without applying at lea
st this op.
* @param func whether this should use initial sync logic (r
ecloning docs) or
* "normal" logic.
* @return BSONObj the op that was synced to. This may be great
er than minValidObj, as a
* single batch might blow right by minvalid. If
applyGTEObj is the same
* op as minValidObj, this will be applyGTEObj.
*/
BSONObj oplogApplySegment(const BSONObj& applyGTEObj, const BSONObj
& minValidObj,
MultiSyncApplyFunc func);
/**
* Runs oplogApplySegment without allowing recloning documents.
*/
virtual BSONObj oplogApplication(const BSONObj& applyGTEObj, const
BSONObj& minValidObj);
void oplogApplication(); void oplogApplication();
bool peek(BSONObj* obj); bool peek(BSONObj* obj);
class OpQueue { class OpQueue {
public: public:
OpQueue() : _size(0) {} OpQueue() : _size(0) {}
size_t getSize() { return _size; } size_t getSize() { return _size; }
std::deque<BSONObj>& getDeque() { return _deque; } std::deque<BSONObj>& getDeque() { return _deque; }
void push_back(BSONObj& op) { void push_back(BSONObj& op) {
_deque.push_back(op); _deque.push_back(op);
skipping to change at line 72 skipping to change at line 98
// stop waiting and apply the queue we have. Only returns false if !ops.empty(). // stop waiting and apply the queue we have. Only returns false if !ops.empty().
bool tryPopAndWaitForMore(OpQueue* ops); bool tryPopAndWaitForMore(OpQueue* ops);
// After ops have been written to db, call this // After ops have been written to db, call this
// to update local oplog.rs, as well as notify the primary // to update local oplog.rs, as well as notify the primary
// that we have applied the ops. // that we have applied the ops.
// Ops are removed from the deque. // Ops are removed from the deque.
void applyOpsToOplog(std::deque<BSONObj>* ops); void applyOpsToOplog(std::deque<BSONObj>* ops);
protected: protected:
static const unsigned int replBatchSizeBytes = 1024 * 1024 * 256 ; // Cap the batches using the limit on journal commits.
// This works out to be 100 MB (64 bit) or 50 MB (32 bit)
static const unsigned int replBatchLimitBytes = dur::UncommittedByt
esLimit;
static const int replBatchLimitSeconds = 1;
static const unsigned int replBatchLimitOperations = 5000;
// Prefetch and write a deque of operations, using the supplied fun ction. // Prefetch and write a deque of operations, using the supplied fun ction.
// Initial Sync and Sync Tail each use a different function. // Initial Sync and Sync Tail each use a different function.
void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyF unc); void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyF unc);
// The version of the last op to be read
int oplogVersion;
private: private:
BackgroundSyncInterface* _networkQueue; BackgroundSyncInterface* _networkQueue;
// Doles out all the work to the reader pool threads and waits for them to complete // Doles out all the work to the reader pool threads and waits for them to complete
void prefetchOps(const std::deque<BSONObj>& ops); void prefetchOps(const std::deque<BSONObj>& ops);
// Used by the thread pool readers to prefetch an op // Used by the thread pool readers to prefetch an op
static void prefetchOp(const BSONObj& op); static void prefetchOp(const BSONObj& op);
// Doles out all the work to the writer pool threads and waits for them to complete // Doles out all the work to the writer pool threads and waits for them to complete
void applyOps(const std::vector< std::vector<BSONObj> >& writerVect ors, void applyOps(const std::vector< std::vector<BSONObj> >& writerVect ors,
MultiSyncApplyFunc applyFunc); MultiSyncApplyFunc applyFunc);
void fillWriterVectors(const std::deque<BSONObj>& ops, void fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerV ectors); std::vector< std::vector<BSONObj> >* writerV ectors);
void handleSlaveDelay(const BSONObj& op); void handleSlaveDelay(const BSONObj& op);
void setOplogVersion(const BSONObj& op);
}; };
/** /**
* Initial clone and sync * Initial clone and sync
*/ */
class InitialSync : public SyncTail { class InitialSync : public SyncTail {
public: public:
virtual ~InitialSync(); virtual ~InitialSync();
InitialSync(BackgroundSyncInterface *q); InitialSync(BackgroundSyncInterface *q);
void oplogApplication(const BSONObj& applyGTEObj, const BSONObj& mi
nValidObj); /**
* Creates the initial oplog entry: applies applyGTEObj and writes
it to the oplog. Then
* this runs oplogApplySegment allowing recloning documents.
*/
BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj&
minValidObj);
}; };
// TODO: move hbmsg into an error-keeping class (SERVER-4444) // TODO: move hbmsg into an error-keeping class (SERVER-4444)
void sethbmsg(const string& s, const int logLevel=0); void sethbmsg(const string& s, const int logLevel=0);
// These free functions are used by the thread pool workers to write op s to the db. // These free functions are used by the thread pool workers to write op s to the db.
void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st);
void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* s t); void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* s t);
} // namespace replset } // namespace replset
 End of changes. 6 change blocks. 
3 lines changed or deleted 55 lines changed or added


 syncclusterconnection.h   syncclusterconnection.h 
skipping to change at line 93 skipping to change at line 93
virtual bool call( Message &toSend, Message &response, bool assertO k , string * actualServer ); virtual bool call( Message &toSend, Message &response, bool assertO k , string * actualServer );
virtual void say( Message &toSend, bool isRetry = false , string * actualServer = 0 ); virtual void say( Message &toSend, bool isRetry = false , string * actualServer = 0 );
virtual void sayPiggyBack( Message &toSend ); virtual void sayPiggyBack( Message &toSend );
virtual void killCursor( long long cursorID ); virtual void killCursor( long long cursorID );
virtual string getServerAddress() const { return _address; } virtual string getServerAddress() const { return _address; }
virtual bool isFailed() const { return false; } virtual bool isFailed() const { return false; }
virtual string toString() { return _toString(); } virtual string toString() { return _toString(); }
virtual BSONObj getLastErrorDetailed(const std::string& db,
bool fsync=false,
bool j=false,
int w=0,
int wtimeout=0);
virtual BSONObj getLastErrorDetailed(bool fsync=false, bool j=false , int w=0, int wtimeout=0); virtual BSONObj getLastErrorDetailed(bool fsync=false, bool j=false , int w=0, int wtimeout=0);
virtual bool callRead( Message& toSend , Message& response ); virtual bool callRead( Message& toSend , Message& response );
virtual ConnectionString::ConnectionType type() const { return Conn ectionString::SYNC; } virtual ConnectionString::ConnectionType type() const { return Conn ectionString::SYNC; }
void setAllSoTimeouts( double socketTimeout ); void setAllSoTimeouts( double socketTimeout );
double getSoTimeout() const { return _socketTimeout; } double getSoTimeout() const { return _socketTimeout; }
virtual bool auth(const string &dbname, const string &username, con st string &password_text, string& errmsg, bool digestPassword, Auth::Level* level=NULL); virtual bool auth(const string &dbname, const string &username, con st string &password_text, string& errmsg, bool digestPassword, Auth::Level* level=NULL);
 End of changes. 1 change blocks. 
0 lines changed or deleted 5 lines changed or added


 update.h   update.h 
skipping to change at line 61 skipping to change at line 61
UpdateResult updateObjects(const char* ns, UpdateResult updateObjects(const char* ns,
const BSONObj& updateobj, const BSONObj& updateobj,
const BSONObj& pattern, const BSONObj& pattern,
bool upsert, bool upsert,
bool multi, bool multi,
bool logop, bool logop,
OpDebug& debug, OpDebug& debug,
bool fromMigrate = false, bool fromMigrate = false,
const QueryPlanSelectionPolicy& planPolicy = QueryPlanSelectionPolicy::any()); const QueryPlanSelectionPolicy& planPolicy = QueryPlanSelectionPolicy::any());
/*
* Similar to updateObjects but not strict about applying mods that can
fail during initial
* replication.
*
* Reference ticket: SERVER-4781
*/
UpdateResult updateObjectsForReplication(const char* ns,
const BSONObj& updateobj,
const BSONObj& pattern,
bool upsert,
bool multi,
bool logop,
OpDebug& debug,
bool fromMigrate = false,
const QueryPlanSelectionPolicy
& planPolicy =
QueryPlanSelectionPolicy::
any());
UpdateResult _updateObjects(bool su, UpdateResult _updateObjects(bool su,
const char* ns, const char* ns,
const BSONObj& updateobj, const BSONObj& updateobj,
const BSONObj& pattern, const BSONObj& pattern,
bool upsert, bool upsert,
bool multi, bool multi,
bool logop, bool logop,
OpDebug& debug, OpDebug& debug,
RemoveSaver* rs = 0, RemoveSaver* rs = 0,
bool fromMigrate = false, bool fromMigrate = false,
const QueryPlanSelectionPolicy& planPolicy const QueryPlanSelectionPolicy& planPolicy
= QueryPlanSelectionPolicy::any()); = QueryPlanSelectionPolicy::any(),
bool forReplication = false);
/** /**
* takes the from document and returns a new document * takes the from document and returns a new document
* after apply all the operators * after apply all the operators
* e.g. * e.g.
* applyUpdateOperators( BSON( "x" << 1 ) , BSON( "$inc" << BSON( "x" << 1 ) ) ); * applyUpdateOperators( BSON( "x" << 1 ) , BSON( "$inc" << BSON( "x" << 1 ) ) );
* returns: { x : 2 } * returns: { x : 2 }
*/ */
BSONObj applyUpdateOperators( const BSONObj& from, const BSONObj& opera tors ); BSONObj applyUpdateOperators( const BSONObj& from, const BSONObj& opera tors );
 End of changes. 2 change blocks. 
2 lines changed or deleted 23 lines changed or added


 update_internal.h   update_internal.h 
skipping to change at line 48 skipping to change at line 48
// See opFromStr below // See opFromStr below
// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 // 0 1 2 3 4 5 6 7 8 9 10 11 12 13
enum Op { INC, SET, PUSH, PUSH_ALL, PULL, PULL_ALL , POP, UNSET, BI TAND, BITOR , BIT , ADDTOSET, RENAME_FROM, RENAME_TO } op; enum Op { INC, SET, PUSH, PUSH_ALL, PULL, PULL_ALL , POP, UNSET, BI TAND, BITOR , BIT , ADDTOSET, RENAME_FROM, RENAME_TO } op;
static const char* modNames[]; static const char* modNames[];
static unsigned modNamesNum; static unsigned modNamesNum;
const char* fieldName; const char* fieldName;
const char* shortFieldName; const char* shortFieldName;
// Determines if this mod must absoluetly be applied. In some repli
cation scenarios, a
// failed apply of a mod does not constitute an error. In those cas
es, setting strict
// to off would not throw errors.
bool strictApply;
BSONElement elt; // x:5 note: this is the actual element from the u pdateobj BSONElement elt; // x:5 note: this is the actual element from the u pdateobj
boost::shared_ptr<Matcher> matcher; boost::shared_ptr<Matcher> matcher;
bool matcherOnPrimitive; bool matcherOnPrimitive;
void init( Op o , BSONElement& e ) { void init( Op o , BSONElement& e , bool forReplication ) {
op = o; op = o;
elt = e; elt = e;
strictApply = !forReplication;
if ( op == PULL && e.type() == Object ) { if ( op == PULL && e.type() == Object ) {
BSONObj t = e.embeddedObject(); BSONObj t = e.embeddedObject();
if ( t.firstElement().getGtLtOp() == 0 ) { if ( t.firstElement().getGtLtOp() == 0 ) {
matcher.reset( new Matcher( t ) ); matcher.reset( new Matcher( t ) );
matcherOnPrimitive = false; matcherOnPrimitive = false;
} }
else { else {
matcher.reset( new Matcher( BSON( "" << t ) ) ); matcher.reset( new Matcher( BSON( "" << t ) ) );
matcherOnPrimitive = true; matcherOnPrimitive = true;
} }
skipping to change at line 334 skipping to change at line 340
if ( m.isIndexed( idxKeys ) || if ( m.isIndexed( idxKeys ) ||
(backgroundKeys && m.isIndexed(*backgroundKeys)) ) { (backgroundKeys && m.isIndexed(*backgroundKeys)) ) {
_isIndexed++; _isIndexed++;
} }
} }
public: public:
ModSet( const BSONObj& from, ModSet( const BSONObj& from,
const set<string>& idxKeys = set<string>(), const set<string>& idxKeys = set<string>(),
const set<string>* backgroundKeys = 0 ); const set<string>* backgroundKeys = 0,
bool forReplication = false );
/** /**
* re-check if this mod is impacted by indexes * re-check if this mod is impacted by indexes
*/ */
void updateIsIndexed( const set<string>& idxKeys, const set<string> * backgroundKeys ); void updateIsIndexed( const set<string>& idxKeys, const set<string> * backgroundKeys );
// TODO: this is inefficient - should probably just handle when ite rating // TODO: this is inefficient - should probably just handle when ite rating
ModSet * fixDynamicArray( const string& elemMatchKey ) const; ModSet * fixDynamicArray( const string& elemMatchKey ) const;
bool hasDynamicArray() const { return _hasDynamicArray; } bool hasDynamicArray() const { return _hasDynamicArray; }
skipping to change at line 400 skipping to change at line 407
*/ */
class ModState : boost::noncopyable { class ModState : boost::noncopyable {
public: public:
const Mod* m; const Mod* m;
BSONElement old; BSONElement old;
BSONElement newVal; BSONElement newVal;
BSONObj _objData; BSONObj _objData;
const char* fixedOpName; const char* fixedOpName;
BSONElement* fixed; BSONElement* fixed;
int pushStartSize; BSONArray fixedArray;
bool forceEmptyArray;
bool forcePositional;
int position;
int DEPRECATED_pushStartSize;
BSONType incType; BSONType incType;
int incint; int incint;
double incdouble; double incdouble;
long long inclong; long long inclong;
bool dontApply; bool dontApply;
ModState() { ModState() {
fixedOpName = 0; fixedOpName = 0;
fixed = 0; fixed = 0;
pushStartSize = -1; forceEmptyArray = false;
forcePositional = false;
position = 0;
DEPRECATED_pushStartSize = -1;
incType = EOO; incType = EOO;
dontApply = false; dontApply = false;
} }
Mod::Op op() const { Mod::Op op() const {
return m->op; return m->op;
} }
const char* fieldName() const { const char* fieldName() const {
return m->fieldName; return m->fieldName;
} }
bool needOpLogRewrite() const { bool DEPRECATED_needOpLogRewrite() const {
if ( dontApply ) if ( dontApply )
return false; return false;
if ( fixed || fixedOpName || incType ) if ( fixed || fixedOpName || incType )
return true; return true;
switch( op() ) { switch( op() ) {
case Mod::RENAME_FROM: case Mod::RENAME_FROM:
case Mod::RENAME_TO: case Mod::RENAME_TO:
return true; return true;
case Mod::BIT: case Mod::BIT:
case Mod::BITAND: case Mod::BITAND:
case Mod::BITOR: case Mod::BITOR:
// TODO: should we convert this to $set? return true;
return false;
default: default:
return false; return false;
} }
} }
void appendForOpLog( BSONObjBuilder& b ) const; void appendForOpLog( BSONObjBuilder& b ) const;
void apply( BSONBuilderBase& b , BSONElement in ) { void apply( BSONBuilderBase& b , BSONElement in ) {
m->apply( b , in , *this ); m->apply( b , in , *this );
} }
skipping to change at line 519 skipping to change at line 532
if ( ms.dontApply ) { if ( ms.dontApply ) {
return; return;
} }
//const Mod& m = *(ms.m); // HACK //const Mod& m = *(ms.m); // HACK
Mod& m = *((Mod*)(ms.m)); // HACK Mod& m = *((Mod*)(ms.m)); // HACK
switch ( m.op ) { switch ( m.op ) {
case Mod::PUSH: { case Mod::PUSH: {
ms.fixedOpName = "$set";
if ( m.isEach() ) { if ( m.isEach() ) {
b.appendArray( m.shortFieldName, m.getEach() ); BSONObj arr = m.getEach();
b.appendArray( m.shortFieldName, arr );
ms.forceEmptyArray = true;
ms.fixedArray = BSONArray(arr.getOwned());
} else { } else {
BSONObjBuilder arr( b.subarrayStart( m.shortFieldName ) ); BSONObjBuilder arr( b.subarrayStart( m.shortFieldName ) );
arr.appendAs( m.elt, "0" ); arr.appendAs( m.elt, "0" );
arr.done(); ms.forceEmptyArray = true;
ms.fixedArray = BSONArray(arr.done().getOwned());
} }
break; break;
} }
case Mod::ADDTOSET: { case Mod::ADDTOSET: {
ms.fixedOpName = "$set";
if ( m.isEach() ) { if ( m.isEach() ) {
// Remove any duplicates in given array // Remove any duplicates in given array
BSONObjBuilder arr( b.subarrayStart( m.shortFieldName ) ); BSONArrayBuilder arr( b.subarrayStart( m.shortFieldName ) );
BSONElementSet toadd; BSONElementSet toadd;
m.parseEach( toadd ); m.parseEach( toadd );
BSONObjIterator i( m.getEach() ); BSONObjIterator i( m.getEach() );
int n = 0; // int n = 0;
while ( i.more() ) { while ( i.more() ) {
BSONElement e = i.next(); BSONElement e = i.next();
if ( toadd.count(e) ) { if ( toadd.count(e) ) {
arr.appendAs( e , BSONObjBuilder::numStr( n++ ) ); arr.append( e );
toadd.erase( e ); toadd.erase( e );
} }
} }
arr.done(); ms.forceEmptyArray = true;
ms.fixedArray = BSONArray(arr.done().getOwned());
} }
else { else {
BSONObjBuilder arr( b.subarrayStart( m.shortFieldName ) BSONArrayBuilder arr( b.subarrayStart( m.shortFieldName
); ) );
arr.appendAs( m.elt, "0" ); arr.append( m.elt );
arr.done(); ms.forceEmptyArray = true;
ms.fixedArray = BSONArray(arr.done().getOwned());
} }
break; break;
} }
case Mod::PUSH_ALL: { case Mod::PUSH_ALL: {
b.appendAs( m.elt, m.shortFieldName ); b.appendAs( m.elt, m.shortFieldName );
ms.fixedOpName = "$set";
ms.forceEmptyArray = true;
ms.fixedArray = BSONArray(m.elt.Obj());
break; break;
} }
case Mod::UNSET: case Mod::POP:
case Mod::PULL: case Mod::PULL:
case Mod::PULL_ALL: case Mod::PULL_ALL:
// no-op b/c unset/pull of nothing does nothing case Mod::UNSET:
// No-op b/c unset/pull of nothing does nothing. Still, exp
licilty log that
// the target array was reset.
ms.fixedOpName = "$unset";
break; break;
case Mod::INC: case Mod::INC:
ms.fixedOpName = "$set"; ms.fixedOpName = "$set";
case Mod::SET: { case Mod::SET: {
m._checkForAppending( m.elt ); m._checkForAppending( m.elt );
b.appendAs( m.elt, m.shortFieldName ); b.appendAs( m.elt, m.shortFieldName );
break; break;
} }
// shouldn't see RENAME_FROM here // shouldn't see RENAME_FROM here
case Mod::RENAME_TO: case Mod::RENAME_TO:
ms.handleRename( b, m.shortFieldName ); ms.handleRename( b, m.shortFieldName );
break; break;
default: default:
stringstream ss; stringstream ss;
ss << "unknown mod in appendNewFromMod: " << m.op; ss << "unknown mod in appendNewFromMod: " << m.op;
throw UserException( 9015, ss.str() ); throw UserException( 9015, ss.str() );
} }
} }
/** @return true iff the elements aren't eoo(), are distinct, and s hare a field name. */ /** @return true iff the elements aren't eoo(), are distinct, and s hare a field name. */
static bool duplicateFieldName( const BSONElement& a, const BSONEle ment& b ); static bool duplicateFieldName( const BSONElement& a, const BSONEle ment& b );
skipping to change at line 602 skipping to change at line 632
/** /**
* modified underlying _obj * modified underlying _obj
* @param isOnDisk - true means this is an on disk object, and this update needs to be made durable * @param isOnDisk - true means this is an on disk object, and this update needs to be made durable
*/ */
void applyModsInPlace( bool isOnDisk ); void applyModsInPlace( bool isOnDisk );
BSONObj createNewFromMods(); BSONObj createNewFromMods();
// re-writing for oplog // re-writing for oplog
bool needOpLogRewrite() const { bool DEPRECATED_needOpLogRewrite() const {
for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ ) for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ )
if ( i->second->needOpLogRewrite() ) if ( i->second->DEPRECATED_needOpLogRewrite() )
return true; return true;
return false; return false;
} }
BSONObj getOpLogRewrite() const { BSONObj getOpLogRewrite() const {
BSONObjBuilder b; BSONObjBuilder b;
for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ ) for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ )
i->second->appendForOpLog( b ); i->second->appendForOpLog( b );
return b.obj(); return b.obj();
} }
bool haveArrayDepMod() const { bool DEPRECATED_haveArrayDepMod() const {
for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ ) for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ )
if ( i->second->m->arrayDep() ) if ( i->second->m->arrayDep() )
return true; return true;
return false; return false;
} }
void appendSizeSpecForArrayDepMods( BSONObjBuilder& b ) const { void DEPRECATED_appendSizeSpecForArrayDepMods( BSONObjBuilder& b ) const {
for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ ) { for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m ods.end(); i++ ) {
const ModState& m = *i->second; const ModState& m = *i->second;
if ( m.m->arrayDep() ) { if ( m.m->arrayDep() ) {
if ( m.pushStartSize == -1 ) if ( m.DEPRECATED_pushStartSize == -1 )
b.appendNull( m.fieldName() ); b.appendNull( m.fieldName() );
else else
b << m.fieldName() << BSON( "$size" << m.pushStartS ize ); b << m.fieldName() << BSON( "$size" << m.DEPRECATED _pushStartSize );
} }
} }
} }
string toString() const; string toString() const;
friend class ModSet; friend class ModSet;
}; };
} // namespace mongo } // namespace mongo
 End of changes. 29 change blocks. 
25 lines changed or deleted 58 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/