# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2016-2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import scipy.stats as st
import numpy as np
import pandas as pd
import logging
from .findlevel import findlevel
from .brain_observatory_exceptions import BrainObservatoryAnalysisException
from . import observatory_plots as oplots
import matplotlib.pyplot as plt
[docs]class StimulusAnalysis(object):
""" Base class for all response analysis code. Subclasses are responsible
for computing metrics and traces relevant to a particular stimulus.
The base class contains methods for organizing sweep responses row of
a stimulus stable (get_sweep_response). Subclasses implement the
get_response method, computes the mean sweep response to all sweeps for
a each stimulus condition.
Parameters
----------
data_set: BrainObservatoryNwbDataSet instance
speed_tuning: boolean, deprecated
Whether or not to compute speed tuning histograms
"""
_log = logging.getLogger('allensdk.brain_observatory.stimulus_analysis')
_PRELOAD = "PRELOAD"
def __init__(self, data_set):
self.data_set = data_set
self._timestamps = StimulusAnalysis._PRELOAD
self._celltraces = StimulusAnalysis._PRELOAD
self._acquisition_rate = StimulusAnalysis._PRELOAD
self._numbercells = StimulusAnalysis._PRELOAD
self._roi_id = StimulusAnalysis._PRELOAD
self._cell_id = StimulusAnalysis._PRELOAD
self._dfftraces = StimulusAnalysis._PRELOAD
self._dxcm = StimulusAnalysis._PRELOAD
self._dxtime = StimulusAnalysis._PRELOAD
self._binned_dx_sp = StimulusAnalysis._PRELOAD
self._binned_cells_sp = StimulusAnalysis._PRELOAD
self._binned_dx_vis = StimulusAnalysis._PRELOAD
self._binned_cells_vis = StimulusAnalysis._PRELOAD
self._peak_run = StimulusAnalysis._PRELOAD
self._binsize = 800
self._stim_table = StimulusAnalysis._PRELOAD
self._response = StimulusAnalysis._PRELOAD
self._sweep_response = StimulusAnalysis._PRELOAD
self._mean_sweep_response = StimulusAnalysis._PRELOAD
self._pval = StimulusAnalysis._PRELOAD
self._peak = StimulusAnalysis._PRELOAD
@property
def stim_table(self):
if self._stim_table is StimulusAnalysis._PRELOAD:
self.populate_stimulus_table()
return self._stim_table
@property
def sweep_response(self):
if self._sweep_response is StimulusAnalysis._PRELOAD:
self._sweep_response, self._mean_sweep_response, self._pval = \
self.get_sweep_response()
return self._sweep_response
@property
def mean_sweep_response(self):
if self._mean_sweep_response is StimulusAnalysis._PRELOAD:
self._sweep_response, self._mean_sweep_response, self._pval = \
self.get_sweep_response()
return self._mean_sweep_response
@property
def pval(self):
if self._pval is StimulusAnalysis._PRELOAD:
self._sweep_response, self._mean_sweep_response, self._pval = \
self.get_sweep_response()
return self._pval
@property
def response(self):
if self._response is StimulusAnalysis._PRELOAD:
self._response = self.get_response()
return self._response
@property
def peak(self):
if self._peak is StimulusAnalysis._PRELOAD:
self._peak = self.get_peak()
return self._peak
[docs] def get_fluorescence(self):
# get fluorescence
self._timestamps, self._celltraces = \
self.data_set.get_corrected_fluorescence_traces()
self._acquisition_rate = 1 / (self.timestamps[1] - self.timestamps[0])
self._numbercells = len(self.celltraces) # number of cells in dataset
@property
def timestamps(self):
if self._timestamps is StimulusAnalysis._PRELOAD:
self.get_fluorescence()
return self._timestamps
@property
def celltraces(self):
if self._celltraces is StimulusAnalysis._PRELOAD:
self.get_fluorescence()
return self._celltraces
@property
def acquisition_rate(self):
if self._acquisition_rate is StimulusAnalysis._PRELOAD:
self.get_fluorescence()
return self._acquisition_rate
@property
def numbercells(self):
if self._numbercells is StimulusAnalysis._PRELOAD:
self.get_fluorescence()
return self._numbercells
@property
def roi_id(self):
if self._roi_id is StimulusAnalysis._PRELOAD:
self._roi_id = self.data_set.get_roi_ids()
return self._roi_id
@property
def cell_id(self):
if self._cell_id is StimulusAnalysis._PRELOAD:
self._cell_id = self.data_set.get_cell_specimen_ids()
return self._cell_id
@property
def dfftraces(self):
if self._dfftraces is StimulusAnalysis._PRELOAD:
_, self._dfftraces = self.data_set.get_dff_traces()
return self._dfftraces
@property
def dxcm(self):
if self._dxcm is StimulusAnalysis._PRELOAD:
self._dxcm, self._dxtime = self.data_set.get_running_speed()
return self._dxcm
@property
def dxtime(self):
if self._dxtime is StimulusAnalysis._PRELOAD:
self._dxcm, self._dxtime = self.data_set.get_running_speed()
return self._dxtime
@property
def binned_dx_sp(self):
if self._binned_dx_sp is StimulusAnalysis._PRELOAD:
(self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis,
self._binned_cells_vis, self._peak_run) = \
self.get_speed_tuning(binsize=self._binsize)
return self._binned_dx_sp
@property
def binned_cells_sp(self):
if self._binned_cells_sp is StimulusAnalysis._PRELOAD:
(self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis,
self._binned_cells_vis, self._peak_run) = \
self.get_speed_tuning(binsize=self._binsize)
return self._binned_cells_sp
@property
def binned_dx_vis(self):
if self._binned_dx_vis is StimulusAnalysis._PRELOAD:
(self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis,
self._binned_cells_vis, self._peak_run) = \
self.get_speed_tuning(binsize=self._binsize)
return self._binned_dx_vis
@property
def binned_cells_vis(self):
if self._binned_cells_vis is StimulusAnalysis._PRELOAD:
(self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis,
self._binned_cells_vis, self._peak_run) = \
self.get_speed_tuning(binsize=self._binsize)
return self._binned_cells_vis
@property
def peak_run(self):
if self._peak_run is StimulusAnalysis._PRELOAD:
(self._binned_dx_sp, self._binned_cells_sp, self._binned_dx_vis,
self._binned_cells_vis, self._peak_run) = \
self.get_speed_tuning(binsize=self._binsize)
return self._peak_run
[docs] def populate_stimulus_table(self):
""" Implemented by subclasses. """
raise BrainObservatoryAnalysisException("populate_stimulus_table not implemented")
[docs] def get_response(self):
""" Implemented by subclasses. """
raise BrainObservatoryAnalysisException("get_response not implemented")
[docs] def get_peak(self):
""" Implemented by subclasses. """
raise BrainObservatoryAnalysisException("get_peak not implemented")
[docs] def get_speed_tuning(self, binsize):
""" Calculates speed tuning, spontaneous versus visually driven. The return is a 5-tuple
of speed and dF/F histograms.
binned_dx_sp: (bins,2) np.ndarray of running speeds binned during spontaneous activity stimulus.
The first bin contains all speeds below 1 cm/s. Dimension 0 is mean running speed in the bin.
Dimension 1 is the standard error of the mean.
binned_cells_sp: (bins,2) np.ndarray of fluorescence during spontaneous activity stimulus.
First bin contains all data for speeds below 1 cm/s. Dimension 0 is mean fluorescence in the bin.
Dimension 1 is the standard error of the mean.
binned_dx_vis: (bins,2) np.ndarray of running speeds outside of spontaneous activity stimulus.
The first bin contains all speeds below 1 cm/s. Dimension 0 is mean running speed in the bin.
Dimension 1 is the standard error of the mean.
binned_cells_vis: np.ndarray of fluorescence outside of spontaneous activity stimulu.
First bin contains all data for speeds below 1 cm/s. Dimension 0 is mean fluorescence in the bin.
Dimension 1 is the standard error of the mean.
peak_run: pd.DataFrame of speed-related properties of a cell.
Returns
-------
tuple: binned_dx_sp, binned_cells_sp, binned_dx_vis, binned_cells_vis, peak_run
"""
StimulusAnalysis._log.info(
'Calculating speed tuning, spontaneous vs visually driven')
celltraces_trimmed = np.delete(self.dfftraces, range(
len(self.dxcm), np.size(self.dfftraces, 1)), axis=1)
# pull out spontaneous epoch(s)
spontaneous = self.data_set.get_stimulus_table('spontaneous')
peak_run = pd.DataFrame(index=range(self.numbercells), columns=(
'speed_max_sp', 'speed_min_sp', 'ptest_sp', 'mod_sp', 'speed_max_vis', 'speed_min_vis', 'ptest_vis', 'mod_vis'))
dx_sp = self.dxcm[spontaneous.start.iloc[-1]:spontaneous.end.iloc[-1]]
celltraces_sp = celltraces_trimmed[
:, spontaneous.start.iloc[-1]:spontaneous.end.iloc[-1]]
dx_vis = np.delete(self.dxcm, np.arange(
spontaneous.start.iloc[-1], spontaneous.end.iloc[-1]))
celltraces_vis = np.delete(celltraces_trimmed, np.arange(
spontaneous.start.iloc[-1], spontaneous.end.iloc[-1]), axis=1)
if len(spontaneous) > 1:
dx_sp = np.append(
dx_sp, self.dxcm[spontaneous.start.iloc[-2]:spontaneous.end.iloc[-2]], axis=0)
celltraces_sp = np.append(celltraces_sp, celltraces_trimmed[
:, spontaneous.start.iloc[-2]:spontaneous.end.iloc[-2]], axis=1)
dx_vis = np.delete(dx_vis, np.arange(
spontaneous.start.iloc[-2], spontaneous.end.iloc[-2]))
celltraces_vis = np.delete(celltraces_vis, np.arange(
spontaneous.start.iloc[-2], spontaneous.end.iloc[-2]), axis=1)
celltraces_vis = celltraces_vis[:, ~np.isnan(dx_vis)]
dx_vis = dx_vis[~np.isnan(dx_vis)]
nbins = 1 + len(np.where(dx_sp >= 1)[0]) // binsize
dx_sorted = dx_sp[np.argsort(dx_sp)]
celltraces_sorted_sp = celltraces_sp[:, np.argsort(dx_sp)]
binned_cells_sp = np.zeros((self.numbercells, nbins, 2))
binned_dx_sp = np.zeros((nbins, 2))
for i in range(nbins):
if np.all(np.isnan(dx_sorted)):
raise BrainObservatoryAnalysisException(
"dx is filled with NaNs")
offset = findlevel(dx_sorted, 1, 'up')
if offset is None:
StimulusAnalysis._log.info(
"dx never crosses 1, all speed data going into single bin")
offset = len(dx_sorted)
if i == 0:
binned_dx_sp[i, 0] = np.mean(dx_sorted[:offset])
binned_dx_sp[i, 1] = np.std(
dx_sorted[:offset]) / np.sqrt(offset)
binned_cells_sp[:, i, 0] = np.mean(
celltraces_sorted_sp[:, :offset], axis=1)
binned_cells_sp[:, i, 1] = np.std(
celltraces_sorted_sp[:, :offset], axis=1) / np.sqrt(offset)
else:
start = offset + (i - 1) * binsize
binned_dx_sp[i, 0] = np.mean(dx_sorted[start:start + binsize])
binned_dx_sp[i, 1] = np.std(
dx_sorted[start:start + binsize]) / np.sqrt(binsize)
binned_cells_sp[:, i, 0] = np.mean(
celltraces_sorted_sp[:, start:start + binsize], axis=1)
binned_cells_sp[:, i, 1] = np.std(
celltraces_sorted_sp[:, start:start + binsize], axis=1) / np.sqrt(binsize)
binned_cells_shuffled_sp = np.empty((self.numbercells, nbins, 2, 200))
for shuf in range(200):
celltraces_shuffled = celltraces_sp[
:, np.random.permutation(np.size(celltraces_sp, 1))]
celltraces_shuffled_sorted = celltraces_shuffled[
:, np.argsort(dx_sp)]
for i in range(nbins):
offset = findlevel(dx_sorted, 1, 'up')
if offset is None:
StimulusAnalysis._log.info(
"dx never crosses 1, all speed data going into single bin")
offset = celltraces_shuffled_sorted.shape[1]
if i == 0:
binned_cells_shuffled_sp[:, i, 0, shuf] = np.mean(
celltraces_shuffled_sorted[:, :offset], axis=1)
binned_cells_shuffled_sp[:, i, 1, shuf] = np.std(
celltraces_shuffled_sorted[:, :offset], axis=1)
else:
start = offset + (i - 1) * binsize
binned_cells_shuffled_sp[:, i, 0, shuf] = np.mean(
celltraces_shuffled_sorted[:, start:start + binsize], axis=1)
binned_cells_shuffled_sp[:, i, 1, shuf] = np.std(
celltraces_shuffled_sorted[:, start:start + binsize], axis=1)
nbins = 1 + len(np.where(dx_vis >= 1)[0]) // binsize
dx_sorted = dx_vis[np.argsort(dx_vis)]
celltraces_sorted_vis = celltraces_vis[:, np.argsort(dx_vis)]
binned_cells_vis = np.zeros((self.numbercells, nbins, 2))
binned_dx_vis = np.zeros((nbins, 2))
for i in range(nbins):
offset = findlevel(dx_sorted, 1, 'up')
if offset is None:
StimulusAnalysis._log.info(
"dx never crosses 1, all speed data going into single bin")
offset = len(dx_sorted)
if i == 0:
binned_dx_vis[i, 0] = np.mean(dx_sorted[:offset])
binned_dx_vis[i, 1] = np.std(
dx_sorted[:offset]) / np.sqrt(offset)
binned_cells_vis[:, i, 0] = np.mean(
celltraces_sorted_vis[:, :offset], axis=1)
binned_cells_vis[:, i, 1] = np.std(
celltraces_sorted_vis[:, :offset], axis=1) / np.sqrt(offset)
else:
start = offset + (i - 1) * binsize
binned_dx_vis[i, 0] = np.mean(dx_sorted[start:start + binsize])
binned_dx_vis[i, 1] = np.std(
dx_sorted[start:start + binsize]) / np.sqrt(binsize)
binned_cells_vis[:, i, 0] = np.mean(
celltraces_sorted_vis[:, start:start + binsize], axis=1)
binned_cells_vis[:, i, 1] = np.std(
celltraces_sorted_vis[:, start:start + binsize], axis=1) / np.sqrt(binsize)
binned_cells_shuffled_vis = np.empty((self.numbercells, nbins, 2, 200))
for shuf in range(200):
celltraces_shuffled = celltraces_vis[
:, np.random.permutation(np.size(celltraces_vis, 1))]
celltraces_shuffled_sorted = celltraces_shuffled[
:, np.argsort(dx_vis)]
for i in range(nbins):
offset = findlevel(dx_sorted, 1, 'up')
if offset is None:
StimulusAnalysis._log.info(
"dx never crosses 1, all speed data going into single bin")
offset = len(dx_sorted)
if i == 0:
binned_cells_shuffled_vis[:, i, 0, shuf] = np.mean(
celltraces_shuffled_sorted[:, :offset], axis=1)
binned_cells_shuffled_vis[:, i, 1, shuf] = np.std(
celltraces_shuffled_sorted[:, :offset], axis=1)
else:
start = offset + (i - 1) * binsize
binned_cells_shuffled_vis[:, i, 0, shuf] = np.mean(
celltraces_shuffled_sorted[:, start:start + binsize], axis=1)
binned_cells_shuffled_vis[:, i, 1, shuf] = np.std(
celltraces_shuffled_sorted[:, start:start + binsize], axis=1)
shuffled_variance_sp = binned_cells_shuffled_sp[
:, :, 0, :].std(axis=1)**2
variance_threshold_sp = np.percentile(
shuffled_variance_sp, 99.9, axis=1)
response_variance_sp = binned_cells_sp[:, :, 0].std(axis=1)**2
shuffled_variance_vis = binned_cells_shuffled_vis[
:, :, 0, :].std(axis=1)**2
variance_threshold_vis = np.percentile(
shuffled_variance_vis, 99.9, axis=1)
response_variance_vis = binned_cells_vis[:, :, 0].std(axis=1)**2
for nc in range(self.numbercells):
if response_variance_vis[nc] > variance_threshold_vis[nc]:
peak_run.mod_vis[nc] = True
if response_variance_vis[nc] <= variance_threshold_vis[nc]:
peak_run.mod_vis[nc] = False
if response_variance_sp[nc] > variance_threshold_sp[nc]:
peak_run.mod_sp[nc] = True
if response_variance_sp[nc] <= variance_threshold_sp[nc]:
peak_run.mod_sp[nc] = False
temp = binned_cells_sp[nc, :, 0]
start_max = temp.argmax()
peak_run.speed_max_sp[nc] = binned_dx_sp[start_max, 0]
start_min = temp.argmin()
peak_run.speed_min_sp[nc] = binned_dx_sp[start_min, 0]
if peak_run.speed_max_sp[nc] > peak_run.speed_min_sp[nc]:
test_values = celltraces_sorted_sp[
nc, start_max * binsize:(start_max + 1) * binsize]
other_values = np.delete(celltraces_sorted_sp[nc, :], range(
start_max * binsize, (start_max + 1) * binsize))
(_, peak_run.ptest_sp[nc]) = st.ks_2samp(
test_values, other_values)
else:
test_values = celltraces_sorted_sp[
nc, start_min * binsize:(start_min + 1) * binsize]
other_values = np.delete(celltraces_sorted_sp[nc, :], range(
start_min * binsize, (start_min + 1) * binsize))
(_, peak_run.ptest_sp[nc]) = st.ks_2samp(
test_values, other_values)
temp = binned_cells_vis[nc, :, 0]
start_max = temp.argmax()
peak_run.speed_max_vis[nc] = binned_dx_vis[start_max, 0]
start_min = temp.argmin()
peak_run.speed_min_vis[nc] = binned_dx_vis[start_min, 0]
if peak_run.speed_max_vis[nc] > peak_run.speed_min_vis[nc]:
test_values = celltraces_sorted_vis[
nc, start_max * binsize:(start_max + 1) * binsize]
other_values = np.delete(celltraces_sorted_vis[nc, :], range(
start_max * binsize, (start_max + 1) * binsize))
else:
test_values = celltraces_sorted_vis[
nc, start_min * binsize:(start_min + 1) * binsize]
other_values = np.delete(celltraces_sorted_vis[nc, :], range(
start_min * binsize, (start_min + 1) * binsize))
(_, peak_run.ptest_vis[nc]) = st.ks_2samp(
test_values, other_values)
return binned_dx_sp, binned_cells_sp, binned_dx_vis, binned_cells_vis, peak_run
[docs] def get_sweep_response(self):
""" Calculates the response to each sweep in the stimulus table for each cell and the mean response.
The return is a 3-tuple of:
* sweep_response: pd.DataFrame of response dF/F traces organized by cell (column) and sweep (row)
* mean_sweep_response: mean values of the traces returned in sweep_response
* pval: p value from 1-way ANOVA comparing response during sweep to response prior to sweep
Returns
-------
3-tuple: sweep_response, mean_sweep_response, pval
"""
def do_mean(x):
# +1])
return np.mean(x[self.interlength:self.interlength + self.sweeplength + self.extralength])
def do_p_value(x):
(_, p) = st.f_oneway(x[:self.interlength], x[
self.interlength:self.interlength + self.sweeplength + self.extralength])
return p
StimulusAnalysis._log.info('Calculating responses for each sweep')
sweep_response = pd.DataFrame(index=self.stim_table.index.values,
columns=list(map(str, range(self.numbercells + 1))))
sweep_response.rename(
columns={str(self.numbercells): 'dx'}, inplace=True)
for index, row in self.stim_table.iterrows():
start = int(row['start'] - self.interlength)
end = int(row['start'] + self.sweeplength + self.interlength)
for nc in range(self.numbercells):
temp = self.celltraces[int(nc), start:end]
sweep_response[str(nc)][index] = 100 * \
((temp / np.mean(temp[:self.interlength])) - 1)
sweep_response['dx'][index] = self.dxcm[start:end]
mean_sweep_response = sweep_response.applymap(do_mean)
pval = sweep_response.applymap(do_p_value)
return sweep_response, mean_sweep_response, pval
[docs] def plot_representational_similarity(self, repsim, stimulus=False):
if stimulus:
pass
ax = plt.gca()
ax.imshow(repsim, interpolation='nearest', cmap='plasma')
[docs] def plot_running_speed_histogram(self, xlim=None, nbins=None):
if xlim is None:
xlim = [-10,100]
if nbins is None:
nbins = 40
ax = plt.gca()
ax.hist(self.dxcm, bins=nbins, range=xlim, color=oplots.STIM_COLOR)
ax.set_xlim(xlim)
plt.xlabel("running speed (cm/s)")
plt.ylabel("time points")
[docs] def plot_speed_tuning(self, cell_specimen_id=None,
cell_index=None,
evoked_color=oplots.EVOKED_COLOR,
spontaneous_color=oplots.SPONTANEOUS_COLOR):
cell_index = self.row_from_cell_id(cell_specimen_id, cell_index)
oplots.plot_combined_speed(self.binned_cells_vis[cell_index,:,:]*100, self.binned_dx_vis[:,:],
self.binned_cells_sp[cell_index,:,:]*100, self.binned_dx_sp[:,:],
evoked_color, spontaneous_color)
ax = plt.gca()
plt.xlabel("running speed (cm/s)")
plt.ylabel("percent dF/F")
[docs] def row_from_cell_id(self, csid=None, idx=None):
if csid is not None and not np.isnan(csid):
return self.data_set.get_cell_specimen_ids().tolist().index(csid)
elif idx is not None:
return idx
else:
raise Exception("Could not find row for csid(%s) idx(%s)" % (str(csid), str(idx)))