Mongodb源码分析--插入记录及索引B树构建

>

  在之前的一篇文章 中,介绍了assembleResponse函数(位于instance.cpp第224行),它会根据op操作枚举类型来调用相应的crud操作,枚举类型定义如下:

     
enum
 Operations {
        opReply 

=
 
1
,     
/*
 reply. responseTo is set. 
*/

        dbMsg 

=
 
1000
,    
/*
 generic msg command followed by a string 
*/

        dbUpdate 

=
 
2001

/*
 update object 
*/

        dbInsert 

=
 
2002
,
        

//
dbGetByOID = 2003,


        dbQuery 
=
 
2004
,
        dbGetMore 

=
 
2005
,
        dbDelete 

=
 
2006
,
        dbKillCursors 

=
 
2007

    };

    可以看到dbInsert = 2002 为插入操作枚举值,下面我们看一下assembleResponse在确定是插入操作时调用的方法,如下:

assembleResponse( Message 
&
m, DbResponse 
&
dbresponse, 
const
 SockAddr 
&
client ) {
    …..
            

try
 {
                

if
 ( op 
==
 dbInsert ) {  
//
添加记录操作


                    receivedInsert(m, currentOp);
                }
                

else
 
if
 ( op 
==
 dbUpdate ) { 
//
更新记录


                    receivedUpdate(m, currentOp);
                }
                

else
 
if
 ( op 
==
 dbDelete ) { 
//
删除记录


                    receivedDelete(m, currentOp);
                }
                

else
 
if
 ( op 
==
 dbKillCursors ) { 
//
删除Cursors(游标)对象


                    currentOp.ensureStarted();
                    logThreshold 

=
 
10
;
                    ss 

<<
 

killcursors 

;
                    receivedKillCursors(m);
                }
                

else
 {
                    mongo::log() 

<<
 

    operation isn’t supported: 

 
<<
 op 
<<
 endl;
                    currentOp.done();
                    log 

=
 
true
;
                }
            }
          …..
        }
    }

        从上面代码可以看出,系统在确定dbInsert操作时,调用了receivedInsert()方法(位于instance.cpp文件第570行),下面是该方法的定义:

void
 receivedInsert(Message
&
 m, CurOp
&
 op) {
        DbMessage d(m);

//
初始化数据库格式的消息


        
const
 
char
 
*
ns 
=
 d.getns();
//
获取名空间,用于接下来insert数据


        assert(
*
ns);
        uassert( 

10058
 ,  

not master

, isMasterNs( ns ) );
        op.debug().str 

<<
 ns;

        writelock lk(ns);
//
声明写锁


        
        

if
 ( handlePossibleShardedMessage( m , 
0
 ) )
//
查看是不是sharding信息,如果是则处理


            
return
;

        Client::Context ctx(ns);
        
int
 n 
=
 
0
;
        

while
 ( d.moreJSObjs() ) { 
//
循环获取当前消息体中的BSONObj数据(数据库记录)


            BSONObj js 
=
 d.nextJsObj();
            uassert( 

10059
 , 

object to insert too large

, js.objsize() 
<=
 BSONObjMaxUserSize);
            {
                

//
 声明BSONObj迭代器,以查看里面元素是否有更新操作,如set inc push pull 等


                BSONObjIterator i( js );
                

while
 ( i.more() ) {
                    BSONElement e 

=
 i.next();
                    uassert( 

13511
 , 

object to insert can’t have $ modifiers

 , e.fieldName()[
0

!=
 

$

 );
                }
            }
            

//
插入记录操作,god = false用于标识当前BSONObj对象为有效数据


            theDataFileMgr.insertWithObjMod(ns, js, 
false
);
            logOp(


i

, ns, js);
//
日志操作,包括master状态下及sharding分片情况



            

if

++

%
 
4
 
==
 
0
 ) {
                

//
 在插入一些数据后,进行持久化操作,有关持久化部分参见我的这篇文章
                

//
 
http://www.cnblogs.com/daizhj/archive/2011/03/21/1990344.html


                getDur().commitIfNeeded();
            }
        }
        globalOpCounters.incInsertInWriteLock(n);

//
在写锁环境下添加已插入记录数(n),锁采用InterlockedIncrement实现数的原子性


    }

      上面的方法中,主要是在“写锁”环境下执行插入数据操作,并且在插入记录之前进行简单的数据对象检查,如长度和插入数据是否被修改,以确保数据的最终有效性。
      最终上面代码会调用 insertWithObjMod()方法(位于pdfile.cpp 文件第1432行),该方法定义如下:

   DiskLoc DataFileMgr::insertWithObjMod(
const
 
char
 
*
ns, BSONObj 
&
o, 
bool
 god) {
        DiskLoc loc 

=
 insert( ns, o.objdata(), o.objsize(), god );
        

if
 ( 
!
loc.isNull() )
//
判断返回记录地址是否为空(记录是否插入成功)


            o 
=
 BSONObj( loc.rec() );
//
如有效,则用记录地地址上的记录(record类型指针)绑定到o上


        
return
 loc;
   }

      该方法只是一个对插入操作及返回结果的封装,其中ns为数据对象的名空间,o就是要插入的数据对象(BSONObj),god用于标识当前BSONObj 对象是否为有效数据(false=有效),这里之所以要传入god这个参数,是因为在接下来的insert方法里同时支持添加名空间(及索引)和插入记录 操作(都会不断调用该方法),而在添加名空间时god=true。
    
     下面我们看一下insert方法(pdfile.cpp 第1467行),因为其内容较长,请详见注释:

