| dbclientinterface.h | | dbclientinterface.h | |
| | | | |
| skipping to change at line 632 | | skipping to change at line 632 | |
| For fixed size (capped) collections, this size is
the total/max size of the | | For fixed size (capped) collections, this size is
the total/max size of the | |
| collection. | | collection. | |
| @param capped if true, this is a fixed size collection (where ol
d data rolls out). | | @param capped if true, this is a fixed size collection (where ol
d data rolls out). | |
| @param max maximum number of objects if capped (optional). | | @param max maximum number of objects if capped (optional). | |
| | | | |
| returns true if successful. | | returns true if successful. | |
| */ | | */ | |
| bool createCollection(const string &ns, long long size = 0, bool ca
pped = false, int max = 0, BSONObj *info = 0); | | bool createCollection(const string &ns, long long size = 0, bool ca
pped = false, int max = 0, BSONObj *info = 0); | |
| | | | |
| /** Get error result from the last write operation (insert/update/d
elete) on this connection. | | /** Get error result from the last write operation (insert/update/d
elete) on this connection. | |
|
| | | db doesn't change the command's behavior - it is just for auth
checks. | |
| @return error message text, or empty string if no error. | | @return error message text, or empty string if no error. | |
| */ | | */ | |
|
| | | string getLastError(const std::string& db, | |
| | | bool fsync = false, | |
| | | bool j = false, | |
| | | int w = 0, | |
| | | int wtimeout = 0); | |
| | | // Same as above but defaults to using admin DB | |
| string getLastError(bool fsync = false, bool j = false, int w = 0,
int wtimeout = 0); | | string getLastError(bool fsync = false, bool j = false, int w = 0,
int wtimeout = 0); | |
| | | | |
| /** Get error result from the last write operation (insert/update/d
elete) on this connection. | | /** Get error result from the last write operation (insert/update/d
elete) on this connection. | |
|
| | | db doesn't change the command's behavior - it is just for auth
checks. | |
| @return full error object. | | @return full error object. | |
| | | | |
| If "w" is -1, wait for propagation to majority of nodes. | | If "w" is -1, wait for propagation to majority of nodes. | |
| If "wtimeout" is 0, the operation will block indefinitely if ne
eded. | | If "wtimeout" is 0, the operation will block indefinitely if ne
eded. | |
| */ | | */ | |
|
| | | virtual BSONObj getLastErrorDetailed(const std::string& db, | |
| | | bool fsync = false, | |
| | | bool j = false, | |
| | | int w = 0, | |
| | | int wtimeout = 0); | |
| | | // Same as above but defaults to using admin DB | |
| virtual BSONObj getLastErrorDetailed(bool fsync = false, bool j = f
alse, int w = 0, int wtimeout = 0); | | virtual BSONObj getLastErrorDetailed(bool fsync = false, bool j = f
alse, int w = 0, int wtimeout = 0); | |
| | | | |
| /** Can be called with the returned value from getLastErrorDetailed
to extract an error string. | | /** Can be called with the returned value from getLastErrorDetailed
to extract an error string. | |
| If all you need is the string, just call getLastError() instead
. | | If all you need is the string, just call getLastError() instead
. | |
| */ | | */ | |
| static string getLastErrorString( const BSONObj& res ); | | static string getLastErrorString( const BSONObj& res ); | |
| | | | |
| /** Return the last error which has occurred, even if not the very
last operation. | | /** Return the last error which has occurred, even if not the very
last operation. | |
| | | | |
| @return { err : <error message>, nPrev : <how_many_ops_back_occu
rred>, ok : 1 } | | @return { err : <error message>, nPrev : <how_many_ops_back_occu
rred>, ok : 1 } | |
| | | | |
End of changes. 4 change blocks. |
| 0 lines changed or deleted | | 14 lines changed or added | |
|
| dur_recover.h | | dur_recover.h | |
| | | | |
| skipping to change at line 31 | | skipping to change at line 31 | |
| void go(vector<boost::filesystem::path>& files); | | void go(vector<boost::filesystem::path>& files); | |
| ~RecoveryJob(); | | ~RecoveryJob(); | |
| | | | |
| /** @param data data between header and footer. compressed if r
ecovering. */ | | /** @param data data between header and footer. compressed if r
ecovering. */ | |
| void processSection(const JSectHeader *h, const void *data, uns
igned len, const JSectFooter *f); | | void processSection(const JSectHeader *h, const void *data, uns
igned len, const JSectFooter *f); | |
| | | | |
| void close(); // locks and calls _close() | | void close(); // locks and calls _close() | |
| | | | |
| static RecoveryJob & get() { return _instance; } | | static RecoveryJob & get() { return _instance; } | |
| private: | | private: | |
|
| void write(const ParsedJournalEntry& entry); // actually writes | | void write(const ParsedJournalEntry& entry, MongoMMF* mmf); // | |
| to the file | | actually writes to the file | |
| void applyEntry(const ParsedJournalEntry& entry, bool apply, bo | | void applyEntry(const ParsedJournalEntry& entry, bool apply, bo | |
| ol dump); | | ol dump, MongoMMF* mmf); | |
| void applyEntries(const vector<ParsedJournalEntry> &entries); | | void applyEntries(const vector<ParsedJournalEntry> &entries); | |
| bool processFileBuffer(const void *, unsigned len); | | bool processFileBuffer(const void *, unsigned len); | |
| bool processFile(boost::filesystem::path journalfile); | | bool processFile(boost::filesystem::path journalfile); | |
| void _close(); // doesn't lock | | void _close(); // doesn't lock | |
|
| | | MongoMMF* getMongoMMF(const ParsedJournalEntry& entry); | |
| | | | |
| list<boost::shared_ptr<MongoMMF> > _mmfs; | | list<boost::shared_ptr<MongoMMF> > _mmfs; | |
| | | | |
| unsigned long long _lastDataSyncedFromLastRun; | | unsigned long long _lastDataSyncedFromLastRun; | |
| unsigned long long _lastSeqMentionedInConsoleLog; | | unsigned long long _lastSeqMentionedInConsoleLog; | |
| public: | | public: | |
| mongo::mutex _mx; // protects _mmfs | | mongo::mutex _mx; // protects _mmfs | |
| private: | | private: | |
| bool _recovering; // are we in recovery or WRITETODATAFILES | | bool _recovering; // are we in recovery or WRITETODATAFILES | |
| | | | |
| | | | |
End of changes. 2 change blocks. |
| 4 lines changed or deleted | | 5 lines changed or added | |
|
| index.h | | index.h | |
| | | | |
| skipping to change at line 252 | | skipping to change at line 252 | |
| | | | |
| struct IndexChanges { /*on an update*/ | | struct IndexChanges { /*on an update*/ | |
| BSONObjSet oldkeys; | | BSONObjSet oldkeys; | |
| BSONObjSet newkeys; | | BSONObjSet newkeys; | |
| vector<BSONObj*> removed; // these keys were removed as part of the
change | | vector<BSONObj*> removed; // these keys were removed as part of the
change | |
| vector<BSONObj*> added; // these keys were added as part of the c
hange | | vector<BSONObj*> added; // these keys were added as part of the c
hange | |
| | | | |
| /** @curObjLoc - the object we want to add's location. if it is al
ready in the | | /** @curObjLoc - the object we want to add's location. if it is al
ready in the | |
| index, that is allowed here (for bg indexing case)
. | | index, that is allowed here (for bg indexing case)
. | |
| */ | | */ | |
|
| void dupCheck(IndexDetails& idx, DiskLoc curObjLoc) { | | void dupCheck(IndexDetails& idx, DiskLoc curObjLoc); | |
| if( added.empty() || !idx.unique() ) | | | |
| return; | | | |
| const Ordering ordering = Ordering::make(idx.keyPattern()); | | | |
| idx.idxInterface().uassertIfDups(idx, added, idx.head, curObjLo | | | |
| c, ordering); // "E11001 duplicate key on update" | | | |
| } | | | |
| }; | | }; | |
| | | | |
| class NamespaceDetails; | | class NamespaceDetails; | |
| // changedId should be initialized to false | | // changedId should be initialized to false | |
| void getIndexChanges(vector<IndexChanges>& v, const char *ns, Namespace
Details& d, | | void getIndexChanges(vector<IndexChanges>& v, const char *ns, Namespace
Details& d, | |
| BSONObj newObj, BSONObj oldObj, bool &cangedId); | | BSONObj newObj, BSONObj oldObj, bool &cangedId); | |
| void dupCheck(vector<IndexChanges>& v, NamespaceDetails& d, DiskLoc cur
ObjLoc); | | void dupCheck(vector<IndexChanges>& v, NamespaceDetails& d, DiskLoc cur
ObjLoc); | |
| | | | |
| void assureSysIndexesEmptied(const char *ns, IndexDetails *exceptForIdI
ndex); | | void assureSysIndexesEmptied(const char *ns, IndexDetails *exceptForIdI
ndex); | |
| int removeFromSysIndexes(const char *ns, const char *idxName); | | int removeFromSysIndexes(const char *ns, const char *idxName); | |
| | | | |
End of changes. 1 change blocks. |
| 7 lines changed or deleted | | 1 lines changed or added | |
|
| rs.h | | rs.h | |
| | | | |
| skipping to change at line 22 | | skipping to change at line 22 | |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| * GNU Affero General Public License for more details. | | * GNU Affero General Public License for more details. | |
| * | | * | |
| * You should have received a copy of the GNU Affero General Public Licen
se | | * You should have received a copy of the GNU Affero General Public Licen
se | |
| * along with this program. If not, see <http://www.gnu.org/licenses/>. | | * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| */ | | */ | |
| | | | |
| #pragma once | | #pragma once | |
| | | | |
| #include "mongo/db/commands.h" | | #include "mongo/db/commands.h" | |
|
| | | #include "mongo/db/index.h" | |
| #include "mongo/db/oplog.h" | | #include "mongo/db/oplog.h" | |
| #include "mongo/db/oplogreader.h" | | #include "mongo/db/oplogreader.h" | |
| #include "mongo/db/repl/rs_config.h" | | #include "mongo/db/repl/rs_config.h" | |
| #include "mongo/db/repl/rs_exception.h" | | #include "mongo/db/repl/rs_exception.h" | |
| #include "mongo/db/repl/rs_member.h" | | #include "mongo/db/repl/rs_member.h" | |
| #include "mongo/db/repl/rs_optime.h" | | #include "mongo/db/repl/rs_optime.h" | |
| #include "mongo/db/repl/rs_sync.h" | | #include "mongo/db/repl/rs_sync.h" | |
| #include "mongo/util/concurrency/list.h" | | #include "mongo/util/concurrency/list.h" | |
| #include "mongo/util/concurrency/msg.h" | | #include "mongo/util/concurrency/msg.h" | |
| #include "mongo/util/concurrency/thread_pool.h" | | #include "mongo/util/concurrency/thread_pool.h" | |
| | | | |
| skipping to change at line 500 | | skipping to change at line 501 | |
| void startThreads(); | | void startThreads(); | |
| friend class FeedbackThread; | | friend class FeedbackThread; | |
| friend class CmdReplSetElect; | | friend class CmdReplSetElect; | |
| friend class Member; | | friend class Member; | |
| friend class Manager; | | friend class Manager; | |
| friend class GhostSync; | | friend class GhostSync; | |
| friend class Consensus; | | friend class Consensus; | |
| | | | |
| private: | | private: | |
| bool _syncDoInitialSync_clone( const char *master, const list<strin
g>& dbs , bool dataPass ); | | bool _syncDoInitialSync_clone( const char *master, const list<strin
g>& dbs , bool dataPass ); | |
|
| bool _syncDoInitialSync_applyToHead( replset::InitialSync& init, Op
logReader* r , | | bool _syncDoInitialSync_applyToHead( replset::SyncTail& syncer, Opl
ogReader* r , | |
| const Member* source, const BS
ONObj& lastOp, | | const Member* source, const BS
ONObj& lastOp, | |
| BSONObj& minValidOut); | | BSONObj& minValidOut); | |
| void _syncDoInitialSync(); | | void _syncDoInitialSync(); | |
| void syncDoInitialSync(); | | void syncDoInitialSync(); | |
| void _syncThread(); | | void _syncThread(); | |
| void syncTail(); | | void syncTail(); | |
| unsigned _syncRollback(OplogReader& r); | | unsigned _syncRollback(OplogReader& r); | |
| void syncFixUp(HowToFixUp& h, OplogReader& r); | | void syncFixUp(HowToFixUp& h, OplogReader& r); | |
| | | | |
| // keep a list of hosts that we've tried recently that didn't work | | // keep a list of hosts that we've tried recently that didn't work | |
| | | | |
| skipping to change at line 540 | | skipping to change at line 541 | |
| static const int replWriterThreadCount; | | static const int replWriterThreadCount; | |
| static const int replPrefetcherThreadCount; | | static const int replPrefetcherThreadCount; | |
| threadpool::ThreadPool& getPrefetchPool() { return _prefetcherPool;
} | | threadpool::ThreadPool& getPrefetchPool() { return _prefetcherPool;
} | |
| threadpool::ThreadPool& getWriterPool() { return _writerPool; } | | threadpool::ThreadPool& getWriterPool() { return _writerPool; } | |
| | | | |
| const ReplSetConfig::MemberCfg& myConfig() const { return _config;
} | | const ReplSetConfig::MemberCfg& myConfig() const { return _config;
} | |
| bool tryToGoLiveAsASecondary(OpTime&); // readlocks | | bool tryToGoLiveAsASecondary(OpTime&); // readlocks | |
| void syncRollback(OplogReader& r); | | void syncRollback(OplogReader& r); | |
| void syncThread(); | | void syncThread(); | |
| const OpTime lastOtherOpTime() const; | | const OpTime lastOtherOpTime() const; | |
|
| | | static void setMinValid(BSONObj obj); | |
| | | | |
|
| | | int oplogVersion; | |
| private: | | private: | |
| IndexPrefetchConfig _indexPrefetchConfig; | | IndexPrefetchConfig _indexPrefetchConfig; | |
| }; | | }; | |
| | | | |
| class ReplSet : public ReplSetImpl { | | class ReplSet : public ReplSetImpl { | |
| public: | | public: | |
| ReplSet(); | | ReplSet(); | |
| ReplSet(ReplSetCmdline& replSetCmdline); | | ReplSet(ReplSetCmdline& replSetCmdline); | |
| virtual ~ReplSet() {} | | virtual ~ReplSet() {} | |
| | | | |
| | | | |
| skipping to change at line 672 | | skipping to change at line 675 | |
| | | | |
| /** inlines ----------------- */ | | /** inlines ----------------- */ | |
| | | | |
| inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig:
:MemberCfg *c, bool self) : | | inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig:
:MemberCfg *c, bool self) : | |
| _config(*c), _h(h), _hbinfo(ord) { | | _config(*c), _h(h), _hbinfo(ord) { | |
| verify(c); | | verify(c); | |
| if( self ) | | if( self ) | |
| _hbinfo.health = 1.0; | | _hbinfo.health = 1.0; | |
| } | | } | |
| | | | |
|
| | | inline bool ignoreUniqueIndex(IndexDetails& idx) { | |
| | | if (!idx.unique()) { | |
| | | return false; | |
| | | } | |
| | | if (!theReplSet) { | |
| | | return false; | |
| | | } | |
| | | // see SERVER-6671 | |
| | | MemberState ms = theReplSet->state(); | |
| | | if (! ((ms == MemberState::RS_STARTUP2) || | |
| | | (ms == MemberState::RS_RECOVERING) || | |
| | | (ms == MemberState::RS_ROLLBACK))) { | |
| | | return false; | |
| | | } | |
| | | // 2 is the oldest oplog version where operations | |
| | | // are fully idempotent. | |
| | | if (theReplSet->oplogVersion < 2) { | |
| | | return false; | |
| | | } | |
| | | // Never ignore _id index | |
| | | if (idx.isIdIndex()) { | |
| | | return false; | |
| | | } | |
| | | | |
| | | return true; | |
| | | } | |
| | | | |
| } | | } | |
| | | | |
End of changes. 5 change blocks. |
| 1 lines changed or deleted | | 31 lines changed or added | |
|
| rs_sync.h | | rs_sync.h | |
| | | | |
| skipping to change at line 23 | | skipping to change at line 23 | |
| * You should have received a copy of the GNU Affero General Public Lice
nse | | * You should have received a copy of the GNU Affero General Public Lice
nse | |
| * along with this program. If not, see <http://www.gnu.org/licenses/>. | | * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| */ | | */ | |
| | | | |
| #pragma once | | #pragma once | |
| | | | |
| #include <deque> | | #include <deque> | |
| #include <vector> | | #include <vector> | |
| | | | |
| #include "mongo/db/client.h" | | #include "mongo/db/client.h" | |
|
| | | #include "mongo/db/dur.h" | |
| #include "mongo/db/jsobj.h" | | #include "mongo/db/jsobj.h" | |
| #include "mongo/db/oplog.h" | | #include "mongo/db/oplog.h" | |
| #include "mongo/util/concurrency/thread_pool.h" | | #include "mongo/util/concurrency/thread_pool.h" | |
| | | | |
| namespace mongo { | | namespace mongo { | |
| namespace replset { | | namespace replset { | |
| | | | |
| class BackgroundSyncInterface; | | class BackgroundSyncInterface; | |
| | | | |
| /** | | /** | |
| * "Normal" replica set syncing | | * "Normal" replica set syncing | |
| */ | | */ | |
| class SyncTail : public Sync { | | class SyncTail : public Sync { | |
| typedef void (*MultiSyncApplyFunc)(const std::vector<BSONObj>& ops,
SyncTail* st); | | typedef void (*MultiSyncApplyFunc)(const std::vector<BSONObj>& ops,
SyncTail* st); | |
| public: | | public: | |
| SyncTail(BackgroundSyncInterface *q); | | SyncTail(BackgroundSyncInterface *q); | |
| virtual ~SyncTail(); | | virtual ~SyncTail(); | |
| virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert
= false); | | virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert
= false); | |
|
| | | | |
| | | /** | |
| | | * Apply ops from applyGTEObj's ts to at least minValidObj's ts. N | |
| | | ote that, due to | |
| | | * batching, this may end up applying ops beyond minValidObj's ts. | |
| | | * | |
| | | * @param applyGTEObj the op to start replicating at. This is actu | |
| | | ally not used except in | |
| | | * comparision to minValidObj: the background sy | |
| | | nc thread keeps its own | |
| | | * record of where we're synced to and starts pr | |
| | | oviding ops from that | |
| | | * point. | |
| | | * @param minValidObj the op to finish syncing at. This function c | |
| | | annot return (other than | |
| | | * fatally erroring out) without applying at lea | |
| | | st this op. | |
| | | * @param func whether this should use initial sync logic (r | |
| | | ecloning docs) or | |
| | | * "normal" logic. | |
| | | * @return BSONObj the op that was synced to. This may be great | |
| | | er than minValidObj, as a | |
| | | * single batch might blow right by minvalid. If | |
| | | applyGTEObj is the same | |
| | | * op as minValidObj, this will be applyGTEObj. | |
| | | */ | |
| | | BSONObj oplogApplySegment(const BSONObj& applyGTEObj, const BSONObj | |
| | | & minValidObj, | |
| | | MultiSyncApplyFunc func); | |
| | | | |
| | | /** | |
| | | * Runs oplogApplySegment without allowing recloning documents. | |
| | | */ | |
| | | virtual BSONObj oplogApplication(const BSONObj& applyGTEObj, const | |
| | | BSONObj& minValidObj); | |
| | | | |
| void oplogApplication(); | | void oplogApplication(); | |
| bool peek(BSONObj* obj); | | bool peek(BSONObj* obj); | |
| | | | |
| class OpQueue { | | class OpQueue { | |
| public: | | public: | |
| OpQueue() : _size(0) {} | | OpQueue() : _size(0) {} | |
| size_t getSize() { return _size; } | | size_t getSize() { return _size; } | |
| std::deque<BSONObj>& getDeque() { return _deque; } | | std::deque<BSONObj>& getDeque() { return _deque; } | |
| void push_back(BSONObj& op) { | | void push_back(BSONObj& op) { | |
| _deque.push_back(op); | | _deque.push_back(op); | |
| | | | |
| skipping to change at line 72 | | skipping to change at line 98 | |
| // stop waiting and apply the queue we have. Only returns false if
!ops.empty(). | | // stop waiting and apply the queue we have. Only returns false if
!ops.empty(). | |
| bool tryPopAndWaitForMore(OpQueue* ops); | | bool tryPopAndWaitForMore(OpQueue* ops); | |
| | | | |
| // After ops have been written to db, call this | | // After ops have been written to db, call this | |
| // to update local oplog.rs, as well as notify the primary | | // to update local oplog.rs, as well as notify the primary | |
| // that we have applied the ops. | | // that we have applied the ops. | |
| // Ops are removed from the deque. | | // Ops are removed from the deque. | |
| void applyOpsToOplog(std::deque<BSONObj>* ops); | | void applyOpsToOplog(std::deque<BSONObj>* ops); | |
| | | | |
| protected: | | protected: | |
|
| static const unsigned int replBatchSizeBytes = 1024 * 1024 * 256 ; | | // Cap the batches using the limit on journal commits. | |
| | | // This works out to be 100 MB (64 bit) or 50 MB (32 bit) | |
| | | static const unsigned int replBatchLimitBytes = dur::UncommittedByt | |
| | | esLimit; | |
| | | static const int replBatchLimitSeconds = 1; | |
| | | static const unsigned int replBatchLimitOperations = 5000; | |
| | | | |
| // Prefetch and write a deque of operations, using the supplied fun
ction. | | // Prefetch and write a deque of operations, using the supplied fun
ction. | |
| // Initial Sync and Sync Tail each use a different function. | | // Initial Sync and Sync Tail each use a different function. | |
| void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyF
unc); | | void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyF
unc); | |
| | | | |
|
| | | // The version of the last op to be read | |
| | | int oplogVersion; | |
| | | | |
| private: | | private: | |
| BackgroundSyncInterface* _networkQueue; | | BackgroundSyncInterface* _networkQueue; | |
| | | | |
| // Doles out all the work to the reader pool threads and waits for
them to complete | | // Doles out all the work to the reader pool threads and waits for
them to complete | |
| void prefetchOps(const std::deque<BSONObj>& ops); | | void prefetchOps(const std::deque<BSONObj>& ops); | |
| // Used by the thread pool readers to prefetch an op | | // Used by the thread pool readers to prefetch an op | |
| static void prefetchOp(const BSONObj& op); | | static void prefetchOp(const BSONObj& op); | |
| | | | |
| // Doles out all the work to the writer pool threads and waits for
them to complete | | // Doles out all the work to the writer pool threads and waits for
them to complete | |
| void applyOps(const std::vector< std::vector<BSONObj> >& writerVect
ors, | | void applyOps(const std::vector< std::vector<BSONObj> >& writerVect
ors, | |
| MultiSyncApplyFunc applyFunc); | | MultiSyncApplyFunc applyFunc); | |
| | | | |
| void fillWriterVectors(const std::deque<BSONObj>& ops, | | void fillWriterVectors(const std::deque<BSONObj>& ops, | |
| std::vector< std::vector<BSONObj> >* writerV
ectors); | | std::vector< std::vector<BSONObj> >* writerV
ectors); | |
| void handleSlaveDelay(const BSONObj& op); | | void handleSlaveDelay(const BSONObj& op); | |
|
| | | void setOplogVersion(const BSONObj& op); | |
| }; | | }; | |
| | | | |
| /** | | /** | |
| * Initial clone and sync | | * Initial clone and sync | |
| */ | | */ | |
| class InitialSync : public SyncTail { | | class InitialSync : public SyncTail { | |
| public: | | public: | |
| virtual ~InitialSync(); | | virtual ~InitialSync(); | |
| InitialSync(BackgroundSyncInterface *q); | | InitialSync(BackgroundSyncInterface *q); | |
|
| void oplogApplication(const BSONObj& applyGTEObj, const BSONObj& mi | | | |
| nValidObj); | | /** | |
| | | * Creates the initial oplog entry: applies applyGTEObj and writes | |
| | | it to the oplog. Then | |
| | | * this runs oplogApplySegment allowing recloning documents. | |
| | | */ | |
| | | BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& | |
| | | minValidObj); | |
| }; | | }; | |
| | | | |
| // TODO: move hbmsg into an error-keeping class (SERVER-4444) | | // TODO: move hbmsg into an error-keeping class (SERVER-4444) | |
| void sethbmsg(const string& s, const int logLevel=0); | | void sethbmsg(const string& s, const int logLevel=0); | |
| | | | |
| // These free functions are used by the thread pool workers to write op
s to the db. | | // These free functions are used by the thread pool workers to write op
s to the db. | |
| void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); | | void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); | |
| void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* s
t); | | void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* s
t); | |
| | | | |
| } // namespace replset | | } // namespace replset | |
| | | | |
End of changes. 6 change blocks. |
| 3 lines changed or deleted | | 55 lines changed or added | |
|
| syncclusterconnection.h | | syncclusterconnection.h | |
| | | | |
| skipping to change at line 93 | | skipping to change at line 93 | |
| virtual bool call( Message &toSend, Message &response, bool assertO
k , string * actualServer ); | | virtual bool call( Message &toSend, Message &response, bool assertO
k , string * actualServer ); | |
| virtual void say( Message &toSend, bool isRetry = false , string *
actualServer = 0 ); | | virtual void say( Message &toSend, bool isRetry = false , string *
actualServer = 0 ); | |
| virtual void sayPiggyBack( Message &toSend ); | | virtual void sayPiggyBack( Message &toSend ); | |
| | | | |
| virtual void killCursor( long long cursorID ); | | virtual void killCursor( long long cursorID ); | |
| | | | |
| virtual string getServerAddress() const { return _address; } | | virtual string getServerAddress() const { return _address; } | |
| virtual bool isFailed() const { return false; } | | virtual bool isFailed() const { return false; } | |
| virtual string toString() { return _toString(); } | | virtual string toString() { return _toString(); } | |
| | | | |
|
| | | virtual BSONObj getLastErrorDetailed(const std::string& db, | |
| | | bool fsync=false, | |
| | | bool j=false, | |
| | | int w=0, | |
| | | int wtimeout=0); | |
| virtual BSONObj getLastErrorDetailed(bool fsync=false, bool j=false
, int w=0, int wtimeout=0); | | virtual BSONObj getLastErrorDetailed(bool fsync=false, bool j=false
, int w=0, int wtimeout=0); | |
| | | | |
| virtual bool callRead( Message& toSend , Message& response ); | | virtual bool callRead( Message& toSend , Message& response ); | |
| | | | |
| virtual ConnectionString::ConnectionType type() const { return Conn
ectionString::SYNC; } | | virtual ConnectionString::ConnectionType type() const { return Conn
ectionString::SYNC; } | |
| | | | |
| void setAllSoTimeouts( double socketTimeout ); | | void setAllSoTimeouts( double socketTimeout ); | |
| double getSoTimeout() const { return _socketTimeout; } | | double getSoTimeout() const { return _socketTimeout; } | |
| | | | |
| virtual bool auth(const string &dbname, const string &username, con
st string &password_text, string& errmsg, bool digestPassword, Auth::Level*
level=NULL); | | virtual bool auth(const string &dbname, const string &username, con
st string &password_text, string& errmsg, bool digestPassword, Auth::Level*
level=NULL); | |
| | | | |
End of changes. 1 change blocks. |
| 0 lines changed or deleted | | 5 lines changed or added | |
|
| update.h | | update.h | |
| | | | |
| skipping to change at line 61 | | skipping to change at line 61 | |
| UpdateResult updateObjects(const char* ns, | | UpdateResult updateObjects(const char* ns, | |
| const BSONObj& updateobj, | | const BSONObj& updateobj, | |
| const BSONObj& pattern, | | const BSONObj& pattern, | |
| bool upsert, | | bool upsert, | |
| bool multi, | | bool multi, | |
| bool logop, | | bool logop, | |
| OpDebug& debug, | | OpDebug& debug, | |
| bool fromMigrate = false, | | bool fromMigrate = false, | |
| const QueryPlanSelectionPolicy& planPolicy =
QueryPlanSelectionPolicy::any()); | | const QueryPlanSelectionPolicy& planPolicy =
QueryPlanSelectionPolicy::any()); | |
| | | | |
|
| | | /* | |
| | | * Similar to updateObjects but not strict about applying mods that can | |
| | | fail during initial | |
| | | * replication. | |
| | | * | |
| | | * Reference ticket: SERVER-4781 | |
| | | */ | |
| | | UpdateResult updateObjectsForReplication(const char* ns, | |
| | | const BSONObj& updateobj, | |
| | | const BSONObj& pattern, | |
| | | bool upsert, | |
| | | bool multi, | |
| | | bool logop, | |
| | | OpDebug& debug, | |
| | | bool fromMigrate = false, | |
| | | const QueryPlanSelectionPolicy | |
| | | & planPolicy = | |
| | | QueryPlanSelectionPolicy:: | |
| | | any()); | |
| | | | |
| UpdateResult _updateObjects(bool su, | | UpdateResult _updateObjects(bool su, | |
| const char* ns, | | const char* ns, | |
| const BSONObj& updateobj, | | const BSONObj& updateobj, | |
| const BSONObj& pattern, | | const BSONObj& pattern, | |
| bool upsert, | | bool upsert, | |
| bool multi, | | bool multi, | |
| bool logop, | | bool logop, | |
| OpDebug& debug, | | OpDebug& debug, | |
| RemoveSaver* rs = 0, | | RemoveSaver* rs = 0, | |
| bool fromMigrate = false, | | bool fromMigrate = false, | |
|
| const QueryPlanSelectionPolicy& planPolicy | | const QueryPlanSelectionPolicy& planPolicy | |
| = QueryPlanSelectionPolicy::any()); | | = QueryPlanSelectionPolicy::any(), | |
| | | bool forReplication = false); | |
| | | | |
| /** | | /** | |
| * takes the from document and returns a new document | | * takes the from document and returns a new document | |
| * after apply all the operators | | * after apply all the operators | |
| * e.g. | | * e.g. | |
| * applyUpdateOperators( BSON( "x" << 1 ) , BSON( "$inc" << BSON( "x"
<< 1 ) ) ); | | * applyUpdateOperators( BSON( "x" << 1 ) , BSON( "$inc" << BSON( "x"
<< 1 ) ) ); | |
| * returns: { x : 2 } | | * returns: { x : 2 } | |
| */ | | */ | |
| BSONObj applyUpdateOperators( const BSONObj& from, const BSONObj& opera
tors ); | | BSONObj applyUpdateOperators( const BSONObj& from, const BSONObj& opera
tors ); | |
| | | | |
| | | | |
End of changes. 2 change blocks. |
| 2 lines changed or deleted | | 23 lines changed or added | |
|
| update_internal.h | | update_internal.h | |
| | | | |
| skipping to change at line 48 | | skipping to change at line 48 | |
| // See opFromStr below | | // See opFromStr below | |
| // 0 1 2 3 4 5 6 7 8
9 10 11 12 13 | | // 0 1 2 3 4 5 6 7 8
9 10 11 12 13 | |
| enum Op { INC, SET, PUSH, PUSH_ALL, PULL, PULL_ALL , POP, UNSET, BI
TAND, BITOR , BIT , ADDTOSET, RENAME_FROM, RENAME_TO } op; | | enum Op { INC, SET, PUSH, PUSH_ALL, PULL, PULL_ALL , POP, UNSET, BI
TAND, BITOR , BIT , ADDTOSET, RENAME_FROM, RENAME_TO } op; | |
| | | | |
| static const char* modNames[]; | | static const char* modNames[]; | |
| static unsigned modNamesNum; | | static unsigned modNamesNum; | |
| | | | |
| const char* fieldName; | | const char* fieldName; | |
| const char* shortFieldName; | | const char* shortFieldName; | |
| | | | |
|
| | | // Determines if this mod must absoluetly be applied. In some repli | |
| | | cation scenarios, a | |
| | | // failed apply of a mod does not constitute an error. In those cas | |
| | | es, setting strict | |
| | | // to off would not throw errors. | |
| | | bool strictApply; | |
| | | | |
| BSONElement elt; // x:5 note: this is the actual element from the u
pdateobj | | BSONElement elt; // x:5 note: this is the actual element from the u
pdateobj | |
| boost::shared_ptr<Matcher> matcher; | | boost::shared_ptr<Matcher> matcher; | |
| bool matcherOnPrimitive; | | bool matcherOnPrimitive; | |
| | | | |
|
| void init( Op o , BSONElement& e ) { | | void init( Op o , BSONElement& e , bool forReplication ) { | |
| op = o; | | op = o; | |
| elt = e; | | elt = e; | |
|
| | | strictApply = !forReplication; | |
| if ( op == PULL && e.type() == Object ) { | | if ( op == PULL && e.type() == Object ) { | |
| BSONObj t = e.embeddedObject(); | | BSONObj t = e.embeddedObject(); | |
| if ( t.firstElement().getGtLtOp() == 0 ) { | | if ( t.firstElement().getGtLtOp() == 0 ) { | |
| matcher.reset( new Matcher( t ) ); | | matcher.reset( new Matcher( t ) ); | |
| matcherOnPrimitive = false; | | matcherOnPrimitive = false; | |
| } | | } | |
| else { | | else { | |
| matcher.reset( new Matcher( BSON( "" << t ) ) ); | | matcher.reset( new Matcher( BSON( "" << t ) ) ); | |
| matcherOnPrimitive = true; | | matcherOnPrimitive = true; | |
| } | | } | |
| | | | |
| skipping to change at line 334 | | skipping to change at line 340 | |
| if ( m.isIndexed( idxKeys ) || | | if ( m.isIndexed( idxKeys ) || | |
| (backgroundKeys && m.isIndexed(*backgroundKeys)) ) { | | (backgroundKeys && m.isIndexed(*backgroundKeys)) ) { | |
| _isIndexed++; | | _isIndexed++; | |
| } | | } | |
| } | | } | |
| | | | |
| public: | | public: | |
| | | | |
| ModSet( const BSONObj& from, | | ModSet( const BSONObj& from, | |
| const set<string>& idxKeys = set<string>(), | | const set<string>& idxKeys = set<string>(), | |
|
| const set<string>* backgroundKeys = 0 ); | | const set<string>* backgroundKeys = 0, | |
| | | bool forReplication = false ); | |
| | | | |
| /** | | /** | |
| * re-check if this mod is impacted by indexes | | * re-check if this mod is impacted by indexes | |
| */ | | */ | |
| void updateIsIndexed( const set<string>& idxKeys, const set<string>
* backgroundKeys ); | | void updateIsIndexed( const set<string>& idxKeys, const set<string>
* backgroundKeys ); | |
| | | | |
| // TODO: this is inefficient - should probably just handle when ite
rating | | // TODO: this is inefficient - should probably just handle when ite
rating | |
| ModSet * fixDynamicArray( const string& elemMatchKey ) const; | | ModSet * fixDynamicArray( const string& elemMatchKey ) const; | |
| | | | |
| bool hasDynamicArray() const { return _hasDynamicArray; } | | bool hasDynamicArray() const { return _hasDynamicArray; } | |
| | | | |
| skipping to change at line 400 | | skipping to change at line 407 | |
| */ | | */ | |
| class ModState : boost::noncopyable { | | class ModState : boost::noncopyable { | |
| public: | | public: | |
| const Mod* m; | | const Mod* m; | |
| BSONElement old; | | BSONElement old; | |
| BSONElement newVal; | | BSONElement newVal; | |
| BSONObj _objData; | | BSONObj _objData; | |
| | | | |
| const char* fixedOpName; | | const char* fixedOpName; | |
| BSONElement* fixed; | | BSONElement* fixed; | |
|
| int pushStartSize; | | BSONArray fixedArray; | |
| | | bool forceEmptyArray; | |
| | | bool forcePositional; | |
| | | int position; | |
| | | int DEPRECATED_pushStartSize; | |
| | | | |
| BSONType incType; | | BSONType incType; | |
| int incint; | | int incint; | |
| double incdouble; | | double incdouble; | |
| long long inclong; | | long long inclong; | |
| | | | |
| bool dontApply; | | bool dontApply; | |
| | | | |
| ModState() { | | ModState() { | |
| fixedOpName = 0; | | fixedOpName = 0; | |
| fixed = 0; | | fixed = 0; | |
|
| pushStartSize = -1; | | forceEmptyArray = false; | |
| | | forcePositional = false; | |
| | | position = 0; | |
| | | DEPRECATED_pushStartSize = -1; | |
| incType = EOO; | | incType = EOO; | |
| dontApply = false; | | dontApply = false; | |
| } | | } | |
| | | | |
| Mod::Op op() const { | | Mod::Op op() const { | |
| return m->op; | | return m->op; | |
| } | | } | |
| | | | |
| const char* fieldName() const { | | const char* fieldName() const { | |
| return m->fieldName; | | return m->fieldName; | |
| } | | } | |
| | | | |
|
| bool needOpLogRewrite() const { | | bool DEPRECATED_needOpLogRewrite() const { | |
| if ( dontApply ) | | if ( dontApply ) | |
| return false; | | return false; | |
| | | | |
| if ( fixed || fixedOpName || incType ) | | if ( fixed || fixedOpName || incType ) | |
| return true; | | return true; | |
| | | | |
| switch( op() ) { | | switch( op() ) { | |
| case Mod::RENAME_FROM: | | case Mod::RENAME_FROM: | |
| case Mod::RENAME_TO: | | case Mod::RENAME_TO: | |
| return true; | | return true; | |
| case Mod::BIT: | | case Mod::BIT: | |
| case Mod::BITAND: | | case Mod::BITAND: | |
| case Mod::BITOR: | | case Mod::BITOR: | |
|
| // TODO: should we convert this to $set? | | return true; | |
| return false; | | | |
| default: | | default: | |
| return false; | | return false; | |
| } | | } | |
| } | | } | |
| | | | |
| void appendForOpLog( BSONObjBuilder& b ) const; | | void appendForOpLog( BSONObjBuilder& b ) const; | |
| | | | |
| void apply( BSONBuilderBase& b , BSONElement in ) { | | void apply( BSONBuilderBase& b , BSONElement in ) { | |
| m->apply( b , in , *this ); | | m->apply( b , in , *this ); | |
| } | | } | |
| | | | |
| skipping to change at line 519 | | skipping to change at line 532 | |
| if ( ms.dontApply ) { | | if ( ms.dontApply ) { | |
| return; | | return; | |
| } | | } | |
| | | | |
| //const Mod& m = *(ms.m); // HACK | | //const Mod& m = *(ms.m); // HACK | |
| Mod& m = *((Mod*)(ms.m)); // HACK | | Mod& m = *((Mod*)(ms.m)); // HACK | |
| | | | |
| switch ( m.op ) { | | switch ( m.op ) { | |
| | | | |
| case Mod::PUSH: { | | case Mod::PUSH: { | |
|
| | | ms.fixedOpName = "$set"; | |
| if ( m.isEach() ) { | | if ( m.isEach() ) { | |
|
| b.appendArray( m.shortFieldName, m.getEach() ); | | BSONObj arr = m.getEach(); | |
| | | b.appendArray( m.shortFieldName, arr ); | |
| | | ms.forceEmptyArray = true; | |
| | | ms.fixedArray = BSONArray(arr.getOwned()); | |
| } else { | | } else { | |
| BSONObjBuilder arr( b.subarrayStart( m.shortFieldName )
); | | BSONObjBuilder arr( b.subarrayStart( m.shortFieldName )
); | |
| arr.appendAs( m.elt, "0" ); | | arr.appendAs( m.elt, "0" ); | |
|
| arr.done(); | | ms.forceEmptyArray = true; | |
| | | ms.fixedArray = BSONArray(arr.done().getOwned()); | |
| } | | } | |
| break; | | break; | |
| } | | } | |
|
| | | | |
| case Mod::ADDTOSET: { | | case Mod::ADDTOSET: { | |
|
| | | ms.fixedOpName = "$set"; | |
| if ( m.isEach() ) { | | if ( m.isEach() ) { | |
| // Remove any duplicates in given array | | // Remove any duplicates in given array | |
|
| BSONObjBuilder arr( b.subarrayStart( m.shortFieldName )
); | | BSONArrayBuilder arr( b.subarrayStart( m.shortFieldName
) ); | |
| BSONElementSet toadd; | | BSONElementSet toadd; | |
| m.parseEach( toadd ); | | m.parseEach( toadd ); | |
| BSONObjIterator i( m.getEach() ); | | BSONObjIterator i( m.getEach() ); | |
|
| int n = 0; | | // int n = 0; | |
| while ( i.more() ) { | | while ( i.more() ) { | |
| BSONElement e = i.next(); | | BSONElement e = i.next(); | |
| if ( toadd.count(e) ) { | | if ( toadd.count(e) ) { | |
|
| arr.appendAs( e , BSONObjBuilder::numStr( n++ )
); | | arr.append( e ); | |
| toadd.erase( e ); | | toadd.erase( e ); | |
| } | | } | |
| } | | } | |
|
| arr.done(); | | ms.forceEmptyArray = true; | |
| | | ms.fixedArray = BSONArray(arr.done().getOwned()); | |
| } | | } | |
| else { | | else { | |
|
| BSONObjBuilder arr( b.subarrayStart( m.shortFieldName ) | | BSONArrayBuilder arr( b.subarrayStart( m.shortFieldName | |
| ); | | ) ); | |
| arr.appendAs( m.elt, "0" ); | | arr.append( m.elt ); | |
| arr.done(); | | ms.forceEmptyArray = true; | |
| | | ms.fixedArray = BSONArray(arr.done().getOwned()); | |
| } | | } | |
| break; | | break; | |
| } | | } | |
| | | | |
| case Mod::PUSH_ALL: { | | case Mod::PUSH_ALL: { | |
| b.appendAs( m.elt, m.shortFieldName ); | | b.appendAs( m.elt, m.shortFieldName ); | |
|
| | | ms.fixedOpName = "$set"; | |
| | | ms.forceEmptyArray = true; | |
| | | ms.fixedArray = BSONArray(m.elt.Obj()); | |
| break; | | break; | |
| } | | } | |
| | | | |
|
| case Mod::UNSET: | | case Mod::POP: | |
| case Mod::PULL: | | case Mod::PULL: | |
| case Mod::PULL_ALL: | | case Mod::PULL_ALL: | |
|
| // no-op b/c unset/pull of nothing does nothing | | case Mod::UNSET: | |
| | | // No-op b/c unset/pull of nothing does nothing. Still, exp | |
| | | licilty log that | |
| | | // the target array was reset. | |
| | | ms.fixedOpName = "$unset"; | |
| break; | | break; | |
| | | | |
| case Mod::INC: | | case Mod::INC: | |
| ms.fixedOpName = "$set"; | | ms.fixedOpName = "$set"; | |
| case Mod::SET: { | | case Mod::SET: { | |
| m._checkForAppending( m.elt ); | | m._checkForAppending( m.elt ); | |
| b.appendAs( m.elt, m.shortFieldName ); | | b.appendAs( m.elt, m.shortFieldName ); | |
| break; | | break; | |
| } | | } | |
|
| | | | |
| // shouldn't see RENAME_FROM here | | // shouldn't see RENAME_FROM here | |
| case Mod::RENAME_TO: | | case Mod::RENAME_TO: | |
| ms.handleRename( b, m.shortFieldName ); | | ms.handleRename( b, m.shortFieldName ); | |
| break; | | break; | |
|
| | | | |
| default: | | default: | |
| stringstream ss; | | stringstream ss; | |
| ss << "unknown mod in appendNewFromMod: " << m.op; | | ss << "unknown mod in appendNewFromMod: " << m.op; | |
| throw UserException( 9015, ss.str() ); | | throw UserException( 9015, ss.str() ); | |
| } | | } | |
| | | | |
| } | | } | |
| | | | |
| /** @return true iff the elements aren't eoo(), are distinct, and s
hare a field name. */ | | /** @return true iff the elements aren't eoo(), are distinct, and s
hare a field name. */ | |
| static bool duplicateFieldName( const BSONElement& a, const BSONEle
ment& b ); | | static bool duplicateFieldName( const BSONElement& a, const BSONEle
ment& b ); | |
| | | | |
| skipping to change at line 602 | | skipping to change at line 632 | |
| /** | | /** | |
| * modified underlying _obj | | * modified underlying _obj | |
| * @param isOnDisk - true means this is an on disk object, and this
update needs to be made durable | | * @param isOnDisk - true means this is an on disk object, and this
update needs to be made durable | |
| */ | | */ | |
| void applyModsInPlace( bool isOnDisk ); | | void applyModsInPlace( bool isOnDisk ); | |
| | | | |
| BSONObj createNewFromMods(); | | BSONObj createNewFromMods(); | |
| | | | |
| // re-writing for oplog | | // re-writing for oplog | |
| | | | |
|
| bool needOpLogRewrite() const { | | bool DEPRECATED_needOpLogRewrite() const { | |
| for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) | | for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) | |
|
| if ( i->second->needOpLogRewrite() ) | | if ( i->second->DEPRECATED_needOpLogRewrite() ) | |
| return true; | | return true; | |
| return false; | | return false; | |
| } | | } | |
| | | | |
| BSONObj getOpLogRewrite() const { | | BSONObj getOpLogRewrite() const { | |
| BSONObjBuilder b; | | BSONObjBuilder b; | |
| for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) | | for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) | |
| i->second->appendForOpLog( b ); | | i->second->appendForOpLog( b ); | |
| return b.obj(); | | return b.obj(); | |
| } | | } | |
| | | | |
|
| bool haveArrayDepMod() const { | | bool DEPRECATED_haveArrayDepMod() const { | |
| for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) | | for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) | |
| if ( i->second->m->arrayDep() ) | | if ( i->second->m->arrayDep() ) | |
| return true; | | return true; | |
| return false; | | return false; | |
| } | | } | |
| | | | |
|
| void appendSizeSpecForArrayDepMods( BSONObjBuilder& b ) const { | | void DEPRECATED_appendSizeSpecForArrayDepMods( BSONObjBuilder& b )
const { | |
| for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) { | | for ( ModStateHolder::const_iterator i = _mods.begin(); i != _m
ods.end(); i++ ) { | |
| const ModState& m = *i->second; | | const ModState& m = *i->second; | |
| if ( m.m->arrayDep() ) { | | if ( m.m->arrayDep() ) { | |
|
| if ( m.pushStartSize == -1 ) | | if ( m.DEPRECATED_pushStartSize == -1 ) | |
| b.appendNull( m.fieldName() ); | | b.appendNull( m.fieldName() ); | |
| else | | else | |
|
| b << m.fieldName() << BSON( "$size" << m.pushStartS
ize ); | | b << m.fieldName() << BSON( "$size" << m.DEPRECATED
_pushStartSize ); | |
| } | | } | |
| } | | } | |
| } | | } | |
| | | | |
| string toString() const; | | string toString() const; | |
| | | | |
| friend class ModSet; | | friend class ModSet; | |
| }; | | }; | |
| | | | |
| } // namespace mongo | | } // namespace mongo | |
| | | | |
End of changes. 29 change blocks. |
| 25 lines changed or deleted | | 58 lines changed or added | |
|