Source code for allensdk.ephys.feature_extractor
# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2015-2016. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import sys
import math
import numpy as np
import scipy.signal as signal
import logging
# Design notes:
# to generate an average feature file, all sweeps must have all features
# to generate a fitness score of a sweep to a feature file,, the sweep
# must have all features in the file. If one is absent, a penalty
# of TODO ??? will be assessed
# set of features
[docs]class EphysFeatures( object ):
def __init__(self, name):
# feature mean and standard deviations
self.mean = {}
self.stdev = {}
# human-readable names for features
self.glossary = {}
# table indicating how to score feature
# 'hit' feature exists:
# 'ignore' do nothing
# 'stdev' score is # stdevs from target mean
# 'miss' feature absent:
# 'constant' score = scoring['constant']
# 'mean_mult' score = mean * scoring['mean_mult']
#
self.scoring = {}
self.name = name
################################################################
# ignore scores
ignore_score = { "hit": "ignore" }
self.glossary["n_spikes"] = "Number of spikes"
self.scoring["n_spikes"] = ignore_score
################################################################
# ignore misses
ignore_miss = { "hit":"stdev", "miss":"const", "const":0 }
self.glossary["adapt"] = "Adaptation index"
self.scoring["adapt"] = ignore_miss
self.glossary["latency"] = "Time to first spike (ms)"
self.scoring["latency"] = ignore_miss
################################################################
# base miss off mean
mean_score = { "hit":"stdev", "miss":"mean_mult", "mean_mult":2 }
self.glossary["ISICV"] = "ISI-CV"
self.scoring["ISICV"] = mean_score
################################################################
# normal scoring
normal_score = { "hit":"stdev", "miss":"const", "const":20 }
self.glossary["isi_avg"] = "Average ISI (ms)"
self.scoring["isi_avg"] = ignore_score
self.glossary["doublet"] = "Doublet ISI (ms)"
self.scoring["doublet"] = normal_score
self.glossary["f_fast_ahp"] = "Fast AHP (mV)"
self.scoring["f_fast_ahp"] = normal_score
self.glossary["f_slow_ahp"] = "Slow AHP (mV)"
self.scoring["f_slow_ahp"] = normal_score
self.glossary["f_slow_ahp_time"] = "Slow AHP time"
self.scoring["f_slow_ahp_time"] = normal_score
self.glossary["base_v"] = "Baseline voltage (mV)"
self.scoring["base_v"] = normal_score
#self.glossary["base_v2"] = "Baseline voltage 2 (mV)"
#self.scoring["base_v2"] = normal_score
#self.glossary["base_v3"] = "Baseline voltage 3 (mV)"
#self.scoring["base_v3"] = normal_score
################################################################
# per spike scoring
perspike_score = { "hit":"perspike", "miss":"const", "const":20, "skip_last_n":0 }
self.glossary["f_peak"] = "Spike height (mV)"
self.scoring["f_peak"] = perspike_score.copy()
self.glossary["f_trough"] = "Spike depth (mV)"
self.scoring["f_trough"] = perspike_score.copy()
self.scoring["f_trough"]["skip_last_n"] = 1
# self.glossary["f_w"] = "Spike width at -30 mV (ms)"
# self.scoring["f_w"] = perspike_score.copy()
self.glossary["upstroke"] = "Peak upstroke (mV/ms)"
self.scoring["upstroke"] = perspike_score.copy()
self.glossary["upstroke_v"] = "Vm of peak upstroke (mV)"
self.scoring["upstroke_v"] = perspike_score.copy()
self.glossary["downstroke"] = "Peak downstroke (mV/ms)"
self.scoring["downstroke"] = perspike_score.copy()
self.glossary["downstroke_v"] = "Vm of peak downstroke (mV)"
self.scoring["downstroke_v"] = perspike_score.copy()
self.glossary["threshold"] = "Threshold voltage (mV)"
self.scoring["threshold"] = perspike_score.copy()
self.glossary["width"] = "Spike width at half-max (ms)"
self.scoring["width"] = perspike_score.copy()
self.scoring["width"]["skip_last_n"] = 1
self.glossary["thresh_ramp"] = "Change in dv/dt over first 5 mV past threshold (mV/ms)"
self.scoring["thresh_ramp"] = perspike_score.copy()
################################################################
# heavily penalize when there are no spikes
spike_score = { "hit":"stdev", "miss":"const", "const":250 }
self.glossary["rate"] = "Firing rate (Hz)"
self.scoring["rate"] = spike_score
[docs] def print_out(self):
print("Features from " + self.name)
for k in self.mean.keys():
if k in self.glossary:
st = "%30s = " % self.glossary[k]
if self.mean[k] is not None:
st += "%g" % self.mean[k]
else:
st += "--------"
if k in self.stdev and self.stdev[k] is not None:
st += " +/- %g" % self.stdev[k]
print(st)
# initialize summary feature set from file
[docs] def clone(self, param_dict):
for k in param_dict.keys():
self.mean[k] = param_dict[k]["mean"]
self.stdev[k] = param_dict[k]["stdev"]
[docs]class EphysFeatureExtractor( object ):
def __init__(self):
# list of feature set instances
self.feature_list = []
# names of each element in feature list
self.feature_source = []
# feature set object representing combination of all instances
self.summary = None
# adds new feature set instance to feature_list
[docs] def process_instance(self, name, v, curr, t, onset, dur, stim_name):
feature = EphysFeatures(name)
################################################################
# set stop time -- run until end of stimulus or end of sweep
# comment-out the one of the two approaches
# detect spikes only during stimulus
start = onset
stop = onset + dur
# detect spikes for all of sweep
#start = 0
#stop = t[-1]
################################################################
# pull out spike times
# calculate the derivative only within target window
# otherwise get spurious detection at ends of stimuli
# filter with 10kHz cutoff if constant 200kHz sample rate (ie experimental trace)
start_idx = np.where(t >= start)[0][0]
stop_idx = np.where(t >= stop)[0][0]
v_target = v[start_idx:stop_idx]
if np.abs(t[1] - t[0] - 5e-6) < 1e-7 and np.var(np.diff(t)) < 1e-6:
b, a = signal.bessel(4, 0.1, "low")
smooth_v = signal.filtfilt(b, a, v_target, axis=0)
dv = np.diff(smooth_v)
else:
dv = np.diff(v_target)
dvdt = dv / (np.diff(t[start_idx:stop_idx]) * 1e3) # in mV/ms
dv_cutoff = 20
thresh_pct = 0.05
spikes = []
temp_spk_idxs = np.where(np.diff(np.greater_equal(dvdt, dv_cutoff).astype(int)) == 1)[0] # find positive-going crossings of 100 mV/ms
spk_idxs = []
for i, temp in enumerate(temp_spk_idxs):
if i == 0:
spk_idxs.append(temp)
elif np.any(dvdt[temp_spk_idxs[i - 1]:temp] < 0):
# check if the dvdt has gone back down below zero between presumed spike times
# sometimes the dvdt bobbles around detection threshold and produces spurious guesses at spike times
spk_idxs.append(temp)
spk_idxs += start_idx # set back to the "index space" of the original trace
# recalculate full dv/dt for feature analysis (vs spike detection)
if np.abs(t[1] - t[0] - 5e-6) < 1e-7 and np.var(np.diff(t)) < 1e-6:
b, a = signal.bessel(4, 0.1, "low")
smooth_v = signal.filtfilt(b, a, v, axis=0)
dv = np.diff(smooth_v)
else:
dv = np.diff(v)
dvdt = dv / (np.diff(t) * 1e3) # in mV/ms
# First time through, accumulate upstrokes to calculate average threshold target
for spk_n, spk_idx in enumerate(spk_idxs):
# Etay defines spike as time of threshold crossing
spk = {}
if spk_n < len(spk_idxs) - 1:
next_idx = spk_idxs[spk_n + 1]
else:
next_idx = stop_idx
if spk_n > 0:
prev_idx = spk_idxs[spk_n - 1]
else:
prev_idx = start_idx
# Find the peak
peak_idx = np.argmax(v[spk_idx:next_idx]) + spk_idx
spk["peak_idx"] = peak_idx
spk["f_peak"] = v[peak_idx]
spk["f_peak_i"] = curr[peak_idx]
spk["f_peak_t"] = t[peak_idx]
# Check if end of stimulus interval cuts off spike - if so, don't process spike
if spk_n == len(spk_idxs) - 1 and peak_idx == next_idx-1:
continue
if spk_idx == peak_idx:
continue # this was bugfix, but why? ramp?
# Determine maximum upstroke of spike
upstroke_idx = np.argmax(dvdt[spk_idx:peak_idx]) + spk_idx
spk["upstroke"] = dvdt[upstroke_idx]
if np.isnan(spk["upstroke"]): # sometimes dvdt will be NaN because of multiple cvode points at same time step
close_idx = upstroke_idx + 1
while (np.isnan(dvdt[close_idx])):
close_idx += 1
spk["upstroke_idx"] = close_idx
spk["upstroke"] = dvdt[close_idx]
spk["upstroke_v"] = v[close_idx]
spk["upstroke_i"] = curr[close_idx]
spk["upstroke_t"] = t[close_idx]
else:
spk["upstroke_idx"] = upstroke_idx
spk["upstroke_v"] = v[upstroke_idx]
spk["upstroke_i"] = curr[upstroke_idx]
spk["upstroke_t"] = t[upstroke_idx]
# Preliminarily define threshold where dvdt = 5% * max upstroke
thresh_pct = 0.05
find_thresh_idxs = np.where(dvdt[prev_idx:upstroke_idx] <= thresh_pct * spk["upstroke"])[0]
if len(find_thresh_idxs) < 1: # Can't find a good threshold value - probably a bad simulation case
# Fall back to the upstroke value
threshold_idx = upstroke_idx
else:
threshold_idx = find_thresh_idxs[-1] + prev_idx
spk["threshold_idx"] = threshold_idx
spk["threshold"] = v[threshold_idx]
spk["threshold_v"] = v[threshold_idx]
spk["threshold_i"] = curr[threshold_idx]
spk["threshold_t"] = t[threshold_idx]
spk["rise_time"] = spk["f_peak_t"] - spk["threshold_t"]
PERIOD = t[1] - t[0]
width_volts = (v[peak_idx] + v[threshold_idx]) / 2
recording_width = False
for i in range(threshold_idx, min(len(v), threshold_idx + int(0.001 / PERIOD))):
if not recording_width and v[i] >= width_volts:
recording_width = True
idx0 = i
elif recording_width and v[i] < width_volts:
spk["half_height_width"] = t[i] - t[idx0]
break
# </KEITH>
# Check for things that are probably not spikes:
# if there is more than 2 ms between the detection event and the peak, don't count it
if t[peak_idx] - t[threshold_idx] > 0.002:
continue
# if the "spike" is less than 2 mV, don't count it
if v[peak_idx] - v[threshold_idx] < 2.0:
continue
# if the absolute value of the peak is less than -30 mV, don't count it
if v[peak_idx] < -30.0:
continue
spikes.append(spk)
# Refine threshold target based on average of all spikes
if len(spikes) > 0:
threshold_target = np.array([spk["upstroke"] for spk in spikes]).mean() * thresh_pct
for spk_n, spk in enumerate(spikes):
if spk_n < len(spikes) - 1:
next_idx = spikes[spk_n + 1]["threshold_idx"]
else:
next_idx = stop_idx
if spk_n > 0:
prev_idx = spikes[spk_n - 1]["peak_idx"]
else:
prev_idx = start_idx
# Restore variables from before
# peak_idx = spk['peak_idx']
peak_idx = np.argmax(v[spk['threshold_idx']:next_idx]) + spk['threshold_idx']
spk["peak_idx"] = peak_idx
spk["f_peak"] = v[peak_idx]
spk["f_peak_i"] = curr[peak_idx]
spk["f_peak_t"] = t[peak_idx]
# Determine maximum upstroke of spike
# upstroke_idx = spk['upstroke_idx']
upstroke_idx = np.argmax(dvdt[spk['threshold_idx']:peak_idx]) + spk['threshold_idx']
spk["upstroke"] = dvdt[upstroke_idx]
if np.isnan(spk["upstroke"]): # sometimes dvdt will be NaN because of multiple cvode points at same time step
close_idx = upstroke_idx + 1
while (np.isnan(dvdt[close_idx])):
close_idx += 1
spk["upstroke_idx"] = close_idx
spk["upstroke"] = dvdt[close_idx]
spk["upstroke_v"] = v[close_idx]
spk["upstroke_i"] = curr[close_idx]
spk["upstroke_t"] = t[close_idx]
else:
spk["upstroke_idx"] = upstroke_idx
spk["upstroke_v"] = v[upstroke_idx]
spk["upstroke_i"] = curr[upstroke_idx]
spk["upstroke_t"] = t[upstroke_idx]
# Find threshold based on average target
find_thresh_idxs = np.where(dvdt[prev_idx:upstroke_idx] <= threshold_target)[0]
if len(find_thresh_idxs) < 1: # Can't find a good threshold value - probably a bad simulation case
# Fall back to the upstroke value
threshold_idx = upstroke_idx
else:
threshold_idx = find_thresh_idxs[-1] + prev_idx
spk["threshold_idx"] = threshold_idx
spk["threshold"] = v[threshold_idx]
spk["threshold_v"] = v[threshold_idx]
spk["threshold_i"] = curr[threshold_idx]
spk["threshold_t"] = t[threshold_idx]
# Define the spike time as threshold time
spk["t_idx"] = threshold_idx
spk["t"] = t[threshold_idx]
# Save the -30 mV crossing time for backward compatibility with Etay code
overn30_idxs = np.where(v[threshold_idx:peak_idx] >= -30)[0]
if len(overn30_idxs) > 0:
spk["t_idx_n30"] = overn30_idxs[0] + threshold_idx
else: # fall back to threshold definition if spike doesn't cross -30 mV
spk["t_idx_n30"] = threshold_idx
spk["t_n30"] = t[spk["t_idx_n30"]]
# Figure out initial "slope" of phase plot post-threshold
plus_5_vec = np.where(v[threshold_idx:upstroke_idx] >= spk["threshold"] + 5)[0]
if len(plus_5_vec) > 0:
thresh_plus_5_idx = plus_5_vec[0] + threshold_idx
spk["thresh_ramp"] = dvdt[thresh_plus_5_idx] - dvdt[threshold_idx]
else:
spk["thresh_ramp"] = dvdt[upstroke_idx] - dvdt[threshold_idx]
# go forward to determine peak downstroke of spike
downstroke_idx = np.argmin(dvdt[peak_idx:next_idx]) + peak_idx
spk["downstroke_idx"] = downstroke_idx
spk["downstroke_v"] = v[downstroke_idx]
spk["downstroke_i"] = curr[downstroke_idx]
spk["downstroke_t"] = t[downstroke_idx]
spk["downstroke"] = dvdt[downstroke_idx]
if np.isnan(spk["downstroke"]): # sometimes dvdt will be NaN because of multiple cvode points at same time step
close_idx = downstroke_idx + 1
while (np.isnan(dvdt[close_idx])):
close_idx += 1
spk["downstroke"] = dvdt[close_idx]
features = {}
feature.mean["base_v"] = v[np.where((t > onset - 0.1) & (t < onset - 0.001))].mean() # baseline voltage, 100ms before stim
feature.mean["spikes"] = spikes
isi_cv = self.isicv(spikes)
if isi_cv is not None:
feature.mean["ISICV"] = isi_cv
n_spikes = len(spikes)
feature.mean["n_spikes"] = n_spikes
feature.mean["rate"] = 1.0 * n_spikes / (stop - start);
feature.mean["adapt"] = self.adaptation_index(spikes, stop)
if len(spikes) > 1:
feature.mean["doublet"] = 1000 * (spikes[1]["t"] - spikes[0]["t"])
if len(spikes) > 0:
for i, spk in enumerate(spikes):
idx_next = spikes[i + 1]["t_idx"] if i < len(spikes) - 1 else stop_idx
self.calculate_trough(spk, v, curr, t, idx_next)
half_max_v = (spk["f_peak"] - spk["f_trough"]) / 2.0 + spk["f_trough"]
over_half_max_v_idxs = np.where(v[spk["t_idx"]:spk["trough_idx"]] > half_max_v)[0]
if len(over_half_max_v_idxs) > 0:
spk["width"] = 1000. * (t[over_half_max_v_idxs[-1] + spk["t_idx"]] - t[over_half_max_v_idxs[0] + spk["t_idx"]])
feature.mean["latency"] = 1000. * (spikes[0]["t"] - onset)
feature.mean["latency_n30"] = 1000. * (spikes[0]["t_n30"] - onset)
# extract properties for each spike
isicnt = 0
isitot = 0
for i in range(0, len(spikes)-1):
spk = spikes[i]
idx_next = spikes[i+1]["t_idx"]
isitot += spikes[i+1]["t"] - spikes[i]["t"]
isicnt += 1
if isicnt > 0:
feature.mean["isi_avg"] = 1000 * isitot / isicnt
else:
feature.mean["isi_avg"] = None
# average feature data from individual spikes
# build superset dictionary of possible features
superset = {}
for i in range(len(spikes)):
for k in spikes[i].keys():
if k not in superset:
superset[k] = k
for k in superset.keys():
cnt = 0
mean = 0
for i in range(len(spikes)):
if k not in spikes[i]:
continue
mean += float(spikes[i][k])
cnt += 1.0
# this shouldn't be possible, but it may be in future version
# so might as well trap for it
if cnt == 0:
continue
mean /= cnt
stdev = 0
for i in range(len(spikes)):
if k not in spikes[i]:
continue
dif = mean - float(spikes[i][k])
stdev += dif * dif
stdev = math.sqrt(stdev / cnt)
feature.mean[k] = mean
feature.stdev[k] = stdev
#
self.feature_list.append(feature)
self.feature_source.append(name)
[docs] def isicv(self, spikes):
if len(spikes) < 3:
return None
isi_mean = 0
lst = []
for i in range(len(spikes) - 1):
isi = spikes[i+1]["t"] - spikes[i]["t"]
#print("\t%g" % isi)
isi_mean += isi
lst.append(isi)
isi_mean /= 1.0 * len(lst)
#print(isi_mean)
var = 0
for i in range(len(lst)):
dif = isi_mean - lst[i]
var += dif * dif
var /= len(lst)
#var /= len(lst) - 1
#print(math.sqrt(var))
if isi_mean > 0:
return math.sqrt(var) / isi_mean
return None
[docs] def adaptation_index(self, spikes, stim_end):
if len(spikes) < 4:
return None
adi = 0
cnt = 0
isi = []
for i in range(len(spikes)-1):
isi.append(spikes[i+1]["t"] - spikes[i]["t"])
# act as though time between last spike and stim end is another ISI per Etay's code
# l = stim_end - spikes[-1]["t"]
# if l > 0 and l > isi[-1]:
# isi.append(l)
for i in range(len(isi)-1):
adi += 1.0 * (isi[i+1] - isi[i]) / (isi[i+1] + isi[i])
cnt += 1
adi /= cnt
return adi
##----------------------------------------------------------------------
# trough (AHP) is presently defined as the minimum voltage level
# observed between successive spikes in a burst
# there's too much data to cleanly return it on the stack
# instead, spike table is passed in instead
[docs] def calculate_trough(self, spike, v, curr, t, next_idx):
# dt = t[1] - t[0]
peak_idx = spike["peak_idx"]
if peak_idx >= next_idx:
logging.warning("next index (%d) before peak index (%d) calculating trough" % ( next_idx, peak_idx ))
trough_idx = next_idx
else:
trough_idx = np.argmin(v[peak_idx:next_idx]) + peak_idx
spike["trough_idx"] = trough_idx
spike["f_trough"] = v[trough_idx]
spike["trough_v"] = v[trough_idx]
spike["trough_t"] = t[trough_idx]
spike["trough_i"] = curr[trough_idx]
# calculate etay's 'fast' and 'slow' ahp here
if t[peak_idx] + 0.005 >= t[-1]:
five_ms_idx = len(t) - 1
else:
five_ms_idx = np.where(t >= 0.005 + t[peak_idx])[0][0] # 5ms after peak
# fast AHP is minimum value occurring w/in 5ms
if five_ms_idx >= next_idx:
five_ms_idx = next_idx
if peak_idx == five_ms_idx:
fast_idx = next_idx
else:
fast_idx = np.argmin(v[peak_idx:five_ms_idx]) + peak_idx
spike["f_fast_ahp"] = v[fast_idx]
spike["f_fast_ahp_v"] = v[fast_idx]
spike["f_fast_ahp_i"] = curr[fast_idx]
spike["f_fast_ahp_t"] = t[fast_idx]
if five_ms_idx == next_idx:
slow_idx = fast_idx
else:
slow_idx = np.argmin(v[five_ms_idx:next_idx]) + five_ms_idx
spike["f_slow_ahp"] = v[slow_idx]
spike["f_slow_ahp_time"] = (t[slow_idx] - t[peak_idx]) / (t[next_idx] - t[peak_idx])
spike["f_slow_ahp_t"] = t[slow_idx]
# initialize summary feature set from file
# calculate nearness score for feature set X relative to summary
# the nearness score is the sum of squares the features are from
# their target values, in units of standard deviations
# when a feature is absent, the algorithm to determine the
# penalty is stored in the feature itself, and this value
# is calculated then added to the sum
[docs] def score_feature_set(self, set_num):
cand = self.feature_list[set_num]
scores = []
for k in sorted(self.summary.mean.keys()):
if k in self.summary.glossary:
response = self.summary.scoring[k]["hit"]
if response == "ignore":
continue
elif response == "stdev":
mean = self.summary.mean[k]
stdev = self.summary.stdev[k]
assert stdev > 0
if k in cand.mean and cand.mean[k] is not None:
val = cand.mean[k]
inc = abs(mean - val) / stdev
scores.append(inc)
# print("Hit %s, %g+/-%g (%g) = %g" % (k, mean, stdev, val, inc))
else:
resp = cand.scoring[k]["miss"]
if resp == "const":
miss = float(cand.scoring[k][resp])
elif resp == "mean_mult":
miss = mean * float(cand.scoring[k][resp])
else:
assert False
miss = float(miss)
scores.append(miss)
# print("Missed %s, penalty = %g" % (k, miss))
elif response == "perspike":
mean = self.summary.mean[k]
stdev = self.summary.stdev[k]
assert stdev > 0
if k in cand.mean and cand.mean[k] is not None:
val = 0
n_spikes = len(cand.mean["spikes"])
skip_last_n = self.summary.scoring[k]["skip_last_n"]
for spike in cand.mean["spikes"][:n_spikes-skip_last_n]:
val += abs(spike[k] - mean)
val /= n_spikes - skip_last_n
inc = val / stdev
scores.append(inc)
else:
resp = cand.scoring[k]["miss"]
if resp == "const":
miss = float(cand.scoring[k][resp])
elif resp == "mean_mult":
miss = mean * float(cand.scoring[k][resp])
else:
assert False
miss = float(miss)
scores.append(miss)
# print("Missed %s, penalty = %g" % (k, miss))
else:
assert False
if abs(sum(scores)) > 1e10:
print(k)
print(self.summary.scoring)
print(self.summary.mean)
print(self.summary.stdev)
print(cand.summary.scoring)
print(cand.summary.mean)
print(cand.summary.stdev)
assert False
return scores
# create summary of feature instances
# 'summary' is an empty feature object. this must be the same
# class as the other feature objects that are being summarized
[docs] def summarize(self, summary):
if len(self.feature_list) == 0:
print("Error -- no features were extracted. Summary impossible")
sys.exit()
# make dummy dict to verify that all feature instances have
# identical features
# only copy features that are in the glossary. some are for
# internal use (eg, t_idx -- time index) and aren't important
# here
superset = {}
for i in range(len(self.feature_list)):
fx = self.feature_list[i]
for k in fx.mean.keys():
if k in fx.glossary:
superset[k] = k
err = 0
for k in superset.keys():
for i in range(len(self.feature_list)):
fx = self.feature_list[i].mean
if k not in fx:
print("Error - feature '%s' not in all data sets" % k)
err += 1
if err > 0:
return None
# all features must be of the same type
# to ensure this, make programmer specify the type being summarized
self.summary = summary
# now set summary means to zero
for k in fx.keys():
self.summary.mean[k] = 0
# now calculate the average of all features
for i in range(len(self.feature_list)):
fx = self.feature_list[i]
for k in fx.mean.keys():
if k in fx.glossary and fx.mean[k] is not None:
self.summary.mean[k] += fx.mean[k]
self.summary.stdev[k] = 0.0
# divide out n to get actual mean
for k in fx.mean.keys():
self.summary.mean[k] /= 1.0 * len(self.feature_list)
# calculate standard deviation
for i in range(len(self.feature_list)):
fx = self.feature_list[i]
for k in fx.mean.keys():
if k in fx.glossary and fx.mean[k] is not None:
mean = self.summary.mean[k]
dif = mean - fx.mean[k]
self.summary.stdev[k] += dif * dif
# divide out n and take sqrt to get actual stdev
fx = self.feature_list[0]
for k in fx.mean.keys():
if k in fx.glossary and fx.mean[k] is not None:
val = self.summary.stdev[k]
val /= 1.0 * len(self.feature_list)
self.summary.stdev[k] = math.sqrt(val)
return self