DiskLoc DataFileMgr::insert(
const
 
char
 
*
ns, 
const
 
void
 
*
obuf, 
int
 len, 
bool
 god, 
const
 BSONElement 
&
writeId, 
bool
 mayAddIndex) {
        

bool
 wouldAddIndex 
=
 
false
;
        massert( 

10093
 , 

cannot insert into reserved $ collection

, god 
||
 isANormalNSName( ns ) );
        uassert( 

10094
 , str::stream() 
<<
 

invalid ns: 

 
<<
 ns , isValidNS( ns ) );
        

const
 
char
 
*
sys 
=
 strstr(ns, 

system.

);
        

if
 ( sys ) {
//
对插入记录的ns进行判断,是否要插入保留的数据库名(system),如是则停止执行其它代码


            uassert( 
10095
 , 

attempt to insert in reserved database name ‘system’

, sys 
!=
 ns);
            

if
 ( strstr(ns, 

.system.

) ) {
                

//
 later:check for dba-type permissions here if have that at some point separate


                
if
 ( strstr(ns, 

.system.indexes

 ) )
//
判断是否创建索引


                    wouldAddIndex 
=
 
true
;
                

else
 
if
 ( legalClientSystemNS( ns , 
true
 ) )
                    ;
                

else
 
if
 ( 
!
god ) {
//
表示obuf有数据,但这就意味着要向system下插入数据(把system当成数据表了)


                    
out
() 
<<
 

ERROR: attempt to insert in system namespace 

 
<<
 ns 
<<
 endl;
                    

return
 DiskLoc();
                }
            }
            

else

                sys 

=
 
0
;
        }

        
bool
 addIndex 
=
 wouldAddIndex 
&&
 mayAddIndex;
//
判断是否需要添加索引



        NamespaceDetails 

*

=
 nsdetails(ns);
//
获取ns的详细信息


        
if
 ( d 
==
 
0
 ) {
            addNewNamespaceToCatalog(ns);

//
向system catalog添加新的名空间,它会再次调用当前insert()方法


            
/*
 todo: shouldn’t be in the namespace catalog until after the allocations here work.
               also if this is an addIndex, those checks should happen before this!
            

*/

            

//
 创建第一个数据库文件.


            cc().database()
->
allocExtent(ns, Extent::initialSize(len), 
false
);
            d 

=
 nsdetails(ns);
            

if
 ( 
!
god )
                ensureIdIndexForNewNs(ns);
        }
        d

->
paddingFits();

        NamespaceDetails 
*
tableToIndex 
=
 
0
;

        
string
 tabletoidxns;
        BSONObj fixedIndexObject;
        

if
 ( addIndex ) {
            assert( obuf );
            BSONObj io((

const
 
char
 
*
) obuf);
            

//
做索引准备工作,这里并不真正创建索引,只是进行参数检查,以及索引是否已存在等


            
if

!
prepareToBuildIndex(io, god, tabletoidxns, tableToIndex, fixedIndexObject ) )
                

return
 DiskLoc();

            
if
 ( 
!
 fixedIndexObject.isEmpty() ) {
                obuf 

=
 fixedIndexObject.objdata();
                len 

=
 fixedIndexObject.objsize();
            }

        }

        
const
 BSONElement 
*
newId 
=
 
&
writeId;
        

int
 addID 
=
 
0
;
        

if

!
god ) {
            

//
检查对象 是否有_id字段,没有则添加
            

//
Note that btree buckets which we insert aren’t BSONObj’s, but in that case god==true.      


            BSONObj io((
const
 
char
 
*
) obuf);
            BSONElement idField 

=
 io.getField( 

_id

 );
            uassert( 

10099
 ,  

_id cannot be an array

, idField.type() 
!=
 Array );

            
if
( idField.eoo() 
/*
判断是否是结束元素
*/
&&
 
!
wouldAddIndex 
&&
 strstr(ns, 

.local.


==
 
0
 ) {
                addID 

=
 len;
                

if
 ( writeId.eoo() ) {
                    

//
 初始化一个_id 随机值(因为_id可能是12 byte类型或其它类型)


                    idToInsert_.oid.init();
                    newId 

=
 
&
idToInsert;
//
绑定初始化的_id值


                }
                len 

+=
 newId
->
size();
            }
            

//
如果io对象中有时间戳元素时,并用当前时间进行更新


            BSONElementManipulator::lookForTimestamps( io );
        }

        
//
兼容旧的数据文件


        DiskLoc extentLoc;
        

int
 lenWHdr 
=
 len 
+
 Record::HeaderSize;
        lenWHdr 

=
 (
int
) (lenWHdr 
*
 d
->
paddingFactor);
        

if
 ( lenWHdr 
==
 
0
 ) {
            assert( d

->
paddingFactor 
==
 
0
 );
            

*
getDur().writing(
&
d
->
paddingFactor) 
=
 
1.0
;
            lenWHdr 

=
 len 
+
 Record::HeaderSize;
        }

        
//
 在对新的对象分配空间前检查数据是否会造成索引冲突(唯一索引)
        

//
 capped标识是否是固定大小的集合类型,这种类型下系统会自动将过于陈旧的数据remove掉
        

//
 注:此cap与nosql中常说的cap无太大关联
        

//
     nosql cap即:一致性,有效性,分区容忍性
        

//
     参见这篇文章:
http://blog.nosqlfan.com/html/1112.html
,
        

//
                 
http://blog.nosqlfan.com/html/96.html
)


        
if
 ( d
->
nIndexes 
&&
 d
->
capped 
&&
 
!
god ) {
            checkNoIndexConflicts( d, BSONObj( reinterpret_cast

<
const
 
char
 
*>
( obuf ) ) );
        }

        DiskLoc loc 
=
 d
