import numpy as np
import scipy.signal as spsig
try:
from mpi4py import MPI
mpi_size = MPI.COMM_WORLD.Get_size()
numba_parallel = mpi_size == 1 # if there is only 1 thread, turn on numba parallel
except ImportError:
numba_parallel = True # If there is no MPI, turn on numba parallel
try:
from numba import njit, prange
# a faster version of the commented out part of the above class method. Results agree up to a round off error.
@njit(parallel=numba_parallel)
def kernel_dot_product(movie_data, ti_offset, kernel_t_inds, kernel_row_inds, kernel_col_inds, kernel_kernel):
t_inds = kernel_t_inds + ti_offset + 1
result = 0.0
for i in prange(len(t_inds)):
if t_inds[i] >= 0 and t_inds[i] < movie_data.shape[0]:
result = result + movie_data[t_inds[i], kernel_row_inds[i], kernel_col_inds[i]] * kernel_kernel[i]
return result
except ImportError as ie:
# Normal way to calculate dot-product not using numba library. Also had to create because for some reason numba
# is interfering with NEST (at-least ver 3.3) binary causing a segmentation fault.
[docs]
def kernel_dot_product(movie_data, ti_offset, kernel_t_inds, kernel_row_inds, kernel_col_inds, kernel_kernel):
t_inds = kernel_t_inds + ti_offset + 1 # Offset by one nhc 14 Apr '17
min_ind, max_ind = 0, movie_data.shape[0]
allowed_inds = np.where(np.logical_and(min_ind <= t_inds, t_inds < max_ind))
t_inds = t_inds[allowed_inds]
row_inds = kernel_row_inds[allowed_inds]
col_inds = kernel_col_inds[allowed_inds]
kernel_vector = kernel_kernel[allowed_inds]
return np.dot(movie_data[t_inds, row_inds, col_inds], kernel_vector)
from .utilities import convert_tmin_tmax_framerate_to_trange
from .linearfilter import SpatioTemporalFilter, SpectroTemporalFilter
[docs]
class KernelCursor(object):
"""A class that takes care of the convolution of the (non-separable) spatial-temporal linear filter with the move.
"""
def __init__(self, kernel, movie):
self.movie = movie
self.kernel = kernel
self.cache = {}
# This ensures that the kernel frame rate matches the movie frame rate:
np.testing.assert_almost_equal(np.diff(self.kernel.t_range),
np.ones_like(self.kernel.t_range[1:])*(1./movie.frame_rate))
@property
def row_range(self):
return self.movie.row_range
@property
def col_range(self):
return self.movie.col_range
@property
def t_range(self):
return self.movie.t_range
@property
def frame_rate(self):
return self.movie.frame_rate
[docs]
def evaluate(self, t_min=None, t_max=None, downsample=1):
if t_max is None:
t_max = self.t_range[-1]
if t_min is None:
t_min = self.t_range[0]
t_range = convert_tmin_tmax_framerate_to_trange(t_min, t_max, self.movie.frame_rate)[::int(downsample)]
y_vals = np.array([self(t) for t in t_range])
return t_range, y_vals
def __call__(self, t):
# TODO: Using call is not a good idea here, change to evaluate
if t < self.t_range[0] or t > self.t_range[-1]:
curr_rate = 0
else:
ti = t*self.frame_rate
til, tir = int(np.floor(ti)), int(np.ceil(ti))
tl, tr = float(til)/self.frame_rate, float(tir)/self.frame_rate
if np.abs(tl - t) < 1e-12:
curr_rate = self.apply_dot_product(til)
elif np.abs(tr - t) < 1e-12:
curr_rate = self.apply_dot_product(tir)
else:
wa, wb = (1-(t-tl)/(tr-tl)), (1-(tr-t)/(tr-tl))
cl = self.apply_dot_product(til)
cr = self.apply_dot_product(tir)
curr_rate = cl*wa+cr*wb
if np.isnan(curr_rate):
assert RuntimeError
return curr_rate
[docs]
def apply_dot_product(self, ti_offset):
if ti_offset in self.cache:
result = self.cache[ti_offset]
else:
result = kernel_dot_product(
movie_data=self.movie.data,
ti_offset=ti_offset,
kernel_t_inds=self.kernel.t_inds,
kernel_row_inds=self.kernel.row_inds,
kernel_col_inds=self.kernel.col_inds,
kernel_kernel=self.kernel.kernel,
)
self.cache[ti_offset] = result
return result
[docs]
class FilterCursor(KernelCursor):
def __init__(self, spatiotemporal_filter, movie, threshold=0):
# TODO: not sure why this needs to have it's own class and shouldn't be merged into parent?
self.spatiotemporal_filter = spatiotemporal_filter
kernel = self.spatiotemporal_filter.get_spatiotemporal_kernel(movie.row_range, movie.col_range,
t_range=movie.t_range, threshold=threshold,
reverse=True)
super(FilterCursor, self).__init__(kernel, movie)
[docs]
class LNUnitCursor(KernelCursor):
"""A class that takes care of applying a linear-nonlinear filter to a movie. Parent class is used to apply the
spatial-termporal filter convolution to the movie, when then the lnunit non-linear transfer function is applied.
"""
def __init__(self, lnunit, movie, threshold=0):
self.lnunit = lnunit
if hasattr(movie, "t_range_orig"): # Reset padded to original
movie.t_range = movie.t_range_orig
movie.data = movie.data_orig
movie.row_range = movie.row_range_orig
if isinstance(self.lnunit.linear_filter, SpatioTemporalFilter):
kernel = lnunit.get_spatiotemporal_kernel(movie.row_range, movie.col_range, movie.t_range, reverse=True,
threshold=threshold)
elif isinstance(self.lnunit.linear_filter, SpectroTemporalFilter):
kernel = lnunit.get_spectrotemporal_kernel(movie.row_range, movie.t_range, reverse=True, threshold=0.05)
if movie.padding:
if movie.padding == 'edge':
pre_pad = np.full((len(np.unique(kernel.t_inds))-1, movie.data.shape[1]),
movie.data[0, :, 0])
pre_pad = pre_pad[:, :, np.newaxis]
movie.data_orig = movie.data
movie.data = np.concatenate((pre_pad, movie.data))
lower_pad = np.full((movie.data.shape[0], len(np.unique(kernel.row_inds)) - 1, 1),
np.reshape(movie.data[:, 0, 0], (-1, 1, 1)))
upper_pad = np.full((movie.data.shape[0], len(np.unique(kernel.row_inds)) - 1, 1),
np.reshape(movie.data[:, -1, 0], (-1, 1, 1)))
movie.data = np.hstack((lower_pad, movie.data, upper_pad))
kernel.t_range = np.linspace(kernel.t_range[0] - pre_pad.shape[0] * 1/movie.frame_rate,
0, movie.data.shape[0])
movie.t_range_orig = movie.t_range
movie.t_range = kernel.t_range - kernel.t_range[0]
# Treating it like an image, although technically strange to pad to negative frequencies
kernel.row_range = np.linspace(kernel.row_range[0] -
lower_pad.shape[1] * (kernel.row_range[1]-kernel.row_range[0]),
kernel.row_range[-1] +
upper_pad.shape[1] * (kernel.row_range[-1]-kernel.row_range[-2]),
movie.data.shape[1])
movie.row_range_orig = movie.row_range
movie.row_range = kernel.row_range
kernel.row_inds = kernel.row_inds + lower_pad.shape[1]
else:
pass
kernel.apply_threshold(threshold)
super(LNUnitCursor, self).__init__(kernel, movie)
def __call__(self, t):
# TODO: Don't use call operator, change to evaluate
return self.lnunit.transfer_function(super(LNUnitCursor, self).__call__(t))
[docs]
class MultiLNUnitCursor(object):
def __init__(self, multi_lnunit, movie, threshold=0):
self.multi_lnunit = multi_lnunit
self.lnunit_cursor_list = [LNUnitCursor(lnunit, movie, threshold=threshold)
for lnunit in multi_lnunit.lnunit_list]
self.movie = movie
[docs]
def evaluate(self, **kwargs):
multi_e = [unit_cursor.evaluate(**kwargs) for unit_cursor in self.lnunit_cursor_list]
t_list, y_list = zip(*multi_e)
return t_list[0], self.multi_lnunit.transfer_function(*y_list)
[docs]
class MultiLNUnitMultiMovieCursor(MultiLNUnitCursor):
def __init__(self, multi_lnunit, movie_list, threshold=0.):
assert len(multi_lnunit.lnunit_list) == len(movie_list)
self.multi_lnunit = multi_lnunit
self.lnunit_movie_list = movie_list
self.lnunit_cursor_list = [lnunit.get_cursor(movie, threshold=threshold) for
lnunit, movie in zip(multi_lnunit.lnunit_list, movie_list)]
[docs]
class SeparableKernelCursor(object):
"""A class for applying a spatial-temporal convolution to a movie. Unlike the KernelCursor the spatial-temporal
filter is broken up into its components, spatial filter is applied then followed by a temporal filter convolution.
"""
def __init__(self, spatial_kernel, temporal_kernel, movie):
"""Assumes temporal kernel is not reversed"""
self.movie = movie
self.spatial_kernel = spatial_kernel
self.temporal_kernel = temporal_kernel
[docs]
def evaluate(self, threshold=0):
full_spatial_kernel = np.array([self.spatial_kernel.full()])
full_temporal_kernel = self.temporal_kernel.full()
# Convolve every frame in the movie with the spatial filter. First find the range of rows (range min and max)
# and columns in the filter that are above threshold, that way only portion of movie/filter are multiplied
# together
# Using > instead of >= makes the code faster if there are many zeros in the
# spatial kernel. This will not affect the results.
nonzero_inds = np.where(np.abs(full_spatial_kernel[0, :, :]) > threshold)
rm, rM = nonzero_inds[0].min(), nonzero_inds[0].max()
cm, cM = nonzero_inds[1].min(), nonzero_inds[1].max()
convolution_answer_sep_spatial = (self.movie.data[:, rm:rM+1, cm:cM+1] *
full_spatial_kernel[:, rm:rM+1, cm:cM+1]).sum(axis=1).sum(axis=1)
# Convolve results of spatial convolution with the temporal filter
sig_tmp = np.zeros(len(full_temporal_kernel) + len(convolution_answer_sep_spatial) - 1)
sig_tmp[len(full_temporal_kernel)-1:] = convolution_answer_sep_spatial
convolution_answer_sep = spsig.convolve(sig_tmp, full_temporal_kernel[::-1], mode='valid')
t = np.arange(len(convolution_answer_sep))/self.movie.frame_rate
return t, convolution_answer_sep
[docs]
class SeparableSpatioTemporalFilterCursor(SeparableKernelCursor):
def __init__(self, spatiotemporal_filter, movie):
self.spatial_filter = spatiotemporal_filter.spatial_filter
self.temporal_filter = spatiotemporal_filter.temporal_filter
spatial_kernel = self.spatial_filter.get_kernel(movie.row_range, movie.col_range, threshold=-1)
temporal_kernel = self.temporal_filter.get_kernel(t_range=movie.t_range, threshold=0, reverse=True)
spatial_kernel.kernel *= spatiotemporal_filter.amplitude
super(SeparableSpatioTemporalFilterCursor, self).__init__(spatial_kernel, temporal_kernel, movie)
[docs]
class SeparableLNUnitCursor(SeparableSpatioTemporalFilterCursor):
def __init__(self, lnunit, movie):
self.lnunit = lnunit
super(SeparableLNUnitCursor, self).__init__(self.lnunit.linear_filter, movie)
[docs]
def evaluate(self, downsample=1):
assert(downsample == 1)
t, y = super(SeparableLNUnitCursor, self).evaluate()
return t, [self.lnunit.transfer_function(yi) for yi in y]
[docs]
class SeparableMultiLNUnitCursor(object):
def __init__(self, multilnunit, movie):
self.multilnunit = multilnunit
self.lnunit_cursor_list = []
for lnunit in self.multilnunit.lnunit_list:
self.lnunit_cursor_list.append(SeparableLNUnitCursor(lnunit, movie))
[docs]
def evaluate(self, *args, **kwargs):
assert(kwargs.get('downsample', 1) == 1)
y_list = []
for cursor in self.lnunit_cursor_list:
t, y = cursor.evaluate(*args, **kwargs)
y_list.append(y)
return t, self.multilnunit.transfer_function(*y_list)