Source code for allensdk.brain_observatory.stimulus_analysis

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2016-2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import scipy.stats as st
import numpy as np
import pandas as pd
import logging
from .findlevel import findlevel
from .brain_observatory_exceptions import BrainObservatoryAnalysisException
from . import observatory_plots as oplots
import matplotlib.pyplot as plt

[docs]class StimulusAnalysis(object): """ Base class for all response analysis code. Subclasses are responsible for computing metrics and traces relevant to a particular stimulus. The base class contains methods for organizing sweep responses row of a stimulus stable (get_sweep_response). Subclasses implement the get_response method, computes the mean sweep response to all sweeps for a each stimulus condition. Parameters ---------- data_set: BrainObservatoryNwbDataSet instance speed_tuning: boolean, deprecated Whether or not to compute speed tuning histograms """ _log = logging.getLogger('allensdk.brain_observatory.stimulus_analysis') _PRELOAD = "PRELOAD" def __init__(self, data_set): self.data_set = data_set self._timestamps = StimulusAnalysis._PRELOAD self._celltraces = StimulusAnalysis._PRELOAD self._acquisition_rate = StimulusAnalysis._PRELOAD self._numbercells = StimulusAnalysis._PRELOAD self._roi_id = StimulusAnalysis._PRELOAD self._cell_id = StimulusAnalysis._PRELOAD self._dfftraces = StimulusAnalysis._PRELOAD self._dxcm = StimulusAnalysis._PRELOAD self._dxtime = StimulusAnalysis._PRELOAD self._binned_dx_sp = StimulusAnalysis._PRELOAD self._binned_cells_sp = StimulusAnalysis._PRELOAD self._binned_dx_vis = StimulusAnalysis._PRELOAD self._binned_cells_vis = StimulusAnalysis._PRELOAD self._peak_run = StimulusAnalysis._PRELOAD self._binsize = 800 self._stim_table = StimulusAnalysis._PRELOAD self._response = StimulusAnalysis._PRELOAD self._sweep_response = StimulusAnalysis._PRELOAD self._mean_sweep_response = StimulusAnalysis._PRELOAD self._pval = StimulusAnalysis._PRELOAD self._peak = StimulusAnalysis._PRELOAD @property def stim_table(self): if self._stim_table is StimulusAnalysis._PRELOAD: self.populate_stimulus_table() return self._stim_table @property def sweep_response(self): if self._sweep_response is StimulusAnalysis._PRELOAD: self._sweep_response, self._mean_sweep_response, self._pval = \ self.get_sweep_response() return self._sweep_response @property def mean_sweep_response(self): if self._mean_sweep_response is StimulusAnalysis._PRELOAD: self._sweep_response, self._mean_sweep_response, self._pval = \ self.get_sweep_response() return self._mean_sweep_response @property def pval(self): if self._pval is StimulusAnalysis._PRELOAD: self._sweep_response, self._mean_sweep_response, self._pval = \ self.get_sweep_response() return self._pval @property def response(self): if self._response is StimulusAnalysis._PRELOAD: self._response = self.get_response() return self._response @property def peak(self): if self._peak is StimulusAnalysis._PRELOAD: self._peak = self.get_peak() return self._peak
[docs] def get_fluorescence(self): # get fluorescence self._timestamps, self._celltraces = \ self.data_set.get_corrected_fluorescence_traces() self._acquisition_rate = 1 / (self.timestamps[1] - self.timestamps[0]) self._numbercells = len(self.celltraces) # number of cells in dataset
@property def timestamps(self): if self._timestamps is StimulusAnalysis._PRELOAD: self.get_fluorescence() return self._timestamps @property def celltraces(self): if self._celltraces is StimulusAnalysis._PRELOAD: self.get_fluorescence() return self._celltraces @property def acquisition_rate(self): if self._acquisition_rate is StimulusAnalysis._PRELOAD: self.get_fluorescence() return self._acquisition_rate @property def numbercells(self): if self._numbercells is StimulusAnalysis._PRELOAD: self.get_fluorescence() return self._numbercells @property def roi_id(self): if self._roi_id is StimulusAnalysis._PRELOAD: self._roi_id = self.data_set.get_roi_ids() return self._roi_id @property def cell_id(self): if self._cell_id is StimulusAnalysis._PRELOAD: self._cell_id = self.data_set.get_cell_specimen_ids() return self._cell_id @property def dfftraces(self): if self._dfftraces is StimulusAnalysis._PRELOAD: _, self._dfftraces = self.data_set.get_dff_traces() return self._dfftraces @property def dxcm(self): if self._dxcm is StimulusAnalysis._PRELOAD: self._dxcm, self._dxtime = self.data_set.get_running_speed() return self._dxcm @property def dxtime(self): if self._dxtime is StimulusAnalysis._PRELOAD: self._dxcm, self._dxtime = self.data_set.get_running_speed() return self._dxtime @property def binned_dx_sp(self): if self._binned_dx_sp is StimulusAnalysis._PRELOAD: (self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis, self._binned_cells_vis, self._peak_run) = \ self.get_speed_tuning(binsize=self._binsize) return self._binned_dx_sp @property def binned_cells_sp(self): if self._binned_cells_sp is StimulusAnalysis._PRELOAD: (self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis, self._binned_cells_vis, self._peak_run) = \ self.get_speed_tuning(binsize=self._binsize) return self._binned_cells_sp @property def binned_dx_vis(self): if self._binned_dx_vis is StimulusAnalysis._PRELOAD: (self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis, self._binned_cells_vis, self._peak_run) = \ self.get_speed_tuning(binsize=self._binsize) return self._binned_dx_vis @property def binned_cells_vis(self): if self._binned_cells_vis is StimulusAnalysis._PRELOAD: (self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis, self._binned_cells_vis, self._peak_run) = \ self.get_speed_tuning(binsize=self._binsize) return self._binned_cells_vis @property def peak_run(self): if self._peak_run is StimulusAnalysis._PRELOAD: (self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis, self._binned_cells_vis, self._peak_run) = \ self.get_speed_tuning(binsize=self._binsize) return self._peak_run
[docs] def populate_stimulus_table(self): """ Implemented by subclasses. """ raise BrainObservatoryAnalysisException("populate_stimulus_table not implemented")
[docs] def get_response(self): """ Implemented by subclasses. """ raise BrainObservatoryAnalysisException("get_response not implemented")
[docs] def get_peak(self): """ Implemented by subclasses. """ raise BrainObservatoryAnalysisException("get_peak not implemented")
[docs] def get_speed_tuning(self, binsize): """ Calculates speed tuning, spontaneous versus visually driven. The return is a 5-tuple of speed and dF/F histograms. binned_dx_sp: (bins,2) np.ndarray of running speeds binned during spontaneous activity stimulus. The first bin contains all speeds below 1 cm/s. Dimension 0 is mean running speed in the bin. Dimension 1 is the standard error of the mean. binned_cells_sp: (bins,2) np.ndarray of fluorescence during spontaneous activity stimulus. First bin contains all data for speeds below 1 cm/s. Dimension 0 is mean fluorescence in the bin. Dimension 1 is the standard error of the mean. binned_dx_vis: (bins,2) np.ndarray of running speeds outside of spontaneous activity stimulus. The first bin contains all speeds below 1 cm/s. Dimension 0 is mean running speed in the bin. Dimension 1 is the standard error of the mean. binned_cells_vis: np.ndarray of fluorescence outside of spontaneous activity stimulu. First bin contains all data for speeds below 1 cm/s. Dimension 0 is mean fluorescence in the bin. Dimension 1 is the standard error of the mean. peak_run: pd.DataFrame of speed-related properties of a cell. Returns ------- tuple: binned_dx_sp, binned_cells_sp, binned_dx_vis, binned_cells_vis, peak_run """ StimulusAnalysis._log.info( 'Calculating speed tuning, spontaneous vs visually driven') celltraces_trimmed = np.delete(self.dfftraces, range( len(self.dxcm), np.size(self.dfftraces, 1)), axis=1) # pull out spontaneous epoch(s) spontaneous = self.data_set.get_stimulus_table('spontaneous') peak_run = pd.DataFrame(index=range(self.numbercells), columns=( 'speed_max_sp', 'speed_min_sp', 'ptest_sp', 'mod_sp', 'speed_max_vis', 'speed_min_vis', 'ptest_vis', 'mod_vis')) dx_sp = self.dxcm[spontaneous.start.iloc[-1]:spontaneous.end.iloc[-1]] celltraces_sp = celltraces_trimmed[ :, spontaneous.start.iloc[-1]:spontaneous.end.iloc[-1]] dx_vis = np.delete(self.dxcm, np.arange( spontaneous.start.iloc[-1], spontaneous.end.iloc[-1])) celltraces_vis = np.delete(celltraces_trimmed, np.arange( spontaneous.start.iloc[-1], spontaneous.end.iloc[-1]), axis=1) if len(spontaneous) > 1: dx_sp = np.append( dx_sp, self.dxcm[spontaneous.start.iloc[-2]:spontaneous.end.iloc[-2]], axis=0) celltraces_sp = np.append(celltraces_sp, celltraces_trimmed[ :, spontaneous.start.iloc[-2]:spontaneous.end.iloc[-2]], axis=1) dx_vis = np.delete(dx_vis, np.arange( spontaneous.start.iloc[-2], spontaneous.end.iloc[-2])) celltraces_vis = np.delete(celltraces_vis, np.arange( spontaneous.start.iloc[-2], spontaneous.end.iloc[-2]), axis=1) celltraces_vis = celltraces_vis[:, ~np.isnan(dx_vis)] dx_vis = dx_vis[~np.isnan(dx_vis)] nbins = 1 + len(np.where(dx_sp >= 1)[0]) // binsize dx_sorted = dx_sp[np.argsort(dx_sp)] celltraces_sorted_sp = celltraces_sp[:, np.argsort(dx_sp)] binned_cells_sp = np.zeros((self.numbercells, nbins, 2)) binned_dx_sp = np.zeros((nbins, 2)) for i in range(nbins): if np.all(np.isnan(dx_sorted)): raise BrainObservatoryAnalysisException( "dx is filled with NaNs") offset = findlevel(dx_sorted, 1, 'up') if offset is None: StimulusAnalysis._log.info( "dx never crosses 1, all speed data going into single bin") offset = len(dx_sorted) if i == 0: binned_dx_sp[i, 0] = np.mean(dx_sorted[:offset]) binned_dx_sp[i, 1] = np.std( dx_sorted[:offset]) / np.sqrt(offset) binned_cells_sp[:, i, 0] = np.mean( celltraces_sorted_sp[:, :offset], axis=1) binned_cells_sp[:, i, 1] = np.std( celltraces_sorted_sp[:, :offset], axis=1) / np.sqrt(offset) else: start = offset + (i - 1) * binsize binned_dx_sp[i, 0] = np.mean(dx_sorted[start:start + binsize]) binned_dx_sp[i, 1] = np.std( dx_sorted[start:start + binsize]) / np.sqrt(binsize) binned_cells_sp[:, i, 0] = np.mean( celltraces_sorted_sp[:, start:start + binsize], axis=1) binned_cells_sp[:, i, 1] = np.std( celltraces_sorted_sp[:, start:start + binsize], axis=1) / np.sqrt(binsize) binned_cells_shuffled_sp = np.empty((self.numbercells, nbins, 2, 200)) for shuf in range(200): celltraces_shuffled = celltraces_sp[ :, np.random.permutation(np.size(celltraces_sp, 1))] celltraces_shuffled_sorted = celltraces_shuffled[ :, np.argsort(dx_sp)] for i in range(nbins): offset = findlevel(dx_sorted, 1, 'up') if offset is None: StimulusAnalysis._log.info( "dx never crosses 1, all speed data going into single bin") offset = celltraces_shuffled_sorted.shape[1] if i == 0: binned_cells_shuffled_sp[:, i, 0, shuf] = np.mean( celltraces_shuffled_sorted[:, :offset], axis=1) binned_cells_shuffled_sp[:, i, 1, shuf] = np.std( celltraces_shuffled_sorted[:, :offset], axis=1) else: start = offset + (i - 1) * binsize binned_cells_shuffled_sp[:, i, 0, shuf] = np.mean( celltraces_shuffled_sorted[:, start:start + binsize], axis=1) binned_cells_shuffled_sp[:, i, 1, shuf] = np.std( celltraces_shuffled_sorted[:, start:start + binsize], axis=1) nbins = 1 + len(np.where(dx_vis >= 1)[0]) // binsize dx_sorted = dx_vis[np.argsort(dx_vis)] celltraces_sorted_vis = celltraces_vis[:, np.argsort(dx_vis)] binned_cells_vis = np.zeros((self.numbercells, nbins, 2)) binned_dx_vis = np.zeros((nbins, 2)) for i in range(nbins): offset = findlevel(dx_sorted, 1, 'up') if offset is None: StimulusAnalysis._log.info( "dx never crosses 1, all speed data going into single bin") offset = len(dx_sorted) if i == 0: binned_dx_vis[i, 0] = np.mean(dx_sorted[:offset]) binned_dx_vis[i, 1] = np.std( dx_sorted[:offset]) / np.sqrt(offset) binned_cells_vis[:, i, 0] = np.mean( celltraces_sorted_vis[:, :offset], axis=1) binned_cells_vis[:, i, 1] = np.std( celltraces_sorted_vis[:, :offset], axis=1) / np.sqrt(offset) else: start = offset + (i - 1) * binsize binned_dx_vis[i, 0] = np.mean(dx_sorted[start:start + binsize]) binned_dx_vis[i, 1] = np.std( dx_sorted[start:start + binsize]) / np.sqrt(binsize) binned_cells_vis[:, i, 0] = np.mean( celltraces_sorted_vis[:, start:start + binsize], axis=1) binned_cells_vis[:, i, 1] = np.std( celltraces_sorted_vis[:, start:start + binsize], axis=1) / np.sqrt(binsize) binned_cells_shuffled_vis = np.empty((self.numbercells, nbins, 2, 200)) for shuf in range(200): celltraces_shuffled = celltraces_vis[ :, np.random.permutation(np.size(celltraces_vis, 1))] celltraces_shuffled_sorted = celltraces_shuffled[ :, np.argsort(dx_vis)] for i in range(nbins): offset = findlevel(dx_sorted, 1, 'up') if offset is None: StimulusAnalysis._log.info( "dx never crosses 1, all speed data going into single bin") offset = len(dx_sorted) if i == 0: binned_cells_shuffled_vis[:, i, 0, shuf] = np.mean( celltraces_shuffled_sorted[:, :offset], axis=1) binned_cells_shuffled_vis[:, i, 1, shuf] = np.std( celltraces_shuffled_sorted[:, :offset], axis=1) else: start = offset + (i - 1) * binsize binned_cells_shuffled_vis[:, i, 0, shuf] = np.mean( celltraces_shuffled_sorted[:, start:start + binsize], axis=1) binned_cells_shuffled_vis[:, i, 1, shuf] = np.std( celltraces_shuffled_sorted[:, start:start + binsize], axis=1) shuffled_variance_sp = binned_cells_shuffled_sp[ :, :, 0, :].std(axis=1)**2 variance_threshold_sp = np.percentile( shuffled_variance_sp, 99.9, axis=1) response_variance_sp = binned_cells_sp[:, :, 0].std(axis=1)**2 shuffled_variance_vis = binned_cells_shuffled_vis[ :, :, 0, :].std(axis=1)**2 variance_threshold_vis = np.percentile( shuffled_variance_vis, 99.9, axis=1) response_variance_vis = binned_cells_vis[:, :, 0].std(axis=1)**2 for nc in range(self.numbercells): if response_variance_vis[nc] > variance_threshold_vis[nc]: peak_run.mod_vis[nc] = True if response_variance_vis[nc] <= variance_threshold_vis[nc]: peak_run.mod_vis[nc] = False if response_variance_sp[nc] > variance_threshold_sp[nc]: peak_run.mod_sp[nc] = True if response_variance_sp[nc] <= variance_threshold_sp[nc]: peak_run.mod_sp[nc] = False temp = binned_cells_sp[nc, :, 0] start_max = temp.argmax() peak_run.speed_max_sp[nc] = binned_dx_sp[start_max, 0] start_min = temp.argmin() peak_run.speed_min_sp[nc] = binned_dx_sp[start_min, 0] if peak_run.speed_max_sp[nc] > peak_run.speed_min_sp[nc]: test_values = celltraces_sorted_sp[ nc, start_max * binsize:(start_max + 1) * binsize] other_values = np.delete(celltraces_sorted_sp[nc, :], range( start_max * binsize, (start_max + 1) * binsize)) (_, peak_run.ptest_sp[nc]) = st.ks_2samp( test_values, other_values) else: test_values = celltraces_sorted_sp[ nc, start_min * binsize:(start_min + 1) * binsize] other_values = np.delete(celltraces_sorted_sp[nc, :], range( start_min * binsize, (start_min + 1) * binsize)) (_, peak_run.ptest_sp[nc]) = st.ks_2samp( test_values, other_values) temp = binned_cells_vis[nc, :, 0] start_max = temp.argmax() peak_run.speed_max_vis[nc] = binned_dx_vis[start_max, 0] start_min = temp.argmin() peak_run.speed_min_vis[nc] = binned_dx_vis[start_min, 0] if peak_run.speed_max_vis[nc] > peak_run.speed_min_vis[nc]: test_values = celltraces_sorted_vis[ nc, start_max * binsize:(start_max + 1) * binsize] other_values = np.delete(celltraces_sorted_vis[nc, :], range( start_max * binsize, (start_max + 1) * binsize)) else: test_values = celltraces_sorted_vis[ nc, start_min * binsize:(start_min + 1) * binsize] other_values = np.delete(celltraces_sorted_vis[nc, :], range( start_min * binsize, (start_min + 1) * binsize)) (_, peak_run.ptest_vis[nc]) = st.ks_2samp( test_values, other_values) return binned_dx_sp, binned_cells_sp, binned_dx_vis, binned_cells_vis, peak_run
[docs] def get_sweep_response(self): """ Calculates the response to each sweep in the stimulus table for each cell and the mean response. The return is a 3-tuple of: * sweep_response: pd.DataFrame of response dF/F traces organized by cell (column) and sweep (row) * mean_sweep_response: mean values of the traces returned in sweep_response * pval: p value from 1-way ANOVA comparing response during sweep to response prior to sweep Returns ------- 3-tuple: sweep_response, mean_sweep_response, pval """ def do_mean(x): # +1]) return np.mean(x[self.interlength:self.interlength + self.sweeplength + self.extralength]) def do_p_value(x): (_, p) = st.f_oneway(x[:self.interlength], x[ self.interlength:self.interlength + self.sweeplength + self.extralength]) return p StimulusAnalysis._log.info('Calculating responses for each sweep') sweep_response = pd.DataFrame(index=self.stim_table.index.values, columns=list(map(str, range(self.numbercells + 1)))) sweep_response.rename( columns={str(self.numbercells): 'dx'}, inplace=True) for index, row in self.stim_table.iterrows(): start = int(row['start'] - self.interlength) end = int(row['start'] + self.sweeplength + self.interlength) for nc in range(self.numbercells): temp = self.celltraces[int(nc), start:end] sweep_response[str(nc)][index] = 100 * \ ((temp / np.mean(temp[:self.interlength])) - 1) sweep_response['dx'][index] = self.dxcm[start:end] mean_sweep_response = sweep_response.applymap(do_mean) pval = sweep_response.applymap(do_p_value) return sweep_response, mean_sweep_response, pval
[docs] def plot_representational_similarity(self, repsim, stimulus=False): if stimulus: pass ax = plt.gca() ax.imshow(repsim, interpolation='nearest', cmap='plasma')
[docs] def plot_running_speed_histogram(self, xlim=None, nbins=None): if xlim is None: xlim = [-10,100] if nbins is None: nbins = 40 ax = plt.gca() ax.hist(self.dxcm, bins=nbins, range=xlim, color=oplots.STIM_COLOR) ax.set_xlim(xlim) plt.xlabel("running speed (cm/s)") plt.ylabel("time points")
[docs] def plot_speed_tuning(self, cell_specimen_id=None, cell_index=None, evoked_color=oplots.EVOKED_COLOR, spontaneous_color=oplots.SPONTANEOUS_COLOR): cell_index = self.row_from_cell_id(cell_specimen_id, cell_index) oplots.plot_combined_speed(self.binned_cells_vis[cell_index,:,:]*100, self.binned_dx_vis[:,:], self.binned_cells_sp[cell_index,:,:]*100, self.binned_dx_sp[:,:], evoked_color, spontaneous_color) ax = plt.gca() plt.xlabel("running speed (cm/s)") plt.ylabel("percent dF/F")
[docs] def row_from_cell_id(self, csid=None, idx=None): if csid is not None and not np.isnan(csid): return self.data_set.get_cell_specimen_ids().tolist().index(csid) elif idx is not None: return idx else: raise Exception("Could not find row for csid(%s) idx(%s)" % (str(csid), str(idx)))