# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import numpy as np
import scipy.sparse as sparse
from scipy.linalg import solve_banded
import logging
[docs]def get_diagonals_from_sparse(mat):
''' Returns a dictionary of diagonals keyed by offsets
Parameters
----------
mat: scipy.sparse matrix
Returns
-------
dictionary: diagonals keyed by offsets
'''
mat_dia = mat.todia() # make sure the matrix is in diagonal format
offsets = mat_dia.offsets
diagonals = mat_dia.data
mat_dict = {}
for i, o in enumerate(offsets):
mat_dict[o] = diagonals[i]
return mat_dict
[docs]def ab_from_diagonals(mat_dict):
''' Constructs value for scipy.linalg.solve_banded
Parameters
----------
mat_dict: dictionary of diagonals keyed by offsets
Returns
-------
ab: value for scipy.linalg.solve_banded
'''
offsets = list(mat_dict.keys())
l = -np.min(offsets)
u = np.max(offsets)
T = mat_dict[offsets[0]].shape[0]
ab = np.zeros([l + u + 1, T])
for o in offsets:
index = u - o
ab[index] = mat_dict[o]
return ab
[docs]def error_calc(F_M, F_N, F_C, r):
er = np.sqrt(np.mean(np.square(F_C - (F_M - r * F_N)))) / np.mean(F_M)
return er
[docs]def error_calc_outlier(F_M, F_N, F_C, r):
std_F_M = np.std(F_M)
mean_F_M = np.mean(F_M)
ind_outlier = np.where(F_M > mean_F_M + 2. * std_F_M)
er = np.sqrt(np.mean(np.square(
F_C[ind_outlier] - (F_M[ind_outlier] - r * F_N[ind_outlier])))) / np.mean(F_M[ind_outlier])
return er
[docs]def ab_from_T(T, lam, dt):
# using csr because multiplication is fast
Ls = -sparse.eye(T - 1, T, format='csr') + \
sparse.eye(T - 1, T, 1, format='csr')
Ls /= dt
Ls2 = Ls.T.dot(Ls)
M = sparse.eye(T) + lam * Ls2
mat_dict = get_diagonals_from_sparse(M)
ab = ab_from_diagonals(mat_dict)
return ab
[docs]def normalize_F(F_M, F_N):
F_N_min, F_N_max = float(np.amin(F_N)), float(np.amax(F_N))
# rescale so F_N is [0,1]
F_M_s = (F_M - F_N_min) / (F_N_max - F_N_min)
F_N_s = (F_N - F_N_min) / (F_N_max - F_N_min)
return F_M_s, F_N_s
[docs]def alpha_filter(A=1.0, alpha=0.05, beta=0.25, T=100):
return A * np.exp(-alpha * np.arange(T)) - np.exp(-beta * np.arange(T))
[docs]def validate_with_synthetic_F(T, N):
""" Compute N synthetic traces of length T with known values of r, then estimate r.
TODO: docs
"""
af1 = alpha_filter()
af2 = alpha_filter(alpha=0.1, beta=0.5)
r_truth_vals = []
r_est_vals = []
for n in range(N):
F_M_truth, F_N_truth, F_C_truth, r_truth = synthesize_F(T, af1, af2)
results = estimate_contamination_ratios(F_M_truth, F_N_truth)
r_est = results['r']
r_truth_vals.append(r_truth)
r_est_vals.append(r_est)
return r_truth_vals, r_est_vals
[docs]def synthesize_F(T, af1, af2, p1=0.05, p2=0.1):
""" Build a synthetic F_C, F_M, F_N, and r of length T
TODO: docs
"""
x1 = np.random.random(T) < p1
F_C = np.convolve(af1, x1, mode='full')[:T]
x2 = np.random.random(T) < p2
F_N = np.convolve(af2, x2, mode='full')[:T]
r = 2.0 * np.random.random()
F_M = F_C + r * F_N
return F_M, F_N, F_C, r
[docs]class NeuropilSubtract(object):
""" TODO: docs
"""
def __init__(self, lam=0.05, dt=1.0, folds=4):
self.lam = lam
self.dt = dt
self.folds = folds
self.T = None
self.T_f = None
self.ab = None
self.F_M = None
self.F_N = None
self.r_vals = None
self.error_vals = None
self.r = None
self.error = None
[docs] def set_F(self, F_M, F_N):
""" Break the F_M and F_N traces into the number of folds specified
in the class constructor and normalize each fold of F_M and R_N relative to F_N.
"""
F_M_len = len(F_M)
F_N_len = len(F_N)
if F_M_len != F_N_len:
raise Exception(
"F_M and F_N must have the same length (%d vs %d)" % (F_M_len, F_N_len))
if self.T != F_M_len:
logging.debug("updating ab matrix for new T=%d", F_M_len)
self.T = F_M_len
self.T_f = int(self.T / self.folds)
self.ab = ab_from_T(self.T_f, self.lam, self.dt)
self.F_M = []
self.F_N = []
for fi in range(self.folds):
# F_M_i_s, F_N_i_s = normalize_F(F_M[fi*self.T_f:(fi+1)*self.T_f],
# F_N[fi*self.T_f:(fi+1)*self.T_f])
self.F_M.append(F_M[fi * self.T_f:(fi + 1) * self.T_f])
self.F_N.append(F_N[fi * self.T_f:(fi + 1) * self.T_f])
[docs] def fit_block_coordinate_desc(self, r_init=5.0, min_delta_r=0.00000001):
F_M = np.concatenate(self.F_M)
F_N = np.concatenate(self.F_N)
r_vals = []
error_vals = []
r = r_init
delta_r = None
it = 0
ab = ab_from_T(self.T, self.lam, self.dt)
while delta_r is None or delta_r > min_delta_r:
F_C = solve_banded((1, 1), ab, F_M - r * F_N)
new_r = - np.sum((F_C - F_M) * F_N) / np.sum(np.square(F_N))
error = self.estimate_error(new_r)
error_vals.append(error)
r_vals.append(new_r)
if r is not None:
delta_r = np.abs(r - new_r) / r
r = new_r
it += 1
self.r_vals = r_vals
self.error_vals = error_vals
self.r = r_vals[-1]
self.error = error_vals.min()
[docs] def fit(self, r_range=[0.0, 2.0], iterations=3, dr=0.1, dr_factor=0.1):
""" Estimate error values for a range of r values. Identify a new r range
around the minimum error values and repeat multiple times.
TODO: docs
"""
global_min_error = None
global_min_r = None
r_vals = []
error_vals = []
it_range = r_range
it = 0
it_dr = dr
while it < iterations:
it_errors = []
# build a set of r values evenly distributed in a current range
rs = np.arange(it_range[0], it_range[1], it_dr)
# estimate error for each r
for r in rs:
error = self.estimate_error(r)
it_errors.append(error)
r_vals.append(r)
error_vals.append(error)
# find the minimum in this range and update the global minimum
min_i = np.argmin(it_errors)
min_error = it_errors[min_i]
if global_min_error is None or min_error < global_min_error:
global_min_error = min_error
global_min_r = rs[min_i]
logging.debug("iteration %d, r=%0.4f, e=%.6e",
it, global_min_r, global_min_error)
# if the minimum error is on the upper boundary,
# extend the boundary and redo this iteration
if min_i == len(it_errors) - 1:
logging.debug(
"minimum error found on upper r bound, extending range")
it_range = [rs[-1], rs[-1] + (rs[-1] - rs[0])]
else:
# error is somewhere on either side of the minimum error index
it_range = [rs[max(min_i - 1, 0)],
rs[min(min_i + 1, len(rs) - 1)]]
it_dr *= dr_factor
it += 1
self.r_vals = r_vals
self.error_vals = error_vals
self.r = global_min_r
self.error = global_min_error
[docs] def estimate_error(self, r):
""" Estimate error values for a given r for each fold and return the mean. """
errors = np.zeros(self.folds)
for fi in range(self.folds):
F_M = self.F_M[fi]
F_N = self.F_N[fi]
F_C = solve_banded((1, 1), self.ab, F_M - r * F_N)
errors[fi] = abs(error_calc(F_M, F_N, F_C, r))
return np.mean(errors)
[docs]def estimate_contamination_ratios(F_M, F_N,
lam=0.05, folds=4, iterations=3,
r_range=[0.0, 2.0], dr=0.1, dr_factor=0.1):
''' Calculates neuropil contamination of ROI
Parameters
----------
F_M: ROI trace
F_N: Neuropil trace
Returns
-------
dictionary: key-value pairs
* 'r': the contamination ratio -- corrected trace = M - r*N
* 'err': RMS error
* 'min_error': minimum error
* 'bounds_error': boolean. True if error or R are outside tolerance
'''
ns = NeuropilSubtract(lam=lam, folds=folds)
ns.set_F(F_M, F_N)
ns.fit(r_range=r_range,
iterations=iterations,
dr=dr,
dr_factor=dr_factor)
# ns.fit_block_coordinate_desc()
if ns.r < 0:
logging.warning("r is negative (%f). return 0.0.", ns.r)
ns.r = 0
return {
"r": ns.r,
"r_vals": ns.r_vals,
"err": ns.error,
"err_vals": ns.error_vals,
"min_error": ns.error,
"it": len(ns.r_vals)
}