最大熵模型

  1. 熵\
    假设离散变量X的概率分布是P(X)

H(P)=xp(x)logp(x)

  1. 最大熵模型定义\
    假设分类模型是条件概率P(Y|X),给定训练数据集T={(x1,y1),(x2,y2),…,(xn,yn)}学习目标就是用最大熵原理选择最好的模型\

特征函数f(x,y)关于经验分布P*(X,Y)的期望值为:

Ep=x,yP(x,y)f(x,y)

# 特征函数f(x,y)关于模型P(Y|X)与经验分布P*(X)的期望值为:

Ep(f)=x,yPxP(y|x)f(x,y)
我们假设两个期望值相等。

x,yP(x,y)f(x,y)=x,yP(x,y)f(x,y)
假如有n个特征函数,那么就有n个约束。\
定义:最大熵模型
条件概率分布P(Y|X)的条件熵最大,且满足上述期望值相等的约束:

maxH(P)=x,yp(x)p(y|x)logp(y|x)

s.t.x,yP(x,y)f(x,y)=x,yP(x,y)f(x,y)

s.t.yp(y|x)=1

  1. 最大熵模型学习

minH(P)=x,yp(x)p(y|x)logp(y|x)

s.t.x,yP(x,y)f(x,y)=x,yP(x,y)f(x,y)

s.t.yp(y|x)=1

引入拉格朗日因子w0,w1,…,wn

L(P,w)=H(P)+w0(1yp(y|x))+ni=1wi(x,yP(x,y)f(x,y)x,yP(x,y)f(x,y))

minpCmaxwL(P,w)
对偶问题:

maxwminpCL(P,w)

Ψ(w)=minpCL(P,w),Pw=argminPCL(P,w)=Pw(y|x)

对L(P,w)中P(y|x)的偏导数且为0得到:

pw(y|x)=1Zw(x)exp(ni=1wifi(x,y))

P_w就是最大熵模型。\
之后求解对偶问题外部极大化问题:

maxwΨ(w),w=argmaxwΨ(w)

Ψ(w)=x,yP(x)pw(y|x)logPw(y|x)+ni=1wi(x,yP(x,y)f(x,y)x,yP(x)Pw(y|x)f(x,y))

maxwΨ(w)=maxwx,yP(x,y)ni=1wifi(x,y)xP(x)logZw(x)

  1. 对最大熵模型极大似然估计 == 对偶函数极大化\
    极大似然估计:

Lp(Pw)=logΠx,yP(y|x)P(x,y)=x,yP(x,y)logP(y|x)

=x,yP(x,y)ni=1wifi(x,y)xP(x)logZw(x)

from _collections import defaultdict
import math
from ensurepip import __main__
import codecs
class MaxEnt(object):
    ''' classdocs '''

    def __init__(self):
        ''' Constructor '''
        self.samples = []
        self.labels = []
        self.N = 0
        self.M = 0      #特征数量
        self.lambdas = []
        self.last_lambdas = []
        self.current_lambdas = []
        self.C = 0
        self.stepValues = [];
        self._ep_ = []  #
        self._ep = []
        self.numXY = defaultdict(int)
        self.featureId_map = {}
        self.Y = []


    def fit(self,trainX,trainY,iterNum=100):
        self.samples = trainX
        self.labels = trainY
        self.Y = set(trainY)
        self.N = len(trainY)
# self.M = len(trainX[0])
# self.getC()
        self.C = max([len(sample) for sample in trainX])
        for id,sample in enumerate(self.samples):
            y = self.labels[id]
            for x in set(sample):
                self.numXY[(x,y)] += 1.0
        self.M = len(self.numXY.keys())
        self.train(iterNum)


    def _EP_(self):

        #self._ep_ = [xyCount/self.N for xyCount in self.numXY]
        for id,xy in enumerate(self.numXY.keys()):
            self._ep_.append(self.numXY[xy]/self.N)
            self.featureId_map[xy] = id
        print len(self._ep_)

    def ZX(self,sample):
        sumY = 0.0
        for y in self.Y:
            sum = 0.0
            for x in sample:
                if self.numXY.has_key((x,y)):
                    sum += self.current_lambdas[self.featureId_map[(x,y)]]
            sumY += math.exp(sum)

        return sumY


    def pXY(self,sample):
        pxy = 0.0
        ZX_sum = self.ZX(sample)

        result = []
        for y in self.Y:
            pxy_sum = 0.0
            for x in sample:
                if self.numXY.has_key((x,y)):
                        pxy_sum += self.current_lambdas[self.featureId_map[(x,y)]]
            result.append((math.exp(pxy_sum)/ZX_sum,y))

        return result

    def _Ep(self):   
        self._ep = [0.0]*self.M  
        for sample in self.samples:
            pxy = self.pXY(sample)
            for p,y in pxy:
                for x in sample:
                    if self.numXY.has_key((x,y)):
                        self._ep[self.featureId_map[(x,y)]] += p*1.0/self.N


    def train(self,iterNum):
        self.current_lambdas = [0.0]*self.M
        print len(self.current_lambdas)
        self._EP_()
        for iter in range(iterNum):
            #self.last_lambdas = self.current_lambdas
            self._Ep()
            for id,w in enumerate(self.current_lambdas):
# print id
                self.current_lambdas[id] = w + 1.0/self.C*math.log(self._ep_[id]/self._ep[id])
            print self.current_lambdas

    def predict(self,testX):
        X = testX
        p = self.pXY(X)
        print p
    #def predict(self,testX):

def loadfile():
    trainX = []
    trainY = []
    for line in codecs.open("./train",'r','utf-8').readlines():
        trainY.append(line.strip().split("\t")[0])
        trainX.append(line.strip().split("\t")[1:])
    return trainX,trainY


if __name__ == "__main__":
    maxEnt = MaxEnt()
    trainX,trainY = loadfile()

    maxEnt.fit(trainX,trainY,1000)
    maxEnt.predict(["sunny",    "hot",    "high",    "FALSE"])
    maxEnt.predict(["sunny",    "hot",    "high",    "True"])
    maxEnt.predict(["overcast",    "hot",    "high",    "FALSE"])###yes
    maxEnt.predict(["sunny",    "hot",    "high",    "FALSE"])
    maxEnt.predict(["sunny",    "hot",    "high",    "FALSE"])   
点赞