决策树

因为自己平时写计算的代码比较多,很少写树结构的(决策树算法的实现零零整整花了近两周),所以数据结构和代码效率上待优化的地方应该还有很多,仅提供给大家借鉴。

类的定义

自己网上搜到的代码很多都没有定义类,我自己对于这样的代码很是不喜欢,一点点看算法流程是看不下去的。因为是树,所以一定存在节点类和树类。

  • 节点类

    • entropy是未分支前的交叉熵

    • label表示的是当前节点的标签,那类数据多,标签就是啥

    • divide_info存储了分支的信息

    • except_error在后面剪枝的时候使用

    • 好吧,下面两个set函数好像没什么必要

class Node():
	def __init__(self, entropy=-1, label=None, divide_info=None):
        self.entropy = entropy
        self.label = label
        self.divide_info = divide_info
        self.left_child = None
        self.right_child = None
        self.except_error = None

    def set_left(self, node):
        self.left_child = node

    def set_right(self, node):
        self.right_child = node
  • 树类
    • 实现了树的广度优先遍历和查询节点所在层数两个功能,两个功能实现方法差不多。
class Tree():
    def __init__(self):
        self.root = Node()

    def breadth_traversal(self):
        queue = []
        result = []
        if self.root.entropy == -1:
            return
        else:
            queue.append(self.root)
            while queue:
                next_queue = []
                next_result = []
                for node in queue:
                    next_result.append(node.label)
                    if node.left_child:
                        next_queue.append(node.left_child)
                        next_queue.append(node.right_child)
                result.append(next_result)
                queue = next_queue
        for i in range(len(result)):
            print("第", i, "层信息", result[i])
        return
	
	
    def query_layer(self, node):
        if self.root.entropy != -1:
            queue = [self.root]
            layer = 0
            while queue:
                next_queue = []
                for i in queue:
                    if i == node:
                        return layer
                    if i.left_child:
                        next_queue.append(i.left_child)
                        next_queue.append(i.right_child)
                layer = layer +1
                queue = next_queue
            return
        else:
            return
  • 决策树类

    决策树类的代码有点长,一步一步贴代码吧。

    • 初始化函数

      初始化嘛,初始化一个树就好了。

    class Decision_tree():
    
        def __init__(self):
            self.tree = Tree()
    
    • fit函数——根据数据构造树

      因为用的方法是递归的方法,所以有必要加一个node参数,告诉它的位置在哪里。其他参数的话看看名字应该就知道什么意思,就不说了。

      里面调用了返回分支信息的find_variable函数和计算熵的entropy函数,这两个函数放在最后说。

    def fit(self, X, y, minimum_entropy=0, cur_node=None, maximum_layer = np.inf):
        """ build decision tree according to data param: X:features, array_like y:label, array_like minimun_entropy:minimum entropy difference allowed cur_node: current node maximum_layer: maximumn layer allowed, count from 0 """
        X = pd.DataFrame(X)
        y = pd.DataFrame(y)
        minimum_entropy = minimum_entropy
        maximum_layer = maximum_layer
        current_node = cur_node
        if cur_node:
            cur_layer = self.tree.query_layer(cur_node)
            if cur_layer >= maximum_layer:
                return
    
        label = y.iloc[:, 0].value_counts().index[0]
        current_entropy = entropy(y)
    
        divided_info, left_index, right_index, divide_column = find_variable(X, y, minimum_entropy)
    
        if self.tree.root.entropy == -1:
            if divided_info == None:
                print("无需决策分类")
            else:
                self.tree.root = Node(entropy=current_entropy, label=label, divide_info=divided_info)
                X_next = X[X.columns.difference([divide_column])]
                X_left = X_next[left_index]
                y_left = y[left_index]
                X_right = X_next[right_index]
                y_right = y[right_index]
                self.fit(X_left, y_left, minimum_entropy,self.tree.root, maximum_layer)
                self.fit(X_right, y_right, minimum_entropy, self.tree.root, maximum_layer)
        else:
            if divided_info == None:
                if cur_node.left_child == None:
                    cur_node.left_child = Node(entropy=None, label=label)
                    return
                else:
                    cur_node.right_child = Node(entropy=None, label=label)
                    return
            else:
                X_next = X[X.columns.difference([divide_column])]
                X_left = X_next[left_index]
                y_left = y[left_index]
                X_right = X_next[right_index]
                y_right = y[right_index]
                if cur_node.left_child == None:
                    cur_node.set_left(Node(current_entropy, label, divided_info))
                    self.fit(X_left, y_left,minimum_entropy, cur_node.left_child, maximum_layer)
                    self.fit(X_right, y_right, minimum_entropy, cur_node.left_child, maximum_layer)
                else:
                    cur_node.set_right(Node(current_entropy, label, divided_info))
                    self.fit(X_left, y_left, minimum_entropy, cur_node.right_child, maximum_layer)
                    self.fit(X_right, y_right, minimum_entropy, cur_node.right_child, maximum_layer)
    
    • post_pruning——后剪枝函数

      函数调用了add_error函数为每个节点添加悲观的期望错误数信息,添加后再从下往上进行剪枝。这个从下往上剪枝的过程借鉴了广度优先遍历的思想。同样利用到的error_rate函数放到了最后面。

    def post_pruning(self, X_test, y_test):
        self.add_error(X_test, y_test)
        queue = [self.tree.root]
        node_layer = [queue]
        while queue:
            next_queue = []
            for node in queue:
                if node.divide_info:
                    next_queue.append(node.left_child)
                    next_queue.append(node.right_child)
            node_layer.append(next_queue)
            queue = next_queue
        for i in range(len(node_layer)-2, -1, -1):
            current_layer = node_layer[i]
            for node in current_layer:
                if node.divide_info:
                    if node.except_error < node.left_child.except_error + node.right_child.except_error:
                        node.left_child = None
                        node.right_child = None
                        node.divide_info = None
    
    • predict——预测函数

      这个函数的实现也是用了递归,要注意的一点就是一定要把函数返回结果传给X[“predict”]

    def predict(self, X, y, node=None):
        X = pd.DataFrame(X)
        y = pd.DataFrame(y)
        if node==None:
            cur_node = self.tree.root
            X["predict"] = y.copy()
            if self.tree.root.divide_info:
                column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"]
                # 判定变量是分类还是连续
                if len(X[column].unique()) <= 4:
                    left_index = (X[column] == value).values
                else:
                    left_index = (X[column] <= value).values
                right_index = ~left_index
                X_left, y_left = X[left_index], y[left_index]
                X_right, y_right = X[right_index], y[right_index]
                X[left_index]["predict"] = cur_node.left_child.label
                X[right_index]["predict"] = cur_node.right_child.label
                X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child)
                X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child)
            else:
                    return
        else:
            cur_node = node
            if cur_node.divide_info:
                column, value = cur_node.divide_info["column"], cur_node.divide_info["value"]
                if len(X[column].unique()) <= 4:
                    left_index = (X[column] == value).values
                else:
                    left_index = (X[column] <= value).values
                right_index = ~left_index
                X_left, y_left = X[left_index], y[left_index]
                X_right, y_right = X[right_index], y[right_index]
                X[left_index]["predict"] = cur_node.left_child.label
                X[right_index]["predict"] = cur_node.right_child.label
                X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child)
                X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child)
            else:
                return
        return X["predict"]
    
    • add_rate——剪枝辅助函数

      这个算法也是递归实现的(好吧,我喜欢递归)。或许你注意到了判断变量是否连续的判断条件(如果你没有注意到,那你现在需要注意到了),它可能不是精确的,如果你有更好判断变量类型的方法欢迎你告诉我。

    def add_error(self, X_test, y_test, node=None):
        X = pd.DataFrame(X_test)
        y = pd.DataFrame(y_test)
        N = len(X)
        if node == None:
            cur_node = self.tree.root
            error = error_rate(y, cur_node.label)
            cur_node.except_error = N * (error + 1.15 * np.sqrt(error * (1 - error) / N))
            if self.tree.root.divide_info:
                column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"]
                # 判定变量是分类还是连续
                if len(X[column].unique()) <= 4:
                    left_index = (X[column] == value).values
                else:
                    left_index = (X[column] <= value).values
                right_index = ~left_index
                X_left, y_left = X[left_index], y[left_index]
                X_right, y_right = X[right_index], y[right_index]
                self.add_error(X_left, y_left, cur_node.left_child)
                self.add_error(X_right, y_right, cur_node.right_child)
            else:
                return
        else:
            cur_node = node
            error = error_rate(y, cur_node.label)
            cur_node.except_error = error + 1.15 * np.sqrt(error * (1 - error) / N)
            if cur_node.divide_info:
                column, value = cur_node.divide_info["column"], cur_node.divide_info["value"]
                if len(X[column].unique()) <= 4:
                    left_index = (X[column] == value).values
                else:
                    left_index = (X[column] <= value).values
                right_index = ~left_index
                X_left, y_left = X[left_index], y[left_index]
                X_right, y_right = X[right_index], y[right_index]
                self.add_error(X_left, y_left, cur_node.left_child)
                self.add_error(X_right, y_right, cur_node.right_child)
            else:
                return
    

