Source code for bmtk.utils.sonata.edge_stats

import pandas as pd
import h5py
import numpy as np
import matplotlib.pyplot as plt

from bmtk.utils.sonata.config import SonataConfig
from bmtk.utils.sonata.utils import get_attribute_h5
from bmtk.utils.sonata.config import SonataConfig


[docs]def to_edges_dataframe(edges_pop_h5, edge_types_path=None, with_properties=True): edges_df = pd.DataFrame({ 'source_node_id': edges_pop_h5['source_node_id'][()], 'target_node_id': edges_pop_h5['target_node_id'][()], 'edge_type_id': edges_pop_h5['edge_type_id'][()], 'edge_group_id': edges_pop_h5['edge_group_id'][()], 'edge_group_index': edges_pop_h5['edge_group_index'][()], }) if with_properties: if isinstance(with_properties, (list, tuple)): include_prop = lambda s: s in with_properties elif isinstance(with_properties, str): include_prop = lambda s: s == with_properties else: include_prop = lambda s: True if edge_types_path: edge_types_df = pd.read_csv(edge_types_path, sep=' ') edge_types_df = edge_types_df[[c for c in edge_types_df.columns if include_prop(c) or c == 'edge_type_id']] edges_df = pd.merge(edges_df, edge_types_df, how='left', on='edge_type_id') grp_ids = np.unique(edges_pop_h5['edge_group_id'][()]) edge_props_cols = set() for grp_id in grp_ids: edge_group = edges_pop_h5[str(grp_id)] for n, g in edge_group.items(): if isinstance(g, h5py.Dataset) and include_prop(n): edge_props_cols.add(n) for col in edge_props_cols: edges_df[col] = None ind_beg = 0 for egid, egid_df in edges_df.groupby('edge_group_id'): ind_end = ind_beg + len(egid_df) for n, d in edges_pop_h5[str(egid)].items(): if include_prop(n): prop_data = d[()] edges_df.iloc[ind_beg:ind_end, edges_df.columns.get_loc(n)] = prop_data ind_beg = ind_end edges_df = edges_df.drop(columns=['edge_group_id', 'edge_group_index']) return edges_df
[docs]def to_nodes_dataframe(nodes_pop_h5, node_types_path=None, with_properties=True): nodes_df = pd.DataFrame({ 'node_id': nodes_pop_h5['node_id'][()], 'node_type_id': nodes_pop_h5['node_type_id'][()], 'node_group_id': nodes_pop_h5['node_group_id'][()], 'node_group_index': nodes_pop_h5['node_group_index'][()], }) if with_properties: if isinstance(with_properties, (list, tuple)): include_prop = lambda s: s in with_properties elif isinstance(with_properties, str): include_prop = lambda s: s == with_properties else: include_prop = lambda s: True if node_types_path: node_types_df = pd.read_csv(node_types_path, sep=' ') node_types_df = node_types_df[[c for c in node_types_df.columns if include_prop(c) or c == 'node_type_id']] nodes_df = pd.merge(nodes_df, node_types_df, how='left', on='node_type_id') grp_ids = np.unique(nodes_pop_h5['node_group_id'][()]) node_props_cols = set() for grp_id in grp_ids: edge_group = nodes_pop_h5[str(grp_id)] for n, g in edge_group.items(): if isinstance(g, h5py.Dataset) and include_prop(n): node_props_cols.add(n) for col in node_props_cols: nodes_df[col] = None ind_beg = 0 for egid, egid_df in nodes_df.groupby('node_group_id'): ind_end = ind_beg + len(egid_df) for n, d in nodes_pop_h5[str(egid)].items(): if include_prop(n): prop_data = d[()] nodes_df.iloc[ind_beg:ind_end, nodes_df.columns.get_loc(n)] = prop_data ind_beg = ind_end nodes_df = nodes_df.drop(columns=['node_group_id', 'node_group_index']) return nodes_df
class _SonataFP(object): def __init__(self, name, h5_grp, csv): self.name = name self.h5_grp = h5_grp self.csv = csv def __read_sonata(h5_file, csv_file=None): """Open an individual sonata file and returns path to each nodes/edges groups inside hdf5""" edge_populations = {} node_populations = {} h5 = h5py.File(h5_file, 'r') if 'edges' in h5: for ename, epop in h5['/edges'].items(): if isinstance(epop, h5py.Group): edge_populations[ename] = _SonataFP(ename, epop, csv_file) if 'nodes' in h5: for ename, epop in h5['/nodes'].items(): if isinstance(epop, h5py.Group): node_populations[ename] = _SonataFP(ename, epop, csv_file) return node_populations, edge_populations def __read_sonata_files(edge_objs): edge_objs = __to_list(edge_objs) # The list of edge objects can include sonata configs or raw edges files, in which case we want # to convert them to a dict {'edges_files': ..., 'edge_types_file': ...} where it can be easily # processed below. edge_objs_update = [] for eo in edge_objs: if isinstance(eo, str): try: cfg = SonataConfig.from_json(eo) for net_dict in cfg.nodes + cfg.edges: edge_objs_update.append(net_dict) except Exception: edge_objs_update.append({'edges_file': eo}) else: edge_objs_update.append(eo) edges = {} nodes = {} for eo in edge_objs_update: if isinstance(eo, dict): edges_file = eo.get('edges_file', None) edge_types_file = eo.get('edge_types_file', None) if edges_file: sonata_nodes, sonata_edges = __read_sonata(edges_file, edge_types_file) nodes.update(sonata_nodes) edges.update(sonata_edges) nodes_file = eo.get('nodes_file', None) node_types_file = eo.get('node_types_file', None) if nodes_file: sonata_nodes, sonata_edges = __read_sonata(nodes_file, node_types_file) nodes.update(sonata_nodes) edges.update(sonata_edges) return nodes, edges def __to_list(vals): """Helper function for turning parameters into lists""" if vals is None: return [] elif isinstance(vals, (list, tuple, np.ndarray, pd.Series)): return vals else: return [vals] def __load_dist_as_df(edges_obj, **kwopts): # If edges_obj is a csv file, or path to a csv, open it up and return # it as a pandas data frame if isinstance(edges_obj, pd.DataFrame): return edges_obj if isinstance(edges_obj, str): # Check to see if file can be open as a csv-file and return it. try: return pd.read_csv(edges_obj, sep=' ') except Exception: pass # Otherwise assume edges_obj is a config file or sonata file in which case we # need to read the raw edges and return it as a pandas DataFrame. if kwopts.get('edge_props', '') in ['nconns', 'nconnections', 'n_connections']: # The normal edge_props_distribtion doesn't accurately handle returning just # the number of cell-to-cell connections, need to call different function return nconnections_distributions(edges_obj, **kwopts) else: return edge_props_distribution(edges_obj, **kwopts) def __combine_data(edges_obj1, edges_obj2, edge_prop, fill_val=None, **kwargs): """A helper function for doing a join of two edges DataFrames, for comparing the "edge_prop" values between the two edges.""" # convert two edges objects (config files, csv, or paths to sonata files, etc.) to dataframe df1 = __load_dist_as_df(edges_obj1, edge_prop=edge_prop, **kwargs) df2 = __load_dist_as_df(edges_obj2, edge_prop=edge_prop, **kwargs) # find columns to join on, don't include property being study and any other population columns orig_cols = set([c for c in df1.columns if c not in [edge_prop] + ['population', 'source_population', 'target_population']]) new_cols = set([c for c in df2.columns if c not in [edge_prop] + ['population', 'source_population', 'target_population']]) join_cols = list(orig_cols and new_cols) # join the two DataFrames and return comb_df = df1.merge(df2, how='inner', on=join_cols, suffixes=('_orig', '_new')) if fill_val: comb_df = comb_df.fillna(fill_val) return comb_df
[docs]def edge_props_distribution(edge_files, edge_prop, populations=None, edge_props_grouping=None, source_props_grouping=None, target_props_grouping=None, fill_val=1, operation='sum', population_columns=False): """Reads in one or more SONATA edges files and return a DataFrame consisting of the distribution of a given edge property across an arbitary grouping of cells. For example return the total number of synapses between each source/target node-type, or the mean syn_weights for edge edge-type, or the variance of connecting in-degrees across morphologies. By default will return a DataFrame of each "source_node_id", "target_node_id", and "<edge_prop>" row found in the edge file(s). But you can group the rows by any property/column found in the edges, source or target populations; and apply an arbitary <operation> on the results. The <edge_prop> can be any property/column found in the edges, target-nodes, or source-nodes populations. But if the property is not a numeric the <operation> applied to it may fail. :param edge_files: str, dict, list of str or dict. Is the edges (and optional nodes) files paths. It can be a SONATA h5 file, a dictionary of h5/csv files, or the location(s) of SONATA config files. :param edge_prop: string, property name (column if h5 or csv) file that is being investigated. :param populations: string or list of strings. If SONATA file(s) contains multiple edge populations you can specify . :param edge_props_grouping: str or list[str]. List of columns in edges file(s) to group results by. :param source_props_grouping: str or list[str]. List of columns in source-node file(s) to group results by. :param target_props_grouping: str or list[str]. List of columns in target-node file(s) to group results by. :param fill_val: If <edge_prop> has missing/None/NaN values will fill in with given value. set to None to Turn off. :param operation: Str or function: pandas or numpy function to apply to when doing the grouping, eg. 'sum', 'mean', np.std. :param population_columns: If set to true will return extra column describing the edges/nodes populations for each row. """ edge_props_grouping = __to_list(edge_props_grouping) source_props_grouping = __to_list(source_props_grouping) target_props_grouping = __to_list(target_props_grouping) populations = __to_list(populations) nodes, edges = __read_sonata_files(edge_files) ret_edges_df = None for edge_pop_name, edges_fp in edges.items(): if populations and edge_pop_name not in populations: continue # return edges population as a dataframe without only relevant columns edges_df = to_edges_dataframe(edges_fp.h5_grp, edges_fp.csv, with_properties=edge_props_grouping + [edge_prop]) # When grouping/filtering by properties in the source-nodes population if source_props_grouping: # Find the population-name of the source-nodes and get the nodes as a dataframe source_node_pop = get_attribute_h5(edges_fp.h5_grp['source_node_id'], 'node_population') nodes_fp = nodes[source_node_pop] src_nodes_df = to_nodes_dataframe(nodes_fp.h5_grp, nodes_fp.csv, with_properties=['node_id'] + source_props_grouping) # prepend "source_" to all source-node columns so it can be distinguished from the target nodes column sets src_nodes_df.columns = ['source_{}'.format(c) for c in src_nodes_df.columns] source_props_grouping = ['source_{}'.format(c) for c in source_props_grouping] # merge source-nodes dataframe onto the edges edges_df = pd.merge(edges_df, src_nodes_df, how='left', on='source_node_id') if target_props_grouping: # Same as with source-nodes but this time for the target/post-synaptic population of cells. target_node_pop = get_attribute_h5(edges_fp.h5_grp['target_node_id'], 'node_population') nodes_fp = nodes[target_node_pop] trg_nodes_df = to_nodes_dataframe(nodes_fp.h5_grp, nodes_fp.csv, with_properties=['node_id'] + target_props_grouping) trg_nodes_df.columns = ['target_{}'.format(c) for c in trg_nodes_df.columns] target_props_grouping = ['target_{}'.format(c) for c in target_props_grouping] edges_df = pd.merge(edges_df, trg_nodes_df, how='left', on='target_node_id') if fill_val is not False: edges_df[edge_prop] = fill_val if edge_prop not in edges_df.columns else edges_df[edge_prop].fillna(fill_val) grouping_cols = edge_props_grouping + source_props_grouping + target_props_grouping if not grouping_cols: dist_df = edges_df else: dist_df = edges_df[grouping_cols + [edge_prop]].groupby(grouping_cols)[edge_prop].agg(operation).reset_index() if population_columns and 'population' not in dist_df: dist_df['population'] = edge_pop_name # TODO: Instead of concatenating together multiple DataFrames, we should return them as a list, tuple, or Dict? ret_edges_df = dist_df if ret_edges_df is None else pd.concat([ret_edges_df, dist_df]) return ret_edges_df
[docs]def nsyns_distribution(edge_files, populations=None, edge_props_grouping=None, source_props_grouping=None, target_props_grouping=None): """Reads in one or more SONATA edges files and return a DataFrame consisting of the total number of synapses given any arbitary grouping of network properties. The property will be called "nsyns" in the returned table. Similar to edge_props_distribution(). Note: Each cell-to-cell connection may have mutiliple synapses. Use nconnections_distributions() to get distribution of the raw connectivity map. :param edge_files: str, dict, list of str or dict. Is the edges (and optional nodes) files paths. It can be a SONATA h5 file, a dictionary of h5/csv files, or the location(s) of SONATA config files. :param populations: string or list of strings. If SONATA file(s) contains multiple edge populations you can specify . :param source_props_grouping: str or list[str]. List of columns in source-node file(s) to group results by. :param target_props_grouping: str or list[str]. List of columns in target-node file(s) to group results by. """ return edge_props_distribution( edge_files=edge_files, edge_prop='nsyns', populations=populations, edge_props_grouping=edge_props_grouping, source_props_grouping=source_props_grouping, target_props_grouping=target_props_grouping, fill_val=1, operation='sum' )
[docs]def nconnections_distributions(edge_files, populations=None, edge_props_grouping=None, source_props_grouping=None, target_props_grouping=None, **kwopts): """Reads in one or more SONATA edges files and return a DataFrame consisting of the total number of connection given any arbitary grouping of network properties. The property will be called "nconns" in the returned table. Similar to edge_props_distribution(). Note: A connection here just refers to whether-or-not two cells are connected (including autapses) and ignores the number of synapses between each cell. Use nsyns_distribution() to get distribution of the number of synapses. :param edge_files: str, dict, list of str or dict. Is the edges (and optional nodes) files paths. It can be a SONATA h5 file, a dictionary of h5/csv files, or the location(s) of SONATA config files. :param populations: string or list of strings. If SONATA file(s) contains multiple edge populations you can specify . :param source_props_grouping: str or list[str]. List of columns in source-node file(s) to group results by. :param target_props_grouping: str or list[str]. List of columns in target-node file(s) to group results by. """ tmp_edge_props = __to_list(edge_props_grouping) + ['source_node_id', 'target_node_id'] edges_df = edge_props_distribution( edge_files=edge_files, edge_prop='_conns_', populations=populations, edge_props_grouping=tmp_edge_props, source_props_grouping=source_props_grouping, target_props_grouping=target_props_grouping, fill_val=1, operation='count' ) edges_df = edges_df.drop(columns=['_conns_']) edges_df = edges_df.drop_duplicates() if len(edges_df.columns) > 2: # Drop the source_node_id and target_node_id, but only if some type of edge/node grouping is specified, otherwise just return # list of source_node_id and target_node_id edges_df = edges_df.drop(columns=['source_node_id', 'target_node_id']) return edges_df.value_counts().reset_index(name='nconnections')
[docs]def plot_distribution(edges_data, edge_prop, names=None, log_scale=False, ax=None, show=True, **kwopts): """Plots the distribution of an edge property given an arbitary grouping of rows based on edge, target or source node columns. :param edge_files: str, dict, list of str or dict. Is the edges (and optional nodes) files paths. It can be a SONATA h5 file, a dictionary of h5/csv files, or the location(s) of SONATA config files. Can also be a csv file containg results from edge_props_distribution() function :param edge_prop: string, property name (column if h5 or csv) file that is being investigated. :param names: A list of names/titles to use for the labeling of the distribution(s) curves. If None then will infer from data. :param log_scale: If set to True then use log-scale on the x-axis. Default: False. :param ax: If true will save distribution plot to pre-generated matplotlib axis, for merging into another figure. Default: None. :param show: If true will plot distribution. Default: True. :param kwargs: optional args that will be passed into edge_props_distribution() function. """ edges_data = __to_list(edges_data) names = [str(ed) for ed in edges_data] if names is None else names if ax is None: fig, ax = plt.subplots(1, 1) for csv_path, name in zip(edges_data, names): dist_df = __load_dist_as_df(csv_path, edge_prop=edge_prop, **kwopts) prop_counts = dist_df[edge_prop].values min_val, max_val = np.min(prop_counts), np.max(prop_counts) bins = np.logspace(np.log10(min_val), np.log10(max_val)) if log_scale else np.linspace(min_val, max_val) hist, bin_edges = np.histogram(prop_counts, bins=bins) ax.plot(bin_edges[:-1], hist, label=name) if log_scale: ax.set_xscale('log') ax.set_xlabel(edge_prop) ax.set_title(', '.join([c for c in dist_df.columns if c not in [edge_prop] + ['population', 'source_population', 'target_population']])) ax.legend(fontsize='xx-small') if show: # plt.legend() plt.show() return ax
[docs]def plot_correlation(edges_orig, edges_new, edge_prop, log_scale=False, ax=None, show=True): combined_df = __combine_data(edges_orig, edges_new, edge_prop) data_org = combined_df[edge_prop + '_orig'].values data_new = combined_df[edge_prop + '_new'].values if log_scale: data_org = np.log(data_org) data_new = np.log(data_new) if ax is None: fig, ax = plt.subplots(1, 1) ax.plot(data_org, data_new, '.') if log_scale: ax.set_xscale('log') ax.set_yscale('log') if show: plt.show()
[docs]def edge_stats_table(edges_data): edges_data = __to_list(edges_data) _, edges = __read_sonata_files(edges_data) pop_stats = {} for edge_pop_name, edges_fp in edges.items(): edges_df = to_edges_dataframe(edges_fp.h5_grp, edges_fp.csv, with_properties=['nsyns']) n_src_nodes = edges_df['source_node_id'].nunique() n_trg_nodes = edges_df['target_node_id'].nunique() n_edge_types = edges_df.pop('edge_type_id').nunique() edges_df['nsyns'] = 1 if 'nsyns' not in edges_df.columns else edges_df['nsyns'].fillna(1) conns_se = edges_df.groupby(['source_node_id', 'target_node_id'])['nsyns'].agg('sum') n_conns = len(conns_se) n_syns = np.sum(conns_se.values) pop_stats[edge_pop_name] = [n_src_nodes, n_trg_nodes, n_edge_types, n_conns, n_syns] stats_df = pd.DataFrame.from_dict(pop_stats) stats_df.index = ['n_sources', 'n_targets', 'n_edge_types', 'n_connections', 'n_synapses'] return stats_df
[docs]def pearson_r(edges_orig, edges_new, edge_prop, **kwargs): combined_df = __combine_data(edges_orig, edges_new, edge_prop, **kwargs) data_org = combined_df[edge_prop + '_orig'].values data_new = combined_df[edge_prop + '_new'].values return np.corrcoef(data_org, data_new)[0, 1]
[docs]def chisquare(edges_orig, edges_new, edge_prop, raw_counts=False, **kwargs): from scipy.stats import chisquare combined_df = __combine_data(edges_orig, edges_new, edge_prop, **kwargs) data_org = combined_df[edge_prop + '_orig'].values data_new = combined_df[edge_prop + '_new'].values if not raw_counts: data_org = data_org / np.sum(data_org) data_new = data_new / np.sum(data_new) results = chisquare(data_org, data_new) return results.statistic, results.pvalue
[docs]def kolmogorov_smirnov(edges_orig, edges_new, edge_prop, **kwargs): from scipy.stats import ks_2samp combined_df = __combine_data(edges_orig, edges_new, edge_prop, **kwargs) data_org = combined_df[edge_prop + '_orig'].values data_new = combined_df[edge_prop + '_new'].values results = ks_2samp(data_org, data_new) return results.statistic, results.pvalue
if __name__ == '__main__': # stat, pval = kolmogorov_smirnov('v1_edges.nsyns.orig.csv', 'v1_edges.nsyns.rebuilt.csv', 'nsyns') # stat, pval = chisquare('v1_edges.nsyns.orig.csv', 'v1_edges.nsyns.rebuilt.csv', 'nsyns', raw_counts=True) # r1 = pearson_r('v1_edges.nsyns.orig.csv', 'v1_edges.nsyns.rebuilt.csv', 'nsyns') # print(pval) # plot_correlation('v1_edges.nsyns.orig.csv', 'v1_edges.nsyns.rebuilt.csv', 'nsyns', log_scale=True) # plot_correlation('v1_edges.nconns.orig.csv', 'v1_edges.nconns.rebuilt.csv', 'nconnections') # plot_distribution(['v1_edges.nsyns.orig.csv', 'v1_edges.nsyns.rebuilt.csv'], edge_prop='nsyns', log_scale=True, show=False) # plot_distribution(['v1_edges.nconns.orig.csv', 'v1_edges.nconns.rebuilt.csv'], edge_prop='nconnections', log_scale=True, show=False) # plot_distribution('config.orig.json', 'nsyns') # nconn_edges_df = nconnections_distributions( # 'config.orig.json', # source_props_grouping='pop_name', # target_props_grouping='pop_name' # ) # plot_distribution( # ['config.orig.json', 'config.rebuilt.json'], # edge_prop='nsyns', # populations='v1_to_v1', # source_props_grouping='pop_name', # target_props_grouping='pop_name', # log_scale=True # ) # plot_distribution( # ['config.orig.json', 'config.rebuilt.json'], # edge_prop='nconns', # # operation='nconns', # populations='v1_to_v1', # source_props_grouping='pop_name', # target_props_grouping='pop_name', # log_scale=True # ) # stat, pval = kolmogorov_smirnov( # 'config.orig.json', # 'config.rebuilt.json', # edge_prop='nsyns', # source_props_grouping='pop_name', # target_props_grouping='pop_name', # ) # print(stat, pval) # r2 = pearson_r( # 'config.orig.json', # 'config.rebuilt.json', # edge_prop='nsyns', # source_props_grouping='pop_name', # target_props_grouping='pop_name', # ) # print(r2) # stat, pval = chisquare( # 'config.orig.json', # 'config.rebuilt.json', # edge_prop='nsyns', # source_props_grouping='pop_name', # target_props_grouping='pop_name', # ) # print(stat, pval) # print(nconn_edges_df) # plt.show() edge_stats_table('config.orig.json') # edge_stats_table({"edges_file": "bio_450cells/internal_internal_edges.h5", "edge_types_file": "bio_450cells/internal_internal_edge_types.csv"})