鍍金池/ 教程/ Python/ 關于 B+tree (附 python 模擬代碼)
通過 memcached 實現領號排隊功能及 python 隊列實例
利用 pypy 提高 python 腳本的執(zhí)行速度及測試性能
Python FAQ3-python 中 的原始(raw)字符串
Mongodb 千萬級數據在 python 下的綜合壓力測試及應用探討
Parallel Python 實現程序的并行多 cpu 多核利用【pp 模塊】
python simplejson 模塊淺談
服務端 socket 開發(fā)之多線程和 gevent 框架并發(fā)測試[python 語言]
python Howto 之 logging 模塊
python 之 MySQLdb 庫的使用
關于 python 調用 zabbix api 接口的自動化實例 [結合 saltstack]
python 之利用 PIL 庫實現頁面的圖片驗證碼及縮略圖
Python 通過 amqp 消息隊列協(xié)議中的 Qpid 實現數據通信
python 中用 string.maketrans 和 translate 巧妙替換字符串
python linecache 模塊讀取文件用法詳解
Python 批量更新 nginx 配置文件
python 計算文件的行數和讀取某一行內容的實現方法
python+Django 實現 Nagios 自動化添加監(jiān)控項目
多套方案來提高 python web 框架的并發(fā)處理能力
python 寫報警程序中的聲音實現 winsound
python 調用 zabbix 的 api 接口添加主機、查詢組、主機、模板
對 Python-memcache 分布式散列和調用的實現
使用 python 構建基于 hadoop 的 mapreduce 日志分析平臺
一個腳本講述 python 語言的基礎規(guī)范,適合初學者
Python 編寫的 socket 服務器和客戶端
如何將 Mac OS X10.9 下的 Python2.7 升級到最新的 Python3.3
python 監(jiān)控文件或目錄變化
報警監(jiān)控平臺擴展功能 url 回調的設計及應用 [python 語言]
Python 處理 cassandra 升級后的回滾腳本
python 實現 select 和 epoll 模型 socket 網絡編程
關于 B+tree (附 python 模擬代碼)
通過 python 和 websocket 構建實時通信系統(tǒng)[擴展 saltstack 監(jiān)控]

關于 B+tree (附 python 模擬代碼)

