/usr/include/mongo/s/chunk_diff.hpp is in mongodb-dev 1:2.4.9-1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 | // @file chunk_diff_impl.hpp
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "mongo/s/chunk_diff.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/type_chunk.h"
namespace mongo {
template < class ValType, class ShardType >
bool ConfigDiffTracker<ValType,ShardType>::
isOverlapping( const BSONObj& min, const BSONObj& max )
{
RangeOverlap overlap = overlappingRange( min, max );
return overlap.first != overlap.second;
}
template < class ValType, class ShardType >
void ConfigDiffTracker<ValType,ShardType>::
removeOverlapping( const BSONObj& min, const BSONObj& max )
{
verifyAttached();
RangeOverlap overlap = overlappingRange( min, max );
_currMap->erase( overlap.first, overlap.second );
}
template < class ValType, class ShardType >
typename ConfigDiffTracker<ValType,ShardType>::RangeOverlap ConfigDiffTracker<ValType,ShardType>::
overlappingRange( const BSONObj& min, const BSONObj& max )
{
verifyAttached();
typename RangeMap::iterator low;
typename RangeMap::iterator high;
if( isMinKeyIndexed() ){
// Returns the first chunk with a min key that is >= min - implies the
// previous chunk cannot overlap min
low = _currMap->lower_bound( min );
// Returns the first chunk with a min key that is >= max - implies the
// chunk does not overlap max
high = _currMap->lower_bound( max );
}
else{
// Returns the first chunk with a max key that is > min - implies that
// the chunk overlaps min
low = _currMap->upper_bound( min );
// Returns the first chunk with a max key that is > max - implies that
// the next chunk cannot not overlap max
high = _currMap->upper_bound( max );
}
return RangeOverlap( low, high );
}
template < class ValType, class ShardType >
int ConfigDiffTracker<ValType,ShardType>::
calculateConfigDiff( string config,
const set<ChunkVersion>& extraMinorVersions )
{
verifyAttached();
// Get the diff query required
Query diffQuery = configDiffQuery( extraMinorVersions );
scoped_ptr<ScopedDbConnection> conn(
ScopedDbConnection::getInternalScopedDbConnection( config ) );
try {
// Open a cursor for the diff chunks
auto_ptr<DBClientCursor> cursor = conn->get()->query(
ChunkType::ConfigNS, diffQuery, 0, 0, 0, 0, ( DEBUG_BUILD ? 2 : 1000000 ) );
verify( cursor.get() );
int diff = calculateConfigDiff( *cursor.get() );
conn->done();
return diff;
}
catch( DBException& e ){
// Should only happen on connection errors
e.addContext( str::stream() << "could not calculate config difference for ns " << _ns << " on " << config );
throw;
}
}
template < class ValType, class ShardType >
int ConfigDiffTracker<ValType,ShardType>::
calculateConfigDiff( DBClientCursorInterface& diffCursor )
{
verifyAttached();
// Apply the chunk changes to the ranges and versions
//
// Overall idea here is to work in two steps :
// 1. For all the new chunks we find, increment the maximum version per-shard and
// per-collection, and remove any conflicting chunks from the ranges
// 2. For all the new chunks we're interested in (all of them for mongos, just chunks on the
// shard for mongod) add them to the ranges
//
vector<BSONObj> newTracked;
// Store epoch now so it doesn't change when we change max
OID currEpoch = _maxVersion->epoch();
_validDiffs = 0;
while( diffCursor.more() ){
BSONObj diffChunkDoc = diffCursor.next();
ChunkVersion chunkVersion = ChunkVersion::fromBSON(diffChunkDoc, ChunkType::DEPRECATED_lastmod());
if( diffChunkDoc[ChunkType::min()].type() != Object ||
diffChunkDoc[ChunkType::max()].type() != Object ||
diffChunkDoc[ChunkType::shard()].type() != String )
{
warning() << "got invalid chunk document " << diffChunkDoc
<< " when trying to load differing chunks" << endl;
continue;
}
if( ! chunkVersion.isSet() || ! chunkVersion.hasCompatibleEpoch( currEpoch ) ){
warning() << "got invalid chunk version " << chunkVersion << " in document " << diffChunkDoc
<< " when trying to load differing chunks at version "
<< ChunkVersion( _maxVersion->toLong(), currEpoch ) << endl;
// Don't keep loading, since we know we'll be broken here
return -1;
}
_validDiffs++;
// Get max changed version and chunk version
if( chunkVersion > *_maxVersion ) *_maxVersion = chunkVersion;
// Chunk version changes
ShardType shard = shardFor( diffChunkDoc[ChunkType::shard()].String() );
typename map<ShardType, ChunkVersion>::iterator shardVersionIt = _maxShardVersions->find( shard );
if( shardVersionIt == _maxShardVersions->end() || shardVersionIt->second < chunkVersion ){
(*_maxShardVersions)[ shard ] = chunkVersion;
}
// See if we need to remove any chunks we are currently tracking b/c of this chunk's changes
removeOverlapping(diffChunkDoc[ChunkType::min()].Obj(),
diffChunkDoc[ChunkType::max()].Obj());
// Figure out which of the new chunks we need to track
// Important - we need to actually own this doc, in case the cursor decides to getMore or unbuffer
if( isTracked( diffChunkDoc ) ) newTracked.push_back( diffChunkDoc.getOwned() );
}
LOG(3) << "found " << _validDiffs
<< " new chunks for collection " << _ns
<< " (tracking " << newTracked.size()
<< "), new version is " << *_maxVersion
<< endl;
for( vector<BSONObj>::iterator it = newTracked.begin(); it != newTracked.end(); it++ ){
BSONObj chunkDoc = *it;
// Important - we need to make sure we actually own the min and max here
BSONObj min = chunkDoc[ChunkType::min()].Obj().getOwned();
BSONObj max = chunkDoc[ChunkType::max()].Obj().getOwned();
// Invariant enforced by sharding
// It's possible to read inconsistent state b/c of getMore() and yielding, so we want
// to detect as early as possible.
// TODO: This checks for overlap, we also should check for holes here iff we're tracking
// all chunks
if( isOverlapping( min, max ) ) return -1;
_currMap->insert( rangeFor( chunkDoc, min, max ) );
}
return _validDiffs;
}
template < class ValType, class ShardType >
Query ConfigDiffTracker<ValType,ShardType>::
configDiffQuery( const set<ChunkVersion>& extraMinorVersions ) const
{
verifyAttached();
//
// Basic idea behind the query is to find all the chunks $gt the current max version, and
// then also update chunks that we need minor versions - splits and (2.0) max chunks on
// shards
//
static const int maxMinorVersionClauses = 50;
BSONObjBuilder queryB;
int numStaleMinorClauses = extraMinorVersions.size() + _maxShardVersions->size();
#ifdef _DEBUG
// In debug builds, randomly trigger full reloads to exercise both codepaths
if( rand() % 2 ) numStaleMinorClauses = maxMinorVersionClauses;
#endif
queryB.append(ChunkType::ns(), _ns);
//
// If we have only a few minor versions to refresh, we can be more selective in our query
//
if( numStaleMinorClauses < maxMinorVersionClauses ){
//
// Get any version changes higher than we know currently
//
BSONArrayBuilder queryOrB( queryB.subarrayStart( "$or" ) );
{
BSONObjBuilder queryNewB( queryOrB.subobjStart() );
{
BSONObjBuilder ts(queryNewB.subobjStart(ChunkType::DEPRECATED_lastmod()));
// We should *always* pull at least a single chunk back, this lets us quickly
// detect if our collection was unsharded (and most of the time if it was
// resharded) in the meantime
ts.appendTimestamp( "$gte", _maxVersion->toLong() );
ts.done();
}
queryNewB.done();
}
// Get any shard version changes higher than we know currently
// Needed since there could have been a split of the max version chunk of any shard
// TODO: Ideally, we shouldn't care about these
for( typename map<ShardType, ChunkVersion>::const_iterator it = _maxShardVersions->begin(); it != _maxShardVersions->end(); it++ ){
BSONObjBuilder queryShardB( queryOrB.subobjStart() );
queryShardB.append(ChunkType::shard(), nameFrom( it->first ) );
{
BSONObjBuilder ts(queryShardB.subobjStart(ChunkType::DEPRECATED_lastmod()));
ts.appendTimestamp( "$gt", it->second.toLong() );
ts.done();
}
queryShardB.done();
}
// Get any minor version changes we've marked as interesting
// TODO: Ideally we shouldn't care about these
for( set<ChunkVersion>::const_iterator it = extraMinorVersions.begin(); it != extraMinorVersions.end(); it++ ){
BSONObjBuilder queryShardB( queryOrB.subobjStart() );
{
BSONObjBuilder ts(queryShardB.subobjStart(ChunkType::DEPRECATED_lastmod()));
ts.appendTimestamp( "$gt", it->toLong() );
ts.appendTimestamp( "$lt",
ChunkVersion( it->majorVersion() + 1, 0, OID() ).toLong() );
ts.done();
}
queryShardB.done();
}
queryOrB.done();
}
BSONObj query = queryB.obj();
//
// NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, TO HANDLE
// CURSOR YIELDING BETWEEN CHUNKS BEING MIGRATED.
//
// This ensures that changes to chunk version (which will always be higher) will always
// come *after* our current position in the chunk cursor.
//
Query queryObj(query);
queryObj.sort(BSON( "lastmod" << 1 ));
LOG(2) << "major version query from " << *_maxVersion << " and over "
<< _maxShardVersions->size() << " shards is " << queryObj << endl;
return queryObj;
}
} // namespace mongo
|