- 熵\
假设离散变量X的概率分布是P(X)
H(P)=−∑xp(x)logp(x)
- 最大熵模型定义\
假设分类模型是条件概率P(Y|X),给定训练数据集T={(x1,y1),(x2,y2),…,(xn,yn)}学习目标就是用最大熵原理选择最好的模型\
特征函数f(x,y)关于经验分布P*(X,Y)的期望值为:
Ep∗=∑x,yP∗(x,y)f(x,y)
# 特征函数f(x,y)关于模型P(Y|X)与经验分布P*(X)的期望值为:
Ep(f)=∑x,yP∗xP(y|x)f(x,y)
我们假设两个期望值相等。
∑x,yP∗(x,y)f(x,y)=∑x,yP∗(x,y)f(x,y)
假如有n个特征函数,那么就有n个约束。\
定义:最大熵模型
条件概率分布P(Y|X)的条件熵最大,且满足上述期望值相等的约束:
maxH(P)=−∑x,yp∗(x)p(y|x)logp(y|x)
s.t.∑x,yP∗(x,y)f(x,y)=∑x,yP∗(x,y)f(x,y)
s.t.∑yp(y|x)=1
- 最大熵模型学习
min−H(P)=∑x,yp∗(x)p(y|x)logp(y|x)
s.t.∑x,yP∗(x,y)f(x,y)=∑x,yP∗(x,y)f(x,y)
s.t.∑yp(y|x)=1
引入拉格朗日因子w0,w1,…,wn
L(P,w)=−H(P)+w0(1−∑yp(y|x))+∑ni=1wi(∑x,yP∗(x,y)f(x,y)−∑x,yP∗(x,y)f(x,y))
minp∈CmaxwL(P,w)
对偶问题:
maxwminp∈CL(P,w)
Ψ(w)=minp∈CL(P,w),Pw=argminP∈CL(P,w)=Pw(y|x)
对L(P,w)中P(y|x)的偏导数且为0得到:
pw(y|x)=1Zw(x)exp(∑ni=1wifi(x,y))
P_w就是最大熵模型。\
之后求解对偶问题外部极大化问题:
maxwΨ(w),w∗=argmaxwΨ(w)
Ψ(w)=∑x,yP∗(x)pw(y|x)logPw(y|x)+∑ni=1wi(∑x,yP∗(x,y)f(x,y)−∑x,yP∗(x)Pw(y|x)f(x,y))
maxwΨ(w)=maxw∑x,yP∗(x,y)∑ni=1wifi(x,y)−∑xP∗(x)logZw(x)
- 对最大熵模型极大似然估计 == 对偶函数极大化\
极大似然估计:
Lp(Pw)=logΠx,yP(y|x)P∗(x,y)=∑x,yP∗(x,y)logP(y|x)
=∑x,yP∗(x,y)∑ni=1wifi(x,y)−∑xP∗(x)logZw(x)
from _collections import defaultdict
import math
from ensurepip import __main__
import codecs
class MaxEnt(object):
''' classdocs '''
def __init__(self):
''' Constructor '''
self.samples = []
self.labels = []
self.N = 0
self.M = 0 #特征数量
self.lambdas = []
self.last_lambdas = []
self.current_lambdas = []
self.C = 0
self.stepValues = [];
self._ep_ = [] #
self._ep = []
self.numXY = defaultdict(int)
self.featureId_map = {}
self.Y = []
def fit(self,trainX,trainY,iterNum=100):
self.samples = trainX
self.labels = trainY
self.Y = set(trainY)
self.N = len(trainY)
# self.M = len(trainX[0])
# self.getC()
self.C = max([len(sample) for sample in trainX])
for id,sample in enumerate(self.samples):
y = self.labels[id]
for x in set(sample):
self.numXY[(x,y)] += 1.0
self.M = len(self.numXY.keys())
self.train(iterNum)
def _EP_(self):
#self._ep_ = [xyCount/self.N for xyCount in self.numXY]
for id,xy in enumerate(self.numXY.keys()):
self._ep_.append(self.numXY[xy]/self.N)
self.featureId_map[xy] = id
print len(self._ep_)
def ZX(self,sample):
sumY = 0.0
for y in self.Y:
sum = 0.0
for x in sample:
if self.numXY.has_key((x,y)):
sum += self.current_lambdas[self.featureId_map[(x,y)]]
sumY += math.exp(sum)
return sumY
def pXY(self,sample):
pxy = 0.0
ZX_sum = self.ZX(sample)
result = []
for y in self.Y:
pxy_sum = 0.0
for x in sample:
if self.numXY.has_key((x,y)):
pxy_sum += self.current_lambdas[self.featureId_map[(x,y)]]
result.append((math.exp(pxy_sum)/ZX_sum,y))
return result
def _Ep(self):
self._ep = [0.0]*self.M
for sample in self.samples:
pxy = self.pXY(sample)
for p,y in pxy:
for x in sample:
if self.numXY.has_key((x,y)):
self._ep[self.featureId_map[(x,y)]] += p*1.0/self.N
def train(self,iterNum):
self.current_lambdas = [0.0]*self.M
print len(self.current_lambdas)
self._EP_()
for iter in range(iterNum):
#self.last_lambdas = self.current_lambdas
self._Ep()
for id,w in enumerate(self.current_lambdas):
# print id
self.current_lambdas[id] = w + 1.0/self.C*math.log(self._ep_[id]/self._ep[id])
print self.current_lambdas
def predict(self,testX):
X = testX
p = self.pXY(X)
print p
#def predict(self,testX):
def loadfile():
trainX = []
trainY = []
for line in codecs.open("./train",'r','utf-8').readlines():
trainY.append(line.strip().split("\t")[0])
trainX.append(line.strip().split("\t")[1:])
return trainX,trainY
if __name__ == "__main__":
maxEnt = MaxEnt()
trainX,trainY = loadfile()
maxEnt.fit(trainX,trainY,1000)
maxEnt.predict(["sunny", "hot", "high", "FALSE"])
maxEnt.predict(["sunny", "hot", "high", "True"])
maxEnt.predict(["overcast", "hot", "high", "FALSE"])###yes
maxEnt.predict(["sunny", "hot", "high", "FALSE"])
maxEnt.predict(["sunny", "hot", "high", "FALSE"])