import numpy as np
import numpy.linalg as LA
import multiprocessing as mp
from pytranskit.optrans.continuous.radoncdt import RadonCDT
eps = 1e-6
x0_range = [0, 1]
x_range = [0, 1]
Rdown = 4 # downsample radon projections (w.r.t. angles)
theta = np.linspace(0, 176, 180 // Rdown)
[docs]class RCDT_NS:
def __init__(self, num_classes, thetas=theta, rm_edge=False):
"""
Parameters
----------
num_classes : integer, total number of classes
thetas : array-like, angles in degrees for taking radon projections
default = [0,180) with increment of 4 degrees.
rm_edge : boolean flag; IF TRUE the first and last points of RCDTs will be removed
default = False
"""
self.num_classes = num_classes
self.thetas = thetas
self.rm_edge = rm_edge
self.subspaces = []
self.len_subspace = 0
[docs] def fit(self, Xtrain, Ytrain, no_deform_model=False):
"""Fit linear model.
Parameters
----------
Xtrain : array-like, shape (n_samples, n_rows, n_columns)
Image data for training.
Ytrain : ndarray of shape (n_samples,)
Labels of the training images.
no_deform_model : boolean flag; IF TRUE, no deformation model will be added
default = False.
"""
# calculate the RCDT using parallel CPUs
print('\nCalculating RCDTs for training images ...')
Xrcdt = self.rcdt_parallel(Xtrain)
# generate the basis vectors for each class
print('Generating basis vectors for each class ...')
for class_idx in range(self.num_classes):
class_data = Xrcdt[Ytrain == class_idx]
if no_deform_model:
flat = class_data.reshape(class_data.shape[0], -1)
else:
class_data_trans = self.add_trans_samples(class_data)
flat = class_data_trans.reshape(class_data_trans.shape[0], -1)
u, s, vh = LA.svd(flat,full_matrices=False)
cum_s = np.cumsum(s)
cum_s = cum_s/np.max(cum_s)
max_basis = (np.where(cum_s>=0.99)[0])[0] + 1
if max_basis > self.len_subspace:
self.len_subspace = max_basis
basis = vh[:flat.shape[0]]
self.subspaces.append(basis)
[docs] def predict(self, Xtest, use_gpu=False):
"""Predict using the linear model
Let :math:`B^k` be the basis vectors of class :math:`k`, and :math:`x` be the RCDT sapce feature vector of an input,
the NS method performs classification by
.. math::
arg\min_k \| B^k (B^k)^T x - x\|^2
Parameters
----------
Xtest : array-like, shape (n_samples, n_rows, n_columns)
Image data for testing.
use_gpu: boolean flag; IF TRUE, use gpu for calculations
default = False.
Returns
-------
ndarray of shape (n_samples,)
Predicted target values per element in Xtest.
"""
# calculate the RCDT using parallel CPUs
print('\nCalculating RCDTs for testing images ...')
Xrcdt = self.rcdt_parallel(Xtest)
# vectorize RCDT matrix
X = Xrcdt.reshape([Xrcdt.shape[0], -1])
# import cupy for using GPU
if use_gpu:
import cupy as cp
X = cp.array(X)
# find nearest subspace for each test sample
print('Finding nearest subspace for each test sample ...')
D = []
for class_idx in range(self.num_classes):
basis = self.subspaces[class_idx]
basis = basis[:self.len_subspace,:]
if use_gpu:
D.append(cp.linalg.norm(cp.matmul(cp.matmul(X, cp.array(basis).T),
cp.array(basis)) -X, axis=1))
else:
proj = X @ basis.T # (n_samples, n_basis)
projR = proj @ basis # (n_samples, n_features)
D.append(LA.norm(projR - X, axis=1))
if use_gpu:
preds = cp.argmin(cp.stack(D, axis=0), axis=0)
return cp.asnumpy(preds)
else:
D = np.stack(D, axis=0)
preds = np.argmin(D, axis=0)
return preds
[docs] def fun_rcdt_single(self, I):
# I: (rows, columns)
radoncdt = RadonCDT(self.thetas)
template = np.ones(I.shape, dtype=I.dtype)
Ircdt = radoncdt.forward(x0_range, template / np.sum(template),
x_range, I / np.sum(I),
self.rm_edge)
return Ircdt
[docs] def fun_rcdt_batch(self, data):
# data: (n_samples, rows, columns)
dataRCDT = [self.fun_rcdt_single(data[j, :, :] + eps) for j in range(data.shape[0])]
return np.array(dataRCDT)
[docs] def rcdt_parallel(self, X):
# X: (n_samples, rows, columns)
# calc RCDT of images
n_cpu = np.min([mp.cpu_count(), X.shape[0]])
splits = np.array_split(X, n_cpu, axis=0)
pl = mp.Pool(n_cpu)
dataRCDT = pl.map(self.fun_rcdt_batch, splits)
rcdt_features = np.vstack(dataRCDT) # (n_samples, proj_len, num_angles)
pl.close()
pl.join()
return rcdt_features
[docs] def add_trans_samples(self, rcdt_features):
# rcdt_features: (n_samples, proj_len, num_angles)
# deformation vectors for translation
v1, v2 = np.cos(self.thetas*np.pi/180), np.sin(self.thetas*np.pi/180)
v1 = np.repeat(v1[np.newaxis], rcdt_features.shape[1], axis=0)
v2 = np.repeat(v2[np.newaxis], rcdt_features.shape[1], axis=0)
return np.concatenate([rcdt_features, v1[np.newaxis], v2[np.newaxis]])