Source code for bmtk.simulator.filternet.modules.create_spikes

import os
import numpy as np
import random
import six

from .base import SimModule
from bmtk.utils.reports.spike_trains import SpikeTrains, pop_na, sort_order, sort_order_lu
from bmtk.simulator.filternet.lgnmodel import poissongeneration as pg
from bmtk.utils.io.ioutils import bmtk_world_comm


[docs]class SpikesGenerator(SimModule): def __init__(self, spikes_file_csv=None, spikes_file=None, spikes_file_nwb=None, tmp_dir='output', sort_order='node_id', compression='gzip'): def _get_file_path(file_name): if file_name is None or os.path.isabs(file_name): return file_name else: rel_tmp = os.path.realpath(tmp_dir) rel_fname = os.path.realpath(file_name) if not rel_fname.startswith(rel_tmp): return os.path.join(tmp_dir, file_name) else: return file_name self._csv_fname = _get_file_path(spikes_file_csv) self._save_csv = spikes_file_csv is not None self._h5_fname = _get_file_path(spikes_file) self._save_h5 = spikes_file is not None self._compression = compression self._nwb_fname = _get_file_path(spikes_file_nwb) self._save_nwb = spikes_file_nwb is not None self._tmpdir = tmp_dir # self._spike_writer = SpikeTrainWriter(tmp_dir=tmp_dir) self._spike_writer = SpikeTrains(cache_dir=tmp_dir) self._sort_order = sort_order_lu[sort_order]
[docs] def save(self, sim, cell, times, rates): try: spike_trains = np.array(f_rate_to_spike_train(times*1000.0, rates, np.random.randint(10000), 1000.*min(times), 1000.*max(times), 0.1)) except: # convert to milliseconds and hence the multiplication by 1000 spike_trains = 1000.0*np.array(pg.generate_inhomogenous_poisson(times, rates, seed=np.random.randint(10000))) self._spike_writer.add_spikes(node_ids=cell.gid, timestamps=spike_trains, population=cell.population)
[docs] def finalize(self, sim): self._spike_writer.flush() if self._save_csv: self._spike_writer.to_csv(self._csv_fname, sort_order=self._sort_order) if self._save_h5: self._spike_writer.to_sonata(self._h5_fname, sort_order=self._sort_order, compression=self._compression) if self._save_nwb: self._spike_writer.to_nwb(self._nwb_fname, sort_order=self._sort_order) self._spike_writer.close()
[docs]def f_rate_to_spike_train(t, f_rate, random_seed, t_window_start, t_window_end, p_spike_max): # t and f_rate are lists containing time stamps and corresponding firing rate values; # they are assumed to be of the same length and ordered with the time strictly increasing; # p_spike_max is the maximal probability of spiking that we allow within the time bin; it is used to decide on the size of the time bin; should be less than 1! #if np.max(f_rate) * np.max(np.diff(t))/1000. > 0.1: #Divide by 1000 to convert to seconds # print('Firing rate to high for time interval and will not estimate spike correctly. Spikes will ' \ # 'be calculated with the slower inhomogenous poisson generating fucntion') # raise Exception() spike_times = [] # Use seed(...) to instantiate the random number generator. Otherwise, current system time is used. random.seed(random_seed) # Assume here for each pair (t[k], f_rate[k]) that the f_rate[k] value applies to the time interval [t[k], t[k+1]). for k in six.moves.range(0, len(f_rate)-1): t_k = t[k] t_k_1 = t[k+1] if ((t_k >= t_window_start) and (t_k_1 <= t_window_end)): delta_t = t_k_1 - t_k # Average number of spikes expected in this interval (note that firing rate is in Hz and time is in ms). av_N_spikes = f_rate[k] / 1000.0 * delta_t if (av_N_spikes > 0): if (av_N_spikes <= p_spike_max): N_bins = 1 else: N_bins = int(np.ceil(av_N_spikes / p_spike_max)) t_base = t[k] t_bin = 1.0 * delta_t / N_bins p_spike_bin = 1.0 * av_N_spikes / N_bins for i_bin in six.moves.range(0, N_bins): rand_tmp = random.random() if rand_tmp < p_spike_bin: spike_t = t_base + random.random() * t_bin spike_times.append(spike_t) t_base += t_bin return spike_times