->
alloc(ns, lenWHdr, extentLoc);
//
为当前记录分配空间namespace.cpp __stdAlloc方法


        
if
 ( loc.isNull() ) {
//
如果分配失效


            
if
 ( d
->
capped 
==
 
0
 ) { 
//
 cap大小未增加,即


                log(
1

<<
 

allocating new extent for 

 
<<
 ns 
<<
 

 padding:

 
<<
 d
->
paddingFactor 
<<
 

 lenWHdr: 

 
<<
 lenWHdr 
<<
 endl;
                

//
尝试从空闲空间列表中分配空间


                cc().database()
->
allocExtent(ns, Extent::followupSize(lenWHdr, d
->
lastExtentSize), 
false
);
                

//
尝试再次为当前记录分配空间


                loc 
=
 d
->
alloc(ns, lenWHdr, extentLoc);
                

if
 ( loc.isNull() ) {
                    log() 

<<
 

WARNING: alloc() failed after allocating new extent. lenWHdr: 

 
<<
 lenWHdr 
<<
 

 last extent size:

 
<<
 d
->
lastExtentSize 
<<
 

; trying again/n

;
                    

for
 ( 
int
 zzz
=
0
; zzz
<
10
 
&&
 lenWHdr 
>
 d
->
lastExtentSize; zzz
++
 ) {
//
最多尝试循环10次分配空间


                        log() 
<<
 

try #

 
<<
 zzz 
<<
 endl;
                        cc().database()

->
allocExtent(ns, Extent::followupSize(len, d
->
lastExtentSize), 
false
);
                        loc 

=
 d
->
alloc(ns, lenWHdr, extentLoc);
                        

if
 ( 
!
 loc.isNull() )
                            

break
;
                    }
                }
            }
            

if
 ( loc.isNull() ) {
//
最终未分配空间给对象


                log() 
<<
 

insert: couldn’t alloc space for object ns:

 
<<
 ns 
<<
 

 capped:

 
<<
 d
->
capped 
<<
 endl;
                assert(d

->
capped);
                

return
 DiskLoc();
            }
        }

        Record 
*

=
 loc.rec();
        {
            assert( r

->
lengthWithHeaders 
>=
 lenWHdr );
            r 

=
 (Record
*
) getDur().writingPtr(r, lenWHdr);
//
持久化插入记录信息


            
if
( addID ) {
                

/*
 a little effort was made here to avoid a double copy when we add an ID 
*/

                ((

int
&
)
*
r
->
data) 
=
 
*
((
int
*
) obuf) 
+
 newId
->
size();
                memcpy(r

->
data
+
4
, newId
->
rawdata(), newId
->
size());
//
拷贝_id字段到指定记录内存空间


                memcpy(r
->
data
+
4
+
newId
->
size(), ((
char
 
*
)obuf)
+
4
, addID

4
);
//
拷贝数据到指定内存空间


            }
            

else
 {
                

if
( obuf )
                    memcpy(r

->
data, obuf, len);
//
直接拷贝数据到记录字段r


            }
        }

        {
            Extent 
*

=
 r
->
myExtent(loc);
            

if
 ( e
->
lastRecord.isNull() ) {
//
如果未尾记录为空,本人理解:即之前未插入过记录


                Extent::FL 
*
fl 
=
 getDur().writing(e
->
fl());
                fl

->
firstRecord 
=
 fl
->
lastRecord 
=
 loc;
                r

->
prevOfs 
=
 r
->
nextOfs 
=
 DiskLoc::NullOfs;
            }
            

else
 {
                Record 

*
oldlast 
=
 e
->
lastRecord.rec();
//
否则将新记录添加到最后一条记录的后面


                r
->
prevOfs 
=
 e
->
lastRecord.getOfs();
                r

->
nextOfs 
=
 DiskLoc::NullOfs;
                getDur().writingInt(oldlast

->
nextOfs) 
=
 loc.getOfs();
                getDur().writingDiskLoc(e

->
lastRecord) 
=
 loc;
            }
        }

        
/*
 持久化操作并更新相应统计信息 
*/

        {
            NamespaceDetails::Stats 

*

=
 getDur().writing(
&
d
->
stats);
            s

->
datasize 
+=
 r
->
netLength();
            s

->
nrecords
++
;
        }

        
//
 在god时会清空stats信息,同时会添加一个 btree bucket(占据存储空间)


        
if
 ( 
!
god )
            NamespaceDetailsTransient::get_w( ns ).notifyOfWriteOp();

//
在写操作时清空缓存,优化查询优化



        

if
 ( tableToIndex ) {
            uassert( 

13143
 , 

can’t create index on system.indexes

 , tabletoidxns.find( 

.system.indexes

 ) 
==
 
string
::npos );

            BSONObj info 
=
 loc.obj();
            

bool
 background 
=
 info[

background

].trueValue();
            

if
( background 
&&
 cc().isSyncThread() ) {
                

/*
 don’t do background indexing on slaves.  there are nuances.  this could be added later but requires more code.
*/

                log() 

<<
 

info: indexing in foreground on this replica; was a background index build on the primary

 
<<
 endl;
                background 

=
 
false
;
            }

            
int
 idxNo 
=
 tableToIndex
->
nIndexes;
            IndexDetails

&
 idx 
=
 tableToIndex
->
addIndex(tabletoidxns.c_str(), 
!
background); 
//
 清空临时缓存信息; 同时递增索引数量


            getDur().writingDiskLoc(idx.info) 
=
 loc;
            

try
 {
                buildAnIndex(tabletoidxns, tableToIndex, idx, idxNo, background);

//
创建索引


            }
            

catch
( DBException
&
 e ) {
                

//
 保存异常信息,并执行dropIndexes


                LastError 
*
le 
=
 lastError.
get
();
                

int
 savecode 
=
 
0
;
                

string
 saveerrmsg;
                

if
 ( le ) {
                    savecode 

=
 le
->
code;
                    saveerrmsg 

=
 le
->
msg;
                }
                

else
 {
                    savecode 

=
 e.getCode();
                    saveerrmsg 

=
 e.what();
                }

                
//
回滚索引操作(drop索引)


                
string
 name 
=
 idx.indexName();
                BSONObjBuilder b;
                

string
 errmsg;
                

bool
 ok 
=
 dropIndexes(tableToIndex, tabletoidxns.c_str(), name.c_str(), errmsg, b, 
true
);
                

if

!
ok ) {
                    log() 

<<
 

failed to drop index after a unique key error building it: 

 
<<
 errmsg 
<<
 

 

 
<<
 tabletoidxns 
<<
 

 

 
<<
 name 
<<
 endl;
                }

                assert( le 
&&
 
!
saveerrmsg.empty() );
                raiseError(savecode,saveerrmsg.c_str());
                

throw
;
            }
        }

        
/*
 将记录数据添加到索引信息(btree)中 
*/

        

if
 ( d
->
nIndexes ) {
            

try
 {
                BSONObj obj(r

->
data);
                indexRecord(d, obj, loc);
            }
            

catch
( AssertionException
&
 e ) {
                

//
 _id index 键值重复


                
if
( tableToIndex 
||
 d
->
capped ) {
                    massert( 

12583


unexpected index insertion failure on capped collection


!
d
->
capped );
                    

string
 s 
=
 e.toString();
                    s 

+=
 

 : on addIndex/capped – collection and its index will not match

;
                    uassert_nothrow(s.c_str());
                    error() 

<<
 s 
<<
 endl;
                }
                

else
 {
                    

//
 回滚上述操作


                    _deleteRecord(d, ns, r, loc);
                    

throw
;
                }
            }
        }

        
//
  out() << ”   inserted at loc:” << hex << loc.getOfs() << ” lenwhdr:” << hex << lenWHdr << dec << ‘ ‘ << ns << endl;


        
return
 loc;
    }

     正如之前所说,该方法会完成添加名空间,添加索引,添加数据记录(memcpy调用)。其中名空间的添加方法addNewNamespaceToCatalog 比较简单,下面主要介绍一下索引的创建过程,这里分为了两步:

     1.创建索引树(b树)

     2.将数据(主要是地址)添加到索引(树)中

     先看一下创建索引过程:

static
 
