# Copyright 2017. Allen Institute. All rights reserved
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
# following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import os
import json
import ast
import numpy as np
# from bmtk.simulator.core.config import ConfigDict
from bmtk.simulator.core.simulation_config import SimulationConfig as ConfigDict
### from bmtk.simulator.utils.property_maps import NodePropertyMap, EdgePropertyMap
from bmtk.utils import sonata
from bmtk.simulator.core.io_tools import io
from bmtk.simulator.core.node_sets import NodeSet, NodeSetAll
"""
Creates a graph of nodes and edges from multiple network files for all simulators.
Consists of edges and nodes. All classes are abstract and should be reimplemented by a specific simulator. Also
contains base factor methods for building a network from a config file (or other).
"""
[docs]class SimGraph(object):
model_type_col = 'model_type'
def __init__(self):
self._components = {} # components table, i.e. paths to model files.
self._io = io
self._node_populations = {}
self._internal_populations_map = {}
self._virtual_populations_map = {}
self._virtual_cells_nid = {}
self._recurrent_edges = {}
self._external_edges = {}
self._node_sets = {}
self._using_gids = False
@property
def io(self):
return self._io
@property
def node_populations(self):
return list(self._node_populations.keys())
[docs] def get_node_set(self, node_set):
if node_set in self._node_sets.keys():
return self._node_sets[node_set]
elif isinstance(node_set, (dict, list)):
return NodeSet(node_set, self)
else:
self.io.log_exception('Unable to load or find node_set "{}"'.format(node_set))
[docs] def get_node_populations(self):
return self._node_populations.values()
[docs] def get_node_population(self, population_name):
return self._node_populations[population_name]
[docs] def get_component(self, key):
"""Get the value of item in the components dictionary.
:param key: name of component
:return: value assigned to component
"""
return self._components[key]
[docs] def add_component(self, key, value):
"""Add a component key-value pair
:param key: name of component
:param value: value
"""
self._components[key] = value
def _validate_components(self):
"""Make sure various components (i.e. paths) exists before attempting to build the graph."""
return True
def __avail_model_types(self, population):
model_types = set()
for grp in population.groups:
if self.model_type_col not in grp.all_columns:
self.io.log_exception('model_type is missing from nodes.')
model_types.update(set(np.unique(grp.get_values(self.model_type_col))))
return model_types
def _preprocess_node_types(self, node_population):
# TODO: The following figures out the actually used node-type-ids. For mem and speed may be better to just
# process them all
node_type_ids = node_population.type_ids
# TODO: Verify all the node_type_ids are in the table
node_types_table = node_population.types_table
# TODO: Convert model_type to a enum
morph_dir = self.get_component('morphologies_dir')
if morph_dir is not None and 'morphology' in node_types_table.columns:
for nt_id in node_type_ids:
node_type = node_types_table[nt_id]
if node_type['morphology'] is None:
continue
# TODO: Check the file exits
# TODO: See if absolute path is stored in csv
node_type['morphology'] = os.path.join(morph_dir, node_type['morphology'])
if 'dynamics_params' in node_types_table.columns and 'model_type' in node_types_table.columns:
for nt_id in node_type_ids:
node_type = node_types_table[nt_id]
dynamics_params = node_type['dynamics_params']
if isinstance(dynamics_params, dict):
continue
model_type = node_type['model_type']
if model_type == 'biophysical':
params_dir = self.get_component('biophysical_neuron_models_dir')
elif model_type == 'point_process':
params_dir = self.get_component('point_neuron_models_dir')
elif model_type == 'point_soma':
params_dir = self.get_component('point_neuron_models_dir')
else:
# Not sure what to do in this case, throw Exception?
params_dir = self.get_component('custom_neuron_models')
params_path = os.path.join(params_dir, dynamics_params)
# see if we can load the dynamics_params as a dictionary. Otherwise just save the file path and let the
# cell_model loader function handle the extension.
try:
params_val = json.load(open(params_path, 'r'))
node_type['dynamics_params'] = params_val
except Exception:
# TODO: Check dynamics_params before
self.io.log_exception('Could not find node dynamics_params file {}.'.format(params_path))
def _preprocess_edge_types(self, edge_pop):
edge_types_table = edge_pop.types_table
edge_type_ids = np.unique(edge_pop.type_ids)
for et_id in edge_type_ids:
edge_type = edge_types_table[et_id]
if 'dynamics_params' in edge_types_table.columns:
dynamics_params = edge_type['dynamics_params']
params_dir = self.get_component('synaptic_models_dir')
params_path = os.path.join(params_dir, dynamics_params)
# see if we can load the dynamics_params as a dictionary. Otherwise just save the file path and let the
# cell_model loader function handle the extension.
try:
params_val = json.load(open(params_path, 'r'))
edge_type['dynamics_params'] = params_val
except Exception:
# TODO: Check dynamics_params before
self.io.log_exception('Could not find edge dynamics_params file {}.'.format(params_path))
# Split target_sections
if 'target_sections' in edge_type:
trg_sec = edge_type['target_sections']
if trg_sec is not None:
try:
edge_type['target_sections'] = ast.literal_eval(trg_sec)
except Exception as exc:
self.io.log_warning('Unable to split target_sections list {}'.format(trg_sec))
edge_type['target_sections'] = None
# Split target distances
if 'distance_range' in edge_type:
dist_range = edge_type['distance_range']
if dist_range is not None:
try:
# TODO: Make the distance range has at most two values
edge_type['distance_range'] = json.loads(dist_range)
except Exception as e:
try:
edge_type['distance_range'] = [0.0, float(dist_range)]
except Exception as e:
self.io.log_warning('Unable to parse distance_range {}'.format(dist_range))
edge_type['distance_range'] = None
[docs] def external_edge_populations(self, src_pop, trg_pop):
return self._external_edges.get((src_pop, trg_pop), [])
[docs] def add_nodes(self, sonata_file, populations=None):
"""Add nodes from a network to the graph.
:param sonata_file: A NodesFormat type object containing list of nodes.
:param populations: name/identifier of network. If none will attempt to retrieve from nodes object
"""
nodes = sonata_file.nodes
selected_populations = nodes.population_names if populations is None else populations
for pop_name in selected_populations:
if pop_name not in nodes:
# when user wants to simulation only a few populations in the file
continue
if pop_name in self.node_populations:
# Make sure their aren't any collisions
self.io.log_exception('There are multiple node populations with name {}.'.format(pop_name))
node_pop = nodes[pop_name]
self._preprocess_node_types(node_pop)
self._node_populations[pop_name] = node_pop
# Segregate into virtual populations and non-virtual populations
model_types = self.__avail_model_types(node_pop)
if 'virtual' in model_types:
self._virtual_populations_map[pop_name] = node_pop
self._virtual_cells_nid[pop_name] = {}
model_types -= set(['virtual'])
if model_types:
# We'll allow a population to have virtual and non-virtual nodes but it is not ideal
self.io.log_warning('Node population {} contains both virtual and non-virtual nodes which can '.format(pop_name) +
'cause memory and build-time inefficiency. Consider separating virtual nodes ' +
'into their own population')
if model_types:
self._internal_populations_map[pop_name] = node_pop
self._node_sets[pop_name] = NodeSet({'population': pop_name}, self)
[docs] def build_nodes(self):
raise NotImplementedError
[docs] def build_recurrent_edges(self):
raise NotImplementedError
[docs] def add_edges(self, sonata_file, populations=None, source_pop=None, target_pop=None):
"""
:param sonata_file:
:param populations:
:param source_pop:
:param target_pop:
:return:
"""
edges = sonata_file.edges
selected_populations = edges.population_names if populations is None else populations
for pop_name in selected_populations:
if pop_name not in edges:
continue
edge_pop = edges[pop_name]
self._preprocess_edge_types(edge_pop)
# Check the source nodes exists
src_pop = source_pop if source_pop is not None else edge_pop.source_population
is_internal_src = src_pop in self._internal_populations_map.keys()
is_external_src = src_pop in self._virtual_populations_map.keys()
trg_pop = target_pop if target_pop is not None else edge_pop.target_population
is_internal_trg = trg_pop in self._internal_populations_map.keys()
if not is_internal_trg:
self.io.log_exception(('Node population {} does not exists (or consists of only virtual nodes). ' +
'{} edges cannot create connections.').format(trg_pop, pop_name))
if not (is_internal_src or is_external_src):
self.io.log_exception('Source node population {} not found. Please update {} edges'.format(src_pop,
pop_name))
if is_internal_src:
if trg_pop not in self._recurrent_edges:
self._recurrent_edges[trg_pop] = []
self._recurrent_edges[trg_pop].append(edge_pop)
if is_external_src:
if trg_pop not in self._external_edges:
self._external_edges[(src_pop, trg_pop)] = []
self._external_edges[(src_pop, trg_pop)].append(edge_pop)
[docs] @classmethod
def from_config(cls, conf, **properties):
"""Generates a graph structure from a json config file or dictionary.
:param conf: name of json config file, or a dictionary with config parameters
:param properties: optional properties.
:return: A graph object of type cls
"""
graph = cls(**properties)
# The simulation run script should create a config-dict since it's likely to vary based on the simulator engine,
# however in the case the user doesn't we will try a generic conversion from dict/json to ConfigDict
if isinstance(conf, ConfigDict):
config = conf
else:
try:
config = ConfigDict.load(conf)
except Exception as e:
graph.io.log_exception('Could not convert {} (type "{}") to json.'.format(conf, type(conf)))
if not config.with_networks:
graph.io.log_exception('Could not find any network files. Unable to build network.')
# TODO: These are simulator specific
graph.spike_threshold = config.spike_threshold
graph.dL = config.dL
# load components
for name, value in config.components.items():
graph.add_component(name, value)
graph._validate_components()
# load nodes
gid_map = config.gid_mappings
for node_dict in config.nodes:
nodes_net = sonata.File(data_files=node_dict['nodes_file'], data_type_files=node_dict['node_types_file'],
gid_table=gid_map)
graph.add_nodes(nodes_net)
# load edges
for edge_dict in config.edges:
target_network = edge_dict['target'] if 'target' in edge_dict else None
source_network = edge_dict['source'] if 'source' in edge_dict else None
edge_net = sonata.File(data_files=edge_dict['edges_file'], data_type_files=edge_dict['edge_types_file'])
graph.add_edges(edge_net, source_pop=target_network, target_pop=source_network)
graph._node_sets['all'] = NodeSetAll(graph)
for ns_name, ns_filter in conf.node_sets.items():
graph._node_sets[ns_name] = NodeSet(ns_filter, graph)
return graph