Source code for bmtk.builder.network_adaptors.dm_network

# Copyright 2017. Allen Institute. All rights reserved
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
# following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import os
import numpy as np
import h5py
import logging

from .network import Network
from bmtk.builder.node import Node
from bmtk.builder.edge import Edge
from bmtk.utils import sonata

from .edges_collator import EdgesCollator
from .edge_props_table import EdgeTypesTable
from ..index_builders import create_index_in_memory, create_index_on_disk
from ..builder_utils import mpi_rank, mpi_size, barrier
from ..edges_sorter import sort_edges


logger = logging.getLogger(__name__)


[docs]class DenseNetwork(Network): def __init__(self, name, **network_props): super(DenseNetwork, self).__init__(name, **network_props or {}) # self.__edges_types = {} # self.__src_mapping = {} # self.__networks = {} # self.__node_count = 0 self._nodes = [] self.__edges_tables = [] self._target_networks = {} def _initialize(self): self.__id_map = [] self.__lookup = [] def _add_nodes(self, nodes): self._nodes.extend(nodes) self._nnodes = len(self._nodes)
[docs] def edges_table(self): return self.__edges_tables
def _save_nodes(self, nodes_file_name, compression='gzip'): if not self._nodes_built: self._build_nodes() if compression == 'none': compression = None # legit option for h5py for no compression # save the node_types file group_indx = 0 groups_lookup = {} group_indicies = {} group_props = {} for ns in self._node_sets: if ns.params_hash in groups_lookup: continue else: groups_lookup[ns.params_hash] = group_indx group_indicies[group_indx] = 0 group_props[group_indx] = {k: [] for k in ns.params_keys if k != 'node_id'} group_indx += 1 node_gid_table = np.zeros(self._nnodes) # todo: set dtypes node_type_id_table = np.zeros(self._nnodes) node_group_table = np.zeros(self._nnodes) node_group_index_tables = np.zeros(self._nnodes) for i, node in enumerate(self.nodes()): node_gid_table[i] = node.node_id node_type_id_table[i] = node.node_type_id group_id = groups_lookup[node.params_hash] node_group_table[i] = group_id node_group_index_tables[i] = group_indicies[group_id] group_indicies[group_id] += 1 group_dict = group_props[group_id] for key, prop_ds in group_dict.items(): prop_ds.append(node.params[key]) if mpi_rank == 0: with h5py.File(nodes_file_name, 'w') as hf: # Add magic and version attribute add_hdf5_attrs(hf) pop_grp = hf.create_group('/nodes/{}'.format(self.name)) pop_grp.create_dataset('node_id', data=node_gid_table, dtype='uint64', compression=compression) pop_grp.create_dataset('node_type_id', data=node_type_id_table, dtype='uint64', compression=compression) pop_grp.create_dataset('node_group_id', data=node_group_table, dtype='uint32', compression=compression) pop_grp.create_dataset('node_group_index', data=node_group_index_tables, dtype='uint64', compression=compression) for grp_id, props in group_props.items(): model_grp = pop_grp.create_group('{}'.format(grp_id)) for key, dataset in props.items(): try: model_grp.create_dataset(key, data=dataset, compression=compression) except TypeError: str_list = [str(d) for d in dataset] hf.create_dataset(key, data=str_list, compression=compression) barrier()
[docs] def nodes_iter(self, node_ids=None): if node_ids is not None: return [n for n in self._nodes if n.node_id in node_ids] else: return self._nodes
def _process_nodepool(self, nodepool): return nodepool
[docs] def import_nodes(self, nodes_file_name, node_types_file_name, population=None): sonata_file = sonata.File(data_files=nodes_file_name, data_type_files=node_types_file_name) if sonata_file.nodes is None: raise Exception('nodes file {} does not have any nodes.'.format(nodes_file_name)) populations = sonata_file.nodes.populations if len(populations) == 1: node_pop = populations[0] elif population is None: raise Exception('The nodes file {} contains multiple populations.'.format(nodes_file_name) + 'Please specify population parameter.') else: for pop in populations: if pop.name == population: node_pop = pop break else: raise Exception('Nodes file {} does not contain population {}.'.format(nodes_file_name, population)) for node_type_props in node_pop.node_types_table: self._add_node_type(node_type_props) for node in node_pop: self._node_id_gen.remove_id(node.node_id) self._nodes.append(Node(node.node_id, node.group_props, node.node_type_properties))
def _add_edges(self, connection_map, i): """ :param connection_map: :param i: """ edge_type_id = connection_map.edge_type_properties['edge_type_id'] logger.debug('Generating edges data for edge_types_id {}.'.format(edge_type_id)) edges_table = EdgeTypesTable(connection_map, network_name=self.name) connections = connection_map.connection_itr() # iterate through all possible SxT source/target pairs and use the user-defined function/list/value to update # the number of syns between each pair. TODO: See if this can be vectorized easily. for conn in connections: if conn[2]: edges_table.set_nsyns(source_id=conn[0], target_id=conn[1], nsyns=conn[2]) target_net = connection_map.target_nodes self._target_networks[target_net.network_name] = target_net.network # For when the user specified individual edge properties to be put in the hdf5 (syn_weight, syn_location, etc), # get prop value and add it to the edge-types table. Need to fetch and store SxTxN value (where N is the avg # num of nsyns between each source/target pair) and it is necessary that the nsyns table be finished. for param in connection_map.params: rule = param.rule rets_multiple_vals = isinstance(param.names, (list, tuple, np.ndarray)) if not rets_multiple_vals: prop_name = param.names # name of property prop_type = param.dtypes.get(prop_name, None) edges_table.create_property(prop_name=param.names, prop_type=prop_type) # initialize property array for source_node, target_node, edge_index in edges_table.iter_edges(): # calls connection map rule and saves value to edge table pval = rule(source_node, target_node) edges_table.set_property_value(prop_name=prop_name, edge_index=edge_index, prop_value=pval) else: # Same as loop above, but some connection-map 'rules' will return multiple properties for each edge. pnames = param.names ptypes = [param.dtypes[pn] for pn in pnames] for prop_name, prop_type in zip(pnames, ptypes): edges_table.create_property(prop_name=prop_name, prop_type=prop_type) # initialize property arrays for source_node, target_node, edge_index in edges_table.iter_edges(): pvals = rule(source_node, target_node) for pname, pval in zip(pnames, pvals): edges_table.set_property_value(prop_name=pname, edge_index=edge_index, prop_value=pval) logger.debug('Edge-types {} data built with {} connection ({} synapses)'.format( edge_type_id, edges_table.n_edges, edges_table.n_syns) ) edges_table.save() # To EdgeTypesTable the number of synaptic/gap connections between all source/target paris, which can be more # than the number of actual edges stored (for efficency), may be a better user-representation. self._nedges += edges_table.n_syns # edges_table.n_edges self.__edges_tables.append(edges_table) def _get_edge_group_id(self, params_hash): return int(params_hash) def _save_gap_junctions(self, gj_file_name, compression='gzip'): source_ids = [] target_ids = [] src_gap_ids = [] trg_gap_ids = [] if compression == 'none': compression = None # legit option for h5py for no compression for et in self.__edges_tables: try: is_gap = et['edge_types']['is_gap_junction'] except: continue if is_gap: if et['source_network'] != et['target_network']: raise Exception("All gap junctions must be two cells in the same network builder.") table = et['syn_table'] junc_table = table.nsyn_table locs = np.where(junc_table > 0) for i in range(len(locs[0])): source_ids.append(table.source_ids[locs[0][i]]) target_ids.append(table.target_ids[locs[1][i]]) src_gap_ids.append(self._gj_id_gen.next()) trg_gap_ids.append(self._gj_id_gen.next()) else: continue if len(source_ids) > 0: with h5py.File(gj_file_name, 'w') as f: add_hdf5_attrs(f) f.create_dataset('source_ids', data=np.array(source_ids), compression=compression) f.create_dataset('target_ids', data=np.array(target_ids), compression=compression) f.create_dataset('src_gap_ids', data=np.array(src_gap_ids), compression=compression) f.create_dataset('trg_gap_ids', data=np.array(trg_gap_ids), compression=compression) def _save_edges(self, edges_file_name, src_network, trg_network, pop_name=None, sort_by='target_node_id', index_by=('target_node_id', 'source_node_id'), compression='gzip'): barrier() if compression == 'none': compression = None # legit option for h5py for no compression if mpi_rank == 0: logger.debug('Saving {} --> {} edges to {}.'.format(src_network, trg_network, edges_file_name)) filtered_edge_types = [ # Some edges may not match the source/target population et for et in self.__edges_tables if et.source_network == src_network and et.target_network == trg_network ] merged_edges = EdgesCollator(filtered_edge_types, network_name=self.name) merged_edges.process() n_total_conns = merged_edges.n_total_edges barrier() if n_total_conns == 0: if mpi_rank == 0: logger.warning('Was not able to generate any edges using the "connection_rule". Not saving.') return # Try to sort before writing file, If edges are split across ranks/files for MPI/size issues then we need to # write to disk first then sort the hdf5 file sort_on_disk = False edges_file_name_final = edges_file_name if sort_by: if merged_edges.can_sort: merged_edges.sort(sort_by=sort_by) else: sort_on_disk = True edges_file_name_final = edges_file_name edges_file_basename = os.path.basename(edges_file_name) edges_file_dirname = os.path.dirname(edges_file_name) edges_file_name = os.path.join(edges_file_dirname, '.unsorted.{}'.format(edges_file_basename)) if mpi_rank == 0: logger.debug('Unable to sort edges in memory, will temporarly save to {}'.format(edges_file_name) + ' before sorting hdf5 file.') barrier() if mpi_rank == 0: logger.debug('Saving {} edges to disk'.format(n_total_conns)) pop_name = '{}_to_{}'.format(src_network, trg_network) if pop_name is None else pop_name with h5py.File(edges_file_name, 'w') as hf: # Initialize the hdf5 groups and datasets add_hdf5_attrs(hf) pop_grp = hf.create_group('/edges/{}'.format(pop_name)) pop_grp.create_dataset('source_node_id', (n_total_conns,), dtype='uint64', compression=compression) pop_grp['source_node_id'].attrs['node_population'] = src_network pop_grp.create_dataset('target_node_id', (n_total_conns,), dtype='uint64', compression=compression) pop_grp['target_node_id'].attrs['node_population'] = trg_network pop_grp.create_dataset('edge_group_id', (n_total_conns,), dtype='uint16', compression=compression) pop_grp.create_dataset('edge_group_index', (n_total_conns,), dtype='uint32', compression=compression) pop_grp.create_dataset('edge_type_id', (n_total_conns,), dtype='uint32', compression=compression) for group_id in merged_edges.group_ids: # different model-groups will have different datasets/properties depending on what edge information # is being saved for each edges model_grp = pop_grp.create_group(str(group_id)) for prop_mdata in merged_edges.get_group_metadata(group_id): model_grp.create_dataset(prop_mdata['name'], shape=prop_mdata['dim'], dtype=prop_mdata['type'], compression=compression) # Uses the collated edges (eg combined edges across all edge-types) to actually write the data to hdf5, # potentially in multiple chunks. For small networks doing it this way isn't very effiecent, however # this has the benefits: # * For very large networks it won't always be possible to store all the data in memory. # * When using MPI/multi-node the chunks can represent data from different ranks. for chunk_id, idx_beg, idx_end in merged_edges.itr_chunks(): pop_grp['source_node_id'][idx_beg:idx_end] = merged_edges.get_source_node_ids(chunk_id) pop_grp['target_node_id'][idx_beg:idx_end] = merged_edges.get_target_node_ids(chunk_id) pop_grp['edge_type_id'][idx_beg:idx_end] = merged_edges.get_edge_type_ids(chunk_id) pop_grp['edge_group_id'][idx_beg:idx_end] = merged_edges.get_edge_group_ids(chunk_id) pop_grp['edge_group_index'][idx_beg:idx_end] = merged_edges.get_edge_group_indices(chunk_id) for group_id, prop_name, grp_idx_beg, grp_idx_end in merged_edges.get_group_data(chunk_id): prop_array = merged_edges.get_group_property(prop_name, group_id, chunk_id) pop_grp[str(group_id)][prop_name][grp_idx_beg:grp_idx_end] = prop_array if sort_on_disk: logger.debug('Sorting {} by {} to {}'.format(edges_file_name, sort_by, edges_file_name_final)) sort_edges( input_edges_path=edges_file_name, output_edges_path=edges_file_name_final, edges_population='/edges/{}'.format(pop_name), sort_by=sort_by, compression=compression, # sort_on_disk=True, ) try: logger.debug('Deleting intermediate edges file {}.'.format(edges_file_name)) os.remove(edges_file_name) except OSError as e: logger.warning('Unable to remove intermediate edges file {}.'.format(edges_file_name)) if index_by: index_by = index_by if isinstance(index_by, (list, tuple)) else [index_by] for index_type in index_by: logger.debug('Creating index {}'.format(index_type)) create_index_in_memory( edges_file=edges_file_name_final, edges_population='/edges/{}'.format(pop_name), index_type=index_type, compression=compression ) barrier() del merged_edges if mpi_rank == 0: logger.debug('Saving completed.') def _clear(self): self._nedges = 0 self._nnodes = 0
[docs] def edges_iter(self, trg_gids, src_network=None, trg_network=None): matching_edge_tables = self.__edges_tables if trg_network is not None: matching_edge_tables = [et for et in self.__edges_tables if et.target_network == trg_network] if src_network is not None: matching_edge_tables = [et for et in matching_edge_tables if et.source_network == src_network] for edge_type_table in matching_edge_tables: et_df = edge_type_table.to_dataframe() et_df = et_df[et_df['target_node_id'].isin(trg_gids)] if len(et_df) == 0: continue edge_type_props = edge_type_table.edge_type_properties for row in et_df.to_dict(orient='records'): yield Edge( src_gid=row['source_node_id'], trg_gid=row['target_node_id'], edge_type_props=edge_type_props, syn_props=row )
@property def nnodes(self): if not self.nodes_built: return 0 return self._nnodes @property def nedges(self): return self._nedges
[docs]def add_hdf5_attrs(hdf5_handle): # TODO: move this as a utility function hdf5_handle['/'].attrs['magic'] = np.uint32(0x0A7A) hdf5_handle['/'].attrs['version'] = [np.uint32(0), np.uint32(1)]