辅助函数

一共也就三个辅助函数:计算熵、查找分支信息和计算错误率函数。

  • entropy——计算熵
def entropy(y):
    category = y.apply(pd.value_counts) / len(y)
    return (-category * np.log2(category)).sum()[0]
  • find_variable——查找分支信息

    跟前面add_rate函数一样,你需要注意变量类型判断条件。

def find_variable(X, y, minimum_entropy=0):
    """ find the variable that split the data set getting minmum entropy param: X:array_like y:array_like minimum_entropy:minimum difference allowed return: variable_name: str, split information X_left_index: array_like, left dateset index X_right_index:array_like, right dataset index column:str or int depending on X's columns, split column """
    X = pd.DataFrame(X)
    y = pd.DataFrame(y)
    minimum_entropy = minimum_entropy

    assert len(y.iloc[:, 0].unique()) <= 2
    label = y.iloc[:, 0].value_counts().index[0]
    current_entropy = entropy(y)

    if (current_entropy == 0) | (X.shape[1] == 0):
        return (None, None, None, None)

    min_divide_entropy = np.inf  # 标记最小交叉熵
    divide_variable = None  # 标记分组信息
    divide_column = None  # 标记分组column名称
    X_left_index = None  # 左子树序列
    X_right_index = None  # 右子树序列

    for column in X.columns:
        category_index = X.loc[:, column].unique()  # 当前属性的种类索引
        category_number = len(category_index)  # 种类数量
        # 如果变量是0-1变量
        if category_number == 2:
            temp_left_index = (X[column] == category_index[0]).values
            temp_right_index = ~temp_left_index
            temp_y1 = y[temp_left_index]
            temp_y2 = y[temp_right_index]
            temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(
                temp_y2)  # 计算当前变脸的熵
            if temp_entropy < min_divide_entropy:
                X_left_index = temp_left_index
                X_right_index = temp_right_index
                min_divide_entropy = temp_entropy
                divide_variable = {"column":column, "value":category_index[0]}
                divide_column = column
        # 当是无序的分类变量时
        elif (category_number >= 3) & (category_number <= 4):
            best_divide_level = category_index[0]
            best_divide_value_entropy = np.inf
            for level in category_index:
                set1_index = (X[column] == level).values
                set2_index = ~set1_index
                temp_y1 = y[set1_index]
                temp_y2 = y[set2_index]
                temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
                if temp_entropy < best_divide_value_entropy:
                    best_divide_level = level
                    best_divide_value_entropy = temp_entropy
            if best_divide_value_entropy < min_divide_entropy:
                X_left_index = (X[column] == best_divide_level).values
                X_right_index = ~X_left_index
                min_divide_entropy = best_divide_value_entropy
                divide_variable = {"column":column, "value":best_divide_level}
                divide_column = column
        # 连续变量
        else:
            all_possible_value = np.sort(X[column].unique())
            best_divide_value = np.inf
            best_divide_value_entropy = np.inf
            for i in all_possible_value:
                set1_index = (X[column] <= i).values
                set2_index = ~set1_index
                temp_y1 = y[set1_index]
                temp_y2 = y[set2_index]
                temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
                if temp_entropy < best_divide_value_entropy:
                    best_divide_value_entropy = temp_entropy
                    best_divide_value = i
            if best_divide_value_entropy < min_divide_entropy:
                X_left_index = (X[column] <= best_divide_value).values
                X_right_index = ~X_right_index
                min_divide_entropy = best_divide_value_entropy
                divide_variable = {"column":column, "value":best_divide_value}
                divide_column = column
    if (current_entropy - min_divide_entropy) < minimum_entropy:
        return (None, None, None, None)
    return (divide_variable, X_left_index, X_right_index, divide_column)
  • error_rate——计算错误率
def error_rate(y_true, label):
    y = pd.DataFrame(y_true)
    return len(y[y != label])/len(y)

小结

到这里,算法基本编完了,可以小测一下:

if __name__ == "__main__":
    X = pd.DataFrame([["Rainy", "Saturday"],
                      ["Sunny", "Saturday"],
                      ["Windy", "Tuesday"],
                      ["Sunny", "Saturday"],
                      ["Sunny", "Monday"],
                      ["Windy", "Saturday"]], columns=["Weather", "Dow"])
    y = pd.DataFrame([["No"],
                      ["Yes"],
                      ["No"],
                      ["Yes"],
                      ["No"],
                      ["No"]], columns=["Play"])
    decision_tree = Decision_tree()
    decision_tree.fit(X, y, 0.4, maximum_layer=2)
    decision_tree.tree.breadth_traversal()
    decision_tree.post_pruning(X, y)
    decision_tree.tree.breadth_traversal()
    result = decision_tree.predict(X, y)
    print(result)

没想到吧,猛男的测试就是这么的温柔。因为实在是太懒了,所以请你们加大测试力度,如果有错误记得提醒我。

对于日后想从事数据分析甚至算法岗位的我来说,数据结构有必要补一补了,还有放在文件夹里的C++。

点赞