forked from eriklindernoren/ML-From-Scratch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gaussian_mixture_model.py
143 lines (122 loc) · 5.04 KB
/
gaussian_mixture_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from __future__ import division, print_function
import sys
import os
import math
import random
from sklearn import datasets
import numpy as np
# Import helper functions
from mlfromscratch.utils.data_manipulation import normalize
from mlfromscratch.utils.data_operation import euclidean_distance, calculate_covariance_matrix
from mlfromscratch.unsupervised_learning import PCA
from mlfromscratch.utils import Plot
class GaussianMixtureModel():
"""A probabilistic clustering method for determining groupings among data samples.
Parameters:
-----------
k: int
The number of clusters the algorithm will form.
max_iterations: int
The number of iterations the algorithm will run for if it does
not converge before that.
tolerance: float
If the difference of the results from one iteration to the next is
smaller than this value we will say that the algorithm has converged.
"""
def __init__(self, k=2, max_iterations=2000, tolerance=1e-8):
self.k = k
self.parameters = []
self.max_iterations = max_iterations
self.tolerance = tolerance
self.responsibilities = []
self.sample_assignments = None
self.responsibility = None
# Initialize gaussian randomly
def _init_random_gaussians(self, X):
n_samples = np.shape(X)[0]
self.priors = (1 / self.k) * np.ones(self.k)
for i in range(self.k):
params = {}
params["mean"] = X[np.random.choice(range(n_samples))]
params["cov"] = calculate_covariance_matrix(X)
self.parameters.append(params)
# Likelihood
def multivariate_gaussian(self, X, params):
n_features = np.shape(X)[1]
mean = params["mean"]
covar = params["cov"]
determinant = np.linalg.det(covar)
likelihoods = np.zeros(np.shape(X)[0])
for i, sample in enumerate(X):
d = n_features # dimension
coeff = (1.0 / (math.pow((2.0 * math.pi), d / 2)
* math.sqrt(determinant)))
exponent = math.exp(-0.5 * (sample - mean).T.dot(np.linalg.pinv(covar)).dot((sample - mean)))
likelihoods[i] = coeff * exponent
return likelihoods
# Calculate the likelihood over all samples
def _get_likelihoods(self, X):
n_samples = np.shape(X)[0]
likelihoods = np.zeros((n_samples, self.k))
for i in range(self.k):
likelihoods[
:, i] = self.multivariate_gaussian(
X, self.parameters[i])
return likelihoods
# Calculate the responsibility
def _expectation(self, X):
# Calculate probabilities of X belonging to the different clusters
weighted_likelihoods = self._get_likelihoods(X) * self.priors
sum_likelihoods = np.expand_dims(
np.sum(weighted_likelihoods, axis=1), axis=1)
# Determine responsibility as P(X|y)*P(y)/P(X)
self.responsibility = weighted_likelihoods / sum_likelihoods
# Assign samples to cluster that has largest probability
self.sample_assignments = self.responsibility.argmax(axis=1)
# Save value for convergence check
self.responsibilities.append(np.max(self.responsibility, axis=1))
# Update the parameters and priors
def _maximization(self, X):
# Iterate through clusters and recalculate mean and covariance
for i in range(self.k):
resp = np.expand_dims(self.responsibility[:, i], axis=1)
mean = (resp * X).sum(axis=0) / resp.sum()
covariance = (X - mean).T.dot((X - mean) * resp) / resp.sum()
self.parameters[i]["mean"], self.parameters[
i]["cov"] = mean, covariance
# Update weights
n_samples = np.shape(X)[0]
self.priors = self.responsibility.sum(axis=0) / n_samples
# Covergence if || likehood - last_likelihood || < tolerance
def _converged(self, X):
if len(self.responsibilities) < 2:
return False
diff = np.linalg.norm(
self.responsibilities[-1] - self.responsibilities[-2])
# print ("Likelihood update: %s (tol: %s)" % (diff, self.tolerance))
return diff <= self.tolerance
# Run GMM and return the cluster indices
def predict(self, X):
# Initialize the gaussians randomly
self._init_random_gaussians(X)
# Run EM until convergence or for max iterations
for _ in range(self.max_iterations):
self._expectation(X) # E-step
self._maximization(X) # M-step
# Check convergence
if self._converged(X):
break
# Make new assignments and return them
self._expectation(X)
return self.sample_assignments
def main():
# Load the dataset
X, y = datasets.make_blobs()
# Cluster the data
clf = GaussianMixtureModel(k=3)
y_pred = clf.predict(X)
p = MatplotlibWrapper()
p.plot_in_2d(X, y_pred, title="GMM Clustering")
p.plot_in_2d(X, y, title="Actual Clustering")
if __name__ == "__main__":
main()