因为自己平时写计算的代码比较多,很少写树结构的(决策树算法的实现零零整整花了近两周),所以数据结构和代码效率上待优化的地方应该还有很多,仅提供给大家借鉴。
类的定义
自己网上搜到的代码很多都没有定义类,我自己对于这样的代码很是不喜欢,一点点看算法流程是看不下去的。因为是树,所以一定存在节点类和树类。
节点类
entropy是未分支前的交叉熵
label表示的是当前节点的标签,那类数据多,标签就是啥
divide_info存储了分支的信息
except_error在后面剪枝的时候使用
好吧,下面两个set函数好像没什么必要
class Node():
def __init__(self, entropy=-1, label=None, divide_info=None):
self.entropy = entropy
self.label = label
self.divide_info = divide_info
self.left_child = None
self.right_child = None
self.except_error = None
def set_left(self, node):
self.left_child = node
def set_right(self, node):
self.right_child = node
- 树类
- 实现了树的广度优先遍历和查询节点所在层数两个功能,两个功能实现方法差不多。
class Tree():
def __init__(self):
self.root = Node()
def breadth_traversal(self):
queue = []
result = []
if self.root.entropy == -1:
return
else:
queue.append(self.root)
while queue:
next_queue = []
next_result = []
for node in queue:
next_result.append(node.label)
if node.left_child:
next_queue.append(node.left_child)
next_queue.append(node.right_child)
result.append(next_result)
queue = next_queue
for i in range(len(result)):
print("第", i, "层信息", result[i])
return
def query_layer(self, node):
if self.root.entropy != -1:
queue = [self.root]
layer = 0
while queue:
next_queue = []
for i in queue:
if i == node:
return layer
if i.left_child:
next_queue.append(i.left_child)
next_queue.append(i.right_child)
layer = layer +1
queue = next_queue
return
else:
return
决策树类
决策树类的代码有点长,一步一步贴代码吧。
初始化函数
初始化嘛,初始化一个树就好了。
class Decision_tree(): def __init__(self): self.tree = Tree()
fit函数——根据数据构造树
因为用的方法是递归的方法,所以有必要加一个node参数,告诉它的位置在哪里。其他参数的话看看名字应该就知道什么意思,就不说了。
里面调用了返回分支信息的find_variable函数和计算熵的entropy函数,这两个函数放在最后说。
def fit(self, X, y, minimum_entropy=0, cur_node=None, maximum_layer = np.inf): """ build decision tree according to data param: X:features, array_like y:label, array_like minimun_entropy:minimum entropy difference allowed cur_node: current node maximum_layer: maximumn layer allowed, count from 0 """ X = pd.DataFrame(X) y = pd.DataFrame(y) minimum_entropy = minimum_entropy maximum_layer = maximum_layer current_node = cur_node if cur_node: cur_layer = self.tree.query_layer(cur_node) if cur_layer >= maximum_layer: return label = y.iloc[:, 0].value_counts().index[0] current_entropy = entropy(y) divided_info, left_index, right_index, divide_column = find_variable(X, y, minimum_entropy) if self.tree.root.entropy == -1: if divided_info == None: print("无需决策分类") else: self.tree.root = Node(entropy=current_entropy, label=label, divide_info=divided_info) X_next = X[X.columns.difference([divide_column])] X_left = X_next[left_index] y_left = y[left_index] X_right = X_next[right_index] y_right = y[right_index] self.fit(X_left, y_left, minimum_entropy,self.tree.root, maximum_layer) self.fit(X_right, y_right, minimum_entropy, self.tree.root, maximum_layer) else: if divided_info == None: if cur_node.left_child == None: cur_node.left_child = Node(entropy=None, label=label) return else: cur_node.right_child = Node(entropy=None, label=label) return else: X_next = X[X.columns.difference([divide_column])] X_left = X_next[left_index] y_left = y[left_index] X_right = X_next[right_index] y_right = y[right_index] if cur_node.left_child == None: cur_node.set_left(Node(current_entropy, label, divided_info)) self.fit(X_left, y_left,minimum_entropy, cur_node.left_child, maximum_layer) self.fit(X_right, y_right, minimum_entropy, cur_node.left_child, maximum_layer) else: cur_node.set_right(Node(current_entropy, label, divided_info)) self.fit(X_left, y_left, minimum_entropy, cur_node.right_child, maximum_layer) self.fit(X_right, y_right, minimum_entropy, cur_node.right_child, maximum_layer)
post_pruning——后剪枝函数
函数调用了add_error函数为每个节点添加悲观的期望错误数信息,添加后再从下往上进行剪枝。这个从下往上剪枝的过程借鉴了广度优先遍历的思想。同样利用到的error_rate函数放到了最后面。
def post_pruning(self, X_test, y_test): self.add_error(X_test, y_test) queue = [self.tree.root] node_layer = [queue] while queue: next_queue = [] for node in queue: if node.divide_info: next_queue.append(node.left_child) next_queue.append(node.right_child) node_layer.append(next_queue) queue = next_queue for i in range(len(node_layer)-2, -1, -1): current_layer = node_layer[i] for node in current_layer: if node.divide_info: if node.except_error < node.left_child.except_error + node.right_child.except_error: node.left_child = None node.right_child = None node.divide_info = None
predict——预测函数
这个函数的实现也是用了递归,要注意的一点就是一定要把函数返回结果传给X[“predict”]
def predict(self, X, y, node=None): X = pd.DataFrame(X) y = pd.DataFrame(y) if node==None: cur_node = self.tree.root X["predict"] = y.copy() if self.tree.root.divide_info: column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"] # 判定变量是分类还是连续 if len(X[column].unique()) <= 4: left_index = (X[column] == value).values else: left_index = (X[column] <= value).values right_index = ~left_index X_left, y_left = X[left_index], y[left_index] X_right, y_right = X[right_index], y[right_index] X[left_index]["predict"] = cur_node.left_child.label X[right_index]["predict"] = cur_node.right_child.label X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child) X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child) else: return else: cur_node = node if cur_node.divide_info: column, value = cur_node.divide_info["column"], cur_node.divide_info["value"] if len(X[column].unique()) <= 4: left_index = (X[column] == value).values else: left_index = (X[column] <= value).values right_index = ~left_index X_left, y_left = X[left_index], y[left_index] X_right, y_right = X[right_index], y[right_index] X[left_index]["predict"] = cur_node.left_child.label X[right_index]["predict"] = cur_node.right_child.label X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child) X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child) else: return return X["predict"]
add_rate——剪枝辅助函数
这个算法也是递归实现的(好吧,我喜欢递归)。或许你注意到了判断变量是否连续的判断条件(如果你没有注意到,那你现在需要注意到了),它可能不是精确的,如果你有更好判断变量类型的方法欢迎你告诉我。
def add_error(self, X_test, y_test, node=None): X = pd.DataFrame(X_test) y = pd.DataFrame(y_test) N = len(X) if node == None: cur_node = self.tree.root error = error_rate(y, cur_node.label) cur_node.except_error = N * (error + 1.15 * np.sqrt(error * (1 - error) / N)) if self.tree.root.divide_info: column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"] # 判定变量是分类还是连续 if len(X[column].unique()) <= 4: left_index = (X[column] == value).values else: left_index = (X[column] <= value).values right_index = ~left_index X_left, y_left = X[left_index], y[left_index] X_right, y_right = X[right_index], y[right_index] self.add_error(X_left, y_left, cur_node.left_child) self.add_error(X_right, y_right, cur_node.right_child) else: return else: cur_node = node error = error_rate(y, cur_node.label) cur_node.except_error = error + 1.15 * np.sqrt(error * (1 - error) / N) if cur_node.divide_info: column, value = cur_node.divide_info["column"], cur_node.divide_info["value"] if len(X[column].unique()) <= 4: left_index = (X[column] == value).values else: left_index = (X[column] <= value).values right_index = ~left_index X_left, y_left = X[left_index], y[left_index] X_right, y_right = X[right_index], y[right_index] self.add_error(X_left, y_left, cur_node.left_child) self.add_error(X_right, y_right, cur_node.right_child) else: return
辅助函数
一共也就三个辅助函数:计算熵、查找分支信息和计算错误率函数。
- entropy——计算熵
def entropy(y):
category = y.apply(pd.value_counts) / len(y)
return (-category * np.log2(category)).sum()[0]
find_variable——查找分支信息
跟前面add_rate函数一样,你需要注意变量类型判断条件。
def find_variable(X, y, minimum_entropy=0):
""" find the variable that split the data set getting minmum entropy param: X:array_like y:array_like minimum_entropy:minimum difference allowed return: variable_name: str, split information X_left_index: array_like, left dateset index X_right_index:array_like, right dataset index column:str or int depending on X's columns, split column """
X = pd.DataFrame(X)
y = pd.DataFrame(y)
minimum_entropy = minimum_entropy
assert len(y.iloc[:, 0].unique()) <= 2
label = y.iloc[:, 0].value_counts().index[0]
current_entropy = entropy(y)
if (current_entropy == 0) | (X.shape[1] == 0):
return (None, None, None, None)
min_divide_entropy = np.inf # 标记最小交叉熵
divide_variable = None # 标记分组信息
divide_column = None # 标记分组column名称
X_left_index = None # 左子树序列
X_right_index = None # 右子树序列
for column in X.columns:
category_index = X.loc[:, column].unique() # 当前属性的种类索引
category_number = len(category_index) # 种类数量
# 如果变量是0-1变量
if category_number == 2:
temp_left_index = (X[column] == category_index[0]).values
temp_right_index = ~temp_left_index
temp_y1 = y[temp_left_index]
temp_y2 = y[temp_right_index]
temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(
temp_y2) # 计算当前变脸的熵
if temp_entropy < min_divide_entropy:
X_left_index = temp_left_index
X_right_index = temp_right_index
min_divide_entropy = temp_entropy
divide_variable = {"column":column, "value":category_index[0]}
divide_column = column
# 当是无序的分类变量时
elif (category_number >= 3) & (category_number <= 4):
best_divide_level = category_index[0]
best_divide_value_entropy = np.inf
for level in category_index:
set1_index = (X[column] == level).values
set2_index = ~set1_index
temp_y1 = y[set1_index]
temp_y2 = y[set2_index]
temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
if temp_entropy < best_divide_value_entropy:
best_divide_level = level
best_divide_value_entropy = temp_entropy
if best_divide_value_entropy < min_divide_entropy:
X_left_index = (X[column] == best_divide_level).values
X_right_index = ~X_left_index
min_divide_entropy = best_divide_value_entropy
divide_variable = {"column":column, "value":best_divide_level}
divide_column = column
# 连续变量
else:
all_possible_value = np.sort(X[column].unique())
best_divide_value = np.inf
best_divide_value_entropy = np.inf
for i in all_possible_value:
set1_index = (X[column] <= i).values
set2_index = ~set1_index
temp_y1 = y[set1_index]
temp_y2 = y[set2_index]
temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
if temp_entropy < best_divide_value_entropy:
best_divide_value_entropy = temp_entropy
best_divide_value = i
if best_divide_value_entropy < min_divide_entropy:
X_left_index = (X[column] <= best_divide_value).values
X_right_index = ~X_right_index
min_divide_entropy = best_divide_value_entropy
divide_variable = {"column":column, "value":best_divide_value}
divide_column = column
if (current_entropy - min_divide_entropy) < minimum_entropy:
return (None, None, None, None)
return (divide_variable, X_left_index, X_right_index, divide_column)
- error_rate——计算错误率
def error_rate(y_true, label):
y = pd.DataFrame(y_true)
return len(y[y != label])/len(y)
小结
到这里,算法基本编完了,可以小测一下:
if __name__ == "__main__":
X = pd.DataFrame([["Rainy", "Saturday"],
["Sunny", "Saturday"],
["Windy", "Tuesday"],
["Sunny", "Saturday"],
["Sunny", "Monday"],
["Windy", "Saturday"]], columns=["Weather", "Dow"])
y = pd.DataFrame([["No"],
["Yes"],
["No"],
["Yes"],
["No"],
["No"]], columns=["Play"])
decision_tree = Decision_tree()
decision_tree.fit(X, y, 0.4, maximum_layer=2)
decision_tree.tree.breadth_traversal()
decision_tree.post_pruning(X, y)
decision_tree.tree.breadth_traversal()
result = decision_tree.predict(X, y)
print(result)
没想到吧,猛男的测试就是这么的温柔。因为实在是太懒了,所以请你们加大测试力度,如果有错误记得提醒我。
对于日后想从事数据分析甚至算法岗位的我来说,数据结构有必要补一补了,还有放在文件夹里的C++。