Source code for bmtk.utils.reports.spike_trains.spikes_file_writers

# Copyright 2020. Allen Institute. All rights reserved
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
# following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import os
import csv
import h5py
import numpy as np
from datetime import datetime

import bmtk
from .core import SortOrder, csv_headers, col_population, find_conversion
from .core import MPI_rank, comm_barrier
from bmtk.utils.sonata.utils import add_hdf5_magic, add_hdf5_version


[docs] def write_sonata(path, spiketrain_reader, mode='w', sort_order=SortOrder.none, units='ms', population_renames=None, compression='gzip', **kwargs): # make sure to take care of 'none' or 'None' compression if isinstance(compression, str): if compression.lower() == 'none': compression = None path_dir = os.path.dirname(path) if MPI_rank == 0 and path_dir and not os.path.exists(path_dir): os.makedirs(path_dir) spiketrain_reader.flush() comm_barrier() populations = spiketrain_reader.populations spikes_root = None if MPI_rank == 0: h5 = h5py.File(path, mode=mode) add_hdf5_magic(h5) add_hdf5_version(h5) spikes_root = h5.create_group('/spikes') if '/spikes' not in h5 else h5['/spikes'] for pop_name in populations: # metrics.keys(): if MPI_rank == 0 and pop_name in spikes_root: # Problem if file already contains /spikes/<pop_name> # TODO: append new data to old spikes?!? raise ValueError('sonata file {} already contains a spikes group {}, '.format(path, pop_name) + 'skiping(use option mode="w" to overwrite)') pop_df = spiketrain_reader.to_dataframe(populations=pop_name, with_population_col=False, sort_order=sort_order, on_rank='root') if MPI_rank == 0: spikes_pop_grp = spikes_root.create_group(pop_name) if sort_order != SortOrder.unknown: spikes_pop_grp.attrs['sorting'] = sort_order.value spikes_pop_grp.create_dataset('timestamps', data=pop_df['timestamps'], compression=compression) spikes_pop_grp['timestamps'].attrs['units'] = spiketrain_reader.units() spikes_pop_grp.create_dataset('node_ids', data=pop_df['node_ids'], compression=compression) comm_barrier()
[docs] def write_sonata_itr(path, spiketrain_reader, mode='w', sort_order=SortOrder.none, units='ms', population_renames=None, compression='gzip', **kwargs): path_dir = os.path.dirname(path) if MPI_rank == 0 and path_dir and not os.path.exists(path_dir): os.makedirs(path_dir) spiketrain_reader.flush() comm_barrier() conv_factor = find_conversion(spiketrain_reader.units, units) if MPI_rank == 0: h5 = h5py.File(path, mode=mode) add_hdf5_magic(h5) add_hdf5_version(h5) spikes_root = h5.create_group('/spikes') if '/spikes' not in h5 else h5['/spikes'] population_renames = population_renames or {} for pop_name in spiketrain_reader.populations: n_spikes = spiketrain_reader.n_spikes(pop_name) if n_spikes <= 0: continue if MPI_rank == 0: spikes_grp = spikes_root.create_group('{}'.format(population_renames.get(pop_name, pop_name))) if sort_order != SortOrder.unknown: spikes_grp.attrs['sorting'] = sort_order.value timestamps_ds = spikes_grp.create_dataset('timestamps', shape=(n_spikes,), dtype=np.float64, compression=compression) timestamps_ds.attrs['units'] = units node_ids_ds = spikes_grp.create_dataset('node_ids', shape=(n_spikes,), dtype=np.uint64, compression=compression) for i, spk in enumerate(spiketrain_reader.spikes(populations=pop_name, sort_order=sort_order)): if MPI_rank == 0: timestamps_ds[i] = spk[0]*conv_factor node_ids_ds[i] = spk[2] comm_barrier()
[docs] def write_csv(path, spiketrain_reader, mode='w', sort_order=SortOrder.none, include_header=True, include_population=True, units='ms', **kwargs): path_dir = os.path.dirname(path) if MPI_rank == 0 and path_dir and not os.path.exists(path_dir): os.makedirs(path_dir) df = spiketrain_reader.to_dataframe(sort_order=sort_order, on_rank='root') if MPI_rank == 0: df[['timestamps', 'population', 'node_ids']].to_csv(path, header=include_header, index=False, sep=' ') comm_barrier()
[docs] def write_csv_itr(path, spiketrain_reader, mode='w', sort_order=SortOrder.none, include_header=True, include_population=True, units='ms', **kwargs): path_dir = os.path.dirname(path) if MPI_rank == 0 and path_dir and not os.path.exists(path_dir): os.makedirs(path_dir) conv_factor = find_conversion(spiketrain_reader.units, units) cols_to_print = csv_headers if include_population else [c for c in csv_headers if c != col_population] if MPI_rank == 0: f = open(path, mode=mode) csv_writer = csv.writer(f, delimiter=' ') if include_header: csv_writer.writerow(cols_to_print) for spk in spiketrain_reader.spikes(sort_order=sort_order): if MPI_rank == 0: ts = spk[0]*conv_factor c_data = [ts, spk[1], spk[2]] if include_population else [ts, spk[2]] csv_writer.writerow(c_data) comm_barrier()
[docs] def write_nwb(path, spiketrain_reader, mode='w', include_population=True, units='ms', **kwargs): import pynwb path_dir = os.path.dirname(path) if MPI_rank == 0 and path_dir and not os.path.exists(path_dir): os.makedirs(path_dir) if MPI_rank == 0: # Last checked pynwb doesn't support writing on multiple cores, must let first core do all the # writing to NWB. nwbfile = pynwb.NWBFile( session_description='BMTK {} generated NWB spikes file'.format(bmtk.__version__), identifier='Generated in-silico, no session id', # TODO: No idea what to put here? session_start_time=datetime.now().astimezone(), # experiment_description=str(session.experiment_metadata['experiment_id']) ) if include_population: nwbfile.add_unit_column(name="population", description="node population identifier") add_unit = lambda nid, pop, st: nwbfile.add_unit(id=nid, spike_times=st, population=pop, node_id=nid) else: add_unit = lambda nid, pop, st: nwbfile.add_unit(id=nid, spike_times=st, node_id=nid) nwbfile.add_unit_column(name="node_id", description="id of each node within a population") for population in spiketrain_reader.populations: for node_id in spiketrain_reader.node_ids(population=population): spikes_times = spiketrain_reader.get_times(node_id=node_id, population=population) if spikes_times is None or len(spikes_times) == 0: # No spikes for given node, don't try to write to nwb continue # sometimes bmtk/sonata may default to use different 32 or unsigned data-types which will cause # nwb to throw a fit. Need to explicity convert data-types just in case. spikes_times = spikes_times.astype('float64') node_id = int(node_id) add_unit(node_id, population, spikes_times) with pynwb.NWBHDF5IO(path, mode) as io: io.write(nwbfile) comm_barrier()