Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import numpy as np
import matplotlib.pyplot as plt
from pykalman import KalmanFilter
class LDS:
def __init__(self,n_states=1):
self.n_states=n_states
def make_initial_guess(self,all_train_data):
# Initialize the Kalman filter Model
n_states=self.n_states
transition_matrix_guess = np.random.rand(n_states, n_states)
observation_matrix_guess = np.random.rand(1, n_states)
initial_state_mean_guess = np.random.rand(n_states) #train_data_mean#
initial_state_covariance_guess = np.eye(n_states) * 0.1 #train_data_var
observation_covariance_guess = np.eye(1) * 0.1
transition_covariance_guess = np.eye(n_states) * 0.1
self.ParamInit = {}
self.ParamInit['A'] = transition_matrix_guess # np.array([[1, 0], [0.1, 1]])
self.ParamInit['Gamma'] = transition_covariance_guess # #np.array([[1e-6, 0], [0, 1e-6]])
self.ParamInit['C'] = observation_matrix_guess #np.array([[0.0, 1.0]])
self.ParamInit['Sigma'] = observation_covariance_guess #25
self.ParamInit['mu0'] = initial_state_mean_guess #np.array([[0], [10]])
self.ParamInit['V0'] = initial_state_covariance_guess #np.array([[1.0, 0], [0, 100.0]])
self.Param = {}
self.Param['A'] = self.ParamInit['A']
self.Param['Gamma'] = self.ParamInit['Gamma']
self.Param['C'] = self.ParamInit['C']
self.Param['Sigma'] = self.ParamInit['Sigma']
self.Param['mu0'] = self.ParamInit['mu0']
self.Param['V0'] = self.ParamInit['V0']
def expectation(self, Data):
for nn in range(1, len(Data)):
self.P[:, :, nn - 1] = self.Param['A'] @ self.V[:, :, nn - 1] @ self.Param['A'].T + self.Param['Gamma']
self.K[:, nn] = ((self.P[:, :, nn - 1] @ self.Param['C'].T) / (self.Param['C'] @ self.P[:, :, nn - 1] @ self.Param['C'].T + self.Param['Sigma'])).flatten()
self.Mu[:, nn] = self.Param['A'] @ self.Mu[:, nn - 1] + self.K[:, nn] * (Data[nn] - self.Param['C'] @ self.Param['A'] @ self.Mu[:, nn - 1])
self.V[:, :, nn] = (np.eye(self.n_states) - self.K[:, nn].reshape(-1, 1) @ self.Param['C']) @ self.P[:, :, nn - 1]
self.Mu_hat[:, len(Data) - 1] = self.Mu[:, len(Data) - 1]
self.V_hat[:, :, len(Data) - 1] = self.V[:, :, len(Data) - 1]
for nn in range(len(Data) - 2, -1, -1):
self.J[:, :, nn] = (self.V[:, :, nn] @ self.Param['A'].T) @ np.linalg.inv(self.P[:, :, nn])
self.Mu_hat[:, nn] = self.Mu[:, nn] + self.J[:, :, nn] @ (self.Mu_hat[:, nn + 1] - self.Param['A'] @ self.Mu[:, nn])
self.V_hat[:, :, nn] = self.V[:, :, nn] + self.J[:, :, nn] @ (self.V_hat[:, :, nn + 1] - self.P[:, :, nn]) @ self.J[:, :, nn].T
if(0):
plt.figure()
plt.plot(Data,label='Data', color='red', marker='o')
plt.plot(self.Mu[0, :],label='mu1', color='blue', marker='o')
plt.plot(self.Mu_hat[0, :], label='mu_hat1', color='green', marker='x')
plt.legend()
plt.show(block=False)
plt.figure()
plt.plot(Data,label='Data', color='red', marker='o')
plt.plot(self.Mu[1, :],label='mu2', color='blue', marker='o')
plt.plot(self.Mu_hat[1, :],label='mu_hat2', color='green', marker='x')
plt.legend()
plt.show()
x=0
def Learning_LDS(self, Data):
self.Param['mu0_New'] = self.Mu_hat[:, 0]
self.Param['V0_New'] = self.V_hat[:, :, 0]
temp1 = np.zeros_like(self.V_hat[:, :, 0])
temp2 = np.zeros_like(self.V_hat[:, :, 0])
for ii in range(1, len(Data)):
temp1 += self.V_hat[:, :, ii] @ self.J[:, :, ii - 1].T + self.Mu_hat[:, ii].reshape(-1,1) @ self.Mu_hat[:, ii - 1].reshape(-1,1).T
temp2 += self.V_hat[:, :, ii - 1] + self.Mu_hat[:, ii - 1].reshape(-1,1) @ self.Mu_hat[:, ii - 1].reshape(-1,1).T
self.Param['A_New'] = temp1 @ np.linalg.inv(temp2)
temp3 = np.zeros_like(self.V_hat[:, :, 0])
for ii in range(1, len(Data)):
term_1 = self.V_hat[:, :, ii] + self.Mu_hat[:, ii].reshape(-1,1) @ self.Mu_hat[:, ii].reshape(-1,1).T
term_2 = self.Param['A_New'] @ (self.J[:, :, ii - 1] @ self.V_hat[:, :, ii].T + self.Mu_hat[:, ii - 1].reshape(-1,1) @ self.Mu_hat[:, ii].reshape(-1,1).T)
term_3 = (self.V_hat[:, :, ii] @ self.J[:, :, ii - 1].T + self.Mu_hat[:, ii].reshape(-1,1) @ self.Mu_hat[:, ii - 1].reshape(-1,1).T) @ self.Param['A_New'].T
term_4 = self.Param['A_New'] @ (self.V_hat[:, :, ii - 1] + self.Mu_hat[:, ii - 1].reshape(-1,1) @ self.Mu_hat[:, ii - 1].reshape(-1,1).T) @ self.Param['A_New'].T
temp3 += term_1 - term_2 - term_3 + term_4
self.Param['Gamma_New'] = temp3 / (len(Data) - 1)
temp1 = np.zeros_like(self.Param['C'])
temp2 = np.zeros_like(self.V_hat[:, :, 0])
for ii in range(len(Data)):
temp1 += Data[ii].reshape(-1, 1) @ self.Mu_hat[:, ii].reshape(1, -1)
temp2 += self.V_hat[:, :, ii] + self.Mu_hat[:, ii].reshape(-1,1) @ self.Mu_hat[:, ii].reshape(-1,1).T #bug fixed
self.Param['C_New'] = temp1 @ np.linalg.inv(temp2)
temp3 = self.Param['Sigma'] * 0.0
for ii in range(len(Data)):
term_1 = Data[ii].reshape(-1, 1) @ Data[ii].reshape(1, -1)
term_2 = (self.Param['C_New'] @ self.Mu_hat[:, ii].reshape(-1, 1)) @ Data[ii].reshape(1, -1)
term_3 = Data[ii].reshape(-1, 1) @ (self.Mu_hat[:, ii].reshape(1, -1) @ self.Param['C_New'].T)
term_4 = (self.Param['C_New'] @ (self.V_hat[:, :, ii] + self.Mu_hat[:, ii].reshape(-1,1) @ self.Mu_hat[:, ii].reshape(-1,1).T)) @ self.Param['C_New'].T
temp3 += term_1 - term_2 - term_3 + term_4
self.Param['Sigma_New'] = temp3 / len(Data)
def EM_init(self,Data):
num_points=Data.shape[0]
self.Mu = np.zeros((self.n_states, num_points))
self.P = np.zeros((self.n_states, self.n_states, num_points))
self.K = np.zeros((self.n_states, num_points))
self.V = np.zeros((self.n_states, self.n_states, num_points))
self.J = np.zeros((self.n_states, self.n_states, num_points))
self.Mu_hat = np.zeros((self.n_states, num_points))
self.V_hat = np.zeros((self.n_states, self.n_states, num_points))
self.Mu[:, 0] = np.mean(Data)#self.Param['mu0'].flatten()
self.V[:, :, 0] = self.Param['V0']
def EM(self,Data,total_iterations):
num_points=Data.shape[0]
self.iter_Total = total_iterations
timer_cnt = 0
for self.iter in range(1, self.iter_Total + 1):
timer_cnt += 1
self.expectation(Data)
self.Learning_LDS(Data)
#A = [self.iter, self.Mu[:, 0], self.V[:, :, 0].reshape(1, 4), np.array(list(self.Param['C'])).reshape(1, 2), self.Param['Sigma'], np.array(list(self.Param['A'])).reshape(1, 4), np.array(list(self.Param['Gamma'])).reshape(1, 4), Data['State'][0], Data['State'][1]]
self.Param['A'] = self.Param['A_New']
self.Param['C'] = self.Param['C_New']
self.Param['Gamma'] = self.Param['Gamma_New']
self.Param['Sigma'] = self.Param['Sigma_New']
self.Mu[:, 0] = self.Param['mu0_New']
self.V[:, :, 0] = self.Param['V0_New']
#print(self.Param['mu0_New'])
def loglikelihood(self, data):
Y=data.reshape(-1,1)
# A = self.transition_matrices
# C = self.observation_matrices
# Q = self.transition_covariance
# R = self.observation_covariance
# mu_0 = self.initial_state_mean
# Sigma_0 = self.initial_state_covariance
# self.Param['A'] = self.ParamInit['A']
# self.Param['Gamma'] = self.ParamInit['Gamma']
# self.Param['C'] = self.ParamInit['C']
# self.Param['Sigma'] = self.ParamInit['Sigma']
# self.Param['mu0'] = self.ParamInit['mu0']
# self.Param['V0'] = self.ParamInit['V0']
# self.Param['A_New'] = np.array([[0, 0], [0, 0]])
# self.Param['Gamma_New'] = np.array([[0, 0], [0, 0]])
# self.Param['Sigma_New'] = 1
A=self.Param['A']
C=self.Param['C']
Q=self.Param['Gamma']
R=self.Param['Sigma']
mu_0=self.Param['mu0']
Sigma_0=self.Param['V0']
# kf = KalmanFilter(
# transition_matrices=A,
# observation_matrices=C,
# initial_state_mean=mu_0,
# initial_state_covariance=Sigma_0,
# observation_covariance=R,
# transition_covariance=Q
# )
# loglikelihood_pykalman = kf.loglikelihood(data)
n_timepoints, n_dim_obs = Y.shape
n_dim_state = A.shape[0]
log_likelihood = 0.0
# Initialize Kalman filter variables
mu_t = mu_0
Sigma_t = Sigma_0
all_S_t=[] #storing innovation covariance for all samples
all_innovation=[]
for t in range(n_timepoints):
# Prediction step
mu_t_pred = np.dot(A, mu_t)
Sigma_t_pred = np.dot(np.dot(A, Sigma_t), A.T) + Q #Gamma
# Update step
y_t = Y[t]
innovation = y_t - np.dot(C, mu_t_pred)
all_innovation.append(innovation[0])
S_t = np.dot(np.dot(C, Sigma_t_pred), C.T) + R
all_S_t.append(S_t[0,0])
K_t = np.dot(np.dot(Sigma_t_pred, C.T), np.linalg.inv(S_t))
mu_t = mu_t_pred + np.dot(K_t, innovation)
Sigma_t = Sigma_t_pred - np.dot(np.dot(K_t, C), Sigma_t_pred)
#Compute log likelihood contribution at time t
log_likelihood -= 0.5 * (
np.log(np.linalg.det(S_t)) +
np.dot(np.dot(innovation.T, np.linalg.inv(S_t)), innovation) +
n_dim_obs * np.log(2 * np.pi)
)
all_S_t=2*np.sqrt(all_S_t)
return log_likelihood,all_S_t,all_innovation