Source code for pytranskit.classification.scdt_nls


import numpy as np
import numpy.linalg as LA

from pytranskit.optrans.continuous.scdt import SCDT
from sklearn.model_selection import train_test_split

[docs]class SCDT_NLS: def __init__(self, num_classes, rm_edge = False): """ Parameters ---------- num_classes : integer, total number of classes rm_edge : boolean flag; IF TRUE the first and last points of CDTs will be removed default = False """ self.num_classes = num_classes self.rm_edge = rm_edge self.Nset = [] self.subspaces = [] self.len_subspace = 0 self.k = 1 self.label = [] self.pca_basis = [] self.N = 1
[docs] def fit(self, X, Y, Ttrain=None, no_local_enrichment=True): """Fit SCDT-NLS. Parameters ---------- X : array-like, shape (n_samples, n_columns) 1D data for training. Y : ndarray of shape (n_samples,) Labels of the training samples. Ttrain : [optional] array-like, shape (n_samples, n_columns) domain for corresponding training signals. no_local_enrichment: [optional] boolean, default TRUE IF FALSE, apply deformation while searching k samples """ # calculate the SCDTs print('+++++++++++ Training Phase +++++++++++') print('\nCalculating SCDTs for training data ...\n') #Xcdt = self.cdt_parallel(Xtrain) N = X.shape[1] t0 = np.linspace(0,1,N) # Domain of the reference s0 = np.ones(N) s0 = s0/s0.sum() self.t0 = t0 self.s0 = s0 s_scdt = [] for i in range(X.shape[0]): if Ttrain is None: s_scdt.append(self.calc_scdt(X[i],t0,s0,t0)) else: s_scdt.append(self.calc_scdt(X[i],Ttrain[i],s0,t0)) Xscdt = np.stack(s_scdt) Xtrain, Xval, Ytrain, Yval = train_test_split(Xscdt, Y, test_size=0.3, random_state=0) self.bas = [] for class_idx in range(self.num_classes): # generate the bases vectors class_data = Xtrain[Ytrain == class_idx] self.Nset.append(class_data) self.label.append(class_idx) bas = [] for j in range(class_data.shape[0]): flat = np.copy(class_data[j].reshape(1,-1)) u, s, vh = LA.svd(flat,full_matrices=False) bas.append(vh[:flat.shape[0]]) self.bas.append(bas) if Xtrain.shape[0]//self.num_classes == 1: self.k = 1 else: smp_class = [] for i in range(len(np.unique(Ytrain))): smp_class.append(np.count_nonzero(Ytrain == i)) # k_range = range(1,min(smp_class)) # min(min(smp_class),100) k_range = range(1,min(min(smp_class),100)) n_range = range(-1,6) print('Tune parameters using validation set ...\n') self.k, self.N = self.find_kN(Xval, Yval, k_range, n_range) self.Nset = [] self.label = [] self.bas = [] for class_idx in range(self.num_classes): # generate the bases vectors class_data = Xscdt[Y == class_idx] self.Nset.append(class_data) self.label.append(class_idx) bas = [] for j in range(class_data.shape[0]): if no_local_enrichment: flat = np.copy(class_data[j].reshape(1,-1)) else: flat = self.enrichment(class_data[j].reshape(1,-1), k=self.N) # k=0 => translation only u, s, vh = LA.svd(flat,full_matrices=False) bas.append(vh[:flat.shape[0]]) self.bas.append(bas)
[docs] def predict(self, Xtest, Ttest=None, k=None, N=None): """Predict using SCDT-NLS Parameters ---------- Xtest : array-like, shape (n_samples, n_columns) 1D data for testing. Ttest : [optional] array-like, shape (n_samples, n_columns) domain for corresponding test signals. k : [pre-tuned parameter] number of closest points to test sample N : [pre-tuned parameter] number of sinusoidal bases used for subspace enrrichment Returns ------- ndarray of shape (n_samples,) Predicted target values per sample in Xtest. """ if k is not None: k_opt = k else: k_opt = self.k if N is not None: n_opt = N else: n_opt = self.N print('+++++++++++ Testing Phase +++++++++++') print('\nCalculating SCDTs for testing data ...\n') s_scdt = [] for i in range(Xtest.shape[0]): if Ttest is None: s_scdt.append(self.calc_scdt(Xtest[i],self.t0,self.s0,self.t0)) else: s_scdt.append(self.calc_scdt(Xtest[i],Ttest[i],self.s0,self.t0)) X = np.stack(s_scdt) print('Apply NLS algorithm in SCDT domain\n') D = [] for class_idx in range(self.num_classes): Xi = self.Nset[class_idx] Xi_bas = self.bas[class_idx] d = np.zeros([X.shape[0],1]) B = [] L_basis = [] for i in range(X.shape[0]): x = X[i,:] dist_i = [] for j in range(Xi.shape[0]): basj = Xi_bas[j]#[:self.len_subspace,:] projR = x @ basj.T @ basj # (n_samples, n_features) # projR = x @ flat.T@np.linalg.inv(flat@flat.T + lmd*np.identity(flat.shape[0]))@flat dist_i.append(LA.norm(projR - x)) dist_i = np.stack(dist_i) indx = dist_i.argsort()[:k_opt] #Ni = np.concatenate([Xi[indx[0:k_opt],:], V], axis=0) Ni = self.enrichment(Xi[indx,:], k=n_opt) # k=0 => translation only u, s, vh = LA.svd(Ni,full_matrices=False) cum_s = np.cumsum(s) cum_s = cum_s/np.max(cum_s) basis = vh[:Ni.shape[0]] B.append(basis) L_basis.append((np.where(cum_s>=0.99)[0])[0]+1) max_basis = min(L_basis) for i in range(X.shape[0]): x = X[i,:] basis = B[i][:max_basis,:] proj = x @ basis.T # (n_samples, n_basis) projR = proj @ basis # (n_samples, n_features) d[i]=LA.norm(projR - x) D.append(np.squeeze(d)) D = np.stack(D, axis=0) preds = np.argmin(D, axis=0) pred_label = [self.label[i] for i in preds] return pred_label
[docs] def score(self, X, y): #print('Optimum k: {}'.format(self.k)) #print('Optimum N: {}'.format(self.N)) n = X.shape[0] y_pred = self.predict(X) n_correct = np.sum(y_pred == y) return n_correct/n, y_pred
[docs] def calc_scdt(self, sig1, t1, s0, t0): # sig1: (0, columns) # t1: domain of sig1 scdt = SCDT(reference=s0,x0=t0) Ipos, Ineg, Imasspos, Imassneg = scdt.stransform(sig1, t1) if self.rm_edge: shat = np.concatenate((Ipos[1:-2],Ineg[1:-2],Imasspos.reshape(1),Imassneg.reshape(1)),axis=0) else: shat = np.concatenate((Ipos[:-1],Ineg[:-1],Imasspos.reshape(1),Imassneg.reshape(1)),axis=0) return shat
[docs] def find_kN(self, X, y, k_range, n_range): n = X.shape[0] max_acc = 0. score_prev = 0. k_opt = 1 count = 0 acc_count = 0 ### calculate distances for samples in validation set indx = [] for i in range(X.shape[0]): x = np.copy(X[i,:]) indXi = [] for class_idx in range(self.num_classes): Xi = self.Nset[class_idx] Xi_bas = self.bas[class_idx] dist_i = [] for j in range(Xi.shape[0]): basj = Xi_bas[j]#[:self.len_subspace,:] projR = x @ basj.T @ basj # (n_samples, n_features) dist_i.append(LA.norm(projR - x)) dist_i = np.stack(dist_i) indXi.append(dist_i.argsort()[:max(k_range)+1]) indx.append(indXi) ### tune k using validation set for k in k_range: D = [] for class_idx in range(self.num_classes): Xi = self.Nset[class_idx] d = np.zeros([X.shape[0],1]) B = [] L_basis = [] for i in range(X.shape[0]): x = np.copy(X[i,:]) ind = indx[i][class_idx] Ni = np.copy(Xi[ind[:k],:]) u, s, vh = LA.svd(Ni,full_matrices=False) cum_s = np.cumsum(s) cum_s = cum_s/np.max(cum_s) basis = vh[:Ni.shape[0]] B.append(basis) L_basis.append((np.where(cum_s>=0.99)[0])[0]+1) max_basis = min(L_basis) for i in range(X.shape[0]): x = np.copy(X[i,:]) basis = B[i][:max_basis,:] projR = x @ basis.T @ basis # (n_samples, n_features) d[i]=LA.norm(projR - x) D.append(np.squeeze(d)) D = np.stack(D, axis=0) preds = np.argmin(D, axis=0) pred_label = [self.label[i] for i in preds] score = (np.sum(pred_label == y))/n #print('Validation accuracy: {} with k = {}'.format(score, k)) if score >= max_acc: max_acc = score k_opt = k acc_count = 0 else: acc_count = acc_count + 1 if score > score_prev: count = 0 else: count = count + 1 if count == 10 or acc_count == 20: break score_prev = score n_iter = [] max_acc = 0. score_prev = 0. n_opt = 1 count = 0 acc_count = 0 for n_enr in n_range: #print('\nN = {}'.format(n_enr)) n_iter.append(n_enr) D = [] for class_idx in range(self.num_classes): Xi = self.Nset[class_idx] d = np.zeros([X.shape[0],1]) B = [] L_basis = [] for i in range(X.shape[0]): x = np.copy(X[i,:]) ind = indx[i][class_idx] Ni = self.enrichment(Xi[ind[:k_opt],:], k=n_enr) # k=0 => translation only u, s, vh = LA.svd(Ni,full_matrices=False) cum_s = np.cumsum(s) cum_s = cum_s/np.max(cum_s) basis = vh[:Ni.shape[0]] B.append(basis) L_basis.append((np.where(cum_s>=0.99)[0])[0]+1) max_basis = min(L_basis) for i in range(X.shape[0]): x = X[i,:] basis = B[i][:max_basis,:] projR = x @ basis.T @ basis # (n_samples, n_features) d[i]=LA.norm(projR - x) D.append(np.squeeze(d)) D = np.stack(D, axis=0) preds = np.argmin(D, axis=0) pred_label = [self.label[i] for i in preds] score = (np.sum(pred_label == y))/n #print('Validation accuracy: {} with k = {}'.format(score, k_opt)) if score > max_acc or score==1.: max_acc = score acc_count = 0 n_opt = n_enr else: acc_count = acc_count + 1 if score > score_prev: count = 0 else: count = count + 1 if count == 10 or acc_count == 20: break score_prev = score return k_opt, n_opt
[docs] def enrichment(self, scdt_features, k): # scdt_features: (n_samples, scdt) if k<0: return scdt_features v= np.ones([1, scdt_features.shape[1]]) # add translation indx = 0 for i in range(-k,k+1): if i != 0: vi = scdt_features-np.sin(i*np.pi*scdt_features)/(np.abs(i)*np.pi) v = np.concatenate((v,vi)) indx = indx+1 return np.concatenate((scdt_features,v))