# GMM 模型及其 EM 算法

# 一、GMM 模型简介

GMMGMM,全称GaussMixtureModelGauss Mixture ModelGMMGMM 是混合密度模型的一个特例,由多个高斯(正态分布)函数的组合而成,其公式为如下:

p(x)=k=1MαkN(x;μk;σk)p(x)=\sum\limits_{k=1}^M \alpha_k N(x;\mu_k;\sigma_k)

其中μk\mu_k 是此多维正态分布的平均数向量,σk\sigma_k 是此多维正态分布的协方差矩阵,αk\alpha_k 是此变量是由第kk 个高斯分布产生的。

该模型当数量 M 无限大时,GMM 可以任意精读逼近任意分布密度函数

# 二、GMM 参数估计问题

当只有训练数据集DX=x1,...,xnD_X={x_1,...,x_n} 时,我们要用GMMGMM 模型来拟合训练数据集,因此需要对 GMM 的参数进行估计,需要估计的参数θ=(α1,...αM,μ1,...μM,σ1,...σM)\theta=(\alpha_1,...\alpha_M,\mu_1,...\mu_M,\sigma_1,...\sigma_M)

对数似然函数:l(θ)=i=1nln[k=1MαkN(xi;μk,σk)]l(\theta)=\sum\limits_{i=1}^n ln[\sum\limits_{k=1}^M \alpha_k N(x_i;\mu_k,\sigma_k)],通过观察计算可以发现,该极值点方程是复杂的超越方程组,很难直接求解

常用的GMMGMM 参数估计方法是EMEM 算法

在看EMEM 算法前,先思考参数估计有哪些问题:

  • 只有样本xix_i,但是不知道是由哪一个成分高斯产生的
  • yi=ky_i=k 表示xix_i 是由第 k 个成分高斯产生的,构造集合:DY=D_Y=
  • 完整的数据集D=DXDYD=D_X \cap D_Y,其中DYD_Y 为缺失的数据

在正常情况下,DYD_Yθ\theta 均未知时

  • 可以采用交替的方式,迭代优化
  • 固定参数θ\theta,优化DYD_Y
  • 固定DYD_Y,优化参数θ\theta

# 三、EM 算法

  • 迭代优化过程中,参数θ\thetaDYD_Y 都是不准确的中间推断结果
  • 依据不准确参数θ\theta 断定xix_i 由某个高斯产生,过于武断
  • 合理的方式是推断样本xix_i 由每一个高斯产生的概率

P(yi=kxi)=P(yi=k)p(xiyi=k)p(xi)=akN(xi;μk,σk)j=1MajN(xi;μj,σj)\dfrac{P(y_i=k|x_i)=P(y_i=k)p(x_i|y_i=k)}{p(x_i)}=\dfrac{a_k N(x_i;\mu_k,\sigma_k)}{\sum_{j=1}^Ma_jN(x_i;\mu_j,\sigma_j)} (1)

这是EMEM 算法中的EE 步:估计隐变量(缺失数据)的概率

  • 每个部分高斯分布参数也需要由所有样本参与估计,同时需要考虑样本由不同高斯产生的概率

a^k=frac1ni=1nP(yi=k)\hat{a}_k=frac{1}{n}\sum\limits_{i=1}^nP(y_i=k) (2)

μ^k=i=1nP(yi=k)xii=1nP(yi=k)\hat{\mu}_k=\dfrac{\sum_{i=1}^nP(y_i=k)x_i}{\sum_{i=1}^nP(y_i=k)} (3)

σ^k=i=1nP(yi=k)(xiμ^k)(xiμ^k)ti=1nP(yi=k)\hat{\sigma}_k=\dfrac{\sum_{i=1}^nP(y_i=k)(x_i-\hat{\mu}_k)(x_i-\hat{\mu}_k)^t}{\sum_{i=1}^nP(y_i=k)} (4)

