Source code for bmtk.utils.sim_setup

# Copyright 2017. Allen Institute. All rights reserved
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
# following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import os
import sys
import shutil
import json
import six
from subprocess import call
from optparse import OptionParser
from collections import OrderedDict
import logging
from distutils.dir_util import copy_tree


logger = logging.getLogger(__name__)

# Order of the different sections of the config.json. Any non-listed items will be placed at the end of the config
config_order = [
    'manifest',
    'target_simulator',
    'run',
    'conditions',
    'inputs',
    'components',
    'output',
    'reports',
    'networks'
]

network_dir_synonyms = ['network', 'networks', 'circuit', 'circuits', 'network_dir',  'circuit_dir']


[docs]class EnvBuilder(object): def __init__(self, base_dir='.', network_dir=None, components_dir=None, output_dir=None, node_sets_file=None): self._base_dir = self._get_base_dir(base_dir) self._network_dir = self._get_network_dir(network_dir) self._components_dir = self._get_components_dir(components_dir) self._output_dir = self._get_output_dir(output_dir) self._node_sets_file = self._get_node_sets_fname(node_sets_file) self._circuit_config = {} self._simulation_config = {} @property def target_simulator(self): raise NotImplementedError() @property def bmtk_simulator(self): raise NotImplementedError() @property def base_dir(self): return self._base_dir @property def scripts_root(self): local_path = os.path.dirname(os.path.realpath(__file__)) return os.path.join(local_path, 'scripts') @property def examples_dir(self): raise NotImplementedError() def _get_base_dir(self, base_dir): """Create base-dir, or if it exists make sure""" if not os.path.exists(base_dir): logger.info('Creating directory "{}"'.format(os.path.abspath(base_dir))) os.mkdir(base_dir) elif os.path.isfile(base_dir): logging.error('simulation directory path {} points to an existing file, cannot create.'.format(base_dir)) exit(1) else: logging.info('Using existing directory "{}"'.format(os.path.abspath(base_dir))) return os.path.abspath(base_dir) @property def network_dir(self): return self._network_dir def _get_network_dir(self, network_dir=None): """Attempts to get the appropiate path the the directory containing the sonata network files, either as specified by the user (-n <network_dir> argument) or else attempt to find the location of the network files. :param base_dir: :param network_dir: :return: The absolute path of the directory that does/should contain the network files """ network_dir_abs = None if network_dir is None: # Check to see if there are any folders in base_dir that might contain SONATA network files sub_dirs = [os.path.join(self.base_dir, dn) for dn in os.listdir(self.base_dir) if os.path.isdir(os.path.join(self.base_dir, dn)) and dn.lower() in network_dir_synonyms] if sub_dirs: # See if there are any subfolders that might contain network files network_dir_abs = os.path.abspath(sub_dirs[0]) logging.info( 'No network folder specified, attempting to use existing folder: {}'.format(network_dir_abs)) else: network_dir_abs = os.path.abspath(os.path.join(self.base_dir, 'network')) logging.info('No network folder specified, creating empty folder: {}'.format(network_dir_abs)) os.makedirs(network_dir_abs) elif os.path.isabs(network_dir): # In the case the user specifies the network directory as an absolute path network_dir_abs = network_dir logging.info('Using network directory: {}'.format(network_dir_abs)) elif os.path.isdir(network_dir): # In case during calling the script the user was pointing to an already existing directory network_dir_abs = os.path.abspath(network_dir) logging.info('Using network directory: {}'.format(network_dir_abs)) else: # Default case is the to use/add the <network_dir> found under base_dir network_dir_abs = os.path.join(self.base_dir, network_dir) if not os.path.exists(network_dir_abs): logging.info('Creating new network directory: {}'.format(network_dir_abs)) os.makedirs(network_dir_abs) else: logging.info('Using network directory: {}'.format(network_dir_abs)) return network_dir_abs def _parse_network_dir(self, network_dir): logger.info('Parsing {} for SONATA network files'.format(network_dir)) net_nodes = {} net_edges = {} net_gaps = {} for root, dirs, files in os.walk(network_dir): for f in files: if not os.path.isfile(os.path.join(network_dir, f)) or f.startswith('.'): continue if '_nodes' in f: net_name = f[:f.find('_nodes')] nodes_dict = net_nodes.get(net_name, {}) nodes_dict['nodes_file'] = os.path.abspath(os.path.join(root, f)) net_nodes[net_name] = nodes_dict logger.info(' Adding nodes file: {}'.format(nodes_dict['nodes_file'])) elif '_node_types' in f: net_name = f[:f.find('_node_types')] nodes_dict = net_nodes.get(net_name, {}) nodes_dict['node_types_file'] = os.path.abspath(os.path.join(root, f)) net_nodes[net_name] = nodes_dict logger.info(' Adding node types file: {}'.format(nodes_dict['node_types_file'])) elif '_edges' in f: net_name = f[:f.find('_edges')] edges_dict = net_edges.get(net_name, {}) edges_dict['edges_file'] = os.path.abspath(os.path.join(root, f)) net_edges[net_name] = edges_dict logger.info(' Adding edges file: {}'.format(edges_dict['edges_file'])) elif '_edge_types' in f: net_name = f[:f.find('_edge_types')] edges_dict = net_edges.get(net_name, {}) edges_dict['edge_types_file'] = os.path.abspath(os.path.join(root, f)) net_edges[net_name] = edges_dict logger.info(' Adding edge types file: {}'.format(edges_dict['edge_types_file'])) elif '_gap_juncs' in f: net_name = f[:f.find('_gap_juncs')] gaps_dict = net_gaps.get(net_name, {}) gaps_dict['gap_juncs_file'] = os.path.abspath(os.path.join(root, f)) net_gaps[net_name] = gaps_dict logger.info(' Adding gap junctions file: {}'.format(gaps_dict['gap_juncs_file'])) else: logger.info( ' Skipping file (could not categorize): {}'.format(os.path.abspath(os.path.join(root, f)))) if not (net_nodes or net_edges): logger.info(' Could not find any sonata nodes or edges file(s).') network_config = {'nodes': [], 'edges': [], 'gap_juncs': []} for _, sect in net_nodes.items(): network_config['nodes'].append(sect) for _, sect in net_edges.items(): network_config['edges'].append(sect) for _, sect in net_gaps.items(): network_config['gap_juncs'].append(sect) self._circuit_config['networks'] = network_config @property def components_dir(self): return self._components_dir def _get_components_dir(self, components_dir=None): if components_dir is None: return os.path.abspath(os.path.join(self.base_dir, 'components')) elif os.path.isabs(components_dir): return components_dir elif os.path.exists(components_dir): return os.path.abspath(components_dir) else: return os.path.abspath(os.path.join(self.base_dir, components_dir)) def _create_components_dir(self, components_dir, with_examples=True): if not os.path.exists(components_dir): logger.info('Creating components directory: {}'.format(components_dir)) os.makedirs(components_dir) components_config = {} comps_dirs = [sd for sd in os.listdir(self.examples_dir) if os.path.isdir(os.path.join(self.examples_dir, sd))] for sub_dir in comps_dirs: comp_name = sub_dir + '_dir' src_dir = os.path.join(self.examples_dir, sub_dir) trg_dir = os.path.join(components_dir, sub_dir) if not os.path.exists(trg_dir): logger.info('Creating new components directory: {}'.format(trg_dir)) os.makedirs(trg_dir) else: logger.info('Using components directory: {}'.format(trg_dir)) components_config[comp_name] = trg_dir if with_examples: logger.info(' Copying files from {}.'.format(src_dir)) copy_tree(src_dir, trg_dir) # return components_config self._circuit_config['components'] = components_config @property def output_dir(self): return self._output_dir def _get_output_dir(self, output_dir): if output_dir is None: return os.path.abspath(os.path.join(self.base_dir, 'output')) elif os.path.isabs(output_dir): return output_dir elif os.path.exists(output_dir): return os.path.abspath(output_dir) else: return os.path.abspath(os.path.join(self.base_dir, output_dir)) def _add_manifest(self, config_dict, network_dir=None, components_dir=None, output_dir=None): config_dict['manifest'] = {'$BASE_DIR': '${configdir}'} base_dir = os.path.abspath(self.base_dir) replace_str = lambda fd, bd, var_name: fd.replace(bd, var_name) if fd.startswith(bd) else fd if network_dir is not None: config_dict['manifest']['$NETWORK_DIR'] = replace_str(network_dir, base_dir, '$BASE_DIR') if len(config_dict['networks'].get('nodes', [])) > 0: config_dict['networks']['nodes'] = [{k: replace_str(v, network_dir, '$NETWORK_DIR') for k, v in l.items()} for l in config_dict['networks']['nodes']] if len(config_dict['networks'].get('edges', [])) > 0: config_dict['networks']['edges'] = [{k: replace_str(v, network_dir, '$NETWORK_DIR') for k, v in l.items()} for l in config_dict['networks']['edges']] if 'network' in config_dict and isinstance(config_dict['network'], six.string_types): config_dict['network'] = replace_str(config_dict['network'], base_dir, '$BASE_DIR') if components_dir is not None: config_dict['manifest']['$COMPONENTS_DIR'] = replace_str(components_dir, base_dir, '$BASE_DIR') for k, v in config_dict['components'].items(): config_dict['components'][k] = replace_str(v, components_dir, '$COMPONENTS_DIR') if output_dir is not None: config_dict['manifest']['$OUTPUT_DIR'] = replace_str(output_dir, base_dir, '$BASE_DIR') for k, v in config_dict.get('output', {}).items(): if isinstance(v, six.string_types): config_dict['output'][k] = replace_str(v, output_dir, '$OUTPUT_DIR') for input_dict in config_dict.get('inputs', {}).values(): if 'input_file' in input_dict: input_dict['input_file'] = replace_str(input_dict['input_file'], base_dir, '$BASE_DIR') if 'file_name' in input_dict: input_dict['file_name'] = replace_str(input_dict['file_name'], base_dir, '$BASE_DIR') if 'node_sets_file' in config_dict: config_dict['node_sets_file'] = replace_str(config_dict['node_sets_file'], base_dir, '$BASE_DIR') def _save_config(self, json_dict, config_file_name): logger.info('Creating config file: {}'.format(config_file_name)) with open(os.path.join(self.base_dir, config_file_name), 'w') as outfile: ordered_dict = OrderedDict(sorted(json_dict.items(), key=lambda s: config_order.index(s[0]) if s[0] in config_order else 100)) json.dump(ordered_dict, outfile, indent=2) @property def node_sets_file(self): return self._node_sets_file def _get_node_sets_fname(self, node_sets_file): if node_sets_file is None or not os.path.isabs(node_sets_file): abs_path = os.path.abspath(os.path.join(self.base_dir, node_sets_file or 'node_sets.json')) else: abs_path = node_sets_file return abs_path def _create_node_sets_file(self, recorded_nodes=None, default_ns='all'): if os.path.exists(self.node_sets_file): logger.info('Found existing node sets file: {}'.format(self.node_sets_file)) else: logger.info('Creating new node sets file: {}'.format(self.node_sets_file)) node_sets = { 'biophysical_nodes': {'model_type': 'biophysical'}, 'point_nodes': {'model_type': 'point_process'} } if recorded_nodes is not None: node_sets['recorded_nodes'] = {'node_ids': recorded_nodes} if isinstance(recorded_nodes, list) \ else {'population': recorded_nodes} json.dump(node_sets, open(self.node_sets_file, 'w'), indent=2) if recorded_nodes is not None: default_ns = 'recorded_nodes' self._simulation_config['node_sets_file'] = self.node_sets_file return default_ns def _add_reports(self, cell_vars, node_set, section='soma'): if isinstance(cell_vars, six.string_types): cell_vars = [cell_vars] for v in cell_vars: logger.info('Adding membrane report for variable {}'.format(v)) report_config = { '{}_report'.format(v): { 'variable_name': v, 'cells': node_set, 'module': 'membrane_report', 'sections': section } for v in cell_vars} if 'reports' not in self._simulation_config: self._simulation_config['reports'] = {} self._simulation_config['reports'].update(report_config) def _add_clamp_reports(self, clamp_reports): if isinstance(clamp_reports, six.string_types): clamp_reports = [clamp_reports] for c in clamp_reports: logger.info("Adding clamp report for {} clamp.".format(c)) report_config = { '{}_clamp_report'.format(c): { 'variable_name': c, 'module': 'clamp_report' } for c in clamp_reports} if 'reports' not in self._simulation_config: self._simulation_config['reports'] = {} self._simulation_config['reports'].update(report_config) def _add_output_section(self): output_section = { 'log_file': 'log.txt', 'output_dir': self.output_dir, 'spikes_file': 'spikes.h5' } self._simulation_config['output'] = output_section def _add_current_clamp(self, current_param): if current_param is None: return logger.info('Adding current clamp') iclamp_config = { "input_type": "current_clamp", "module": "IClamp", "node_set": "all", "gids": current_param['gids'], "amp": current_param['amp'], "delay": current_param['delay'], "duration": current_param['duration'] } if 'inputs' not in self._simulation_config: self._simulation_config['inputs'] = {} self._simulation_config['inputs']['current_clamp'] = iclamp_config def _add_file_current_clamp(self, current_param): if current_param is None: return amp_file = os.path.abspath(current_param["input_file"]) f_iclamp_config = { "input_type": "current_clamp", "module": "FileIClamp", "node_set": "all", "input_file": amp_file } if 'inputs' not in self._simulation_config: self._simulation_config['inputs'] = {} self._simulation_config['inputs']['file_current_clamp'] = f_iclamp_config def _add_se_voltage_clamp(self, clamp_param): seclamp_config = { "input_type": "voltage_clamp", "module": "SEClamp", "node_set": "all", "gids": clamp_param['gids'], "amps": clamp_param['amps'], "durations": clamp_param["durations"] } if "rs" in clamp_param.keys(): seclamp_config["rs"] = clamp_param["rs"] if 'inputs' not in self._simulation_config: self._simulation_config['inputs'] = {} name = 'se_voltage_clamp' if 'name' in clamp_param.keys(): name = clamp_param['name'] self._simulation_config['inputs'][name] = seclamp_config def _add_spikes_inputs(self, spikes_inputs): inputs_dict = {} for s in spikes_inputs: pop_name = s[0] or 'all' input_name = '{}_spikes'.format(s[0] or 'input') spikes_file = os.path.abspath(s[1]) spikes_ext = os.path.splitext(spikes_file)[1][1:] spikes_ext = 'sonata' if spikes_ext in ['h5', 'hdf5'] else spikes_ext inputs_dict[input_name] = { "input_type": "spikes", "module": spikes_ext, "input_file": spikes_file, "node_set": pop_name } if 'inputs' not in self._simulation_config: self._simulation_config['inputs'] = {} self._simulation_config['inputs'].update(inputs_dict) def _add_run_params(self, tstart=0.0, tstop=1000.0, dt=0.001, **kwargs): self._simulation_config['run'] = { 'tstart': tstart, 'tstop': tstop, 'dt': dt } def _copy_run_script(self): run_script = 'run_{}.py'.format(self.bmtk_simulator) shutil.copy(os.path.join(self.examples_dir, run_script), os.path.join(self.base_dir, run_script))
[docs] def build(self, include_examples=False, use_relative_paths=True, report_vars=[], report_nodes=None, clamp_reports=[], current_clamp=None, file_current_clamp=None, se_voltage_clamp=None, spikes_inputs=None, config_file='config.json', overwrite_config=False, **run_args): config_path = config_file if os.path.isabs(config_file) else os.path.join(self._base_dir, config_file) config_filename = os.path.splitext(os.path.basename(config_file))[0] base_config = { 'network': os.path.join(self._base_dir, 'circuit_' + config_filename + '.json'), 'simulation': os.path.join(self._base_dir, 'simulation_' + config_filename + '.json') } if os.path.exists(config_path) and not overwrite_config: logging.warning(f'Configuration file {config_path} already exists, skipping. Please delete existing file, ' f'use a different name, or use overwrite_config=True.') else: self._save_config(base_config, config_path) self._parse_network_dir(self.network_dir) self._create_components_dir(self.components_dir, with_examples=include_examples) if use_relative_paths: self._add_manifest(self._circuit_config, network_dir=self.network_dir, components_dir=self.components_dir) if os.path.exists(base_config['network']) and not overwrite_config: logging.warning(f'Configuration file {base_config["network"]} already exists, skipping. ' f'Please delete existing file, use a different name, or use overwrite_config=True.') else: self._save_config(self._circuit_config, os.path.join(self._base_dir, 'circuit_'+config_filename+'.json')) selected_ns = self._create_node_sets_file(report_nodes) self._add_reports(report_vars, selected_ns) self._add_clamp_reports(clamp_reports) self._add_output_section() self._simulation_config['target_simulator'] = self.target_simulator self._add_run_params(**run_args) if current_clamp is not None: try: current_clamp['gids'] except: current_clamp['gids']='all' self._add_current_clamp(current_clamp) if file_current_clamp is not None: self._add_file_current_clamp(file_current_clamp) if se_voltage_clamp is not None: try: se_voltage_clamp['gids'] except: se_voltage_clamp['gids'] = 'all' self._add_se_voltage_clamp(se_voltage_clamp) if spikes_inputs!=None: self._add_spikes_inputs(spikes_inputs) if use_relative_paths: self._add_manifest(self._simulation_config, output_dir=self.output_dir) if os.path.exists(base_config['simulation']) and not overwrite_config: logging.warning(f'Configuration file {base_config["simulation"]} already exists, skipping. ' f'Please delete existing file, use a different name, or use overwrite_config=True.') else: self._save_config(self._simulation_config, os.path.join(self._base_dir, 'simulation_'+config_filename+'.json')) self._copy_run_script()
[docs]class BioNetEnvBuilder(EnvBuilder): @property def examples_dir(self): return os.path.join(self.scripts_root, 'bionet') @property def target_simulator(self): return 'NEURON' @property def bmtk_simulator(self): return 'bionet' def _add_run_params(self, tstart=0.0, tstop=1000.0, dt=0.001, dL=20.0, spikes_threshold=-15.0, nsteps_block=5000, v_init=-80.0, celsius=34.0, **kwargs): self._simulation_config['run'] = { 'tstart': tstart, 'tstop': tstop, 'dt': dt, 'dL': dL, 'spike_threshold': spikes_threshold, 'nsteps_block': nsteps_block } self._simulation_config['conditions'] = { 'celsius': celsius, 'v_init': v_init }
[docs] def compile_mechanisms(self): mechanisms_dir = os.path.join(self.components_dir, 'mechanisms') logger.info('Attempting to compile NEURON mechanims under "{}"'.format(mechanisms_dir)) cwd = os.getcwd() try: os.chdir(os.path.join(mechanisms_dir)) call(['nrnivmodl', 'modfiles']) logger.info(' Success.') except Exception as e: logger.error(' Was unable to compile mechanism in {}'.format(mechanisms_dir)) os.chdir(cwd)
[docs]class PointNetEnvBuilder(EnvBuilder): @property def examples_dir(self): return os.path.join(self.scripts_root, 'pointnet') @property def target_simulator(self): return 'NEST' @property def bmtk_simulator(self): return 'pointnet' def _add_output_section(self): super(PointNetEnvBuilder, self)._add_output_section() self._simulation_config['output']['quiet_simulator'] = True
[docs]class FilterNetEnvBuilder(EnvBuilder): @property def examples_dir(self): return os.path.join(self.scripts_root, 'filternet') @property def target_simulator(self): return 'LGNModel' @property def bmtk_simulator(self): return 'filternet' def _add_output_section(self): super(FilterNetEnvBuilder, self)._add_output_section() self._simulation_config['output']['rates_file_csv'] = 'rates.csv' self._simulation_config['output']['spikes_file_csv'] = 'spikes.csv' self._simulation_config['output']['spikes_file_h5'] = 'spikes.h5'
[docs]class PopNetEnvBuilder(EnvBuilder): @property def examples_dir(self): return os.path.join(self.scripts_root, 'popnet') @property def target_simulator(self): return 'DiPDE' @property def bmtk_simulator(self): return 'popnet' def _add_output_section(self): super(PopNetEnvBuilder, self)._add_output_section() self._simulation_config['output']['rates_file'] = "firing_rates.csv"
[docs]def build_env_bionet(base_dir='.', network_dir=None, components_dir=None, node_sets_file=None, include_examples=False, overwrite_config=False, tstart=0.0, tstop=1000.0, dt=0.001, dL=20.0, spikes_threshold=-15.0, nsteps_block=5000, v_init=-80.0, celsius=34.0, report_vars=[], report_nodes=None, clamp_reports=[], current_clamp=None, file_current_clamp=None, se_voltage_clamp=None, spikes_inputs=None, compile_mechanisms=False, use_relative_paths=True, config_file=None): env_builder = BioNetEnvBuilder(base_dir=base_dir, network_dir=network_dir, components_dir=components_dir, node_sets_file=node_sets_file) env_builder.build(include_examples=include_examples, use_relative_paths=use_relative_paths, report_vars=report_vars, report_nodes=report_nodes, clamp_reports=clamp_reports, current_clamp=current_clamp, file_current_clamp=file_current_clamp, se_voltage_clamp=se_voltage_clamp, spikes_inputs=spikes_inputs, tstart=tstart, tstop=tstop, dt=dt, dL=dL, spikes_threshold=spikes_threshold, nsteps_block=nsteps_block, v_init=v_init, celsius=celsius, config_file=config_file, overwrite_config=overwrite_config) if compile_mechanisms: env_builder.compile_mechanisms()
[docs]def build_env_pointnet(base_dir='.', network_dir=None, components_dir=None, node_sets_file=None, include_examples=False, tstart=0.0, tstop=1000.0, dt=0.001, dL=20.0, spikes_threshold=-15.0, nsteps_block=5000, v_init=-80.0, celsius=34.0, report_vars=[], report_nodes=None, current_clamp=None, spikes_inputs=None, use_relative_paths=True, config_file=None): env_builder = PointNetEnvBuilder(base_dir=base_dir, network_dir=network_dir, components_dir=components_dir, node_sets_file=node_sets_file) env_builder.build(include_examples=include_examples, use_relative_paths=use_relative_paths, report_vars=report_vars, report_nodes=report_nodes, current_clamp=current_clamp, spikes_inputs=spikes_inputs, tstart=tstart, tstop=tstop, dt=dt, dL=dL, spikes_threshold=spikes_threshold, nsteps_block=nsteps_block, v_init=v_init, celsius=celsius)
[docs]def build_env_filternet(base_dir='.', network_dir=None, components_dir=None, node_sets_file=None, include_examples=False, tstart=0.0, tstop=1000.0, config_file='config.json'): env_builder = FilterNetEnvBuilder(base_dir=base_dir, network_dir=network_dir, components_dir=components_dir, node_sets_file=node_sets_file) env_builder.build(include_examples=include_examples, base_dir=base_dir, network_dir=network_dir, components_dir=components_dir, tstart=tstart, tstop=tstop, config_file=config_file)
[docs]def build_env_popnet(base_dir='.', network_dir=None, components_dir=None, node_sets_file=None, reports=None, include_examples=True, config_file=None, tstart=0.0, tstop=1000.0, dt=0.001, **args): # raise NotImplementedError() env_builder = PopNetEnvBuilder(base_dir=base_dir, network_dir=network_dir, components_dir=components_dir, node_sets_file=node_sets_file) env_builder.build(include_examples=include_examples, base_dir=base_dir, network_dir=network_dir, components_dir=components_dir, tstart=tstart, tstop=tstop, config_file=config_file)
# simulator='popnet' # target_simulator='DiPDE' # components_dir='pop_components' # # # Copy run script # copy_run_script(base_dir=base_dir, simulator=simulator, run_script='run_{}.py'.format(simulator)) # # # Build circuit_config and componenets directory # circuit_config = build_circuit_env(base_dir=base_dir, network_dir=network_dir, components_dir=components_dir, # simulator=simulator, with_examples=with_examples) # circuit_config['components']['population_models_dir'] = '$COMPONENTS_DIR/population_models' # # population_models_dir = os.path.join(base_dir, components_dir, 'population_models') # if with_examples: # models_dir = os.path.join(base_dir, components_dir, 'population_models') # if os.path.exists(models_dir): # shutil.rmtree(models_dir) # shutil.copytree(os.path.join(scripts_path, simulator, 'population_models'), models_dir) # # copy_config(base_dir, circuit_config, 'circuit_config.json') # # # Build simulation config # simulation_config = build_simulation_env(base_dir=base_dir, target_simulator=target_simulator, tstop=tstop, dt=dt, # reports=reports) # # PopNet doesn't produce spike files so instead need to replace them with rates files # for output_key in simulation_config['output'].keys(): # if output_key.startswith('spikes'): # del simulation_config['output'][output_key] # # simulation_config['output']['rates_file_csv'] = 'firing_rates.csv' # simulation_config['output']['rates_file'] = 'firing_rates.csv' # # copy_config(base_dir, simulation_config, 'simulation_config.json') if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(module)s [%(levelname)s] %(message)s') parser = OptionParser(usage="Usage: python %prog [options] [bionet|pointnet|popnet|filternet] sim_dir") parser.add_option('-n', '--network_dir', dest='network_dir', default=None, help="Use an exsting directory with network files.") parser.add_option('--tstop', type='float', dest='tstop', default=1000.0) parser.add_option('--dt', type=float, dest='dt', help='simulation time step dt', default=0.001) # For membrane report def membrane_report_parser(option, opt, value, parser): parser.values.has_membrane_report = True if ',' in value: try: setattr(parser.values, option.dest, [int(v) for v in value.split(',')]) except ValueError as ve: setattr(parser.values, option.dest, value.split(',')) else: setattr(parser.values, option.dest, value) parser.add_option('--report-vars', dest='mem_rep_vars', type='string', action='callback', callback=membrane_report_parser, default=[], help='A list of membrane variables to record from; v, cai, etc.') parser.add_option('--report-nodes', dest='mem_rep_cells', type='string', action='callback', callback=membrane_report_parser, default=None) parser.add_option('--iclamp', dest='current_clamp', type='string', action='callback', callback=membrane_report_parser, default=None, help='Adds a soma current clamp using three variables: <amp>,<delay>,<duration> (nA, ms, ms)') parser.add_option('--spikes-inputs', dest='spikes_input', type='string', action='callback', callback=membrane_report_parser, default=None) parser.add_option('--include-examples', dest='include_examples', action='store_true', default=False, help='Copies component files used by examples and tutorials.') parser.add_option('--compile-mechanisms', dest='compile_mechanisms', action='store_true', default=False, help='Will try to compile the NEURON mechanisms (BioNet only).') parser.add_option('--config-file', dest='config_file', type='string', default='config.json', help='Name of configuration json file.') options, args = parser.parse_args() # Check the passed in argments are correct. [sim] </path/to/dir/> if len(args) < 2: parser.error('Invalid number of arguments, Please specify a target simulation (bionet, pointnet, filternet,' 'popnet) and the path to save the simulation environment.') elif len(args) > 2: parser.error('Unrecognized arguments {}'.format(args[2:])) else: target_sim = args[0].lower() if target_sim not in ['bionet', 'popnet', 'pointnet', 'filternet']: parser.error('Must specify one target simulator. options: "bionet", pointnet", "popnet", "filternet"') base_dir = args[1] if options.current_clamp is not None: cc_args = options.current_clamp if len(cc_args) != 3: parser.error('Invalid arguments for current clamp, requires three floating point numbers ' '<ampage>,<delay>,<duration> (nA, ms, ms)') iclamp_args = {'amp': float(cc_args[0]), 'delay': float(cc_args[1]), 'duration': float(cc_args[2])} else: iclamp_args = None spikes_inputs = [] if options.spikes_input is not None: spikes = [options.spikes_input] if isinstance(options.spikes_input, str) else list(options.spikes_input) for spike_str in spikes: vals = spike_str.split(':') if len(vals) == 1: spikes_inputs.append((None, vals[0])) elif len(vals) == 2: spikes_inputs.append((vals[0], vals[1])) else: parser.error('Cannot parse spike-input string <pop1>:<spikes-file1>,<pop2>:<spikes-file2>,...') if target_sim == 'bionet': build_env_bionet(base_dir=base_dir, network_dir=options.network_dir, tstop=options.tstop, dt=options.dt, report_vars=options.mem_rep_vars, report_nodes=options.mem_rep_cells, current_clamp=iclamp_args, include_examples=options.include_examples, spikes_inputs=spikes_inputs, compile_mechanisms=options.compile_mechanisms, config_file=options.config_file ) if target_sim == 'pointnet': build_env_pointnet(base_dir=base_dir, network_dir=options.network_dir, tstop=options.tstop, dt=options.dt, report_vars=options.mem_rep_vars, report_nodes=options.mem_rep_cells, current_clamp=iclamp_args, include_examples=options.include_examples, spikes_inputs=spikes_inputs) elif target_sim == 'popnet': build_env_popnet(base_dir=base_dir, network_dir=options.network_dir, tstop=options.tstop, dt=options.dt, config_file=options.config_file) elif target_sim == 'filternet': build_env_filternet(base_dir=base_dir, network_dir=options.network_dir, tstop=options.tstop, include_examples=options.include_examples, config_file=options.config_file)