void
 buildAnIndex(
string
 ns, NamespaceDetails 
*
d, IndexDetails
&
 idx, 
int
 idxNo, 
bool
 background) {
        tlog() 

<<
 

building new index on 

 
<<
 idx.keyPattern() 
<<
 

 for 

 
<<
 ns 
<<
 ( background 
?
 

 background

 : 
“”
 ) 
<<
 endl;
        Timer t;
        unsigned 

long
 
long
 n;

        
if
( background ) {
            log(

2

<<
 

buildAnIndex: background=true/n

;
        }

        assert( 
!
BackgroundOperation::inProgForNs(ns.c_str()) ); 
//
 should have been checked earlier, better not be…


        assert( d
->
indexBuildInProgress 
==
 
0
 );
        assertInWriteLock();
        RecoverableIndexState recoverable( d );
        

if
( inDBRepair 
||
 
!
background ) {
//
当数据库在repair时或非后台工作方式下


            n 
=
 fastBuildIndex(ns.c_str(), d, idx, idxNo);
//
创建索引


            assert( 
!
idx.head.isNull() );
        }
        

else
 {
            BackgroundIndexBuildJob j(ns.c_str());

//
以后台方式创建索引


            n 
=
 j.go(ns, d, idx, idxNo);
        }
        tlog() 

<<
 

done for 

 
<<
 n 
<<
 

 records 

 
<<
 t.millis() 
/
 
1000.0
 
<<
 

secs

 
<<
 endl;
    }

    创建索引方法会要据创建方式(是否是后台线程等),使用不同的方法,这里主要讲解非后台方式,也就是上面的fastBuildIndex方法(pdfile.cpp第1101行),其定义如下(内容详见注释):
    

unsigned 
long
 
long
 fastBuildIndex(
const
 
char
 
*
ns, NamespaceDetails 
*
d, IndexDetails
&
 idx, 
int
 idxNo) {
        CurOp 

*
 op 
=
 cc().curop();
//
设置当前操作指针,用于设置操作信息



        Timer t;

        tlog(
1

<<
 

fastBuildIndex 

 
<<
 ns 
<<
 

 idxNo:

 
<<
 idxNo 
<<
 

 

 
<<
 idx.info.obj().toString() 
<<
 endl;

        
bool
 dupsAllowed 
=
 
!
idx.unique();
        

bool
 dropDups 
=
 idx.dropDups() 
||
 inDBRepair;
        BSONObj order 

=
 idx.keyPattern();

        getDur().writingDiskLoc(idx.head).Null();

        
if
 ( logLevel 
>
 
1
 ) printMemInfo( 

before index start

 );

        
/*
 获取并排序所有键值 —– 
*/

        unsigned 

long
 
long
 n 
=
 
0
;
        shared_ptr

<
Cursor
>
 c 
=
 theDataFileMgr.findAll(ns);
        BSONObjExternalSorter sorter(order);
        sorter.hintNumObjects( d

->
stats.nrecords );
        unsigned 

long
 
long
 nkeys 
=
 
0
;
        ProgressMeterHolder pm( op

->
setMessage( 

index: (1/3) external sort

 , d
->
stats.nrecords , 
10
 ) );
        

while
 ( c
->
ok() ) {
            BSONObj o 

=
 c
->
current();
            DiskLoc loc 

=
 c
->
currLoc();

            BSONObjSetDefaultOrder keys;
            idx.getKeysFromObject(o, keys);
//
从对象中获取键值信息


            
int
 k 
=
 
0
;
            

for
 ( BSONObjSetDefaultOrder::iterator i
=
keys.begin(); i 
!=
 keys.end(); i
++
 ) {
                

if

++

==
 
2
 ) {
//
是否是多键索引


                    d
->
setIndexIsMultikey(idxNo);
                }
                sorter.add(

*
i, loc);
//
向排序器添加键值和记录位置信息


                nkeys
++
;
            }

            c
->
advance();
            n

++
;
            pm.hit();
            

if
 ( logLevel 
>
 
1
 
&&
 n 
%
 
10000
 
==
 
0
 ) {
                printMemInfo( 


/t iterating objects

 );
            }

        };
        pm.finished();

        
if
 ( logLevel 
>
 
1
 ) printMemInfo( 

before final sort

 );
        sorter.sort();
        

if
 ( logLevel 
>
 
1
 ) printMemInfo( 

after final sort

 );

        log(t.seconds() 
>
 
5
 
?
 
0
 : 
1

<<
 

/t external sort used : 

 
<<
 sorter.numFiles() 
<<
 

 files 

 
<<
 

 in 

 
<<
 t.seconds() 
<<
 

 secs

 
<<
 endl;

        list
<
DiskLoc
>
 dupsToDrop;

        
/*
 创建索引 
*/

        {
            BtreeBuilder btBuilder(dupsAllowed, idx);

//
实例化b树索引对象
            

//
BSONObj keyLast;


            auto_ptr
<
BSONObjExternalSorter::Iterator
>
 i 
=
 sorter.iterator();
//
初始化迭代器用于下面遍历


            assert( pm 
==
 op
->
setMessage( 

index: (2/3) btree bottom up

 , nkeys , 
10
 ) );
            

while
( i
->
more() ) {
                RARELY killCurrentOp.checkForInterrupt();

//
检查冲突如shutdown或kill指令


                BSONObjExternalSorter::Data d 
=
 i
->
next();

                
try
 {
                    btBuilder.addKey(d.first, d.second);

//
向b树索引对象中添加索引键值和记录位置信息


                }
                

catch
( AssertionException
&
 e ) {
                    

if
 ( dupsAllowed ) {
                        

//
 unknow exception??


                        
throw
;
                    }

                    
if
( e.interrupted() )
                        

throw
;

                    
if
 ( 
!
 dropDups )
                        

throw
;

                    
/*
 we could queue these on disk, but normally there are very few dups, so instead we
                       keep in ram and have a limit.
                    

*/

                    dupsToDrop.push_back(d.second);
                    uassert( 

10092
 , 

too may dups on index build with dropDups=true

, dupsToDrop.size() 
<
 
1000000
 );
                }
                pm.hit();
            }
            pm.finished();
            op

->
setMessage( 

index: (3/3) btree-middle

 );
            log(t.seconds() 

>
 
10
 
?
 
0
 : 
1
 ) 
<<
 

/t done building bottom layer, going to commit

 
<<
 endl;
            btBuilder.commit();

//
提交创建索引操作,该方法会完成最终构造Btree索引操作


            wassert( btBuilder.getn() 
==
 nkeys 
||
 dropDups );
        }

        log(
1

<<
 

/t fastBuildIndex dupsToDrop:

 
<<
 dupsToDrop.size() 
<<
 endl;
        

//
删除索引中已出现的重复记录


        
for
( list
<
DiskLoc
>
::iterator i 
=
 dupsToDrop.begin(); i 
!=
 dupsToDrop.end(); i
++
 )
            theDataFileMgr.deleteRecord( ns, i

->
rec(), 
*
i, 
false

true
 );

        
return
 n;
    }

 

      上面方法主要对要创建的索引信息进行提取,并封装到一个BtreeBuilder中,顾名思义,该对象用于进行b树的创建(因为索引也是一个b树),当信息收集排序完成后,就开始创建索引,如下:

    btree.cpp 1842行
    

void
 BtreeBuilder::commit() {
        buildNextLevel(first);
        committed 

=
 
true
;
    }

     
void
 BtreeBuilder::buildNextLevel(DiskLoc loc) {
        

int
 levels 
=
 
1
;
        

while

1
 ) {
            

if
( loc.btree()
->
tempNext().isNull() ) {
                

//
 在当前层级上只有一个 bucket


                getDur().writingDiskLoc(idx.head) 
=
 loc;
                

break
;
            }
            levels

++
;

            DiskLoc upLoc 
=
 BtreeBucket::addBucket(idx);
//
添加bucket并实例化上一层DiskLoc


            DiskLoc upStart 
=
 upLoc;
            BtreeBucket 

*
up 
=
 upLoc.btreemod();
//
获取上一层的bucket指针



            DiskLoc xloc 

=
 loc;
            

while

!
xloc.isNull() ) {
                RARELY {
                    getDur().commitIfNeeded();
                    b 

=
 cur.btreemod();
                    up 

=
 upLoc.btreemod();
                }

                BtreeBucket 
*

=
 xloc.btreemod();
                BSONObj k;
                DiskLoc r;
                x

->
popBack(r,k);
//
弹出当前bucket中最右边的键


                
bool
 keepX 
=
 ( x
->

!=
 
0
 );
//
当前bucket中元素个数是否为0


                DiskLoc keepLoc 
=
 keepX 
?
 xloc : x
->
nextChild;

                
//
压入上面弹出的最右边的键值,该键值为当前up(bucket)中最大值


                
if
 ( 
!
 up
->
_pushBack(r, k, ordering, keepLoc) )
                {
                    

//
 当前 bucket 已满,则新创建一个addBucket


                    DiskLoc n 
=
 BtreeBucket::addBucket(idx);
                    up

->
tempNext() 
=
 n;
                    upLoc 

=
 n;
                    up 

=
 upLoc.btreemod();
                    up

->
pushBack(r, k, ordering, keepLoc);
                }

                DiskLoc nextLoc 
=
 x
->
tempNext(); 
//
get next in chain at current level


                
if
 ( keepX ) {
//
表示当前结点非顶层结点,则设置它的父结点


                    x
->
parent 
=
 upLoc;
                }
                

else
 {
                    

if
 ( 
!
x
->
nextChild.isNull() )
                        x

->
nextChild.btreemod()
->
parent 
=
 upLoc;
                    x

->
deallocBucket( xloc, idx );
//
删除xloc bucket


                }
                xloc 

=
 nextLoc;
//
指向当前层的下个元素


            }

            loc 
=
 upStart;
//
升级当前结点


            mayCommitProgressDurably();
        }

        
if
( levels 
>
 
1
 )
            log(

2

<<
 

btree levels: 

 
<<
 levels 
<<
 endl;
    }

    上面的buildNextLevel方法自下而上根据之前抽取的键值逐层构造一个b树。这里有一个问题需要注意一下,因为mongodb使用 bucket来作为b树中的一个层次结点或叶子结点容器(如下图),bucket最大尺寸为8192字节,c。有关b树索引的文章可以参见这篇文章 :,
    mongodb目前关于B树索引的文档 :http://blog.nosqlfan.com/html/758.html
《Mongodb源码分析--插入记录及索引B树构建》

      当初始化了b树索引及空间信息之后,下面就会将数据绑定到相应信息结点上了,也就是DataFileMgr::insert方法(pdfile.cpp文件)的如下代码:

/*
 将记录数据添加到索引信息(btree)中 
*/

        
if
 ( d
->
nIndexes ) {
            

try
 {
                BSONObj obj(r

->
data);
                indexRecord(d, obj, loc);
            }
            ……
        } 

     上面的indexRecord方法会将键值和数据(包括存储位置)添加到索引中(其中参数d包括之前创建的B树索引信息), 该方法定义如下(pdfile.cpp 第1355行):

/*
 将键值和数据(包括存储位置)添加到索引中
*/

    

static
 
void
 indexRecord(NamespaceDetails 
*
d, BSONObj obj, DiskLoc loc) {
        

int
 n 
=
 d
->
nIndexesBeingBuilt();
//
获取已(及正在)构建的索引数


        
for
 ( 
int
 i 
=
 
0
; i 
<
 n; i
++
 ) {
            

try
 {
                

bool
 unique 
=
 d
->
idx(i).unique();
                

//
内联函数(inline):将索引和记录相关信息初始化到btree中


                _indexRecord(d, i
/*
索引顺序位
*/
, obj, loc, 
/*
dupsAllowed
*/
!
unique);
            }
            

catch
( DBException
&
 ) {
                

/*
 如果发生异常,则进行回滚操作
                   note <= i (not < i) is important here as the index we were just attempted
                   may be multikey and require some cleanup.
                

*/

                

for

int
 j 
=
 
0
; j 
<=
 i; j
++
 ) {
                    

try
 {
                        _unindexRecord(d

->
idx(j), obj, loc, 
false
);
                    }
                    

catch
(…) {
                        log(

3

<<
 

unindex fails on rollback after unique failure/n

;
                    }
                }
                

throw
;
            }
        }
    }

    上面的_indexRecord为内联函数(pdfile.cpp)(inline关键字参见C++说明),该参数声明如下:

 
static
 inline 
void
  _indexRecord(NamespaceDetails 
*
d, 
int
 idxNo, BSONObj
&
 obj, DiskLoc recordLoc, 
bool
 dupsAllowed) {
        IndexDetails

&
 idx 
=
 d
->
idx(idxNo);
//

        BSONObjSetDefaultOrder keys;
        idx.getKeysFromObject(obj, keys);

//
从对象信息中获取键属性信息


        BSONObj order 
=
 idx.keyPattern();
        Ordering ordering 

=
 Ordering::make(order);
//
初始化排序方式用于下面传参


        
int
 n 
=
 
0
;
        

for
 ( BSONObjSetDefaultOrder::iterator i
=
keys.begin(); i 
!=
 keys.end(); i
++
 ) {
            

if

++

==
 
2
 ) {
                d

->
setIndexIsMultikey(idxNo);
//
设置多键值索引


            }
            assert( 

!
recordLoc.isNull() );
            

try
 {
                idx.head

/*
DiskLoc
*/
.btree()
/*
BtreeBucket
*/
->
bt_insert(idx.head, recordLoc, 
//
执行向btree中添加记录和绑定索引信息的操作


                                            
*
i, ordering, dupsAllowed, idx);
            }
            

catch
 (AssertionException
&
 e) {
                

if
( e.getCode() 
==
 
10287
 
&&
 idxNo 
==
 d
->
nIndexes ) {
                    DEV log() 

<<
 

info: caught key already in index on bg indexing (ok)

 
<<
 endl;
                    

continue
;
                }
                

if

!
dupsAllowed ) {
                    

//
 重复键值异常


                    
throw
;
                }
                problem() 

<<
 

 caught assertion _indexRecord 

 
<<
 idx.indexNamespace() 
<<
 endl;
            }
        }
    }

    上面方法最终会执行b树插入方法bt_insert(btree.cpp文件1622行),如下(详情见注释):

   