前幾天我寫了點 btree 的東西(http://thuhak.blog.51cto.com/2891595/1261783),今天繼續(xù)這個思路,繼續(xù)寫 b+tree。

而且 b+tree 才是我的目的,更加深入理解文件和數據庫索引的基本原理。

在之前,我一直只把 b+tree 當成是 btree 的一種變形,或者說是在某種情況下的一種優(yōu)化,另外一些情況可能還是 btree 好些。但是做完之后才發(fā)現,b+tree 在各種情況都可以完全取代 btree,并能夠讓索引性能得到比 btree 更好的優(yōu)化。因為 b+tree 設計的核心要點,是為了彌補 btree 最大的缺陷。

btree 最大的缺陷是什么?

首先,我們知道對于 btree 和 b+tree 這種多路搜索樹來說,一個很重要的特點就是樹的度數非常大。因為只有這樣才能夠降低樹的深度,減少磁盤讀取的次數。而樹的度數越大,葉子節(jié)點在樹中的比例就越大。假設度數為 1000,那么葉子節(jié)點比他上一層內部節(jié)點的數量至少要多 1000 倍,在上一層就更加可以忽略不計了??梢哉f樹種 99.9% 的節(jié)點都是葉子節(jié)點。 但是對于 btree 來說,所有節(jié)點都是一樣的結構,都含有一定數量的數據和指向節(jié)點的指針。這兩項數據占據 btree 節(jié)點的幾乎全部的空間。一個節(jié)點內的數據的數量比硬盤指針的數量少一,可以說和指針的數量幾乎相等。對于 python 這種動態(tài)類型語言感覺不出來,但是對于 C 這種固定類型語言來說,即使這個 children list 數組為空,這個數組的空間也都是預留出去的。導致的結果就是占絕大多數的葉子節(jié)點的 children list 指針數組所占的磁盤空間完全浪費。

一個數據的大小和硬盤指針的大小取決于 key-value 中 key 和 value 大小的比值。假如說這個比值是 2:1。那么 btree 浪費了幾乎 1/3 的空間。

b+tree 針對這個問題的,把葉子節(jié)點和內節(jié)點的數據結構分開設計,讓葉子節(jié)點不存放指針。因此同樣大小的葉子節(jié)點,b+tree 所能包含數據數量要比 btree 大。按照上面的假設就是大 1/2。數的深度很可能比 btree 矮,大范圍搜索或遍歷所需要的載入磁盤的次數也少。

另外,b+tree 還有一個特點是所有數據都存放在葉子節(jié)點,這些葉子節(jié)點也可以組成一個鏈表,并把這個鏈表的表頭拿出來,方便直訪問數據。有些文章認為這對于范圍搜索來說是個巨大的優(yōu)化。但是就我的看法,這個特性最大的作用僅僅是讓代碼更容易一些,性能上,只會比樹的遍歷差,而不會比樹的遍歷好。因為不管是用指向葉子節(jié)點的指針搜,還是用樹的遍歷搜,所搜索的節(jié)點的數量都是幾乎相同的。在相同大小的范圍搜索的性能,只取決于訪問順序的連續(xù)性。從樹根向下遍歷,那么一次可以取得大量的子節(jié)點的范圍,并針對這些節(jié)點做訪問排序,得到更好的訪問連續(xù)性。如果是沿著指向兄弟節(jié)點的指針搜索,一是兄弟節(jié)點也許是后插入的,存放并不一定和自己是連續(xù)的,二是只有每次從硬盤中將該節(jié)點載入到內存,才知道兄弟節(jié)點放在硬盤哪個位置,這又變成了對硬盤的一個隨機的同步操作,性能的下降可想而知。

說 b+tree 因為有指向兄弟節(jié)點的指針方便數據庫掃庫這種結論,是不正確的。

還是上代碼吧,依舊只是在內存對數據結構插入刪除查找的模擬

be

#!/usr/bin/env python
from random import randint,choice
from bisect import bisect_right,bisect_left
from collections import deque
class InitError(Exception):
    pass
class ParaError(Exception):
    pass
class KeyValue(object):
    __slots__=('key','value')
    def __init__(self,key,value):
        self.key=key
        self.value=value
    def __str__(self):
        return str((self.key,self.value))
    def __cmp__(self,key):
        if self.key>key:
            return 1
        elif self.key==key:
            return 0
        else:
            return -1
class Bptree_InterNode(object):
    def __init__(self,M):
        if not isinstance(M,int):
            raise InitError,'M must be int'
        if M<=3:
            raise InitError,'M must be greater then 3'
        else:
            self.__M=M
            self.clist=[]
            self.ilist=[]
            self.par=None
    def isleaf(self):
        return False
    def isfull(self):
        return len(self.ilist)>=self.M-1
    def isempty(self):
        return len(self.ilist)<=(self.M+1)/2-1
    @property
    def M(self):
        return self.__M
class Bptree_Leaf(object):
    def __init__(self,L):
        if not isinstance(L,int):
            raise InitError,'L must be int'
        else:
            self.__L=L
            self.vlist=[]
            self.bro=None
            self.par=None
    def isleaf(self):
        return True
    def isfull(self):
        return len(self.vlist)>self.L
    def isempty(self):
        return len(self.vlist)<=(self.L+1)/2
    @property
    def L(self):
        return self.__L
class Bptree(object):
    def __init__(self,M,L):
        if L>M:
            raise InitError,'L must be less or equal then M'
        else:
            self.__M=M
            self.__L=L
            self.__root=Bptree_Leaf(L)
            self.__leaf=self.__root
    @property
    def M(self):
        return self.__M
    @property
    def L(self):
        return self.__L
    def insert(self,key_value):
        node=self.__root
        def split_node(n1):
            mid=self.M/2
            newnode=Bptree_InterNode(self.M)
            newnode.ilist=n1.ilist[mid:]
            newnode.clist=n1.clist[mid:]
            newnode.par=n1.par
            for c in newnode.clist:
                c.par=newnode
            if n1.par is None:
                newroot=Bptree_InterNode(self.M)
                newroot.ilist=[n1.ilist[mid-1]]
                newroot.clist=[n1,newnode]
                n1.par=newnode.par=newroot
                self.__root=newroot
            else:
                i=n1.par.clist.index(n1)
                n1.par.ilist.insert(i,n1.ilist[mid-1])
                n1.par.clist.insert(i+1,newnode)
            n1.ilist=n1.ilist[:mid-1]
            n1.clist=n1.clist[:mid]
            return n1.par
        def split_leaf(n2):
            mid=(self.L+1)/2
            newleaf=Bptree_Leaf(self.L)
            newleaf.vlist=n2.vlist[mid:]
            if n2.par==None:
                newroot=Bptree_InterNode(self.M)
                newroot.ilist=[n2.vlist[mid].key]
                newroot.clist=[n2,newleaf]
                n2.par=newleaf.par=newroot
                self.__root=newroot
            else:
                i=n2.par.clist.index(n2)
                n2.par.ilist.insert(i,n2.vlist[mid].key)
                n2.par.clist.insert(i+1,newleaf)
                newleaf.par=n2.par
            n2.vlist=n2.vlist[:mid]
            n2.bro=newleaf
        def insert_node(n):
            if not n.isleaf():
                if n.isfull():
                    insert_node(split_node(n))
                else:
                    p=bisect_right(n.ilist,key_value)
                    insert_node(n.clist[p])
            else:
                p=bisect_right(n.vlist,key_value)
                n.vlist.insert(p,key_value)
                if n.isfull():
                    split_leaf(n)
                else:
                    return
        insert_node(node)
    def search(self,mi=None,ma=None):
        result=[]
        node=self.__root
        leaf=self.__leaf
        if mi is None and ma is None:
            raise ParaError,'you need to setup searching range'
        elif mi is not None and ma is not None and mi>ma:
            raise ParaError,'upper bound must be greater or equal than lower bound'
        def search_key(n,k):
            if n.isleaf():
                p=bisect_left(n.vlist,k)
                return (p,n)
            else:
                p=bisect_right(n.ilist,k)
                return search_key(n.clist[p],k)
        if mi is None:
            while True:
                for kv in leaf.vlist:
                    if kv<=ma:
                        result.append(kv)
                    else:
                        return result
                if leaf.bro==None:
                    return result
                else:
                    leaf=leaf.bro
        elif ma is None:
            index,leaf=search_key(node,mi)
            result.extend(leaf.vlist[index:])
            while True:
                if leaf.bro==None:
                    return result
                else:
                    leaf=leaf.bro
                    result.extend(leaf.vlist)
        else:
            if mi==ma:
                i,l=search_key(node,mi)
                try:
                    if l.vlist[i]==mi:
                        result.append(l.vlist[i])
                        return result
                    else:
                        return result
                except IndexError:
                    return result
            else:
                i1,l1=search_key(node,mi)
                i2,l2=search_key(node,ma)
                if l1 is l2:
                    if i1==i2:
                        return result
                    else:
                        result.extend(l.vlist[i1:i2])
                        return result
                else:
                    result.extend(l1.vlist[i1:])
                    l=l1
                    while True:
                        if l.bro==l2:
                            result.extend(l2.vlist[:i2+1])
                            return result
                        else:
                            result.extend(l.bro.vlist)
                            l=l.bro
    def traversal(self):
        result=[]
        l=self.__leaf
        while True:
            result.extend(l.vlist)
            if l.bro==None:
                return result
            else:
                l=l.bro
    def show(self):
        print 'this b+tree is:\n'
        q=deque()
        h=0
        q.append([self.__root,h])
        while True:
            try:
                w,hei=q.popleft()
            except IndexError:
                return
            else:
                if not w.isleaf():
                    print w.ilist,'the height is',hei
                    if hei==h:
                        h+=1
                    q.extend([[i,h] for i in w.clist])
                else:
                    print [v.key for v in w.vlist],'the leaf is,',hei

    def delete(self,key_value):
        def merge(n,i):
            if n.clist[i].isleaf():
                n.clist[i].vlist=n.clist[i].vlist+n.clist[i+1].vlist
                n.clist[i].bro=n.clist[i+1].bro
            else:
                n.clist[i].ilist=n.clist[i].ilist+[n.ilist[i]]+n.clist[i+1].ilist
                n.clist[i].clist=n.clist[i].clist+n.clist[i+1].clist
            n.clist.remove(n.clist[i+1])
            n.ilist.remove(n.ilist[i])
            if n.ilist==[]:
                n.clist[0].par=None
                self.__root=n.clist[0]
                del n
                return self.__root
            else:
                return n
        def tran_l2r(n,i):
            if not n.clist[i].isleaf():
                n.clist[i+1].clist.insert(0,n.clist[i].clist[-1])
                n.clist[i].clist[-1].par=n.clist[i+1]
                n.clist[i+1].ilist.insert(0,n.ilist[i])
                n.ilist[i]=n.clist[i].ilist[-1]
                n.clist[i].clist.pop()
                n.clist[i].ilist.pop()
            else:
                n.clist[i+1].vlist.insert(0,n.clist[i].vlist[-1])
                n.clist[i].vlist.pop()
                n.ilist[i]=n.clist[i+1].vlist[0].key
        def tran_r2l(n,i):
            if not n.clist[i].isleaf():
                n.clist[i].clist.append(n.clist[i+1].clist[0])
                n.clist[i+1].clist[0].par=n.clist[i]
                n.clist[i].ilist.append(n.ilist[i])
                n.ilist[i]=n.clist[i+1].ilist[0]
                n.clist[i+1].clist.remove(n.clist[i+1].clist[0])
                n.clist[i+1].ilist.remove(n.clist[i+1].ilist[0])
            else:
                n.clist[i].vlist.append(n.clist[i+1].vlist[0])
                n.clist[i+1].vlist.remove(n.clist[i+1].vlist[0])
                n.ilist[i]=n.clist[i+1].vlist[0].key
        def del_node(n,kv):
            if not n.isleaf():
                p=bisect_right(n.ilist,kv)
                if p==len(n.ilist):
                    if not n.clist[p].isempty():
                        return del_node(n.clist[p],kv)
                    elif not n.clist[p-1].isempty():
                        tran_l2r(n,p-1)
                        return del_node(n.clist[p],kv)
                    else:
                        return del_node(merge(n,p),kv)
                else:
                    if not n.clist[p].isempty():
                        return del_node(n.clist[p],kv)
                    elif not n.clist[p+1].isempty():
                        tran_r2l(n,p)
                        return del_node(n.clist[p],kv)
                    else:
                        return del_node(merge(n,p),kv)
            else:
                p=bisect_left(n.vlist,kv)
                try:
                    pp=n.vlist[p]
                except IndexError:
                    return -1
                else:
                    if pp!=kv:
                        return -1
                    else:
                        n.vlist.remove(kv)
                        return 0
        del_node(self.__root,key_value)
def test():
    mini=2
    maxi=60
    testlist=[]
    for i in range(1,10):
        key=i
        value=i
        testlist.append(KeyValue(key,value))
    mybptree=Bptree(4,4)
    for kv in testlist:
        mybptree.insert(kv)
    mybptree.delete(testlist[0])
    mybptree.show()
    print '\nkey of this b+tree is \n'
    print [kv.key for kv in mybptree.traversal()]
    #print [kv.key for kv in mybptree.search(mini,maxi)]
if __name__=='__main__':
    test()

實現過程和 btree 很像,不過有幾點顯著不同。

  1. 內節(jié)點不存儲 key-value,只存放 key
  2. 沿著內節(jié)點搜索的時候,查到索引相等的數要向樹的右邊走。所以二分查找要選擇 bisect_right
  3. 在葉子節(jié)點滿的時候,并不是先分裂再插入而是先插入再分裂。因為 b+tree 無法保證分裂的兩個節(jié)點的大小都是相等的。在奇數大小的數據分裂的時候右邊的子節(jié)點會比左邊的大。如果先分裂再插入無法保證插入的節(jié)點一定會插在數量更少的子節(jié)點上,滿足節(jié)點數量平衡的條件。
  4. 在刪除數據的時候,b+tree 的左右子節(jié)點借數據的方式比 btree 更加簡單有效,只把子節(jié)點的子樹直接剪切過來,再把索引變一下就行了,而且葉子節(jié)點的兄弟指針也不用動。