这是EMEM 算法中的MM 步:最大化模型的参数

EMEM 算法需要不断经过EE 步和MM 步的迭代最终达到一个临界条件,一般采用似然函数在两轮迭代之间的变化量作为收敛条件,如下:

logp(Xθ)logp(Xθ(t))logp(X|\theta)\ge logp(X|\theta_(t))

# 四、例题实战

1、题目一:有两类两维各 1000 个训练样本保存在EmuTrainSamples.csvEmu-Train-Samples.csvEmuTrainLabels.csvEmu-Train-Labels.csv 文件中,各 1000 个测试样本保存在EmuTestSamples.csvEmu-Test-Samples.csvEmuTestLabels.csvEmu-Test-Labels.csv 文件中。每个类别的样本分别采样自包含 2 个分量的高斯混合模型中,分别使用两个类别的训练样本学习模型化每个类别条件概率密度函数的GMMGMM 模型参数,并与模型的真实值进行比较;

类别 0 的GMMGMM: \alpha_1=\frac{2}{3},\mu_1=(0,0)^t,\sigma_1=\begin{pmatrix}3&1\\1&1\end

α2=13μ2=(10,10)tσ2=(2255)\alpha_2=\frac{1}{3},\mu_2=(10,10)^t,\sigma_2=\begin{pmatrix}2&2\\5&5\end{pmatrix}

类别 1 的GMMGMM: \alpha_1=\frac{2}{3},\mu_1=(2,10)^t,\sigma_1=\begin{pmatrix}1&1\\1&3\end

α2=13μ2=(15,20)tσ2=(5221)\alpha_2=\frac{1}{3},\mu_2=(15,20)^t,\sigma_2=\begin{pmatrix}5&2\\2&1\end{pmatrix}

假设两个类别的先验概率相等,使用学习到的模型参数分类测试样本数据,统计分类的正确率

开始上代码,最重要的一步是写出GMMGMM 模型的类

class GMM:
def __init__(self,data,n_cluster): # data 是训练数据,n_cluster 是一个混合高斯模型中有多少个高斯函数
    self.data=data
    self.n_cluster=n_cluster
    self.max_iteration=100 # max_iteration 是为了防止迭代终止条件太难到达
    self.tolerance=1e-6 # tolerance 就是迭代终止掉件
def Gaussian(self,x,mean,cov): # 这个函数用来求高斯函数的概率密度函数
    ans = []
    for one in x:
        X=np.array(one)
        mean=np.array(mean)
        cov=np.array(cov)
        ans.append(np.exp(-0.5*(np.dot(np.dot((X-mean).T,np.linalg.inv(cov)),X-mean)))
		/math.sqrt(np.linalg.det(2.0*math.pi*cov)))
    return np.array(ans)
def init_params(self): # 随机初始化参数
    self.means=self.data[np.random.choice(self.data.shape
	[0],self.n_cluster,replace=False)] # 从 data 里面随机挑 n_cluster 个数当均值
    self.covs=[np.cov(self.data.T) for i in range(self.n_cluster)] 
	# 所有的协方差都用当前数据的协方差矩阵
    self.weights = np.ones(self.n_cluster)/self.n_cluster 
	# 属于哪个高斯函数初始化都一样为 1/n_cluster
def E_step(self):
    possibility = np.zeros((self.data.shape[0],self.n_cluster)) 
	# 初始化一个 (训练集样本数,n_cluster) 大小的 0 矩阵
    for i in range(self.n_cluster):
        possibility[:,i]=self.weights[i]*self.Gaussian(self.data,self.means[i],self.covs[i]) 
		# E 步分子的计算
    possibility /= possibility.sum(axis=1,keepdims=True) # E 步除以分母 
    return possibility
