# Copyright 2022. Allen Institute. All rights reserved
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
# following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import re
from collections import namedtuple
import warnings
import numpy as np
import pandas as pd
from neuron import h
from bmtk.simulator.bionet.io_tools import io
pc = h.ParallelContext() # object to access MPI methods
class _LazySegmentProps(object):
"""A Struct for storing coordinate invariant properties of each segment of a cell."""
def __init__(self, hobj):
self._hobj = hobj
self.sec_id = None
self.type = None
self.area = None
self.x = None
self.x0 = None
self.x1 = None
self.dist = None
self.length = None
self.dist0 = None
self.dist1 = None
def get(self):
if self.sec_id is None:
seg_type = []
seg_area = []
seg_x = []
seg_x0 = []
seg_x1 = []
seg_dist = []
seg_length = []
seg_sec_id = []
h.distance(sec=self._hobj.soma[0]) # measure distance relative to the soma
for sec_id, sec in enumerate(self._hobj.all):
fullsecname = sec.name()
sec_type = fullsecname.split(".")[1][:4] # get sec name type without the cell name
sec_type_swc = Morphology.sec_type_swc[sec_type] # convert to swc code
x_range = 1.0 / (sec.nseg * 2) # used to calculate [x0, x1]
for seg in sec:
seg_area.append(h.area(seg.x))
seg_x.append(seg.x)
seg_x0.append(seg.x - x_range)
seg_x1.append(seg.x + x_range)
seg_length.append(sec.L / sec.nseg)
seg_type.append(sec_type_swc) # record section type in a list
# seg_dist.append(h.distance(seg.x)) # distance to the center of the segment
seg_dist.append(h.distance(seg))
seg_sec_id.append(sec_id)
length_arr = np.array(seg_length)
dist_arr = np.array(seg_dist)
dist0_arr = dist_arr - length_arr / 2.0
dist1_arr = dist_arr + length_arr / 2.0
self.type = np.array(seg_type)
self.area = np.array(seg_area)
self.x = np.array(seg_x)
self.x0 = np.array(seg_x0)
self.x1 = np.array(seg_x1)
self.dist = dist_arr
self.length = length_arr
self.dist0 = dist0_arr
self.dist1 = dist1_arr
self.sec_id = np.array(seg_sec_id)
return self
class _LazySegmentCoords(object):
"""A Struct for storing properties of each segment of a cell that vary by soma position and rotation."""
def __init__(self, hobj):
self._hobj = hobj
self.p0 = None
self.p1 = None
self.p05 = None
self.soma_pos = None
self.d = None
def get(self):
if self.p0 is None:
# Iterates through each segment (one section may contain one or more segments). We use NEURON's
# segment.x to get the mid-point and lenght of each segment we we can use to find and store the beginning
# (p0) and end (p1) of each segment. We also shift so the middle of the soma is at the origin.
nseg = np.sum([s.nseg for s in self._hobj.all])
n3dsoma = 0
soma_pos = np.zeros(3)
for sec in self._hobj.soma:
n3d = int(h.n3d(sec=sec)) # get number of n3d points in each section
n3dsoma += n3d
for i in range(n3d):
soma_pos[0] += h.x3d(i, sec=sec)
soma_pos[1] += h.y3d(i, sec=sec)
soma_pos[2] += h.z3d(i, sec=sec)
soma_pos /= n3dsoma
ix = 0 # segment index
self.p0 = np.zeros((3, nseg)) # hold the coordinates of segment starting points
self.p1 = np.zeros((3, nseg)) # hold the coordinates of segment end points
self.p05 = np.zeros((3, nseg))
for sec in self._hobj.all:
n3d = int(h.n3d(sec=sec)) # get number of n3d points in each section
p3d = np.zeros((3, n3d)) # to hold locations of 3D morphology for the current section
l3d = np.zeros(n3d) # to hold locations of 3D morphology for the current section
# diam3d = np.zeros(n3d) # to diameters, at the moment we don't need to keep track of diameter
for i in range(n3d):
p3d[0, i] = h.x3d(i, sec=sec) - soma_pos[0] # shift coordinates such to place soma at the origin.
p3d[1, i] = h.y3d(i, sec=sec) - soma_pos[1]
p3d[2, i] = h.z3d(i, sec=sec) - soma_pos[2]
# diam3d[i] = h.diam3d(i, sec=sec)
l3d[i] = h.arc3d(i, sec=sec)
l3d /= sec.L # normalize
nseg = sec.nseg
l0 = np.zeros(nseg) # keep range of segment starting point
l1 = np.zeros(nseg) # keep range of segment ending point
l05 = np.zeros(nseg)
for iseg, seg in enumerate(sec):
l0[iseg] = seg.x - 0.5 * 1 / nseg # x (normalized distance along the section) for the beginning of the segment
l1[iseg] = seg.x + 0.5 * 1 / nseg # x for the end of the segment
l05[iseg] = seg.x
self.p0[0, ix:ix + nseg] = np.interp(l0, l3d, p3d[0, :])
self.p0[1, ix:ix + nseg] = np.interp(l0, l3d, p3d[1, :])
self.p0[2, ix:ix + nseg] = np.interp(l0, l3d, p3d[2, :])
self.p1[0, ix:ix + nseg] = np.interp(l1, l3d, p3d[0, :])
self.p1[1, ix:ix + nseg] = np.interp(l1, l3d, p3d[1, :])
self.p1[2, ix:ix + nseg] = np.interp(l1, l3d, p3d[2, :])
self.p05[0, ix:ix + nseg] = np.interp(l05, l3d, p3d[0, :])
self.p05[1, ix:ix + nseg] = np.interp(l05, l3d, p3d[1, :])
self.p05[2, ix:ix + nseg] = np.interp(l05, l3d, p3d[2, :])
# x0[ix:ix + nseg] = l0
# x1[ix:ix + nseg] = l1
ix += nseg
# TODO: keep track of diameter only when minimum_distance='auto' in EcpMod
self.d = np.array([seg.diam for sec in self._hobj.all for seg in sec])
# Also calculate the middle of the shifted soma, NOTE: in theory this should be (0, 0, 0), but due to
# percision the actual soma center might be a little off.
n3dsoma = 0
r3dsoma = np.zeros(3)
for sec in self._hobj.soma:
n3d = int(h.n3d(sec=sec)) # get number of n3d points in each section
n3dsoma += n3d
for i in range(n3d):
r3dsoma[0] += h.x3d(i, sec=sec) - soma_pos[0]
r3dsoma[1] += h.y3d(i, sec=sec) - soma_pos[1]
r3dsoma[2] += h.z3d(i, sec=sec) - soma_pos[2]
r3dsoma /= n3dsoma
self.soma_pos = r3dsoma
return self
# For a lot of different cells created from the same morphology and model_processing function, the core segment
# properties will be the same (even if coordinates, synapses, hobjs, etc. are different). In such cases we want
# to calculate and store seg_props only once for all equivelent cells. We can do this by caching the seg_props
# helper function pointer, then all equivelent morphologies (as defined by __hash__ function) use the same
# SegmentProps object
morphology_cache = {}
[docs]def rotation_matrix(axis, theta):
"""Return the rotation matrix associated with counterclockwise rotation about the given axis by theta radians.
"""
axis = np.asarray(axis)
theta = np.asarray(theta)
axis = axis/np.sqrt(np.dot(axis, axis))
a = np.cos(theta/2.0)
b, c, d = -axis*np.sin(theta/2.0)
aa, bb, cc, dd = a*a, b*b, c*c, d*d
bc, ad, ac, ab, bd, cd = b*c, a*d, a*c, a*b, b*d, c*d
return np.array([
[aa+bb-cc-dd, 2*(bc+ad), 2*(bd-ac)],
[2*(bc-ad), aa+cc-bb-dd, 2*(cd+ab)],
[2*(bd+ac), 2*(cd-ab), aa+dd-bb-cc]
])
[docs]class Morphology(object):
sec_type_swc = {
'soma': 1, 'somatic': 1,
'axon': 2, 'axonal': 2,
'dend': 3, 'basal': 3,
'apic': 4, 'apical': 4
}
def __init__(self, hobj, rng_seed=None, swc_path=None):
self.hobj = hobj # h.Biophys1(swc_path)
self.rng_seed = rng_seed
self.swc_path = swc_path
self._prng = np.random.RandomState(self.rng_seed)
self._sections = None
self._segments = None
self._seg_props = _LazySegmentProps(self.hobj) # None
self._seg_coords = _LazySegmentCoords(self.hobj) # None
self._nseg = None
self._swc_map = None
# Used by find_sections() and other methods when building edges/synapses. Should make it faster to look-up
# cooresponding segments for a large number of syns that target the same area of a cell
self._trg_segs_cache = {}
def _copy(self):
new_morph = Morphology(hobj=self.hobj, swc_path=self.swc_path)
return new_morph
def _soma_position_orig(self):
n3dsoma = 0
r3dsoma = np.zeros(3)
for sec in self.hobj.soma:
n3d = int(h.n3d(sec=sec)) # get number of n3d points in each section
# r3d = np.zeros((3, n3d)) # to hold locations of 3D morphology for the current section
n3dsoma += n3d
for i in range(n3d):
r3dsoma[0] += h.x3d(i, sec=sec)
r3dsoma[1] += h.y3d(i, sec=sec)
r3dsoma[2] += h.z3d(i, sec=sec)
r3dsoma /= n3dsoma
return r3dsoma
@property
def segments(self):
if self._segments is None:
self._segments = []
for sec in self.hobj.all:
for seg in sec:
self._segments.append(seg)
return self._segments
@property
def sections(self):
if self._sections is None:
self._sections = []
for sec in self.hobj.all:
self._sections.append(sec)
return self._sections
@property
def n_sections(self):
return len(self.sections)
@property
def seg_props(self):
return self._seg_props.get()
@property
def seg_coords(self):
"""Get the coordinates of all segments of the cells. Each segment is defined by three values, the beginning
of the segment (seg_coords.p0), the middle of the segment(seg_coords.p05), and the end (seg_coords.p1)."""
return self._seg_coords.get()
@property
def nseg(self):
if self._nseg is None:
nseg = 0
for sec in self.hobj.all:
nseg += sec.nseg # get the total # of segments in the cell
self._nseg = nseg
return self._nseg
@property
def soma_position(self):
return self.seg_coords.soma_pos
@property
def swc_map(self):
if self._swc_map is None:
swc_df = pd.read_csv(self.swc_path, sep=' ', names=['id', 'type', 'x', 'y', 'z', 'r', 'pid'], comment='#')
swc_df = swc_df[['id', 'type']]
name_indices = []
for ln in range(len(swc_df)):
# in read_swc.hoc all non-bifurcating section of rows are assigned a global section-id. But for each
# type (soma, dend, etc), there is also a nameindex id which is unique within the type, used to assign
# sections to their name. So lines with sec_id=10 may have nameindex=3 (Biophys1[0].dend[3])
sec_id = int(self.hobj.nl.point2sec[ln])
# swc_id = int(self.hobj.nl.pt2id(ln))
# swc_type = int(self.hobj.nl.type[ln])
name_index = int(self.hobj.nl.sections.object(sec_id).nameindex)
name_indices.append(name_index)
swc_df['nameindex'] = name_indices
self._swc_map = swc_df
return self._swc_map
[docs] def get_section(self, section_idx):
return self.sections[section_idx]
[docs] def set_segment_dl(self, dl):
"""Define number of segments in a cell"""
self._nseg = 0
for sec in self.hobj.all:
sec.nseg = 1 + 2 * int(sec.L/(2*dl))
self._nseg += sec.nseg # get the total number of segments in the cell
[docs] def choose_sections(self, section_names, distance_range, n_sections=1, cache=True):
"""Similar to find_sections, but will only N=n_section number of sections_ids/x values randomly selected (may
return less if there aren't as many sections
:param section_names: 'soma', 'dend', 'apic', 'axon'
:param distance_range: [float, float]: distance range of sections from the soma, in um.
:param n_sections: int: maximum number of sections to select
:param cache: caches the segments + probs so that next time the same section_names/distance_range values are
called it will return the same values without recalculating. default True.
:return: [int], [float]: A list of all section_ids and a list of all segment_x values (as defined by NEURON)
that meet the given critera.
"""
secs, probs = self.find_sections(section_names, distance_range, cache=cache)
secs_ix = self._prng.choice(secs, n_sections, p=probs)
return secs_ix, self.seg_props.x[secs_ix]
[docs] def find_sections(self, section_names, distance_range, cache=True):
"""Retrieves a list of sections ids and section x's given a section name/type (eg axon, soma, apic, dend) and
the distance from the soma.
:param section_names: A list of sections to target, 'soma', 'dend', 'apic', 'axon'
:param distance_range: [float, float]: distance range of sections from the soma, in um.
:param cache: caches the segments + probs so that next time the same section_names/distance_range values are
called it will return the same values without recalculating. default True.
:return: [float], [float]: A list of all section_ids and a list of all segment_x values (as defined by NEURON)
that meet the given critera.
"""
cache_key = (tuple(section_names), tuple(distance_range))
if cache and cache_key in self._trg_segs_cache:
return self._trg_segs_cache[cache_key]
dmin, dmax = distance_range[0], distance_range[1]
seg_d0 = self.seg_props.dist0
seg_d1 = self.seg_props.dist1
seg_length = self.seg_props.length
seg_type = self.seg_props.type
# Find the fractional overlap between the segment and the distance range:
# this is done by finding the overlap between [d0,d1] and [dmin,dmax]
# np.minimum(seg_d1,dmax) find the smaller of the two end locations
# np.maximum(seg_d0,dmin) find the larger of the two start locations
# np.maximum(0,overlap) is used to return zero when segments do not overlap
# and then dividing by the segment length
frac_overlap = np.maximum(0, (np.minimum(seg_d1, dmax) - np.maximum(seg_d0, dmin))) / seg_length
ix_drange = np.where(frac_overlap > 0) # find indexes with non-zero overlap
ix_labels = np.array([], dtype=int)
for tar_sec_label in section_names: # find indexes within sec_labels
sec_type = self.sec_type_swc[tar_sec_label] # get swc code for the section label
ix_label = np.where(seg_type == sec_type)
ix_labels = np.append(ix_labels, ix_label) # target segment indexes
tar_seg_ix = np.intersect1d(ix_drange, ix_labels) # find intersection between indexes for range and labels
tar_seg_length = seg_length[tar_seg_ix] * frac_overlap[tar_seg_ix] # weighted length of targeted segments
tar_seg_prob = tar_seg_length / np.sum(tar_seg_length) # probability of targeting segments
if cache:
self._trg_segs_cache[cache_key] = (tar_seg_ix, tar_seg_prob)
return tar_seg_ix, tar_seg_prob
[docs] def move_and_rotate(self, soma_coords=None, rotation_angles=None, inplace=False):
old_seg_coords = self.seg_coords
new_p0 = old_seg_coords.p0.copy()
new_p1 = old_seg_coords.p1.copy()
new_p05 = old_seg_coords.p05.copy()
new_soma_pos = self.soma_position.copy()
if rotation_angles is not None:
assert(len(rotation_angles) == 3)
# try to calc the euler rotation matrix around an arbitary point was causing problems, instead move coords
# so the the soma center (p05[0]) is at the origin
soma_pos_mat = new_soma_pos.reshape((3, 1))
new_p0 = new_p0 - soma_pos_mat
new_p1 = new_p1 - soma_pos_mat
new_p05 = new_p05 - soma_pos_mat
rotx_mat = rotation_matrix([1, 0, 0], rotation_angles[0])
roty_mat = rotation_matrix([0, 1, 0], rotation_angles[1])
rotz_mat = rotation_matrix([0, 0, 1], rotation_angles[2])
rotxyz_mat = np.dot(rotx_mat, roty_mat.dot(rotz_mat))
new_p0 = np.dot(rotxyz_mat, new_p0) + soma_pos_mat
new_p1 = np.dot(rotxyz_mat, new_p1) + soma_pos_mat
new_p05 = np.dot(rotxyz_mat, new_p05) + soma_pos_mat
if soma_coords is not None:
assert(len(soma_coords) == 3)
soma_coords = soma_coords if isinstance(soma_coords, np.ndarray) else np.array(soma_coords)
displacement = soma_coords - self.soma_position
displacement = displacement.reshape((3, 1)) # Req to allow adding vector to matrix
new_p0 += displacement
new_p1 += displacement
new_p05 += displacement
new_soma_pos = soma_coords
# new_seg_coords = SegmentCoords(p0=new_p0, p1=new_p1, p05=new_p05, soma_pos=new_soma_pos)
new_seg_coords = _LazySegmentCoords(self.hobj)
new_seg_coords.p0 = new_p0
new_seg_coords.p05 = new_p05
new_seg_coords.p1 = new_p1
new_seg_coords.soma_pos = new_soma_pos
new_seg_coords.d = old_seg_coords.d # diameter won't change in new position
if inplace:
self._seg_coords = new_seg_coords
return self
else:
new_morph = self._copy()
new_morph._seg_props = self.seg_props
new_morph._seg_coords = new_seg_coords
new_morph._soma_pos = new_soma_pos
return new_morph
[docs] def get_coords(self, sec_id, sec_x):
segs_indices = np.argwhere(self.seg_props.sec_id == sec_id).flatten()
return self.seg_coords.p05[:, segs_indices][:, 0]
[docs] def get_swc_id(self, sec_id, sec_x):
# use sec type and nameindex to find all rows in the swc that correspond to sec_id
sec = self.sections[sec_id]
sec_nameindex = self._get_sec_nameindex(sec)
sec_type = self._get_sec_type(sec)
filtered_swc = self.swc_map[(self.swc_map['type'] == sec_type) & (self.swc_map['nameindex'] == sec_nameindex)]
swc_ids = filtered_swc['id'].values
# use sec_x, a value between [0, 1], to estimate which swc_id/line to choose.
# NOTE: At the moment it assumes each line in the swc is the same distance apart, making estimating the sec_x
# location easy by the number it appears in the squence
if len(swc_ids) == 0:
return -1, 0.0
elif len(swc_ids) == 1:
return swc_ids[0], sec_x
else:
swc_place = np.max((0.0, sec_x*len(swc_ids) - 1.0))
swc_indx = int(np.ceil(swc_place))
swc_id = swc_ids[swc_indx]
swc_dist = swc_indx - swc_place
return swc_id, swc_dist
def _get_sec_type(self, sec):
sec_name = sec.hname()
sec_type_str = sec_name.split('.')[-1].split('[')[0]
type_str = self.sec_type_swc[sec_type_str]
return int(type_str)
def _get_sec_nameindex(self, sec):
sec_name = sec.hname()
sec_type_name = sec_name.split('.')[-1]
nameindex_str = re.search(r'\[(\d+)\]', sec_type_name).group(1)
return int(nameindex_str)
def __eq__(self, other):
if self.swc_path is None or other.swc_path is None:
return False
else:
return hash(self) == hash(other)
def __hash__(self):
"""Used for comparing two Morphology objects and for storing as dict keys in seg_props_cache.
We assume two Morphologies are the same if they have the same morphology file path + same # of segments. This
is not a good way of determining if two Morphologies are equal and in actuality we would need to compare
seg_props on an segment-by-segment basis. However that would be comp. expensive for large networks and instead
do a quick spot test."""
comp_vals = (self.swc_path if self.swc_path else np.random.randint, self.nseg)
return hash(comp_vals)
[docs] @classmethod
def load(cls, hobj=None, morphology_file=None, rng_seed=None, cache_seg_props=True):
"""Factory method, perfered way to create a Morphology object
:param hobj:
:param morphology_file:
:param rng_seed:
:param cache_seg_props:
:return:
"""
if hobj is None and morphology_file is None:
io.log_exception('Unable to create Morphology, no valid HOC object or morphology file specified')
elif morphology_file is not None and hobj is None:
# If the HOC object hasn't already been created try creating it from the morphology
hobj = h.Biophys1(morphology_file)
morph = cls(hobj=hobj, swc_path=morphology_file, rng_seed=rng_seed)
if cache_seg_props and morphology_file is not None:
if morph in morphology_cache:
morph._seg_props = morphology_cache[morph][0]
morph._seg_coords = morphology_cache[morph][1]
else:
morphology_cache[morph] = (morph._seg_props, morph._seg_coords)
return morph