Source code for allensdk.brain_observatory.drifting_gratings

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2016-2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from .stimulus_analysis import StimulusAnalysis
import scipy.stats as st
import pandas as pd
import numpy as np
import h5py
from math import sqrt
import logging
from . import observatory_plots as oplots
from . import circle_plots as cplots
from .brain_observatory_exceptions import MissingStimulusException
import matplotlib.pyplot as plt

[docs]class DriftingGratings(StimulusAnalysis): """ Perform tuning analysis specific to drifting gratings stimulus. Parameters ---------- data_set: BrainObservatoryNwbDataSet object """ _log = logging.getLogger('allensdk.brain_observatory.drifting_gratings') def __init__(self, data_set, **kwargs): super(DriftingGratings, self).__init__(data_set, **kwargs) self.sweeplength = 60 self.interlength = 30 self.extralength = 0 self._orivals = DriftingGratings._PRELOAD self._tfvals = DriftingGratings._PRELOAD self._number_ori = DriftingGratings._PRELOAD self._number_tf = DriftingGratings._PRELOAD @property def orivals(self): if self._orivals is DriftingGratings._PRELOAD: self.populate_stimulus_table() return self._orivals @property def tfvals(self): if self._tfvals is DriftingGratings._PRELOAD: self.populate_stimulus_table() return self._tfvals @property def number_ori(self): if self._number_ori is DriftingGratings._PRELOAD: self.populate_stimulus_table() return self._number_ori @property def number_tf(self): if self._number_tf is DriftingGratings._PRELOAD: self.populate_stimulus_table() return self._number_tf
[docs] def populate_stimulus_table(self): stimulus_table = self.data_set.get_stimulus_table('drifting_gratings') self._stim_table = stimulus_table.fillna(value=0.) self._orivals = np.unique(self.stim_table.orientation).astype(int) self._tfvals = np.unique(self.stim_table.temporal_frequency).astype(int) self._number_ori = len(self.orivals) self._number_tf = len(self.tfvals)
[docs] def get_response(self): ''' Computes the mean response for each cell to each stimulus condition. Return is a (# orientations, # temporal frequencies, # cells, 3) np.ndarray. The final dimension contains the mean response to the condition (index 0), standard error of the mean of the response to the condition (index 1), and the number of trials with a significant response (p < 0.05) to that condition (index 2). Returns ------- Numpy array storing mean responses. ''' DriftingGratings._log.info("Calculating mean responses") response = np.empty( (self.number_ori, self.number_tf, self.numbercells + 1, 3)) def ptest(x): return len(np.where(x < (0.05 / (8 * 5)))[0]) for ori in self.orivals: ori_pt = np.where(self.orivals == ori)[0][0] for tf in self.tfvals: tf_pt = np.where(self.tfvals == tf)[0][0] subset_response = self.mean_sweep_response[ (self.stim_table.temporal_frequency == tf) & (self.stim_table.orientation == ori)] subset_pval = self.pval[(self.stim_table.temporal_frequency == tf) & ( self.stim_table.orientation == ori)] response[ori_pt, tf_pt, :, 0] = subset_response.mean(axis=0) response[ori_pt, tf_pt, :, 1] = subset_response.std( axis=0) / sqrt(len(subset_response)) response[ori_pt, tf_pt, :, 2] = subset_pval.apply( ptest, axis=0) return response
[docs] def get_peak(self): ''' Computes metrics related to each cell's peak response condition. Returns ------- Pandas data frame containing the following columns (_dg suffix is for drifting grating): * ori_dg (orientation) * tf_dg (temporal frequency) * reliability_dg * osi_dg (orientation selectivity index) * dsi_dg (direction selectivity index) * peak_dff_dg (peak dF/F) * ptest_dg * p_run_dg * run_modulation_dg * cv_dg (circular variance) ''' DriftingGratings._log.info('Calculating peak response properties') peak = pd.DataFrame(index=range(self.numbercells), columns=('ori_dg', 'tf_dg', 'reliability_dg', 'osi_dg', 'dsi_dg', 'peak_dff_dg', 'ptest_dg', 'p_run_dg', 'run_modulation_dg', 'cv_os_dg', 'cv_ds_dg', 'tf_index_dg', 'cell_specimen_id')) cids = self.data_set.get_cell_specimen_ids() orivals_rad = np.deg2rad(self.orivals) for nc in range(self.numbercells): cell_peak = np.where(self.response[:, 1:, nc, 0] == np.nanmax( self.response[:, 1:, nc, 0])) prefori = cell_peak[0][0] preftf = cell_peak[1][0] + 1 peak.cell_specimen_id.iloc[nc] = cids[nc] peak.ori_dg.iloc[nc] = prefori peak.tf_dg.iloc[nc] = preftf pref = self.response[prefori, preftf, nc, 0] orth1 = self.response[np.mod(prefori + 2, 8), preftf, nc, 0] orth2 = self.response[np.mod(prefori - 2, 8), preftf, nc, 0] orth = (orth1 + orth2) / 2 null = self.response[np.mod(prefori + 4, 8), preftf, nc, 0] tuning = self.response[:, preftf, nc, 0] tuning = np.where(tuning>0, tuning, 0) #new circular variance below CV_top_os = np.empty((8), dtype=np.complex128) CV_top_ds = np.empty((8), dtype=np.complex128) for i in range(8): CV_top_os[i] = (tuning[i]*np.exp(1j*2*orivals_rad[i])) CV_top_ds[i] = (tuning[i]*np.exp(1j*orivals_rad[i])) peak.cv_os_dg.iloc[nc] = np.abs(CV_top_os.sum())/tuning.sum() peak.cv_ds_dg.iloc[nc] = np.abs(CV_top_ds.sum())/tuning.sum() peak.osi_dg.iloc[nc] = (pref - orth) / (pref + orth) peak.dsi_dg.iloc[nc] = (pref - null) / (pref + null) peak.peak_dff_dg.iloc[nc] = pref groups = [] for ori in self.orivals: for tf in self.tfvals[1:]: groups.append(self.mean_sweep_response[(self.stim_table.temporal_frequency == tf) & ( self.stim_table.orientation == ori)][str(nc)]) groups.append(self.mean_sweep_response[ self.stim_table.temporal_frequency == 0][str(nc)]) _, p = st.f_oneway(*groups) peak.ptest_dg.iloc[nc] = p subset = self.mean_sweep_response[(self.stim_table.temporal_frequency == self.tfvals[ preftf]) & (self.stim_table.orientation == self.orivals[prefori])] #running modulation subset_stat = subset[subset.dx < 1] subset_run = subset[subset.dx >= 1] if (len(subset_run) > 2) & (len(subset_stat) > 2): (_,peak.p_run_dg.iloc[nc]) = st.ttest_ind(subset_run[str(nc)], subset_stat[str(nc)], equal_var=False) if subset_run[str(nc)].mean()>subset_stat[str(nc)].mean(): peak.run_modulation_dg.iloc[nc] = (subset_run[str(nc)].mean() - subset_stat[str(nc)].mean())/np.abs(subset_run[str(nc)].mean()) elif subset_run[str(nc)].mean()<subset_stat[str(nc)].mean(): peak.run_modulation_dg.iloc[nc] = -1*((subset_stat[str(nc)].mean() - subset_run[str(nc)].mean())/np.abs(subset_stat[str(nc)].mean())) else: peak.p_run_dg.iloc[nc] = np.NaN peak.run_modulation_dg.iloc[nc] = np.NaN #reliability subset = self.sweep_response[(self.stim_table.temporal_frequency == self.tfvals[ preftf]) & (self.stim_table.orientation == self.orivals[prefori])] corr_matrix = np.empty((len(subset),len(subset))) for i in range(len(subset)): for j in range(len(subset)): r,p = st.pearsonr(subset[str(nc)].iloc[i][30:90], subset[str(nc)].iloc[j][30:90]) corr_matrix[i,j] = r mask = np.ones((len(subset), len(subset))) for i in range(len(subset)): for j in range(len(subset)): if i>=j: mask[i,j] = np.NaN corr_matrix *= mask peak.reliability_dg.iloc[nc] = np.nanmean(corr_matrix) #TF index tf_tuning = self.response[prefori,1:,nc,0] trials = self.mean_sweep_response[(self.stim_table.temporal_frequency!=0)&(self.stim_table.orientation==self.orivals[prefori])][str(nc)].values SSE_part = np.sqrt(np.sum((trials-trials.mean())**2)/(len(trials)-5)) peak.tf_index_dg.iloc[nc] = (np.ptp(tf_tuning))/(np.ptp(tf_tuning) + 2*SSE_part) return peak
[docs] def open_star_plot(self, cell_specimen_id=None, include_labels=False, cell_index=None): cell_index = self.row_from_cell_id(cell_specimen_id, cell_index) df = self.mean_sweep_response[str(cell_index)] st = self.data_set.get_stimulus_table('drifting_gratings') mask = st.dropna(subset=['orientation']).index data = df.values cmin = self.response[0,0,cell_index,0] cmax = data.mean() + data.std()*3 fp = cplots.FanPlotter.for_drifting_gratings() fp.plot(r_data=st.temporal_frequency.ix[mask].values, angle_data=st.orientation.ix[mask].values, data=df.ix[mask].values, clim=[cmin, cmax]) fp.show_axes(closed=True) if include_labels: fp.show_r_labels() fp.show_angle_labels()
[docs] def plot_orientation_selectivity(self, si_range=oplots.SI_RANGE, n_hist_bins=oplots.N_HIST_BINS, color=oplots.STIM_COLOR, p_value_max=oplots.P_VALUE_MAX, peak_dff_min=oplots.PEAK_DFF_MIN): # responsive cells vis_cells = (self.peak.ptest_dg < p_value_max) & (self.peak.peak_dff_dg > peak_dff_min) # orientation selective cells osi_cells = vis_cells & (self.peak.osi_dg > si_range[0]) & (self.peak.osi_dg < si_range[1]) peak_osi = self.peak.ix[osi_cells] osis = peak_osi.osi_dg.values oplots.plot_selectivity_cumulative_histogram(osis, "orientation selectivity index", si_range=si_range, n_hist_bins=n_hist_bins, color=color)
[docs] def plot_direction_selectivity(self, si_range=oplots.SI_RANGE, n_hist_bins=oplots.N_HIST_BINS, color=oplots.STIM_COLOR, p_value_max=oplots.P_VALUE_MAX, peak_dff_min=oplots.PEAK_DFF_MIN): # responsive cells vis_cells = (self.peak.ptest_dg < p_value_max) & (self.peak.peak_dff_dg > peak_dff_min) # direction selective cells dsi_cells = vis_cells & (self.peak.dsi_dg > si_range[0]) & (self.peak.dsi_dg < si_range[1]) peak_dsi = self.peak.ix[dsi_cells] dsis = peak_dsi.dsi_dg.values oplots.plot_selectivity_cumulative_histogram(dsis, "direction selectivity index", si_range=si_range, n_hist_bins=n_hist_bins, color=color)
[docs] def plot_preferred_direction(self, include_labels=False, si_range=oplots.SI_RANGE, color=oplots.STIM_COLOR, p_value_max=oplots.P_VALUE_MAX, peak_dff_min=oplots.PEAK_DFF_MIN): vis_cells = (self.peak.ptest_dg < p_value_max) & (self.peak.peak_dff_dg > peak_dff_min) pref_dirs = self.peak.ix[vis_cells].ori_dg.values pref_dirs = [ self.orivals[pref_dir] for pref_dir in pref_dirs ] angles, counts = np.unique(pref_dirs, return_counts=True) oplots.plot_radial_histogram(angles, counts, include_labels=include_labels, all_angles=self.orivals, direction=-1, offset=0.0, closed=True, color=color)
[docs] def plot_preferred_temporal_frequency(self, si_range=oplots.SI_RANGE, color=oplots.STIM_COLOR, p_value_max=oplots.P_VALUE_MAX, peak_dff_min=oplots.PEAK_DFF_MIN): vis_cells = (self.peak.ptest_dg < p_value_max) & (self.peak.peak_dff_dg > peak_dff_min) pref_tfs = self.peak.ix[vis_cells].tf_dg.values oplots.plot_condition_histogram(pref_tfs, self.tfvals[1:], color=color) plt.xlabel("temporal frequency (Hz)") plt.ylabel("number of cells")
[docs] def reshape_response_array(self): ''' :return: response array in cells x stim x repetition for noise correlations ''' mean_sweep_response = self.mean_sweep_response.values[:, :self.numbercells] reps = [] stim_table = self.stim_table tfvals = self.tfvals tfvals = tfvals[tfvals != 0] # blank sweep response_new = np.zeros((self.numbercells, self.number_ori, self.number_tf-1), dtype='object') for i, ori in enumerate(self.orivals): for j, tf in enumerate(tfvals): ind = (stim_table.orientation.values == ori) * (stim_table.temporal_frequency.values == tf) for c in range(self.numbercells): response_new[c, i, j] = mean_sweep_response[ind, c] ind = (stim_table.temporal_frequency.values == 0) response_blank = mean_sweep_response[ind, :].T return response_new, response_blank
[docs] def get_signal_correlation(self, corr='spearman'): logging.debug("Calculating signal correlation") response = self.response[:, 1:, :self.numbercells, 0] # orientation x freq x cell, no blank response = response.reshape(self.number_ori * (self.number_tf-1), self.numbercells).T N, Nstim = response.shape signal_corr = np.zeros((N, N)) signal_p = np.empty((N, N)) if corr == 'pearson': for i in range(N): for j in range(i, N): # matrix is symmetric signal_corr[i, j], signal_p[i, j] = st.pearsonr(response[i], response[j]) elif corr == 'spearman': for i in range(N): for j in range(i, N): # matrix is symmetric signal_corr[i, j], signal_p[i, j] = st.spearmanr(response[i], response[j]) else: raise Exception('correlation should be pearson or spearman') signal_corr = np.triu(signal_corr) + np.triu(signal_corr, 1).T # fill in lower triangle signal_p = np.triu(signal_p) + np.triu(signal_p, 1).T # fill in lower triangle return signal_corr, signal_p
[docs] def get_representational_similarity(self, corr='spearman'): logging.debug("Calculating representational similarity") response = self.response[:, 1:, :self.numbercells, 0] # orientation x freq x phase x cell, no blank response = response.reshape(self.number_ori * (self.number_tf-1), self.numbercells) Nstim, N = response.shape rep_sim = np.zeros((Nstim, Nstim)) rep_sim_p = np.empty((Nstim, Nstim)) if corr == 'pearson': for i in range(Nstim): for j in range(i, Nstim): # matrix is symmetric rep_sim[i, j], rep_sim_p[i, j] = st.pearsonr(response[i], response[j]) elif corr == 'spearman': for i in range(Nstim): for j in range(i, Nstim): # matrix is symmetric rep_sim[i, j], rep_sim_p[i, j] = st.spearmanr(response[i], response[j]) else: raise Exception('correlation should be pearson or spearman') rep_sim = np.triu(rep_sim) + np.triu(rep_sim, 1).T # fill in lower triangle rep_sim_p = np.triu(rep_sim_p) + np.triu(rep_sim_p, 1).T # fill in lower triangle return rep_sim, rep_sim_p
[docs] def get_noise_correlation(self, corr='spearman'): logging.debug("Calculating noise correlations") response, response_blank = self.reshape_response_array() noise_corr = np.zeros((self.numbercells, self.numbercells, self.number_ori, self.number_tf-1)) noise_corr_p = np.zeros((self.numbercells, self.numbercells, self.number_ori, self.number_tf-1)) noise_corr_blank = np.zeros((self.numbercells, self.numbercells)) noise_corr_blank_p = np.zeros((self.numbercells, self.numbercells)) if corr == 'pearson': for k in range(self.number_ori): for l in range(self.number_tf-1): for i in range(self.numbercells): for j in range(i, self.numbercells): noise_corr[i, j, k, l], noise_corr_p[i, j, k, l] = st.pearsonr(response[i, k, l], response[j, k, l]) noise_corr[:, :, k, l] = np.triu(noise_corr[:, :, k, l]) + np.triu(noise_corr[:, :, k, l], 1).T for i in range(self.numbercells): for j in range(i, self.numbercells): noise_corr_blank[i, j], noise_corr_blank_p[i, j] = st.pearsonr(response_blank[i], response_blank[j]) elif corr == 'spearman': for k in range(self.number_ori): for l in range(self.number_tf-1): for i in range(self.numbercells): for j in range(i, self.numbercells): noise_corr[i, j, k, l], noise_corr_p[i, j, k, l] = st.spearmanr(response[i, k, l], response[j, k, l]) noise_corr[:, :, k, l] = np.triu(noise_corr[:, :, k, l]) + np.triu(noise_corr[:, :, k, l], 1).T for i in range(self.numbercells): for j in range(i, self.numbercells): noise_corr_blank[i, j], noise_corr_blank_p[i, j] = st.spearmanr(response_blank[i], response_blank[j]) else: raise Exception('correlation should be pearson or spearman') noise_corr_blank[:, :] = np.triu(noise_corr_blank[:, :]) + np.triu(noise_corr_blank[:, :], 1).T return noise_corr, noise_corr_p, noise_corr_blank, noise_corr_blank_p
@staticmethod
[docs] def from_analysis_file(data_set, analysis_file): dg = DriftingGratings(data_set) try: dg.populate_stimulus_table() dg._sweep_response = pd.read_hdf(analysis_file, "analysis/sweep_response_dg") dg._mean_sweep_response = pd.read_hdf(analysis_file, "analysis/mean_sweep_response_dg") dg._peak = pd.read_hdf(analysis_file, "analysis/peak") with h5py.File(analysis_file, "r") as f: dg._response = f["analysis/response_dg"].value dg._binned_dx_sp = f["analysis/binned_dx_sp"].value dg._binned_cells_sp = f["analysis/binned_cells_sp"].value dg._binned_dx_vis = f["analysis/binned_dx_vis"].value dg._binned_cells_vis = f["analysis/binned_cells_vis"].value if "analysis/noise_corr_dg" in f: dg.noise_correlation = f["analysis/noise_corr_dg"].value if "analysis/signal_corr_dg" in f: dg.signal_correlation = f["analysis/signal_corr_dg"].value if "analysis/rep_similarity_dg" in f: dg.representational_similarity = f["analysis/rep_similarity_dg"].value except Exception as e: raise MissingStimulusException(e.args) return dg