# GMM 模型及其 EM 算法
# 一、GMM 模型简介
,全称, 是混合密度模型的一个特例,由多个高斯(正态分布)函数的组合而成,其公式为如下:
其中 是此多维正态分布的平均数向量, 是此多维正态分布的协方差矩阵, 是此变量是由第 个高斯分布产生的。
该模型当数量 M 无限大时,GMM 可以任意精读逼近任意分布密度函数
# 二、GMM 参数估计问题
当只有训练数据集 时,我们要用 模型来拟合训练数据集,因此需要对 GMM 的参数进行估计,需要估计的参数
对数似然函数:,通过观察计算可以发现,该极值点方程是复杂的超越方程组,很难直接求解
常用的 参数估计方法是 算法
在看 算法前,先思考参数估计有哪些问题:
- 只有样本,但是不知道是由哪一个成分高斯产生的
- 令 表示 是由第 k 个成分高斯产生的,构造集合:
- 完整的数据集,其中 为缺失的数据
在正常情况下, 和 均未知时
- 可以采用交替的方式,迭代优化
- 固定参数,优化
- 固定,优化参数
# 三、EM 算法
- 迭代优化过程中,参数 和 都是不准确的中间推断结果
- 依据不准确参数 断定 由某个高斯产生,过于武断
- 合理的方式是推断样本 由每一个高斯产生的概率
(1)
这是 算法中的 步:估计隐变量(缺失数据)的概率
- 每个部分高斯分布参数也需要由所有样本参与估计,同时需要考虑样本由不同高斯产生的概率
(2)
(3)
(4)
这是 算法中的 步:最大化模型的参数
算法需要不断经过 步和 步的迭代最终达到一个临界条件,一般采用似然函数在两轮迭代之间的变化量作为收敛条件,如下:
# 四、例题实战
1、题目一:有两类两维各 1000 个训练样本保存在 和 文件中,各 1000 个测试样本保存在 和 文件中。每个类别的样本分别采样自包含 2 个分量的高斯混合模型中,分别使用两个类别的训练样本学习模型化每个类别条件概率密度函数的 模型参数,并与模型的真实值进行比较;
类别 0 的: \alpha_1=\frac{2}{3},\mu_1=(0,0)^t,\sigma_1=\begin{pmatrix}3&1\\1&1\end
类别 1 的: \alpha_1=\frac{2}{3},\mu_1=(2,10)^t,\sigma_1=\begin{pmatrix}1&1\\1&3\end
假设两个类别的先验概率相等,使用学习到的模型参数分类测试样本数据,统计分类的正确率
开始上代码,最重要的一步是写出 模型的类
class GMM: | |
def __init__(self,data,n_cluster): # data 是训练数据,n_cluster 是一个混合高斯模型中有多少个高斯函数 | |
self.data=data | |
self.n_cluster=n_cluster | |
self.max_iteration=100 # max_iteration 是为了防止迭代终止条件太难到达 | |
self.tolerance=1e-6 # tolerance 就是迭代终止掉件 | |
def Gaussian(self,x,mean,cov): # 这个函数用来求高斯函数的概率密度函数 | |
ans = [] | |
for one in x: | |
X=np.array(one) | |
mean=np.array(mean) | |
cov=np.array(cov) | |
ans.append(np.exp(-0.5*(np.dot(np.dot((X-mean).T,np.linalg.inv(cov)),X-mean))) | |
/math.sqrt(np.linalg.det(2.0*math.pi*cov))) | |
return np.array(ans) | |
def init_params(self): # 随机初始化参数 | |
self.means=self.data[np.random.choice(self.data.shape | |
[0],self.n_cluster,replace=False)] # 从 data 里面随机挑 n_cluster 个数当均值 | |
self.covs=[np.cov(self.data.T) for i in range(self.n_cluster)] | |
# 所有的协方差都用当前数据的协方差矩阵 | |
self.weights = np.ones(self.n_cluster)/self.n_cluster | |
# 属于哪个高斯函数初始化都一样为 1/n_cluster | |
def E_step(self): | |
possibility = np.zeros((self.data.shape[0],self.n_cluster)) | |
# 初始化一个 (训练集样本数,n_cluster) 大小的 0 矩阵 | |
for i in range(self.n_cluster): | |
possibility[:,i]=self.weights[i]*self.Gaussian(self.data,self.means[i],self.covs[i]) | |
# E 步分子的计算 | |
possibility /= possibility.sum(axis=1,keepdims=True) # E 步除以分母 | |
return possibility | |
def M_step(self,possibility): | |
Nk = possibility.sum(axis=0) # 计算概率总和 | |
self.weights=Nk/self.data.shape[0] # 除以总样本数得到新的 alpha 参数 | |
self.means = np.dot(possibility.T,self.data)/Nk[:,np.newaxis] | |
# 给列新增维度,为了保持维度的一致 | |
covs = [] | |
for i in range(self.n_cluster): | |
diff = self.data - self.means[i] | |
cov = np.dot(possibility[:,i]*diff.T,diff)/Nk[i] #直接计算每一个单体值,分子除以分母 | |
covs.append(cov) | |
self.covs=covs # 更新协方差矩阵 | |
def loglikelyhood(self): # 计算对数似然 | |
likelyhood = np.zeros((self.data.shape[0],self.n_cluster)) | |
for i in range(self.n_cluster): | |
likelyhood[:,i]=self.weights[i]*self.Gaussian(self.data,self.means[i],self.covs[i]) | |
return np.log(likelyhood.sum(axis=1)).sum() | |
def EM(self): | |
self.init_params() | |
log_likelyhood_prev=0 | |
for i in range(self.max_iteration): | |
possibility=self.E_step() | |
self.M_step(possibility) | |
log_likelyhood_cur = self.loglikelyhood() | |
if np.abs(log_likelyhood_cur-log_likelyhood_prev)<self.tolerance: | |
break | |
log_likelyhood_prev = log_likelyhood_cur | |
return self.means,self.covs,self.weights | |
def predict(self,x): # 计算某一个新样本点的由此高斯模型产生的概率 | |
possibility = 0 | |
for i in range(self.n_cluster): | |
possibility+=self.weights[i]*self.Gaussian(x,self.means[i],self.covs[i]) | |
return possibility |
有了 GMM 模型的类剩下的就很好解决了,先对数据进行读取处理
Xtrain_file = "F:/Mechine Learning/实验三/Emu-Train-Samples.csv" | |
Ytrain_file = "F:/Mechine Learning/实验三/Emu-Train-Labels.csv" | |
Xtest_file = "F:/Mechine Learning/实验三/Emu-Test-Samples.csv" | |
Ytest_file = "F:/Mechine Learning/实验三/Emu-Test-Labels.csv" | |
X_train = [] | |
X_test = [] | |
Y_train = [] | |
Y_test = [] | |
with open(Xtrain_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
temp = [] | |
for i in row: | |
temp.append(float(i)) | |
X_train.append(temp) | |
with open(Ytrain_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
Y_train.append(int(row[0])) | |
with open(Xtest_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
temp = [] | |
for i in row: | |
temp.append(float(i)) | |
X_test.append(temp) | |
with open(Ytest_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
Y_test.append(int(row[0])) | |
# 由于需要对分别对两类进行估计,因此需要把两类样本分别分出来 | |
X_train_zero = [] | |
X_train_one = [] | |
for i in range(len(X_train)): | |
if Y_train[i] == 0: | |
X_train_zero.append(X_train[i]) | |
else: | |
X_train_one.append(X_train[i]) | |
X_train_one = np.array(X_train_one) | |
X_train_zero = np.array(X_train_zero) |
剩下的就是朴实无华的训练
gmm_zero = GMM(X_train_zero,2) | |
mean,cov,weight = gmm_zero.EM() | |
print(mean) | |
print(cov) | |
print(weight) | |
gmm_one = GMM(X_train_one,2) | |
mean,cov,weight = gmm_one.EM() | |
print(mean) | |
print(cov) | |
print(weight) |
训练的结果如下表:
GMM 估计模型参数 | |||
---|---|---|---|
类别 0,高斯 1 | 0.65890602 | [-0.04877201,-0.03493048] | [[2.85162614,0.97072729],[0.97072729,0.9689499]] |
类别 0,高斯 2 | 0.34109398 | [9.97026618,9.95347268] | [[2.0163164,2.35543704],[2.35543704,5.31830092]] |
类别 1,高斯 1 | 0.66799982 | [2.02206041,10.16700955] | [[0.96728744,0.91060643],[0.91060643,2.74892244]] |
类别 1,高斯 2 | 0.33200018 | [14.97097406,2.02206041] | [[5.28809399,2.18044657],[2.18044657,1.12285598]] |
最后再测试集上测试准确率
predict = [] | |
for per_test in X_test: | |
prob_zero = gmm_zero.predict([per_test]) | |
prob_one = gmm_one.predict([per_test]) | |
if prob_zero>prob_one: | |
predict.append(0) | |
elif prob_zero<prob_one: | |
predict.append(1) | |
else: | |
predict.append('null') | |
right = 0 | |
for i in range(len(X_test)): | |
if predict[i]==Y_test[i]: | |
right+=1 | |
print("正确识别率为:",right) | |
print("正确率为:",(right*1.0)/(1.0*len(X_test))) |
经过预测验证后,发现正确率来到惊人的 1.0,侧面也说明了数据集真的很好
# 2、题目二:MNIST 数据集测试
中包含 30000 个 17 维特征手写数字样本训练, 中包含训练样本的标签
分别使用每个数字的样本集学习该类别的 GMM 模型参数
使用 10 个类别的 模型构成的分类器,分类 MNIST-Test-Samples.csv 的 10000 个样本,与 的类别标记比对,计算识别的正确率
尝试设置不同的 模型超参数 —— 高斯数,测试不同高斯数的 分类器的识别正确率
和第一个题目类似, 类的编写和上一道题目的代码一模一样,所以不在赘述
先对数据进行预处理
Xtrain_file = "F:/Mechine Learning/实验三/MNIST-Train-Samples.csv" | |
Ytrain_file = "F:/Mechine Learning/实验三/MNIST-Train-Labels.csv" | |
Xtest_file = "F:/Mechine Learning/实验三/MNIST-Test-Samples.csv" | |
Ytest_file = "F:/Mechine Learning/实验三/MNIST-Test-Labels.csv" | |
X_train = [] | |
X_test = [] | |
Y_train = [] | |
Y_test = [] | |
TX_train = [] | |
with open(Xtrain_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
temp = [] | |
for i in row: | |
temp.append(float(i)) | |
X_train.append(temp) | |
with open(Ytrain_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
Y_train.append(int(row[0])) | |
with open(Xtest_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
temp = [] | |
for i in row: | |
temp.append(float(i)) | |
X_test.append(temp) | |
with open(Ytest_file,'r',encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
Y_test.append(int(row[0])) | |
for i in range(10): | |
temp = [] | |
for j in range(len(X_train)): | |
if Y_train[j] == i: | |
temp.append(X_train[j]) | |
TX_train.append(temp) |
然后预测新的数据集
def Gaussian(x,mean,cov): | |
ans = [] | |
for one in x: | |
X=np.array(one) | |
mean=np.array(mean) | |
cov=np.array(cov) | |
ans.append(np.exp(-0.5*(np.dot(np.dot((X-mean).T,np.linalg.inv(cov)),X-mean))) | |
/math.sqrt(np.linalg.det(2.0*math.pi*cov))) | |
return np.array(ans) | |
for num in range(1,6): | |
print("高斯数为",num) | |
gmm = [] | |
for i in range(10): | |
gmm_t = GMM(np.array(TX_train[i]),num) | |
mean,cov,weight = gmm_t.EM() | |
gmm.append([mean,cov,weight]) | |
predict = [] | |
for per_test in X_test: | |
predict_label = 0 | |
max_prob = 0 | |
for i in range(10): | |
possibility = 0 | |
for j in range(num): | |
possibility+=gmm[i][2][j]*Gaussian([per_test],gmm[i][0][j],gmm[i][1][j]) | |
if max_prob < possibility: | |
max_prob = possibility | |
predict_label = i | |
predict.append(predict_label) | |
right=0 | |
for i in range(len(X_test)): | |
if predict[i]==Y_test[i]: | |
right += 1 | |
print("正确识别数为:",right) | |
print("正确率为:",(1.0*right)/(1.0*len(X_test))) |
经过 8 个小时的训练,终于抛出了最终的结果,如下:(若想加快训练,可以降低最大迭代次数和放宽迭代退出条件)
高斯数 | 正确率 |
---|---|
1 | 0.9332 |
2 | 0.9441 |
3 | 0.948 |
4 | 0.9486 |
5 | 0.953 |
完结撒花!!!