def M_step(self,possibility):
    Nk = possibility.sum(axis=0) # 计算概率总和
    self.weights=Nk/self.data.shape[0] # 除以总样本数得到新的 alpha 参数
    self.means = np.dot(possibility.T,self.data)/Nk[:,np.newaxis] 
	# 给列新增维度,为了保持维度的一致
    covs = []
    for i in range(self.n_cluster):
        diff = self.data - self.means[i]
        cov = np.dot(possibility[:,i]*diff.T,diff)/Nk[i]  #直接计算每一个单体值,分子除以分母
        covs.append(cov)
    self.covs=covs # 更新协方差矩阵
def loglikelyhood(self): # 计算对数似然
    likelyhood = np.zeros((self.data.shape[0],self.n_cluster))
    for i in range(self.n_cluster):
        likelyhood[:,i]=self.weights[i]*self.Gaussian(self.data,self.means[i],self.covs[i])
    return np.log(likelyhood.sum(axis=1)).sum()
def EM(self):
    self.init_params()
    log_likelyhood_prev=0
    for i in range(self.max_iteration):
        possibility=self.E_step()
        self.M_step(possibility)
        log_likelyhood_cur = self.loglikelyhood()
        if np.abs(log_likelyhood_cur-log_likelyhood_prev)<self.tolerance:
            break
        log_likelyhood_prev = log_likelyhood_cur
    return self.means,self.covs,self.weights
def predict(self,x): # 计算某一个新样本点的由此高斯模型产生的概率
    possibility = 0
    for i in range(self.n_cluster):
        possibility+=self.weights[i]*self.Gaussian(x,self.means[i],self.covs[i])
    return possibility

有了 GMM 模型的类剩下的就很好解决了,先对数据进行读取处理