int
 BtreeBucket::bt_insert(
const
 DiskLoc thisLoc, 
const
 DiskLoc recordLoc,
                               

const
 BSONObj
&
 key, 
const
 Ordering 
&
order, 
bool
 dupsAllowed,
                               IndexDetails

&
 idx, 
bool
 toplevel) 
const
 {
        

if
 ( toplevel ) {
//
如果是顶级节点(如果是通过构造索引方式调用 ,则toplevel=true)
            

//
判断键值是否过界(因为其会存储在system.indexs中),其中:KeyMax = 8192 / 10 .mongodb开发团队可能会在更高版本中扩大该值


            
if
 ( key.objsize() 
>
 KeyMax ) {
                problem() 

<<
 

Btree::insert: key too large to index, skipping 

 
<<
 idx.indexNamespace() 
<<
 

 

 
<<
 key.objsize() 
<<
 

 

 
<<
 key.toString() 
<<
 endl;
                

return
 
3
;
            }
        }
        

//
执行添加操作


        
int
 x 
=
 _insert(thisLoc, recordLoc, key, order, dupsAllowed, DiskLoc(), DiskLoc(), idx);
        assertValid( order );

//
assert排序方式是否有效



        

return
 x;
    }

    上面代码紧接着会调用btree.cpp文件的内部方法_insert(btree.cpp文件 1554行):

   
