Source code for pytranskit.TBM.TBM_PLOT

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 10 11:14:01 2020

@author: Imaging and Data Science Lab
"""

import numpy as np
import multiprocessing as mp
import matplotlib.pyplot as plt
import numpy.linalg as LA

import ot
from sklearn.decomposition import PCA
from pytranskit.optrans.decomposition import PLDA, CCA

from pytranskit.classification.utils import take_train_samples
from sklearn.metrics import accuracy_score, confusion_matrix
from pytranskit.TBM.utils import plot_confusion_matrix

import cv2
from skimage.filters import threshold_otsu
from skimage import morphology
from skimage.measure import regionprops, label
from numpy import matlib as mb
import scipy as sc

from skimage.io import imread
from skimage.color import rgb2gray
from sklearn.cluster import KMeans

[docs]def gaussian2D(x,mean,sigma): sigma2=sigma**2 if len(x.shape)==1: return (1./(2*np.pi*sigma2))*np.exp(-((x-mean)**2).sum()/(2*sigma2)) else: return (1./(2*np.pi*sigma2))*np.exp(-((x-mean)**2).sum(1)/(2*sigma2))
[docs]def particle2image(x,a,sigma,imgshape): ''' This function gets a set of coordinates, x, their amplitude, a, and generates a PDF image using a gaussian kernel ''' X,Y=np.meshgrid(np.arange(imgshape[0]),np.arange(imgshape[1])) coords=np.stack([X.reshape(-1),Y.reshape(-1)],1) I=np.zeros(imgshape) for i,mean in enumerate(x): I+=a[i]*gaussian2D(coords,mean,sigma).reshape(imgshape) return I.T
[docs]def get_particles(img,N): thresh=.01 # If you want to discard low intensity values, otherwise set it to zero xfull=np.argwhere(img>thresh) features= np.concatenate([xfull,img[xfull[:,0],xfull[:,1]][:,np.newaxis]],1) kmeans=KMeans(n_clusters=N) kmeans.fit(features) x=kmeans.cluster_centers_ return x
[docs]def particleApproximation(imgs,Nmasses): PPl=list(); for i in range(imgs.shape[0]): img=imgs[i] x=get_particles(img,Nmasses) PPl.append(x); return PPl
[docs]def pLOT_single(x_temp,x_targ,a_temp,a_targ): C=ot.dist(x_targ,x_temp) w2,log=ot.emd2(a_targ,a_temp,C,return_matrix=True) M=x_targ.shape[0] gamma=log['G'] gamma2=np.array([g/(g.sum()) for g in gamma.T]).T #V=np.matmul(gamma2.T,x_targ)-x_temp V=np.matmul(gamma2.T,x_targ) return V
[docs]class batch_PLOT: def __init__(self, Nmasses=50): self.Nmasses = Nmasses
[docs] def forward_seq(self, x_train, x_test, x_template): N = self.Nmasses PPl_train=particleApproximation(x_train, N) PPl_test=particleApproximation(x_test,N) #x0=np.mean(x_train,axis=0) PPl_tem=get_particles(x_template,N) x_temp=PPl_tem[:,:2] a_temp=PPl_tem[:,2]/PPl_tem[:,2].sum() V=list(); M=x_train.shape[0] for ind in range(M): xa_tr=PPl_train[ind] x_tr=xa_tr[:,:2] a_tr=xa_tr[:,2]/xa_tr[:,2].sum() V_single=pLOT_single(x_temp,x_tr,a_temp,a_tr) V.append(V_single) V=np.asarray(V) x_train_hat=np.zeros((len(V),V[0].shape[0]*V[0].shape[1])) for a in range(len(V)): x_train_hat[a,:]=np.reshape(V[a],(V[0].shape[0]*V[0].shape[1],),order='F') V=list(); M=x_test.shape[0] for ind in range(M): xa_te=PPl_test[ind] x_te=xa_te[:,:2] a_te=xa_te[:,2]/xa_te[:,2].sum() V_single=pLOT_single(x_temp,x_te,a_temp,a_te) V.append(V_single) V=np.asarray(V) x_test_hat=np.zeros((len(V),V[0].shape[0]*V[0].shape[1])) for a in range(len(V)): x_test_hat[a,:]=np.reshape(V[a],(V[0].shape[0]*V[0].shape[1],),order='F') return x_train_hat, x_test_hat, x_temp, a_temp
[docs]class PLOT_PCA: def __init__(self, n_components=2): self.n_components = n_components
[docs] def plot_pca(self, x_train_hat, y_train, x_test_hat, y_test, template): self.y_train = y_train self.y_test = y_test self.template = template [self.R, self.C] = template.shape [self.Ntr, self.Ptr] = x_train_hat.shape [self.Nte, self.Pte] = x_test_hat.shape self.mean_tr=np.mean(x_train_hat, axis=0) self.mean_te=np.mean(x_test_hat, axis=0) x_train_hat_vec=(x_train_hat - self.mean_tr).reshape(self.Ntr,-1,order='F') x_test_hat_vec=(x_test_hat - self.mean_tr).reshape(self.Nte,-1,order='F') pca=PCA(n_components=self.n_components) self.pca_proj_tr = pca.fit_transform(x_train_hat_vec) self.pca_proj_te = pca.transform(x_test_hat_vec) b_hat = pca.inverse_transform(np.identity(self.n_components)) self.basis_hat = np.reshape(b_hat,(self.n_components,self.Ptr)) return self.basis_hat, self.pca_proj_tr, self.pca_proj_te
[docs] def visualize(self, mean_x_train_hat, Intensity, directions=5, points=5, SD_spread=2): dir_num=directions gI_num=points b_hat = self.basis_hat s_tilde_tr = self.pca_proj_tr s_tilde_te = self.pca_proj_te pca_dirs=b_hat[:dir_num,:] pca_proj=s_tilde_tr[:,:dir_num] ## figure 1 of 3 viz_pca=np.zeros((dir_num,self.R,self.C*gI_num)) for a in range(dir_num): lamb=np.linspace(-SD_spread*np.std(pca_proj[:,a]),SD_spread*np.std(pca_proj[:,a]), num=gI_num) mode_var_recon = np.zeros([gI_num,self.R,self.C]) for b in range(gI_num): mode_var=mean_x_train_hat+lamb[b]*pca_dirs[a,:]; mode_var=mode_var.reshape(int(len(mode_var)/2),-1,order='F') mode_var_recon[b,:]=particle2image(mode_var,Intensity,3,(self.R,self.C)) t=mode_var_recon[b] t=t-np.min(t); t=t/np.max(t) mode_var_recon[b]=t viz_pca[a,:] = mode_var_recon.transpose(2,0,1).reshape(self.C,-1) for a in range(dir_num): if a==0: F1=viz_pca[a,:] else: F1=np.concatenate((F1,viz_pca[a,:]),axis=0) r,c=np.shape(F1) plt.figure(figsize=(7,7)) plt.imshow(np.transpose(F1),cmap='gray') plt.xticks(np.linspace(r/(2*dir_num),r-r/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.xlabel('Modes of variation',fontsize=12) plt.yticks(np.linspace(1,c,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) plt.ylabel('($\sigma$)',fontsize=12) plt.title('Variation along the prominant PCA modes') ## figure 2 of 3 y_train = self.y_train y_test = self.y_test viz_dirs=viz_pca[:2,:]; proj_tr=self.pca_proj_tr[:,:2]; proj_te=self.pca_proj_te[:,:2] plt.figure(figsize=(18, 7)) leg_str=['class 1','class 2'] bas1=np.array([0,1]) bas1a=bas1*np.min(proj_tr[:,0]); bas1b=bas1*np.max(proj_tr[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 1), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) X=proj_tr[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of training data on the first 2 PCA directions'); ax1=plt.subplot2grid((4, 10), (3, 1), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 0), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) bas1a=bas1*np.min(proj_te[:,0]); bas1b=bas1*np.max(proj_te[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 6), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) X=proj_te[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of test data on the first 2 PCA directions') ax1=plt.subplot2grid((4, 10), (3, 6), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax2=plt.subplot2grid((4, 10), (0, 5), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) ## figure 3 of 3 which_direction=1 viz_dirs=viz_pca[which_direction-1:which_direction,:]; proj_tr=s_tilde_tr[:,which_direction-1]; proj_te=s_tilde_te[:,which_direction-1] plt.figure(figsize=(16, 7)) leg_str=['class 1','class 2'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) y=y_train[t] X=proj_tr[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first PCA direction') ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) y=y_test[t] X=proj_te[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first PCA direction') ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]) ax1.set_yticks([])
[docs]class PLOT_PLDA: def __init__(self, n_components=2): self.n_components = n_components
[docs] def plot_plda(self, x_train_hat, y_train, x_test_hat, y_test, template): self.y_train = y_train self.y_test = y_test self.template = template [self.R, self.C] = template.shape [self.Ntr, self.Ptr] = x_train_hat.shape [self.Nte, self.Pte] = x_test_hat.shape self.mean_tr=np.mean(x_train_hat, axis=0) self.mean_te=np.mean(x_test_hat, axis=0) x_train_hat_vec=(x_train_hat - self.mean_tr).reshape(self.Ntr,-1,order='F') x_test_hat_vec=(x_test_hat - self.mean_tr).reshape(self.Nte,-1,order='F') pca=PCA() x_train_hat_vec_pca = pca.fit_transform(x_train_hat_vec) x_test_hat_vec_pca = pca.transform(x_test_hat_vec) plda=PLDA(alpha=1.618,n_components=self.n_components) self.plda_proj_tr = plda.fit_transform(x_train_hat_vec_pca,y_train); self.plda_proj_te = plda.transform(x_test_hat_vec_pca); b_hat = pca.inverse_transform(plda.inverse_transform(np.identity(self.n_components))) self.basis_hat = np.reshape(b_hat,(self.n_components,self.Ptr)) return self.basis_hat, self.plda_proj_tr, self.plda_proj_te
[docs] def visualize(self, mean_x_train_hat, Intensity, directions=5, points=5, SD_spread=2): dir_num=directions gI_num=points b_hat = self.basis_hat s_tilde_tr = self.plda_proj_tr s_tilde_te = self.plda_proj_te plda_dirs=b_hat[:dir_num,:] plda_proj=s_tilde_tr[:,:dir_num] ## figure 1 of 3 viz_plda=np.zeros((dir_num,self.R,self.C*gI_num)) for a in range(dir_num): lamb=np.linspace(-SD_spread*np.std(plda_proj[:,a]),SD_spread*np.std(plda_proj[:,a]), num=gI_num) mode_var_recon = np.zeros([gI_num,self.R,self.C]) for b in range(gI_num): mode_var=mean_x_train_hat+lamb[b]*plda_dirs[a,:]; mode_var=mode_var.reshape(int(len(mode_var)/2),-1,order='F') mode_var_recon[b,:]=particle2image(mode_var,Intensity,3,(self.R,self.C)) t=mode_var_recon[b] t=t-np.min(t); t=t/np.max(t) mode_var_recon[b]=t viz_plda[a,:] = mode_var_recon.transpose(2,0,1).reshape(self.C,-1) for a in range(dir_num): if a==0: F1=viz_plda[a,:] else: F1=np.concatenate((F1,viz_plda[a,:]),axis=0) r,c=np.shape(F1) plt.figure(figsize=(7,7)) plt.imshow(np.transpose(F1),cmap='gray') plt.xticks(np.linspace(r/(2*dir_num),r-r/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.xlabel('Modes of variation',fontsize=12) plt.yticks(np.linspace(1,c,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) plt.ylabel('($\sigma$)',fontsize=12) plt.title('Variation along the prominant PLDA modes') ## figure 2 of 3 y_train = self.y_train y_test = self.y_test viz_dirs=viz_plda[:2,:]; proj_tr=self.plda_proj_tr[:,:2]; proj_te=self.plda_proj_te[:,:2] plt.figure(figsize=(18, 7)) leg_str=['class 1','class 2'] bas1=np.array([0,1]) bas1a=bas1*np.min(proj_tr[:,0]); bas1b=bas1*np.max(proj_tr[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 1), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) X=proj_tr[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of training data on the first 2 PLDA directions') ax1=plt.subplot2grid((4, 10), (3, 1), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax2=plt.subplot2grid((4, 10), (0, 0), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) bas1a=bas1*np.min(proj_te[:,0]); bas1b=bas1*np.max(proj_te[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 6), colspan=3,rowspan=3) ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) X=proj_te[t] ax0.scatter(X[:,0],X[:,1],color='C'+str(a+1)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of test data on the first 2 PLDA directions'); ax1=plt.subplot2grid((4, 10), (3, 6), colspan=3,rowspan=1) xax=viz_dirs[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 5), colspan=1,rowspan=3) yax=np.transpose(viz_dirs[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]) ax2.set_yticks([]) ## figure 3 of 3 which_direction=1 viz_dirs=viz_plda[which_direction-1:which_direction,:]; proj_tr=s_tilde_tr[:,which_direction-1]; proj_te=s_tilde_te[:,which_direction-1] plt.figure(figsize=(16, 7)) leg_str=['class 1','class 2'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_train) for a in range(len(y_unique)): t=np.where(a==y_train) y=y_train[t] X=proj_tr[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first PLDA direction') ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]) ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') y_unique=np.unique(y_test) for a in range(len(y_unique)): t=np.where(a==y_test) y=y_test[t] X=proj_te[t]; X=np.reshape(X,(len(y))) if a==0: XX=[X] else: XX.append(X) ax0.hist(XX,color=['C1','C2']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first PLDA direction') ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs[0,:],cmap='gray') ax1.set_xticks([]) ax1.set_yticks([])
[docs]class PLOT_CCA: def __init__(self, n_components=2): self.n_components = n_components
[docs] def plot_cca(self, x_train_hat, y_train, x_test_hat, y_test, template): self.y_train = y_train self.y_test = y_test self.template = template [self.R, self.C] = template.shape [self.Ntr, self.Ptr] = x_train_hat.shape [self.Nte, self.Pte] = x_test_hat.shape self.mean_tr=np.mean(x_train_hat, axis=0) self.mean_te=np.mean(x_test_hat, axis=0) x_train_hat_vec=(x_train_hat - self.mean_tr).reshape(self.Ntr,-1,order='F') x_test_hat_vec=(x_test_hat - self.mean_tr).reshape(self.Nte,-1,order='F') pca=PCA() x_train_hat_vec_pca = pca.fit_transform(x_train_hat_vec) x_test_hat_vec_pca = pca.transform(x_test_hat_vec) t0=np.where(0==y_train); t1=np.where(1==y_train); X_tr=x_train_hat_vec_pca[t0];Y_tr=x_train_hat_vec_pca[t1] t0=np.where(0==y_test); t1=np.where(1==y_test); X_te=x_test_hat_vec_pca[t0];Y_te=x_test_hat_vec_pca[t1] n_components=self.n_components cca=CCA(n_components=n_components) self.cca_proj_tr1,self.cca_proj_tr2 = cca.fit_transform(X_tr,Y_tr); self.cca_proj_te1,self.cca_proj_te2 = cca.transform(X_te,Y_te); b_hat1,b_hat2 = pca.inverse_transform(cca.inverse_transform(np.identity(n_components),np.identity(n_components))); self.basis_hat1=np.reshape(b_hat1,(n_components,self.Ptr)) self.basis_hat2=np.reshape(b_hat2,(n_components,self.Ptr)) return self.basis_hat1, self.basis_hat2, self.cca_proj_tr1,self.cca_proj_tr2, self.cca_proj_te1,self.cca_proj_te2
[docs] def visualize(self, mean_x_train_hat, Intensity, directions=5, points=5, SD_spread=1): dir_num=directions gI_num=points b_hat1 = self.basis_hat1 b_hat2 = self.basis_hat2 s_tilde_tr1 = self.cca_proj_tr1 s_tilde_tr2 = self.cca_proj_tr2 s_tilde_te1 = self.cca_proj_te1 s_tilde_te2 = self.cca_proj_te2 cca_dirs1=b_hat1[:dir_num,:] cca_dirs2=b_hat2[:dir_num,:] cca_proj1=s_tilde_tr1[:,:dir_num] cca_proj2=s_tilde_tr2[:,:dir_num] ## figure 1 of 3 viz_cca1=np.zeros((dir_num,self.R,self.C*gI_num)) viz_cca2=np.zeros((dir_num,self.R,self.C*gI_num)) for a in range(dir_num): lamb1=np.linspace(-SD_spread*np.std(cca_proj1[:,a]),SD_spread*np.std(cca_proj1[:,a]), num=gI_num) lamb2=np.linspace(-SD_spread*np.std(cca_proj2[:,a]),SD_spread*np.std(cca_proj2[:,a]), num=gI_num) mode_var_recon1 = np.zeros([gI_num,self.R,self.C]) mode_var_recon2 = np.zeros([gI_num,self.R,self.C]) for b in range(gI_num): #mode_var1=Pl_tem_vec+self.mean_tr+lamb1[b]*cca_dirs1[a,:] #mode_var2=Pl_tem_vec+self.mean_tr+lamb2[b]*cca_dirs2[a,:] #mode_var_recon1[b,:]=Visualize_LOT(mode_var1,Intensity,self.R,self.C,scale=1) #mode_var_recon2[b,:]=Visualize_LOT(mode_var2,Intensity,self.R,self.C,scale=1) mode_var1=mean_x_train_hat+lamb1[b]*cca_dirs1[a,:] mode_var1=mode_var1.reshape(int(len(mode_var1)/2),-1,order='F') mode_var_recon1[b,:]=particle2image(mode_var1,Intensity,3,(self.R,self.C)) mode_var2=mean_x_train_hat+lamb2[b]*cca_dirs2[a,:] mode_var2=mode_var2.reshape(int(len(mode_var2)/2),-1,order='F') mode_var_recon2[b,:]=particle2image(mode_var2,Intensity,3,(self.R,self.C)) t1=mode_var_recon1[b]; t2=mode_var_recon2[b] t1=t1-np.min(t1); t1=t1/np.max(t1) t2=t2-np.min(t2); t2=t2/np.max(t2) mode_var_recon1[b]=t1; mode_var_recon2[b]=t2 viz_cca1[a,:] = mode_var_recon1.transpose(2,0,1).reshape(self.C,-1) viz_cca2[a,:] = mode_var_recon2.transpose(2,0,1).reshape(self.C,-1) for a in range(dir_num): if a==0: F1=viz_cca1[a,:]; F2=viz_cca2[a,:] else: F1=np.concatenate((F1,viz_cca1[a,:]),axis=0) F2=np.concatenate((F2,viz_cca2[a,:]),axis=0) r1,c1=np.shape(F1); r2,c2=np.shape(F2) fig, (ax0, ax1) = plt.subplots(ncols=2, figsize=(16, 8), sharex=True, sharey=True) ax0.imshow(np.transpose(F1),cmap='gray') ax0.set_xlabel('Modes of variation',fontsize=12) ax0.set_ylabel('($\sigma$)',fontsize=12) ax0.set_title('Variation along the prominant CCA modes (Class 0)') plt.xticks(np.linspace(r1/(2*dir_num),r1-r1/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.yticks(np.linspace(1,c1,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) ax1.imshow(np.transpose(F2),cmap='gray') ax1.set_xlabel('Modes of variation',fontsize=12) ax1.set_ylabel('($\sigma$)',fontsize=12) ax1.set_title('Variation along the prominant CCA modes (Class 1)') plt.xticks(np.linspace(r1/(2*dir_num),r1-r1/(2*dir_num),dir_num),np.array(range(1,dir_num+1))) plt.yticks(np.linspace(1,c1,5),np.array([-SD_spread,-SD_spread/2,0,SD_spread/2,SD_spread])) plt.show() ## figure 2 of 3 viz_dirs1=viz_cca1[:2,:]; viz_dirs2=viz_cca2[:2,:] proj_tr1=s_tilde_tr1[:,:2]; proj_tr2=s_tilde_tr2[:,:2] proj_te1=s_tilde_te1[:,:2]; proj_te2=s_tilde_te2[:,:2] plt.figure(figsize=(18, 7)) leg_str=['Variable X','Variable Y'] bas1=np.array([0,1]) bas1a=bas1*np.min(proj_tr1[:,0]); bas1b=bas1*np.max(proj_tr1[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 1), colspan=3,rowspan=3) ax0.grid(linestyle='--') X=proj_tr1; Y=proj_tr2; ax0.scatter(X[:,0],X[:,1],color='C'+str(1)) ax0.scatter(Y[:,0],Y[:,1],color='C'+str(2)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of training data on the first 2 CCA directions'); ax1=plt.subplot2grid((4, 10), (3, 1), colspan=3,rowspan=1) xax=viz_dirs1[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 0), colspan=1,rowspan=3) yax=np.transpose(viz_dirs1[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]); ax2.set_yticks([]); bas1a=bas1*np.min(proj_te1[:,0]); bas1b=bas1*np.max(proj_te1[:,0]) basy=[bas1a[0],bas1b[0]]; basx=[bas1a[1],bas1b[1]] ax0=plt.subplot2grid((4, 10), (0, 6), colspan=3,rowspan=3) ax0.grid(linestyle='--') X=proj_te1; Y=proj_te2; ax0.scatter(X[:,0],X[:,1],color='C'+str(1)) ax0.scatter(Y[:,0],Y[:,1],color='C'+str(2)) ax0.legend(leg_str) ax0.plot(basx,basy,color='C4') ax0.set_title('Projection of test data on the first 2 CCA directions'); ax1=plt.subplot2grid((4, 10), (3, 6), colspan=3,rowspan=1) xax=viz_dirs1[0,:] ax1.imshow(xax,cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax2=plt.subplot2grid((4, 10), (0, 5), colspan=1,rowspan=3) yax=np.transpose(viz_dirs1[1,:]) ax2.imshow(yax,cmap='gray') ax2.set_xticks([]); ax2.set_yticks([]) ## figure 3 of 3 which_direction=1 viz_dirs1=viz_cca1[which_direction-1:which_direction,:] viz_dirs2=viz_cca2[which_direction-1:which_direction,:] proj_tr1=s_tilde_tr1[:,which_direction-1]; proj_te1=s_tilde_te1[:,which_direction-1] proj_tr2=s_tilde_tr2[:,which_direction-1]; proj_te2=s_tilde_te2[:,which_direction-1] plt.figure(figsize=(16, 7)) leg_str=['Variable X'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_tr1 ax0.hist(XX,color=['C1']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs1[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_te1 ax0.hist(XX,color=['C1']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs1[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); plt.figure(figsize=(16, 7)) leg_str=['Variable Y'] ax0=plt.subplot2grid((4, 8), (0, 0), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_tr2 ax0.hist(XX,color=['C2']); ax0.legend(leg_str) ax0.set_title('Projection of training data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 0), colspan=3,rowspan=1) ax1.imshow(viz_dirs2[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]); ax0=plt.subplot2grid((4, 8), (0, 5), colspan=3,rowspan=2); ax0.grid(linestyle='--') XX=proj_te2 ax0.hist(XX,color=['C2']); ax0.legend(leg_str) ax0.set_title('Projection of test data on the first CCA direction'); ax1=plt.subplot2grid((4, 8), (2, 5), colspan=3,rowspan=1) ax1.imshow(viz_dirs2[0,:],cmap='gray') ax1.set_xticks([]); ax1.set_yticks([]);
[docs]class PLOT_NS_Classifier: def __init__(self, train_sample=None, use_gpu=False): self.train_sample = train_sample self.subspaces = [] self.label = [] self.len_subspace = 0 self.use_gpu = use_gpu
[docs] def classify_PLOT_NS(self,x_train, y_train, x_test, y_test): train_sample = self.train_sample numclass = len(np.unique(y_train)) self.num_classes = numclass if train_sample is not None: # Calculate number of samples of the class with smallest number of train samples unique, count = np.unique(y_train, return_counts=True) mincount = np.min(count) train_sample = np.min([train_sample, mincount]) x_train_sub, y_train_sub = take_train_samples(x_train, y_train, train_sample, numclass, repeat=0) # function from utils.py else: x_train_sub, y_train_sub = x_train, y_train self.fit(x_train_sub, y_train_sub) y_predicted = self.predict(x_test) accuracy = 100*self.score(y_test) print("Accuracy: {:0.2f}%".format(accuracy)) conf_mat = confusion_matrix(y_test, y_predicted) print('Confusion Matrix:') target_names = [] for c in range(numclass): class_label = 'Class '+str(c) target_names.append(class_label) plot_confusion_matrix(conf_mat, target_names) return y_predicted
[docs] def fit(self, X, y): """Fit linear model. Parameters ---------- X : array-like, shape (n_samples, n_proj, n_angles)) Training data. y : ndarray of shape (n_samples,) Target values. Returns ------- self : Returns an instance of self. """ for class_idx in range(self.num_classes): # generate the bases vectors class_data = X[y == class_idx] flat = class_data.reshape(class_data.shape[0], -1) #flat = np.transpose(class_data,(0,2,1)).reshape(class_data.shape[0],-1) u, s, vh = LA.svd(flat,full_matrices=False) cum_s = np.cumsum(s) cum_s = cum_s/np.max(cum_s) #TODO: fix the bug to use original max_basis #max_basis = (np.where(cum_s>=0.99)[0])[0] + 1 max_basis = 30 if max_basis > self.len_subspace: self.len_subspace = max_basis basis = vh[:flat.shape[0]] self.subspaces.append(basis) self.label.append(class_idx)
[docs] def predict(self, X): """Predict using the linear model Parameters ---------- X : array-like, sparse matrix, shape (n_samples, n_proj, n_angles)) Returns ------- ndarray of shape (n_samples,) Predicted target values per element in X. """ if self.use_gpu: import cupy as cp X = X.reshape([X.shape[0], -1]) #X = np.transpose(X,(0,2,1)).reshape(X.shape[0],-1) print('Len basis: {}'.format(self.len_subspace)) D = [] for class_idx in range(self.num_classes): basis = self.subspaces[class_idx] basis = basis[:self.len_subspace,:] if self.use_gpu: D.append(cp.linalg.norm(cp.matmul(cp.matmul(X, cp.array(basis).T), cp.array(basis)) -X, axis=1)) else: proj = X @ basis.T # (n_samples, n_basis) projR = proj @ basis # (n_samples, n_features) D.append(LA.norm(projR - X, axis=1)) if self.use_gpu: preds = cp.argmin(cp.stack(D, axis=0), axis=0) self.preds_label = [self.label[i] for i in cp.asnumpy(preds)] return self.preds_label else: D = np.stack(D, axis=0) preds = np.argmin(D, axis=0) self.preds_label = [self.label[i] for i in preds] return self.preds_label
[docs] def score(self, y_test): return accuracy_score(y_test, self.preds_label)
## ------------ PAST CODES FROM HERE ------------------------
[docs]def fromInd2Coord(ind,Ny): coor=np.zeros((2,len(ind))) coor[0,:] = np.fix(ind/Ny)+1 coor[1,:] = np.remainder(ind,Ny) return coor
[docs]def L2_distance(a,b): b=np.reshape(b,(len(b),1)) c=a-b nrm=np.zeros(c.shape[1]) for i in range(c.shape[1]): tmp=c[:,i] nrm[i]=np.linalg.norm(tmp) return nrm
[docs]def img2pts_Lloyd(img,Nmasses): stopLloyd = 0.5 [ny,nx]=np.shape(img) img_t = img/img.max() level = threshold_otsu(img_t)*0.22 BW = cv2.threshold(img_t,level,1,cv2.THRESH_BINARY)[1]; BW=BW.astype(bool) BW = morphology.remove_small_objects(BW, min_size=7); BW=BW.astype(int); BW = label(BW); STATS=regionprops(BW) bb=STATS[0].bbox sy=bb[0]; sx=bb[1]; Ny=bb[2]-sy; Nx=bb[3]-sx img_t_vec=np.squeeze(np.reshape(img_t,(ny*nx,1),order='F')) img_vec=np.squeeze(np.reshape(img,(ny*nx,1),order='F')) w = np.squeeze(np.argwhere(img_t_vec < level)) img_vec[w]=0; img=np.reshape(img_vec,(ny,nx),order='F') ind = np.squeeze(np.argwhere(img_t_vec>=level)) output_Index = np.random.choice(ind,np.min((Nmasses,len(ind))),replace=False) output_Index=np.sort(output_Index) res_P=fromInd2Coord(output_Index+1,ny) res_c=res_P[0,:]*0; res_c[:]=1 BW=img_t*0 img_x=img/np.sum(img) BW[sy-1:sy+Ny,sx-1:sx+Nx] = img_x[sy-1:sy+Ny,sx-1:sx+Nx] BW_vec=np.reshape(BW,(nx*ny,),order='F'); ii=np.squeeze(np.argwhere(BW_vec)) V=BW_vec[ii] rc=fromInd2Coord(ii+1,ny) col=rc[0,:]; row=rc[1,:] tmp_row=np.reshape(row,(1,len(row))); tmp_col=np.reshape(col,(1,len(col))) Pl=np.concatenate((tmp_col,tmp_row),axis=0) if len(ind)<Nmasses: res_P2 = fromInd2Coord(ind,ny) nlz = np.sum(img_vec[ind]) res_c2 = img_vec[ind]/nlz var_out = np.zeros((1,len(ind))) llerr = 0 else: cur = 0; differ = 1 while differ>stopLloyd: neighbors_map = Pl[0,:]*0 for k in range(Pl.shape[1]): Pk = Pl[:,k]; Pk=np.reshape(Pk,(2,1)) BP=mb.repmat(Pk,1,res_P.shape[1]) err=np.sum((BP-res_P)*(BP-res_P),axis=0); err[np.isnan(err)]=1e6 w=np.argwhere(err==err.min()) neighbors_map[k]=w[0] errUB=np.zeros(res_P.shape[1]) for k in range(res_P.shape[1]): w=np.argwhere(np.absolute(neighbors_map-k)<0.01) cx = np.sum(V[w]*Pl[0,w])/np.sum(V[w]+1e-10) cy = np.sum(V[w]*Pl[1,w])/np.sum(V[w]+1e-10) tmp_center=np.array([cx,cy]); tmp_center=np.reshape(tmp_center,(2,1)) ld=L2_distance(Pl[:,np.squeeze(w)],tmp_center) dist_cent = ld*ld t1=np.reshape(V[w],(len(V[w]),)); t2=np.reshape(dist_cent,(len(dist_cent),)); if t1.shape[0]==t2.shape[0]: errUB[k]=t1.dot(t2) else: errUB[k]=np.sum(t1*t2) res_P[:,k]=np.squeeze(tmp_center) res_c[k]=np.sum(V[w]) if cur==0: llerr=np.sum(errUB) else: llerr=np.append(llerr,np.sum(errUB)) if cur>=3: differ = (llerr[cur]-llerr[cur-1])/(llerr[cur-1]-llerr[cur-2]) else: differ=1 cur=cur+1 vari=np.zeros(res_P.shape[1]) for k in range(res_P.shape[1]): w=np.argwhere(np.absolute(neighbors_map-k)<0.01) w=np.reshape(w,(len(w),)) temp = Pl[:,w]-mb.repmat(np.reshape(res_P[:,k],(2,1)),1,len(w)) vari[k]=np.std(np.diag(temp.T.dot(temp))) eps=1e-10 w=np.argwhere(res_c<eps) # res_c=np.delete(res_c,w) # ? # ? var_out=vari # ? res_c=res_c/np.sum(res_c) res_P2=res_P.T res_c2=res_c return (res_P2,res_c2)
[docs]def particleApproximation_v0(imgs,Nmasses): Pl=list(); P=list() for i in range(imgs.shape[0]): (Pl_t,P_t)=img2pts_Lloyd(imgs[i],Nmasses) Pl.append(Pl_t); P.append(P_t); return (Pl,P)
[docs]def sub2ind(array_shape, rows, cols): return (cols-1)*array_shape[1] + rows-1
[docs]def Visualize_LOT(Data,Intensity,Nx,Ny,scale): NG=35 I1=np.zeros((scale*Nx,scale*Ny)) loc=scale*np.round(np.reshape(Data,(int(len(Data)/2),2),order='F')) linearind=sub2ind(np.shape(I1),loc[:,0],loc[:,1]) linearind=linearind.astype(int) i1=np.squeeze(np.reshape(I1,(scale*Nx*scale*Ny,1),order='F')) i1[linearind]=Intensity I1=np.reshape(i1,(scale*Nx,scale*Ny),order='F') h1 = sc.signal.gaussian(NG*scale,std=7); h1=np.reshape(h1,(len(h1),1),order='F') h=h1.dot(h1.T); h=h/np.sum(h) I1=sc.ndimage.convolve(I1,h,mode='constant') I1=I1-I1.min(); I1=I1/I1.max(); #I1=np.flipud(np.fliplr(I1)) return I1
[docs]class batch_PLOT_v0: def __init__(self, Nmasses=50): self.Nmasses = Nmasses
[docs] def forward_seq(self, x_train, x_test): N = self.Nmasses (Pl_train,P_train)=particleApproximation_v0(x_train, N) (Pl_test,P_test)=particleApproximation_v0(x_test,N) Pl_tem=0 for a in range(2):#x_train.shape[0]): t=Pl_train[a] Pl_tem=Pl_tem+t Pl_tem=Pl_tem/2#x_train.shape[0] P_tem = np.ones((N,))/float(N) #Pl_tem_vec=np.reshape(Pl_tem,(Pl_tem.shape[0]*Pl_tem.shape[1],),order='F') V=list(); M=x_train.shape[0] for ind in range(M): Ni=Pl_train[ind].shape[0] C=ot.dist(Pl_train[ind],Pl_tem) b=P_tem # b=np.ones((N,))/float(N) a=P_train[ind] # a=np.ones((Ni,))/float(Ni) p=ot.emd(a,b,C) # exact linear program #V.append(np.matmul((N*p).T,Pl_train[ind])-Pl_tem) V.append(np.matmul((N*p).T,Pl_train[ind])+Pl_tem) # already giving transport displacement? V=np.asarray(V) x_train_hat=np.zeros((len(V),V[0].shape[0]*V[0].shape[1])) for a in range(len(V)): x_train_hat[a,:]=np.reshape(V[a],(V[0].shape[0]*V[0].shape[1],),order='F') V=list(); M=x_test.shape[0] for ind in range(M): Ni=Pl_test[ind].shape[0] C=ot.dist(Pl_test[ind],Pl_tem) b=P_tem # b=np.ones((N,))/float(N) a=P_test[ind] # a=np.ones((Ni,))/float(Ni) p=ot.emd(a,b,C) # exact linear program #V.append(np.matmul((N*p).T,Pl_test[ind])-Pl_tem) V.append(np.matmul((N*p).T,Pl_test[ind])+Pl_tem) V=np.asarray(V) x_test_hat=np.zeros((len(V),V[0].shape[0]*V[0].shape[1])) for a in range(len(V)): x_test_hat[a,:]=np.reshape(V[a],(V[0].shape[0]*V[0].shape[1],),order='F') return x_train_hat, x_test_hat, Pl_tem, P_tem