Xtrain_file = "F:/Mechine Learning/实验三/Emu-Train-Samples.csv"
Ytrain_file = "F:/Mechine Learning/实验三/Emu-Train-Labels.csv"
Xtest_file = "F:/Mechine Learning/实验三/Emu-Test-Samples.csv"
Ytest_file = "F:/Mechine Learning/实验三/Emu-Test-Labels.csv"
X_train = []
X_test = []
Y_train = []
Y_test = []
with open(Xtrain_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        temp = []
        for i in row:
            temp.append(float(i))
        X_train.append(temp)
with open(Ytrain_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        Y_train.append(int(row[0]))
with open(Xtest_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        temp = []
        for i in row:
            temp.append(float(i))
        X_test.append(temp)
with open(Ytest_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        Y_test.append(int(row[0]))
# 由于需要对分别对两类进行估计,因此需要把两类样本分别分出来
X_train_zero = []
X_train_one = []
for i in range(len(X_train)):
    if Y_train[i] == 0:
        X_train_zero.append(X_train[i])
    else:
        X_train_one.append(X_train[i])
X_train_one = np.array(X_train_one)
X_train_zero = np.array(X_train_zero)

剩下的就是朴实无华的训练

gmm_zero = GMM(X_train_zero,2)
mean,cov,weight = gmm_zero.EM()
print(mean)
print(cov)
print(weight)
gmm_one = GMM(X_train_one,2)
mean,cov,weight = gmm_one.EM()
print(mean)
print(cov)
print(weight)

训练的结果如下表:

GMM 估计模型参数α\alphaμ\muσ\sigma
类别 0,高斯 10.65890602[-0.04877201,-0.03493048][[2.85162614,0.97072729],[0.97072729,0.9689499]]
类别 0,高斯 20.34109398[9.97026618,9.95347268][[2.0163164,2.35543704],[2.35543704,5.31830092]]
类别 1,高斯 10.66799982[2.02206041,10.16700955][[0.96728744,0.91060643],[0.91060643,2.74892244]]
类别 1,高斯 20.33200018[14.97097406,2.02206041][[5.28809399,2.18044657],[2.18044657,1.12285598]]

最后再测试集上测试准确率

predict = []
for per_test in X_test:
    prob_zero = gmm_zero.predict([per_test])
    prob_one = gmm_one.predict([per_test])
    if prob_zero>prob_one:
        predict.append(0)
    elif prob_zero<prob_one:
        predict.append(1)
    else:
        predict.append('null')
right = 0
for i in range(len(X_test)):
    if predict[i]==Y_test[i]:
        right+=1
print("正确识别率为:",right)
print("正确率为:",(right*1.0)/(1.0*len(X_test)))

经过预测验证后,发现正确率来到惊人的 1.0,侧面也说明了数据集真的很好

# 2、题目二:MNIST 数据集测试

MNISTTrainSamples.csvMNIST-Train-Samples.csv 中包含 30000 个 17 维特征手写数字样本训练,MNISTTrainLabels.csvMNIST-Train-Labels.csv 中包含训练样本的标签

分别使用每个数字的样本集学习该类别的 GMM 模型参数

使用 10 个类别的GMMGMM 模型构成的分类器,分类 MNIST-Test-Samples.csv 的 10000 个样本,与MNISTTestLabels.csvMNIST-Test-Labels.csv 的类别标记比对,计算识别的正确率

尝试设置不同的GMMGMM 模型超参数 —— 高斯数,测试不同高斯数的GMMGMM 分类器的识别正确率

和第一个题目类似,GMMGMM 类的编写和上一道题目的代码一模一样,所以不在赘述

先对数据进行预处理

Xtrain_file = "F:/Mechine Learning/实验三/MNIST-Train-Samples.csv"
Ytrain_file = "F:/Mechine Learning/实验三/MNIST-Train-Labels.csv"
Xtest_file = "F:/Mechine Learning/实验三/MNIST-Test-Samples.csv"
Ytest_file = "F:/Mechine Learning/实验三/MNIST-Test-Labels.csv"
X_train = []
X_test = []
Y_train = []
Y_test = []
TX_train = []
with open(Xtrain_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        temp = []
        for i in row:
            temp.append(float(i))
        X_train.append(temp)
with open(Ytrain_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        Y_train.append(int(row[0]))
with open(Xtest_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        temp = []
        for i in row:
            temp.append(float(i))
        X_test.append(temp)
with open(Ytest_file,'r',encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        Y_test.append(int(row[0]))
for i in range(10):
    temp = []
    for j in range(len(X_train)):
        if Y_train[j] == i:
            temp.append(X_train[j])
    TX_train.append(temp)

然后预测新的数据集

def Gaussian(x,mean,cov):
    ans = []
    for one in x:
        X=np.array(one)
        mean=np.array(mean)
        cov=np.array(cov)
        ans.append(np.exp(-0.5*(np.dot(np.dot((X-mean).T,np.linalg.inv(cov)),X-mean)))
		/math.sqrt(np.linalg.det(2.0*math.pi*cov)))
    return np.array(ans)
for num in range(1,6):
    print("高斯数为",num)
    gmm = []
    for i in range(10):
        gmm_t = GMM(np.array(TX_train[i]),num)
        mean,cov,weight = gmm_t.EM()
        gmm.append([mean,cov,weight])
    predict = []
    for per_test in X_test:
        predict_label = 0
        max_prob = 0
        for i in range(10):
            possibility = 0
            for j in range(num):
                possibility+=gmm[i][2][j]*Gaussian([per_test],gmm[i][0][j],gmm[i][1][j])
            if max_prob < possibility:
                max_prob = possibility
                predict_label = i
        predict.append(predict_label)
    right=0
    for i in range(len(X_test)):
        if predict[i]==Y_test[i]:
             right += 1
    print("正确识别数为:",right)
    print("正确率为:",(1.0*right)/(1.0*len(X_test)))

经过 8 个小时的训练,终于抛出了最终的结果,如下:(若想加快训练,可以降低最大迭代次数和放宽迭代退出条件)

高斯数正确率
10.9332
20.9441
30.948
40.9486
50.953

完结撒花!!!