int
 BtreeBucket::_insert(
const
 DiskLoc thisLoc, 
const
 DiskLoc recordLoc,
                             

const
 BSONObj
&
 key, 
const
 Ordering 
&
order, 
bool
 dupsAllowed,
                             

const
 DiskLoc lChild, 
const
 DiskLoc rChild, IndexDetails
&
 idx) 
const
 {
        

if
 ( key.objsize() 
>
 KeyMax ) {
            problem() 

<<
 

ERROR: key too large len:

 
<<
 key.objsize() 
<<
 

 max:

 
<<
 KeyMax 
<<
 

 

 
<<
 key.objsize() 
<<
 

 

 
<<
 idx.indexNamespace() 
<<
 endl;
            

return
 
2
;
        }
        assert( key.objsize() 

>
 
0
 );

        
int
 pos;
        

//
在btree bucket中使用二分查询,查看键值是否已在所索引信息中


        
bool
 found 
=
 find(idx, key, recordLoc, order, pos 
/*
返回该索引信息所在或应该在的位置
*/

!
dupsAllowed);
        

if
 ( insert_debug ) {
            

out
() 
<<
 

  

 
<<
 thisLoc.toString() 
<<
 

.

 
<<
 

_insert 

 
<<

                  key.toString() 

<<
 

/

 
<<
 recordLoc.toString() 
<<

                  


 l:

 
<<
 lChild.toString() 
<<
 

 r:

 
<<
 rChild.toString() 
<<
 endl;
            

out
() 
<<
 

    found:

 
<<
 found 
<<
 

 pos:

 
<<
 pos 
<<
 

 n:

 
<<
 n 
<<
 endl;
        }

        
if
 ( found ) {
            

const
 _KeyNode
&
 kn 
=
 k(pos);
//
获取指定磁盘位置的节点信息,_KeyNode


            
if
 ( kn.isUnused() ) {
//
查看已存在的键结点是否已使用


                log(
4

<<
 

btree _insert: reusing unused key

 
<<
 endl;
                massert( 

10285
 , 

_insert: reuse key but lchild is not null

, lChild.isNull());
                massert( 

10286
 , 

_insert: reuse key but rchild is not null

, rChild.isNull());
                kn.writing().setUsed();
                

return
 
0
;
            }

            DEV {
                log() 
<<
 

_insert(): key already exists in index (ok for background:true)/n

;
                log() 

<<
 

  

 
<<
 idx.indexNamespace() 
<<
 

 thisLoc:

 
<<
 thisLoc.toString() 
<<
 

/n

;
                log() 

<<
 

  

 
<<
 key.toString() 
<<
 

/n

;
                log() 

<<
 

  

 
<<
 

recordLoc:

 
<<
 recordLoc.toString() 
<<
 

 pos:

 
<<
 pos 
<<
 endl;
                log() 

<<
 

  old l r: 

 
<<
 childForPos(pos).toString() 
<<
 

 

 
<<
 childForPos(pos
+
1
).toString() 
<<
 endl;
                log() 

<<
 

  new l r: 

 
<<
 lChild.toString() 
<<
 

 

 
<<
 rChild.toString() 
<<
 endl;
            }
            alreadyInIndex();

//
提示键值结点已在索引中,不必再创建,并抛出异常


        }

        DEBUGGING 
out
() 
<<
 

TEMP: key: 

 
<<
 key.toString() 
<<
 endl;
        DiskLoc child 

=
 childForPos(pos);
//
查询当前pos的子结点信息,以寻找插入位置


        
if
 ( insert_debug )
            

out
() 
<<
 

    getChild(

 
<<
 pos 
<<
 

): 

 
<<
 child.toString() 
<<
 endl;
        

if
 ( child.isNull() 
||
 
!
rChild.isNull() 
/*
 在当前buckets中插入,即 ‘internal’ 插入 
*/
 ) {
            insertHere(thisLoc, pos, recordLoc, key, order, lChild, rChild, idx);

//
在当前buckets中插入


            
return
 
0
;
        }
        

//
如果有子结点,则在子结点上执行插入操作


        
return
 child.btree()
->
bt_insert(child, recordLoc, key, order, dupsAllowed, idx, 
/*
toplevel
*/
false
);
    }

    上面_insert方法首先会使用二分法查找要插入的记录是否已存在于索引中,同时会返回一个插入点(pos),如不存在则会进一步在插入点位置查看找 元素以决定是在当前bucket中插入,还是在当前pos位置的(右)子结点(bucket)上插入(这会再次递归调用上面的bt_insert方法), 这里我们假定在当前bucket插入,则会执行insertHere方法(btree.cpp文件1183行),它的定义如下:
    

 
/*
*
     * insert a key in this bucket, splitting if necessary.
     * @keypos – where to insert the key in range 0..n.  0=make leftmost, n=make rightmost.
     * NOTE this function may free some data, and as a result the value passed for keypos may
     * be invalid after calling insertHere()
     

*/

    

void
 BtreeBucket::insertHere( 
const
 DiskLoc thisLoc, 
int
 keypos,
                                  

const
 DiskLoc recordLoc, 
const
 BSONObj
&
 key, 
const
 Ordering
&
 order,
                                  

const
 DiskLoc lchild, 
const
 DiskLoc rchild, IndexDetails
&
 idx) 
const
 {
        

if
 ( insert_debug )
            

out
() 
<<
 

   

 
<<
 thisLoc.toString() 
<<
 

.insertHere 

 
<<
 key.toString() 
<<
 

/

 
<<
 recordLoc.toString() 
<<
 

 


                  

<<
 lchild.toString() 
<<
 

 

 
<<
 rchild.toString() 
<<
 

 keypos:

 
<<
 keypos 
<<
 endl;

        DiskLoc oldLoc 
=
 thisLoc;
        

//
根据keypos插入相应位置并将数据memcpy到内存指定位置


        
if
 ( 
!
basicInsert(thisLoc, keypos, recordLoc, key, order) ) {
            

//
如果插入无效,表示当前bucket已满,则分割记录并放到新创建的bucket中


            thisLoc.btreemod()
->
split(thisLoc, keypos, recordLoc, key, order, lchild, rchild, idx);
            

return
;
        }

        {
//
持久化当前thisLoc的结点信息并根据插入位置(是否最后一个key),来更新当前thisLoc(及后面key结点)的子结点信息


            
const
 _KeyNode 
*
_kn 
=
 
&
k(keypos);
            _KeyNode 

*
kn 
=
 (_KeyNode 
*
) getDur().alreadyDeclared((_KeyNode
*
) _kn); 
//
 already declared intent in basicInsert()


            
if
 ( keypos
+
1
 
==
 n ) { 
//
 n为pack(打包后)存储的记录数,这里”判断等于n”表示为最后(last)一个key


                
if
 ( nextChild 
!=
 lchild ) {
//
如果是最后元素,那么”当前最高键值的右子结点应该与要插入的左子结点相同


                    
out
() 
<<
 

ERROR nextChild != lchild

 
<<
 endl;
                    

out
() 
<<
 

  thisLoc: 

 
<<
 thisLoc.toString() 
<<
 

 

 
<<
 idx.indexNamespace() 
<<
 endl;
                    

out
() 
<<
 

  keyPos: 

 
<<
 keypos 
<<
 

 n:

 
<<
 n 
<<
 endl;
                    

out
() 
<<
 

  nextChild: 

 
<<
 nextChild.toString() 
<<
 

 lchild: 

 
<<
 lchild.toString() 
<<
 endl;
                    

out
() 
<<
 

  recordLoc: 

 
<<
 recordLoc.toString() 
<<
 

 rchild: 

 
<<
 rchild.toString() 
<<
 endl;
                    

out
() 
<<
 

  key: 

 
<<
 key.toString() 
<<
 endl;
                    dump();
                    assert(

false
);
                }
                kn

->
prevChildBucket 
=
 nextChild;
//
“当前最高键值的右子结点”绑定到持久化结点的左子结点


                assert( kn
->
prevChildBucket 
==
 lchild );
                nextChild.writing() 

=
 rchild;
//
持久化”当前最高键值的右子结点”,并将“要插入结点”的右子结点绑定到


                
if
 ( 
!
rchild.isNull() )
//
如果有右子结点,则更新右子结点的父结点信息为当前thisLoc


                    rchild.btree()
->
parent.writing() 
=
 thisLoc;
            }
            

else
 {
                

//
如果keypos位置不是最后一个


                kn
->
prevChildBucket 
=
 lchild;
//
将左子结点绑定到keypos位置结点的左子结点上


                
if
 ( k(keypos
+
1
).prevChildBucket 
!=
 lchild ) {
//
这时左子结点应该与下一个元素的左子结点相同


                    
out
() 
<<
 

ERROR k(keypos+1).prevChildBucket != lchild

 
<<
 endl;
                    

out
() 
<<
 

  thisLoc: 

 
<<
 thisLoc.toString() 
<<
 

 

 
<<
 idx.indexNamespace() 
<<
 endl;
                    

out
() 
<<
 

  keyPos: 

 
<<
 keypos 
<<
 

 n:

 
<<
 n 
<<
 endl;
                    

out
() 
<<
 

  k(keypos+1).pcb: 

 
<<
 k(keypos
+
1
).prevChildBucket.toString() 
<<
 

 lchild: 

 
<<
 lchild.toString() 
<<
 endl;
                    

out
() 
<<
 

  recordLoc: 

 
<<
 recordLoc.toString() 
<<
 

 rchild: 

 
<<
 rchild.toString() 
<<
 endl;
                    

out
() 
<<
 

  key: 

 
<<
 key.toString() 
<<
 endl;
                    dump();
                    assert(

false
);
                }
                

const
 DiskLoc 
*
pc 
=
 
&
k(keypos
+
1
).prevChildBucket;
//
获取keypos后面元素的左子结点信息


                
*
getDur().alreadyDeclared((DiskLoc
*
) pc) 
=
 rchild; 
//
 将右子结点绑定到下一个元素(keypos+1)的左子结点上declared in basicInsert()


                
if
 ( 
!
rchild.isNull() )
//
如果有右子结点,则更新右子结点的父结点信息为当前thisLoc


                    rchild.btree()
->
parent.writing() 
=
 thisLoc;
            }
            

return
;
        }
    }

       该方法中会调用一个叫basicInsert的方法,它主要会在当前bucket中指定位置(keypos)添加记录信息,同时持久化该结点信息,如下:

//
tree.cpp 1183


     
bool
 BucketBasics::basicInsert(
const
 DiskLoc thisLoc, 
int
 
&
keypos, 
const
 DiskLoc recordLoc, 
const
 BSONObj
&
 key, 
const
 Ordering 
&
order) 
const
 {
        assert( keypos 

>=
 
0
 
&&
 keypos 
<=
 n );
        

//
判断bucket剩余的空间是否满足当前数据需要的存储空间


        
int
 bytesNeeded 
=
 key.objsize() 
+
 
sizeof
(_KeyNode);
        

if
 ( bytesNeeded 
>
 emptySize ) {
            _pack(thisLoc, order, keypos);

//
如不够用,进行一次整理打包操作,以为bucket中整理更多空间


            
if
 ( bytesNeeded 
>
 emptySize )
//
如还不够用,则返回


                
return
 
false
;
        }

        BucketBasics 
*
b;
//
声明Bucket管理对象指针,该对象提供了Bucket存储管理的基本操作和属性,如insert,_pack等


        {
            

const
 
char
 
*

=
 (
const
 
char
 
*

&
k(keypos);
            

const
 
char
 
*

=
 (
const
 
char
 
*

&
k(n
+
1
);
            

//
 declare that we will write to [k(keypos),k(n)]
            

//
 todo: this writes a medium amount to the journal.  we may want to add a verb “shift” to the redo log so
            

//
       we can log a very small amount.


            b 
=
 (BucketBasics
*
) getDur().writingAtOffset((
void
 
*

this
, p

(
char
*
)
this
, q

p);
            

//
如已有3个结点,目前要插到第三个结点之间,则对每三个元素进行迁移,
            

//
 e.g. n==3, keypos==2
            

//
 1 4 9
            

//
 ->
            

//
 1 4 _ 9


            
for
 ( 
int
 j 
=
 n; j 
>
 keypos; j

 ) 
//
 make room


                b
->
k(j) 
=
 b
->
k(j

1
);
        }
        getDur().declareWriteIntent(

&
b
->
emptySize, 
12
); 
//
 [b->emptySize..b->n] is 12 bytes and we are going to write those


        b
->
emptySize 
-=
 
sizeof
(_KeyNode);
//
将当前bucket中的剩余空闲空间减少


        b
->
n
++
;
//
已有结点数加1



        _KeyNode

&
 kn 
=
 b
->
k(keypos);
        kn.prevChildBucket.Null();

//
设置当前结点的左子结点为空


        kn.recordLoc 
=
 recordLoc;
//
绑定结点记录信息


        kn.setKeyDataOfs((
short
) b
->
_alloc(key.objsize()) );
//
设置结点数据偏移信息


        
char
 
*

=
 b
->
dataAt(kn.keyDataOfs());
//
实例化指向磁盘数据(journal文件)位置(含偏移量)的指针


        getDur().declareWriteIntent(p, key.objsize());
//
持久化结点数据信息


        memcpy(p, key.objdata(), key.objsize());
//
将当前结点信息复制到p指向的地址空间


        
return
 
true
;
    }

    如果上面方法调用失效,则意味着当前 bucket中已有可用空间插入新记录,这时系统会调用 split(btree.cpp文件 1240行)方法来进行bucket分割,以创建新的bucket并将信息塞入其中,如下:

    
void
 BtreeBucket::split(
const
 DiskLoc thisLoc, 
int
 keypos, 
const
 DiskLoc recordLoc, 
const
 BSONObj
&
 key, 
const
 Ordering
&
 order, 
const
 DiskLoc lchild, 
const
 DiskLoc rchild, IndexDetails
&
 idx) {
        assertWritable();

        
if
 ( split_debug )
            

out
() 
<<
 

    

 
<<
 thisLoc.toString() 
<<
 

.split

 
<<
 endl;

        
int
 split 
=
 splitPos( keypos );
//
找到要迁移的数据位置


        DiskLoc rLoc 
=
 addBucket(idx);
//
添加一个新的BtreeBucket


        BtreeBucket 
*

=
 rLoc.btreemod();
        

if
 ( split_debug )
            

out
() 
<<
 

     split:

 
<<
 split 
<<
 

 

 
<<
 keyNode(split).key.toString() 
<<
 

 n:

 
<<
 n 
<<
 endl;
        

for
 ( 
int
 i 
=
 split
+
1
; i 
<
 n; i
++
 ) {
            KeyNode kn 

=
 keyNode(i);
            r

->
pushBack(kn.recordLoc, kn.key, order, kn.prevChildBucket);
//
向新bucket中迁移过剩数据


        }
        r

->
nextChild 
=
 nextChild;
//
绑定新bucket的右子结点


        r
->
assertValid( order );

        
if
 ( split_debug )
            

out
() 
<<
 

     new rLoc:

 
<<
 rLoc.toString() 
<<
 endl;
        r 

=
 
0
;
        rLoc.btree()

->
fixParentPtrs(rLoc);
//
设置当前bucket树的父指针信息



        {
            KeyNode splitkey 

=
 keyNode(split);
//
获取内存中分割点位置所存储的数据


            nextChild 
=
 splitkey.prevChildBucket; 
//
 提升splitkey 键,它的子结点将会是 thisLoc (l) 和 rLoc (r)


            
if
 ( split_debug ) {
                

out
() 
<<
 

    splitkey key:

 
<<
 splitkey.key.toString() 
<<
 endl;
            }

            
//
 将 splitkey 提升为父结点


            
if
 ( parent.isNull() ) {
                

//
 如果无父结点时,则创建一个,并将


                DiskLoc L 
=
 addBucket(idx);
                BtreeBucket 

*

=
 L.btreemod();
                p

->
pushBack(splitkey.recordLoc, splitkey.key, order, thisLoc);
                p

->
nextChild 
=
 rLoc;
//
将分割的bucket为了当前


                p
->
assertValid( order );
                parent 

=
 idx.head.writing() 
=
 L;
//
将splitkey 提升为父结点


                
if
 ( split_debug )
                    

out
() 
<<
 

    we were root, making new root:

 
<<
 hex 
<<
 parent.getOfs() 
<<
 dec 
<<
 endl;
                rLoc.btree()

->
parent.writing() 
=
 parent;
            }
            

else
 {
                

//
 set this before calling _insert – if it splits it will do fixParent() logic and change the value.


                rLoc.btree()
->
parent.writing() 
=
 parent;
                

if
 ( split_debug )
                    

out
() 
<<
 

    promoting splitkey key 

 
<<
 splitkey.key.toString() 
<<
 endl;
                

//
提升splitkey键,它的左子结点 thisLoc, 右子点rLoc


                parent.btree()
->
_insert(parent, splitkey.recordLoc, splitkey.key, order, 
/*
dupsallowed
*/
true
, thisLoc, rLoc, idx);
            }
        }

        
int
 newpos 
=
 keypos;
        

//
 打包压缩数据(pack,移除无用数据),以提供更多空间


        truncateTo(split, order, newpos);  
//
 note this may trash splitkey.key.  thus we had to promote it before finishing up here.

        
//
 add our new key, there is room now


        {
            

if
 ( keypos 
<=
 split ) {
//
如果还有空间存储新键


                
if
 ( split_debug )
                    

out
() 
<<
 

  keypos<split, insertHere() the new key

 
<<
 endl;
                insertHere(thisLoc, newpos, recordLoc, key, order, lchild, rchild, idx);

//
再次向当前bucket中添加记录


            }
            

else
 {
//
如压缩之后依旧无可用空间,则向新创建的bucket中添加节点


                
int
 kp 
=
 keypos

split

1
;
                assert(kp

>=
0
);
                rLoc.btree()

->
insertHere(rLoc, kp, recordLoc, key, order, lchild, rchild, idx);
            }
        }

        
if
 ( split_debug )
            

out
() 
<<
 

     split end 

 
<<
 hex 
<<
 thisLoc.getOfs() 
<<
 dec 
<<
 endl;
    }

    好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍客户端发起Delete操作时,Mongodb的执行流程和相应实现部分。

    原文链接:http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html
    作者: daizhj, 代震军   
    微博: http://t.sina.com.cn/daizhj
    Tags: mongodb,c++,btree

    原文作者:B树
    原文地址: https://blog.csdn.net/daizhj/article/details/6288452
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