Source code for allensdk.brain_observatory.receptive_field_analysis.utilities

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from scipy.ndimage.filters import gaussian_filter
import numpy as np
import scipy.interpolate as spinterp
from .tools import dict_generator
from allensdk.api.cache import memoize
import os
import warnings
from skimage.measure import block_reduce

[docs]def upsample_image_to_degrees(img): upsample = 74.4/img.shape[0] x = np.arange(img.shape[0]) y = np.arange(img.shape[1]) g = spinterp.interp2d(y, x, img, kind='linear') ZZ_on = g(np.arange(0, img.shape[1], 1. / upsample), np.arange(0, img.shape[0], 1. / upsample)) return ZZ_on
[docs]def convolve(img, sigma=4): ''' 2D Gaussian convolution ''' if img.sum() == 0: return img img_pad = np.zeros((3 * img.shape[0], 3 * img.shape[1])) img_pad[img.shape[0]:2 * img.shape[0], img.shape[1]:2 * img.shape[1]] = img x = np.arange(3 * img.shape[0]) y = np.arange(3 * img.shape[1]) g = spinterp.interp2d(y, x, img_pad, kind='linear') if img.shape[0] == 16: upsample = 4 offset = -(1 - .625) elif img.shape[0] == 8: upsample = 8 offset = -(1 - .5625) else: raise NotImplementedError ZZ_on = g(offset + np.arange(0, img.shape[1] * 3, 1. / upsample), offset + np.arange(0, img.shape[0] * 3, 1. / upsample)) ZZ_on_f = gaussian_filter(ZZ_on, float(sigma), mode='constant') z_on_new = block_reduce(ZZ_on_f, (upsample, upsample)) z_on_new = z_on_new / z_on_new.sum() * img.sum() z_on_new = z_on_new[img.shape[0]:2 * img.shape[0], img.shape[1]:2 * img.shape[1]] return z_on_new
[docs]@memoize def get_A(data, stimulus): stimulus_table = data.get_stimulus_table(stimulus) stimulus_template = data.get_stimulus_template(stimulus)[stimulus_table['frame'].values, :,:] number_of_pixels = stimulus_template.shape[1]*stimulus_template.shape[2] A = np.zeros((2*number_of_pixels, stimulus_template.shape[0])) for fi in range(stimulus_template.shape[0]): A[:number_of_pixels, fi] = (stimulus_template[fi,:,:].flatten() > 127).astype(float) A[number_of_pixels:, fi] = (stimulus_template[fi, :, :].flatten() < 127).astype(float) return A
[docs]@memoize def get_A_blur(data, stimulus): stimulus_table = data.get_stimulus_table(stimulus) stimulus_template = data.get_stimulus_template(stimulus)[stimulus_table['frame'].values, :, :] A = get_A(data, stimulus).copy() number_of_pixels = A.shape[0] // 2 for fi in range(A.shape[1]): A[:number_of_pixels,fi] = convolve(A[:number_of_pixels, fi].reshape(stimulus_template.shape[1], stimulus_template.shape[2])).flatten() A[number_of_pixels:,fi] = convolve(A[number_of_pixels:, fi].reshape(stimulus_template.shape[1], stimulus_template.shape[2])).flatten() return A
[docs]def get_shuffle_matrix(data, event_vector, A, number_of_shuffles=5000, response_detection_error_std_dev=.1): number_of_events = event_vector.sum() number_of_pixels = A.shape[0] // 2 shuffle_data = np.zeros((2*number_of_pixels, number_of_shuffles)) evr = range(len(event_vector)) for ii in range(number_of_shuffles): size = number_of_events + int(np.round(response_detection_error_std_dev*number_of_events*np.random.randn())) shuffled_event_inds = np.random.choice(evr, size=size, replace=False) b_tmp = np.zeros(len(event_vector), dtype=np.bool) b_tmp[shuffled_event_inds] = True shuffle_data[:, ii] = A[:,b_tmp].sum(axis=1)/float(size) return shuffle_data
[docs]def get_sparse_noise_epoch_mask_list(st, number_of_acquisition_frames, threshold=7): delta = (st.start.values[1:] - st.end.values[:-1]) cut_inds = np.where(delta > threshold)[0] + 1 epoch_mask_list = [] if len(cut_inds) > 2: warnings.warn('more than 2 epochs cut') print(' %d %d' % (len(delta), cut_inds)) for ii in range(len(cut_inds)+1): if ii == 0: first_ind = st.iloc[0].start else: first_ind = st.iloc[cut_inds[ii-1]].start if ii == len(cut_inds): last_ind_inclusive = st.iloc[-1].end else: last_ind_inclusive = st.iloc[cut_inds[ii]-1].end epoch_mask_list.append((first_ind,last_ind_inclusive)) return epoch_mask_list
[docs]def smooth(x,window_len=11,window='hanning', mode='valid'): """smooth the data using a window with requested size. This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal. input: x: the input signal window_len: the dimension of the smoothing window; should be an odd integer window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. output: the smoothed signal example: t=linspace(-2,2,0.1) x=sin(t)+randn(len(t))*0.1 y=smooth(x) see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve scipy.signal.lfilter TODO: the window parameter could be the window itself if an array instead of a string NOTE: length(output) != length(input), to correct this: return y[(window_len/2-1):-(window_len/2)] instead of just y. """ if x.ndim != 1: raise ValueError("smooth only accepts 1 dimension arrays.") if x.size < window_len: raise ValueError("Input vector needs to be bigger than window size.") if window_len<3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'") s=np.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]] #print(len(s)) if window == 'flat': #moving average w=np.ones(window_len,'d') else: w=eval('np.'+window+'(window_len)') y=np.convolve(w/w.sum(),s,mode=mode) return y
[docs]def get_components(receptive_field_data): s1, s2 = receptive_field_data.shape candidate_pixel_list = np.where(receptive_field_data.flatten()==True)[0] pixel_coord_dict = dict((px, (px/s2, (px - s2 * (px/s2)), px% (s1 * s2) == px)) for px in candidate_pixel_list) component_list = [] for curr_pixel in candidate_pixel_list: curr_x, curr_y, curr_frame = pixel_coord_dict[curr_pixel] component_list.append([curr_pixel]) dist_to_component_dict = {} for ii, curr_component in enumerate(component_list): dist_to_component_dict[ii] = np.inf for other_pixel in curr_component: other_x, other_y, other_frame = pixel_coord_dict[other_pixel] if other_frame == curr_frame: x_dist = np.abs(curr_x - other_x) y_dist = np.abs(curr_y - other_y) curr_dist = max(x_dist, y_dist) if curr_dist < dist_to_component_dict[ii]: dist_to_component_dict[ii] = curr_dist # Merge all components with a distance leq 1 to current pixel new_component_list = [] tmp = [] for ii, curr_component in enumerate(component_list): if dist_to_component_dict[ii] <= 1: tmp += curr_component else: new_component_list.append(curr_component) new_component_list.append(tmp) component_list = new_component_list if len(component_list) == 0: return np.zeros((1, receptive_field_data.shape[0], receptive_field_data.shape[1])), len(component_list) elif len(component_list) == 1: return_array = np.zeros((1,receptive_field_data.shape[0], receptive_field_data.shape[1])) else: return_array = np.zeros((len(component_list), receptive_field_data.shape[0], receptive_field_data.shape[1])) for ii, component in enumerate(component_list): curr_component_mask = np.zeros_like(receptive_field_data, dtype=np.bool).flatten() curr_component_mask[component] = True return_array[ii,:,:] = curr_component_mask.reshape(receptive_field_data.shape) return return_array, len(component_list)
[docs]def get_attribute_dict(rf): attribute_dict = {} for x in dict_generator(rf): if x[-3] == 'attrs': if len(x[:-3]) == 0: key = x[-2] else: key = '/'.join(['/'.join(x[:-3]), x[-2]]) attribute_dict[key] = x[-1] return attribute_dict