Source code for pytranskit.TBM.TBM_RCDT

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Aug  4 22:47:30 2020

@author: Imaging and Data Science Lab
"""

import numpy as np
import multiprocessing as mp
import matplotlib.pyplot as plt
import numpy.linalg as LA

from pytranskit.optrans.continuous.radoncdt import RadonCDT
from sklearn.decomposition import PCA
from pytranskit.optrans.decomposition import PLDA, CCA

from pytranskit.classification.utils import take_train_samples
from sklearn.metrics import accuracy_score, confusion_matrix
from pytranskit.TBM.utils import plot_confusion_matrix

eps = 1e-6
x0_range = [0, 1]
x_range = [0, 1]
theta = np.linspace(0, 180, 180)

[docs]class batch_RCDT: def __init__(self, thetas=theta, rm_edge=False): self.thetas = thetas self.rm_edge = rm_edge
[docs] def forward_seq(self, X, template): self.template = template radoncdt = RadonCDT(self.thetas) x_hat = [] for i in range(X.shape[0]): x_hat.append(radoncdt.forward(x0_range, self.template / np.sum(self.template), x_range, X[i,:] / np.sum(X[i,:]), self.rm_edge)) return np.asarray(x_hat)
[docs] def forward(self, X, template): # X: (n_samples, width, height) self.template = template if len(X.shape)<3: Xhat = self.fun_rcdt_single(X + eps) elif X.shape[0] == 1: Xhat = self.fun_rcdt_single(X[0,:,:] + eps) else: Xhat = self.rcdt_parallel(X) return Xhat
[docs] def inverse(self, Xhat, template): # X: (n_samples, width, height) self.template = template if len(Xhat.shape)<3: X_recon = self.fun_ircdt_single(Xhat) elif Xhat.shape[0] == 1: X_recon = self.fun_ircdt_single(Xhat[0,:,:]) else: X_recon = self.ircdt_parallel(Xhat) return X_recon
[docs] def fun_rcdt_single(self, I): # I: (width, height) radoncdt = RadonCDT(self.thetas) Ircdt = radoncdt.forward(x0_range, self.template / np.sum(self.template), x_range, I / np.sum(I), self.rm_edge) return Ircdt
[docs] def fun_rcdt_batch(self, data): # data: (n_samples, width, height) dataRCDT = [self.fun_rcdt_single(data[j, :, :] + eps) for j in range(data.shape[0])] return np.array(dataRCDT)
[docs] def rcdt_parallel(self, X): # X: (n_samples, width, height) # template for RCDT n_cpu = np.min([mp.cpu_count(), X.shape[0]]) splits = np.array_split(X, n_cpu, axis=0) pl = mp.Pool(mp.cpu_count()) dataRCDT = pl.map(self.fun_rcdt_batch, splits) rcdt_features = np.vstack(dataRCDT) # (n_samples, proj_len, num_angles) pl.close() pl.join() return rcdt_features
[docs] def fun_ircdt_single(self, Ihat): radoncdt = RadonCDT(self.thetas) Iircdt = radoncdt.apply_inverse_map(Ihat, self.template, x_range) return Iircdt
[docs] def fun_ircdt_batch(self, data): # data: (n_samples, width, height) dataiRCDT = [self.fun_ircdt_single(data[j, :, :]) for j in range(data.shape[0])] return np.array(dataiRCDT)
[docs] def ircdt_parallel(self, Xhat): # X: (n_samples, width, height) # template for RCDT n_cpu = np.min([mp.cpu_count(), Xhat.shape[0]]) splits = np.array_split(Xhat, n_cpu, axis=0) pl = mp.Pool(mp.cpu_count()) dataiRCDT = pl.map(self.fun_ircdt_batch, splits) Xrecon = np.vstack(dataiRCDT) pl.close() pl.join() return Xrecon
[docs]class RCDT_PCA: def __init__(self, n_components=2): self.n_components = n_components
[docs] def rcdt_pca(self, x_train_hat, y_train, x_test_hat, y_test, template): self.y_train = y_train self.y_test = y_test self.template = template [self.R, self.C] = template.shape [self.Ntr, self.Rtr, self.Ctr] = x_train_hat.shape [self.Nte, self.Rte, self.Cte] = x_test_hat.shape self.mean_tr=np.mean(x_train_hat, axis=0) self.mean_te=np.mean(x_test_hat, axis=0) x_train_hat_vec=(x_train_hat - self.mean_tr).reshape(self.Ntr,-1) x_test_hat_vec=(x_test_hat - self.mean_tr).reshape(self.Nte,-1) pca=PCA(n_components=self.n_components) self.pca_proj_tr = pca.fit_transform(x_train_hat_vec) self.pca_proj_te = pca.transform(x_test_hat_vec) b_hat = pca.inverse_transform(np.identity(self.n_components)) self.basis_hat = np.reshape(b_hat,(self.n_components,self.Rtr,self.Ctr)) return self.basis_hat, self.pca_proj_tr, self.pca_proj_te
[docs] def visualize(self, directions=5, points=5, thetas=theta, SD_spread=1): dir_num=directions gI_num=points b_hat = self.basis_hat s_tilde_tr = self.pca_proj_tr s_tilde_te = self.pca_proj_te pca_dirs=b_hat[:dir_num,:] pca_proj=s_tilde_tr[:,:dir_num] radoncdt = RadonCDT(thetas) ## figure 1 of 3 viz_pca=np.zeros((dir_num,self.R,self.C*gI_num)) for a in range(dir_num): lamb=np.linspace(-SD_spread*np.std(pca_proj[:,a]),SD_spread*np.std(pca_proj[:,a]), num=gI_num) mode_var = np.zeros([gI_num,self.Rtr,self.Ctr]) mode_var_recon = np.zeros([gI_num,self.R,self.C]) for b in range(gI_num): mode_var[b,:,:]=self.mean_tr+lamb[b]*pca_dirs[a,:]; mode_var_recon[b,:,:] = radoncdt.apply_inverse_map(mode_var[b,:,:], self.template, x_range) t=mode_var_recon[b] t=t-np.min(t); t=t/np.max(t) mode_var_recon[b]=t viz_pca[a,:] = mode_var_recon.transpose(2,0,1).reshape(self.C,-1) for a in range(dir_num): if a==0: F1=viz_pca[a,:] else: F1=np.concatenate((F1,viz_pca[a,:]),axis=0) r,c=np.shape(F1) plt.figure(figsize=(7,7)) plt.imshow(np.transpose(F1),cmap='gray') plt.xticks(np.linspace(r/(2*dir_num),r-r/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.xlabel('Modes of variation',fontsize=12) plt.yticks(np.linspace(1,c,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) plt.ylabel('($\sigma$)',fontsize=12) plt.title('Variation along the prominant PCA modes') ## figure 2 of 3 y_train = self.y_train y_test = self.y_test viz_dirs=viz_pca[:2,:]; proj_tr=self.pca_proj_tr[:,:2]; proj_te=self.pca_proj_te[:,:2] plt.figure(figsize=(18, 7)) leg_str=['class 1','class 2'] bas1=np.array([0,1]) bas1a=bas1*np.min(proj_tr[:,0]); bas1b=bas1*np.max(proj_tr[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 1), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) X=proj_tr[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of training data on the first 2 PCA directions'); ax1=plt.subplot2grid((4, 10), (3, 1), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 0), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) bas1a=bas1*np.min(proj_te[:,0]); bas1b=bas1*np.max(proj_te[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 6), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) X=proj_te[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of test data on the first 2 PCA directions') ax1=plt.subplot2grid((4, 10), (3, 6), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax2=plt.subplot2grid((4, 10), (0, 5), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) ## figure 3 of 3 which_direction=1 viz_dirs=viz_pca[which_direction-1:which_direction,:]; proj_tr=s_tilde_tr[:,which_direction-1]; proj_te=s_tilde_te[:,which_direction-1] plt.figure(figsize=(16, 7)) leg_str=['class 1','class 2'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) y=y_train[t] X=proj_tr[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first PCA direction') ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) y=y_test[t] X=proj_te[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first PCA direction') ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]) ax1.set_yticks([])
[docs]class RCDT_PLDA: def __init__(self, n_components=2): self.n_components = n_components
[docs] def rcdt_plda(self, x_train_hat, y_train, x_test_hat, y_test, template): self.y_train = y_train self.y_test = y_test self.template = template [self.R, self.C] = template.shape [self.Ntr, self.Rtr, self.Ctr] = x_train_hat.shape [self.Nte, self.Rte, self.Cte] = x_test_hat.shape self.mean_tr=np.mean(x_train_hat, axis=0) self.mean_te=np.mean(x_test_hat, axis=0) x_train_hat_vec=(x_train_hat - self.mean_tr).reshape(self.Ntr,-1) x_test_hat_vec=(x_test_hat - self.mean_tr).reshape(self.Nte,-1) pca=PCA() x_train_hat_vec_pca = pca.fit_transform(x_train_hat_vec) x_test_hat_vec_pca = pca.transform(x_test_hat_vec) plda=PLDA(alpha=.001,n_components=self.n_components) self.plda_proj_tr = plda.fit_transform(x_train_hat_vec_pca,y_train); self.plda_proj_te = plda.transform(x_test_hat_vec_pca); b_hat = pca.inverse_transform(plda.inverse_transform(np.identity(self.n_components))) self.basis_hat = np.reshape(b_hat,(self.n_components,self.Rtr,self.Ctr)) return self.basis_hat, self.plda_proj_tr, self.plda_proj_te
[docs] def visualize(self, directions=5, points=5, thetas=theta, SD_spread=1): dir_num=directions gI_num=points b_hat = self.basis_hat s_tilde_tr = self.plda_proj_tr s_tilde_te = self.plda_proj_te plda_dirs=b_hat[:dir_num,:] plda_proj=s_tilde_tr[:,:dir_num] radoncdt = RadonCDT(thetas) ## figure 1 of 3 viz_plda=np.zeros((dir_num,self.C,self.R*gI_num)) for a in range(dir_num): lamb=np.linspace(-SD_spread*np.std(plda_proj[:,a]),SD_spread*np.std(plda_proj[:,a]), num=gI_num) mode_var = np.zeros([gI_num,self.Rtr,self.Ctr]) mode_var_recon = np.zeros([gI_num,self.R,self.C]) for b in range(gI_num): mode_var[b,:,:]=self.mean_tr+lamb[b]*plda_dirs[a,:]; mode_var_recon[b,:,:] = radoncdt.apply_inverse_map(mode_var[b,:,:], self.template, x_range) t=mode_var_recon[b] t=t-np.min(t); t=t/np.max(t) mode_var_recon[b]=t viz_plda[a,:] = mode_var_recon.transpose(2,0,1).reshape(self.C,-1) for a in range(dir_num): if a==0: F1=viz_plda[a,:] else: F1=np.concatenate((F1,viz_plda[a,:]),axis=0) r,c=np.shape(F1) plt.figure(figsize=(7,7)) plt.imshow(np.transpose(F1),cmap='gray') plt.xticks(np.linspace(r/(2*dir_num),r-r/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.xlabel('Modes of variation',fontsize=12) plt.yticks(np.linspace(1,c,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) plt.ylabel('($\sigma$)',fontsize=12) plt.title('Variation along the prominant PLDA modes') ## figure 2 of 3 y_train = self.y_train y_test = self.y_test viz_dirs=viz_plda[:2,:]; proj_tr=self.plda_proj_tr[:,:2]; proj_te=self.plda_proj_te[:,:2] plt.figure(figsize=(18, 7)) leg_str=['class 1','class 2'] bas1=np.array([0,1]) bas1a=bas1*np.min(proj_tr[:,0]); bas1b=bas1*np.max(proj_tr[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 1), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) X=proj_tr[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of training data on the first 2 PLDA directions') ax1=plt.subplot2grid((4, 10), (3, 1), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax2=plt.subplot2grid((4, 10), (0, 0), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) bas1a=bas1*np.min(proj_te[:,0]); bas1b=bas1*np.max(proj_te[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 6), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) X=proj_te[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of test data on the first 2 PLDA directions'); ax1=plt.subplot2grid((4, 10), (3, 6), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 5), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) ## figure 3 of 3 which_direction=1 viz_dirs=viz_plda[which_direction-1:which_direction,:]; proj_tr=s_tilde_tr[:,which_direction-1]; proj_te=s_tilde_te[:,which_direction-1] plt.figure(figsize=(16, 7)) leg_str=['class 1','class 2'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) y=y_train[t] X=proj_tr[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first PLDA direction') ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) y=y_test[t] X=proj_te[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first PLDA direction') ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]) ax1.set_yticks([])
[docs]class RCDT_CCA: def __init__(self, n_components=2): self.n_components = n_components
[docs] def rcdt_cca(self, x_train_hat, y_train, x_test_hat, y_test, template): self.y_train = y_train self.y_test = y_test self.template = template [self.R, self.C] = template.shape [self.Ntr, self.Rtr, self.Ctr] = x_train_hat.shape [self.Nte, self.Rte, self.Cte] = x_test_hat.shape self.mean_tr=np.mean(x_train_hat, axis=0) self.mean_te=np.mean(x_test_hat, axis=0) x_train_hat_vec=(x_train_hat - self.mean_tr).reshape(self.Ntr,-1) x_test_hat_vec=(x_test_hat - self.mean_tr).reshape(self.Nte,-1) pca=PCA() x_train_hat_vec_pca = pca.fit_transform(x_train_hat_vec) x_test_hat_vec_pca = pca.transform(x_test_hat_vec) t0=np.where(0==y_train); t1=np.where(1==y_train); X_tr=x_train_hat_vec_pca[t0];Y_tr=x_train_hat_vec_pca[t1] t0=np.where(0==y_test); t1=np.where(1==y_test); X_te=x_test_hat_vec_pca[t0];Y_te=x_test_hat_vec_pca[t1] n_components=self.n_components cca=CCA(n_components=n_components) self.cca_proj_tr1,self.cca_proj_tr2 = cca.fit_transform(X_tr,Y_tr); self.cca_proj_te1,self.cca_proj_te2 = cca.transform(X_te,Y_te); b_hat1,b_hat2 = pca.inverse_transform(cca.inverse_transform(np.identity(n_components),np.identity(n_components))); self.basis_hat1=np.reshape(b_hat1,(n_components,self.Rtr,self.Ctr)) self.basis_hat2=np.reshape(b_hat2,(n_components,self.Rtr,self.Ctr)) return self.basis_hat1, self.basis_hat2, self.cca_proj_tr1,self.cca_proj_tr2, self.cca_proj_te1,self.cca_proj_te2
[docs] def visualize(self, directions=5, points=5, thetas=theta, SD_spread=1): dir_num=directions gI_num=points b_hat1 = self.basis_hat1 b_hat2 = self.basis_hat2 s_tilde_tr1 = self.cca_proj_tr1 s_tilde_tr2 = self.cca_proj_tr2 s_tilde_te1 = self.cca_proj_te1 s_tilde_te2 = self.cca_proj_te2 cca_dirs1=b_hat1[:dir_num,:] cca_dirs2=b_hat2[:dir_num,:] cca_proj1=s_tilde_tr1[:,:dir_num] cca_proj2=s_tilde_tr2[:,:dir_num] radoncdt = RadonCDT(thetas) ## figure 1 of 3 viz_cca1=np.zeros((dir_num,self.R,self.C*gI_num)) viz_cca2=np.zeros((dir_num,self.R,self.C*gI_num)) for a in range(dir_num): lamb1=np.linspace(-SD_spread*np.std(cca_proj1[:,a]),SD_spread*np.std(cca_proj1[:,a]), num=gI_num) lamb2=np.linspace(-SD_spread*np.std(cca_proj2[:,a]),SD_spread*np.std(cca_proj2[:,a]), num=gI_num) mode_var1 = np.zeros([gI_num,self.Rtr,self.Ctr]); mode_var2 = np.zeros([gI_num,self.Rtr,self.Ctr]) mode_var_recon1 = np.zeros([gI_num,self.R,self.C]) mode_var_recon2 = np.zeros([gI_num,self.R,self.C]) for b in range(gI_num): mode_var1[b,:,:]=self.mean_tr+lamb1[b]*cca_dirs1[a,:] mode_var2[b,:,:]=self.mean_tr+lamb2[b]*cca_dirs2[a,:] mode_var_recon1[b,:,:] = radoncdt.apply_inverse_map(mode_var1[b,:,:], self.template, x_range) mode_var_recon2[b,:,:] = radoncdt.apply_inverse_map(mode_var2[b,:,:], self.template, x_range) t1=mode_var_recon1[b]; t2=mode_var_recon2[b] t1=t1-np.min(t1); t1=t1/np.max(t1) t2=t2-np.min(t2); t2=t2/np.max(t2) mode_var_recon1[b]=t1; mode_var_recon2[b]=t2 viz_cca1[a,:] = mode_var_recon1.transpose(2,0,1).reshape(self.C,-1) viz_cca2[a,:] = mode_var_recon2.transpose(2,0,1).reshape(self.C,-1) for a in range(dir_num): if a==0: F1=viz_cca1[a,:]; F2=viz_cca2[a,:] else: F1=np.concatenate((F1,viz_cca1[a,:]),axis=0) F2=np.concatenate((F2,viz_cca2[a,:]),axis=0) r1,c1=np.shape(F1); r2,c2=np.shape(F2) fig, (ax0, ax1) = plt.subplots(ncols=2, figsize=(16, 8), sharex=True, sharey=True) ax0.imshow(np.transpose(F1),cmap='gray') ax0.set_xlabel('Modes of variation',fontsize=12) ax0.set_ylabel('($\sigma$)',fontsize=12) ax0.set_title('Variation along the prominant CCA modes (Class 0)') plt.xticks(np.linspace(r1/(2*dir_num),r1-r1/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.yticks(np.linspace(1,c1,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) ax1.imshow(np.transpose(F2),cmap='gray') ax1.set_xlabel('Modes of variation',fontsize=12) ax1.set_ylabel('($\sigma$)',fontsize=12) ax1.set_title('Variation along the prominant CCA modes (Class 1)') plt.xticks(np.linspace(r1/(2*dir_num),r1-r1/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.yticks(np.linspace(1,c1,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) plt.show() ## figure 2 of 3 viz_dirs1=viz_cca1[:2,:]; viz_dirs2=viz_cca2[:2,:] proj_tr1=s_tilde_tr1[:,:2]; proj_tr2=s_tilde_tr2[:,:2] proj_te1=s_tilde_te1[:,:2]; proj_te2=s_tilde_te2[:,:2] plt.figure(figsize=(18, 7)) leg_str=['Variable X','Variable Y'] bas1=np.array([0,1]) bas1a=bas1*np.min(proj_tr1[:,0]); bas1b=bas1*np.max(proj_tr1[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 1), colspan=3,rowspan=3) ax0.grid(linestyle='--') X=proj_tr1; Y=proj_tr2; ax0.scatter(X[:,0],X[:,1],color='C'+str(1)) ax0.scatter(Y[:,0],Y[:,1],color='C'+str(2)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of training data on the first 2 CCA directions'); ax1=plt.subplot2grid((4, 10), (3, 1), colspan=3,rowspan=1) xax=viz_dirs1[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 0), colspan=1,rowspan=3) yax=np.transpose(viz_dirs1[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]); ax2.set_yticks([]); bas1a=bas1*np.min(proj_te1[:,0]); bas1b=bas1*np.max(proj_te1[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 6), colspan=3,rowspan=3) ax0.grid(linestyle='--') X=proj_te1; Y=proj_te2; ax0.scatter(X[:,0],X[:,1],color='C'+str(1)) ax0.scatter(Y[:,0],Y[:,1],color='C'+str(2)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of test data on the first 2 CCA directions'); ax1=plt.subplot2grid((4, 10), (3, 6), colspan=3,rowspan=1) xax=viz_dirs1[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 5), colspan=1,rowspan=3) yax=np.transpose(viz_dirs1[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]); ax2.set_yticks([]) ## figure 3 of 3 which_direction=1 viz_dirs1=viz_cca1[which_direction-1:which_direction,:] viz_dirs2=viz_cca2[which_direction-1:which_direction,:] proj_tr1=s_tilde_tr1[:,which_direction-1]; proj_te1=s_tilde_te1[:,which_direction-1] proj_tr2=s_tilde_tr2[:,which_direction-1]; proj_te2=s_tilde_te2[:,which_direction-1] plt.figure(figsize=(16, 7)) leg_str=['Variable X'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_tr1 ax0.hist(XX,color=['C1']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs1[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_te1 ax0.hist(XX,color=['C1']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs1[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); plt.figure(figsize=(16, 7)) leg_str=['Variable Y'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_tr2 ax0.hist(XX,color=['C2']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs2[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_te2 ax0.hist(XX,color=['C2']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs2[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]);
[docs]class RCDT_NS_Classifier: def __init__(self, train_sample=None, use_gpu=False): self.train_sample = train_sample self.subspaces = [] self.label = [] self.len_subspace = 0 self.use_gpu = use_gpu
[docs] def classify_RCDT_NS(self,x_train, y_train, x_test, y_test): train_sample = self.train_sample numclass = len(np.unique(y_train)) self.num_classes = numclass if train_sample is not None: # Calculate number of samples of the class with smallest number of train samples unique, count = np.unique(y_train, return_counts=True) mincount = np.min(count) train_sample = np.min([train_sample, mincount]) x_train_sub, y_train_sub = take_train_samples(x_train, y_train, train_sample, numclass, repeat=0) # function from utils.py else: x_train_sub, y_train_sub = x_train, y_train self.fit(x_train_sub, y_train_sub) y_predicted = self.predict(x_test) accuracy = 100*self.score(y_test) print("Accuracy: {:0.2f}%".format(accuracy)) conf_mat = confusion_matrix(y_test, y_predicted) print('Confusion Matrix:') target_names = [] for c in range(numclass): class_label = 'Class '+str(c) target_names.append(class_label) plot_confusion_matrix(conf_mat, target_names) return y_predicted
[docs] def fit(self, X, y): """Fit linear model. Parameters ---------- X : array-like, shape (n_samples, n_proj, n_angles)) Training data. y : ndarray of shape (n_samples,) Target values. Returns ------- self : Returns an instance of self. """ for class_idx in range(self.num_classes): # generate the bases vectors class_data = X[y == class_idx] flat = class_data.reshape(class_data.shape[0], -1) #flat = np.transpose(class_data,(0,2,1)).reshape(class_data.shape[0],-1) u, s, vh = LA.svd(flat,full_matrices=False) cum_s = np.cumsum(s) cum_s = cum_s/np.max(cum_s) max_basis = (np.where(cum_s>=0.99)[0])[0] + 1 if max_basis > self.len_subspace: self.len_subspace = max_basis basis = vh[:flat.shape[0]] self.subspaces.append(basis) self.label.append(class_idx)
[docs] def predict(self, X): """Predict using the linear model Parameters ---------- X : array-like, sparse matrix, shape (n_samples, n_proj, n_angles)) Returns ------- ndarray of shape (n_samples,) Predicted target values per element in X. """ if self.use_gpu: import cupy as cp X = X.reshape([X.shape[0], -1]) #X = np.transpose(X,(0,2,1)).reshape(X.shape[0],-1) print('Len basis: {}'.format(self.len_subspace)) D = [] for class_idx in range(self.num_classes): basis = self.subspaces[class_idx] basis = basis[:self.len_subspace,:] if self.use_gpu: D.append(cp.linalg.norm(cp.matmul(cp.matmul(X, cp.array(basis).T), cp.array(basis)) -X, axis=1)) else: proj = X @ basis.T # (n_samples, n_basis) projR = proj @ basis # (n_samples, n_features) D.append(LA.norm(projR - X, axis=1)) if self.use_gpu: preds = cp.argmin(cp.stack(D, axis=0), axis=0) self.preds_label = [self.label[i] for i in cp.asnumpy(preds)] return self.preds_label else: D = np.stack(D, axis=0) preds = np.argmin(D, axis=0) self.preds_label = [self.label[i] for i in preds] return self.preds_label
[docs] def score(self, y_test): return accuracy_score(y_test, self.preds_label)