Source code for bmtk.builder.index_builders

import os
import numpy as np
import pandas as pd
import h5py
import logging


logger = logging.getLogger(__name__)

# MAX_EDGE_READS = 500000000
# MAX_EDGE_READS = 200000000


def _get_names(index_type):
    if index_type.lower() in ['target', 'target_id', 'target_node_id', 'target_node_ids']:
        col_to_index = 'target_node_id'
        index_grp_name = 'indices/target_to_source'
    elif index_type in ['source', 'source_id', 'source_node_id', 'source_node_ids']:
        col_to_index = 'source_node_id'
        index_grp_name = 'indices/source_to_target'
    elif index_type in ['edge_type', 'edge_type_id', 'edge_type_ids']:
        col_to_index = 'edge_type_id'
        index_grp_name = 'indices/edge_type_to_index'
    else:
        raise ValueError('Unknown edges parameter {}'.format(index_type))

    return col_to_index, index_grp_name


[docs]def remove_index(edges_file, edges_population): with h5py.File(edges_file, mode='r+') as edges_h5: edges_pop_grp = edges_h5[edges_population] del edges_pop_grp['indices']
[docs]def create_index_in_memory(edges_file, edges_population, index_type, force_rebuild=True, compression='gzip', **kwargs): col_to_index, index_grp_name = _get_names(index_type) with h5py.File(edges_file, mode='r+') as edges_h5: edges_pop_grp = edges_h5[edges_population] if index_grp_name in edges_pop_grp: # Remove existing index if it exists if not force_rebuild: logger.debug('create_index_in_memory> Edges index {} already exists, skipping.'.format(index_grp_name)) else: logger.debug('create_index_in_memory> Removing existing index {}.'.format(index_grp_name)) del edges_pop_grp[index_grp_name] index_grp = edges_pop_grp.create_group(index_grp_name) ids_array = np.array(edges_pop_grp[col_to_index]) # ids to be indexed total_edges = len(edges_pop_grp[col_to_index]) if total_edges == 0: logger.warning('edges file {} does not contain any edges.'.format(edges_file)) return # group together and save ids so contigous duplicates are represented using ranges, creating a range -> edge # index table. # eg. [10 10 10 10 10 32 32 32 10 10 ...] ==> ... 10: [(0, 5), (8, 10)], 32: [(5, 8)], ... logger.debug('create_index_in_memory> Creating range_to_edge_id table') ids_diffs = np.diff(ids_array) ids_diffs_idx = ids_diffs.nonzero()[0] ranges_beg = np.concatenate(([0], ids_diffs_idx + 1)) ranges_end = np.concatenate((ids_diffs_idx + 1, [total_edges])) id_idxs = np.concatenate(([0], ids_diffs_idx + 1)) r2e_table_df = pd.DataFrame({ 'lu_ids': ids_array[id_idxs], 'range_beg': ranges_beg, 'range_end': ranges_end }).sort_values('lu_ids') index_grp.create_dataset('range_to_edge_id', data=r2e_table_df[['range_beg', 'range_end']].values, dtype='uint64', compression=compression) # np.uint64) # create a map to the range_to_edge_id dataset from id --> blocks ranges. The id value is implicitly equal to # the index # TODO: See if del r2e_table_df will significantly improve memory footprint? logger.debug('create_index_in_memory> Creating node_id_to_range table') ordered_ids = np.array(r2e_table_df['lu_ids']) ordered_ids_diffs = np.diff(ordered_ids).nonzero()[0] ordered_ids_beg = np.concatenate(([0], ordered_ids_diffs+1)) ordered_ids_end = np.concatenate((ordered_ids_diffs+1, [len(r2e_table_df)])) id_idxs = np.concatenate(([0], ordered_ids_diffs+1)) i2r_table_df = pd.DataFrame({ 'lu_ids': ordered_ids[id_idxs], 'range_beg': ordered_ids_beg, 'range_end': ordered_ids_end }).set_index('lu_ids') # There may be missing values in the id_cols table so fil them in i2r_table_df = i2r_table_df.reindex(pd.RangeIndex(i2r_table_df.index.max()+1), fill_value=0) index_grp.create_dataset('node_id_to_range', data=i2r_table_df[['range_beg', 'range_end']].values, dtype=np.uint64, compression=compression)
[docs]def create_index_on_disk(edges_file, edges_population, index_type, force_rebuild=False, cache_file=None, max_edge_reads=200000000, **kwargs): col_to_index, index_grp_name = _get_names(index_type) cache_file = cache_file if cache_file is not None else edges_file mode = 'r+' if os.path.exists(cache_file) else 'w' with h5py.File(cache_file, mode=mode) as cache_h5: # Step 1: Separate the column being indexed into N partitons each of less than max_edge_reads length. Build the # index for each partition and save to disk. Make sure we don't have to have the read the entire col_to_index # in memory at one time edges_h5 = cache_h5 if edges_file == cache_file else h5py.File(edges_file, 'r') edges_root_grp = edges_h5[edges_population] if edges_population not in cache_h5: caches_root_grp = cache_h5.create_group(edges_population) else: caches_root_grp = cache_h5[edges_population] total_edges = edges_root_grp[col_to_index].shape[0] partition_size = np.min((max_edge_reads, total_edges)) n_partitions = np.ceil(total_edges / max_edge_reads).astype(np.uint) block_begin_idx = 0 # initial index of current partition being created partition_index = 0 max_id = 0 total_blocks = 0 if force_rebuild: del caches_root_grp[index_grp_name] if index_grp_name not in caches_root_grp or 'cache' not in caches_root_grp[index_grp_name]: index_grp = caches_root_grp.create_group(index_grp_name) \ if index_grp_name not in caches_root_grp else caches_root_grp[index_grp_name] cache_grp = index_grp.create_group('cache') cache_grp.attrs['max_edge_reads'] = max_edge_reads cache_grp.attrs['cache_partition_size'] = partition_size else: cache_grp = caches_root_grp[index_grp_name]['cache'] max_id = cache_grp.attrs.get('max_id', max_id) if cache_grp.attrs['max_edge_reads'] != max_edge_reads: raise Exception( 'Cache already exists but has a conflicting max_edge_reads' '({} vs {}). Use force-rebuild'.format(cache_grp.attrs['max_edge_reads'], max_edge_reads) ) if cache_grp.attrs['cache_partition_size'] != partition_size: raise Exception( 'Cache already exists but has a conflicting partition_size' '({} vs {}). Use force-rebuild'.format(cache_grp.attrs['cache_partition_size'], partition_size) ) logger.debug('Number of edges, {}, exceeds maximum ({}).'.format(total_edges, max_edge_reads)) logger.debug('Separating into {} partitions'.format(n_partitions)) while block_begin_idx < total_edges: # cache_grp_name = index_grp_name + '/cache/edges_partition_{}'.format(partition_index) partition_grp_name = 'edges_partition_{}'.format(partition_index) if partition_grp_name in cache_grp: logger.debug('Cache {}/cache/{} already exists, skipping'.format(index_grp_name, partition_grp_name)) block_begin_idx += partition_size partition_index += 1 total_blocks += cache_grp[partition_grp_name]['partitioned_table'].shape[0] continue logger.debug('Creating cache {} of {} to {}/{}'.format(partition_index+1, n_partitions, index_grp_name, partition_grp_name)) block_end_idx = block_begin_idx + partition_size edge_ids = edges_root_grp[col_to_index][block_begin_idx:block_end_idx] # Group together contigious duplicates to create a list of [id [edge_index_beg, edge_index_end)], sort by # id and save table to disk diffs = np.diff(edge_ids) diffs_idx = diffs.nonzero()[0] ranges_beg = np.concatenate(([0], diffs_idx + 1)) + block_begin_idx ranges_end = np.concatenate((diffs_idx + 1, [partition_size])) + block_begin_idx ranges_end[-1] = np.min((total_edges+1, ranges_end[-1])) id_idxs = np.concatenate(([0], diffs_idx + 1)) r2e_table_df = pd.DataFrame({ 'lu_ids': edge_ids[id_idxs], 'range_beg': ranges_beg, 'range_end': ranges_end }).sort_values('lu_ids') # Creates a lookup table id --> ranges ordered_ids = r2e_table_df['lu_ids'].values # id_block_indxs = np.diff(ordered_ids).nonzero()[0] + 1 # id_block_sizes = np.concatenate(([id_block_indxs[0]], np.diff(id_block_indxs))) # sums_vals = np.ones(len(ordered_ids), dtype=np.int32) # sums_vals[id_block_indxs] = -id_block_sizes + 1 ordered_ids_diffs = np.diff(ordered_ids).nonzero()[0] ordered_ids_beg = np.concatenate(([0], ordered_ids_diffs + 1)) ordered_ids_end = np.concatenate((ordered_ids_diffs + 1, [len(r2e_table_df)])) id_idxs = np.concatenate(([0], ordered_ids_diffs + 1)) i2r_table_df = pd.DataFrame({ 'lu_ids': ordered_ids[id_idxs], 'range_beg': ordered_ids_beg, 'range_end': ordered_ids_end }).set_index('lu_ids') # fill in missing ids and foward fill Nans with the last previous index index i2r_table_df = i2r_table_df.reindex(pd.RangeIndex(i2r_table_df.index.max() + 1)) i2r_table_df['range_end'] = i2r_table_df['range_end'].fillna(method='ffill') i2r_table_df['range_end'].fillna(0, inplace=True) nans_mask = i2r_table_df['range_beg'].isna() i2r_table_df['range_beg'][nans_mask] = i2r_table_df['range_end'][nans_mask] # Save partition to disk partition_grp = cache_grp.create_group(partition_grp_name) partition_grp.create_dataset('partitioned_table', data=r2e_table_df[['lu_ids', 'range_beg', 'range_end']].values, dtype='uint64') partition_grp.create_dataset('lookup_tables', data=i2r_table_df[['range_beg', 'range_end']].values, dtype='uint64') partition_grp.attrs['columns'] = 'id,range_beg,range_end' cache_h5.flush() max_id = np.max((max_id, int(np.max(ordered_ids)))) total_blocks += len(r2e_table_df) block_begin_idx += partition_size partition_index += 1 del r2e_table_df del i2r_table_df # TODO: Find max_id, total_blocks, and n_partitions and save as attribute in "cache" group cache_grp.attrs['max_id'] = max_id cache_grp.attrs['n_partitions'] = n_partitions cache_grp.attrs['total_blocks'] = total_blocks with h5py.File(cache_file, mode='r+') as cache_h5: # Step 2: Using the partitions we cached to disk in the previous step to create the actual index edges_pop_grp = cache_h5[edges_population] index_grp = edges_pop_grp[index_grp_name] if 'cache' not in index_grp: raise ValueError('Error, could not find hdf5 group {}/cache. Unable to proceed.'.format(index_grp_name)) cache_grp = edges_pop_grp[index_grp_name]['cache'] logger.debug('Building edges index {}'.format(index_grp_name)) total_blocks = cache_grp.attrs['total_blocks'] # total number of 'partitioned_table' rows across all partitions max_id = cache_grp.attrs['max_id'] # maximum value in 'col_to_index' across all partions n_partitions = len(cache_grp.keys()) # Go through each partition and build a master id lookup table. eg for each id should contains the number # of ranges and where their offsets would go when ordered. block_sizes = np.zeros(max_id + 1, dtype=np.uint32) for grp_name, grp in edges_pop_grp[index_grp_name]['cache'].items(): # block_sizes += np.diff(grp['lookup_tables']).flatten() partiton_max_id = grp['lookup_tables'].shape[0] # some partions may not contain all the ids block_sizes[:partiton_max_id] += np.diff(grp['lookup_tables']).flatten() # global_offsets = np.concatenate(([0], np.cumsum(block_sizes)[:-1])).astype(np.uint) # + 1 global_offsets = np.concatenate(([0], np.cumsum(block_sizes))).astype(np.uint) # + 1 ranges = np.vstack((global_offsets, np.concatenate((global_offsets[1:], [total_blocks])))).T # edges_pop_grp.create_dataset('{}/node_id_to_range'.format(index_grp), data=ranges, dtype=np.uint64) if not cache_grp.attrs.get('building_index', False): if 'node_id_to_range' in index_grp: del index_grp['node_id_to_range'] index_grp.create_dataset('node_id_to_range'.format(index_grp), data=ranges[:-1], dtype=np.uint64) if 'range_to_edge_id' in index_grp: del index_grp['range_to_edge_id'] index_grp.create_dataset('range_to_edge_id', (total_blocks, 2), dtype=np.uint64) # Using the ids offsets we can calculate the number N such that the associated "partitioned_table" for those N # ids have less than MAX_EDGE_READS rows (across all partitions). Procedure is to find N_0, iterate through all # the partitions to find ranges for ids [0, N_0], merge and sort and save to dataset. Then do the same thing # for ids [N_0, N_1] # TODO: Save index_beg, index_end, and block number as attributes in "cache" caches = {grp_name: grp for grp_name, grp in edges_pop_grp[index_grp_name]['cache'].items()} read_block_size = max_edge_reads / (n_partitions*2+1) if cache_grp.attrs.get('building_index', False): print('Index has already been partially built, resumming from last point') block_num = cache_grp.attrs['block_num'] id_beg = cache_grp.attrs['id_beg'] id_end = cache_grp.attrs['id_end'] else: # separating global offsets into N blocks. Need to track which block were on, the begging index/row of the # block and the end index/row of block cache_grp.attrs['building_index'] = True cache_grp.attrs['block_num'] = block_num = 1 cache_grp.attrs['id_beg'] = id_beg = 0 id_end = np.max((np.searchsorted(global_offsets, read_block_size * block_num, side='right'), 1)) id_end = np.min((max_id, id_end)) cache_grp.attrs['id_end'] = id_end while id_beg <= max_id: logger.debug('Building "range_to_edge_id" for nodes ({}, {})'.format(id_beg, id_end)) indx_beg = global_offsets[id_beg] indx_end = global_offsets[id_end+1] block_num += 1 to_remove = [] master_df = None for grp_name, grp in caches.items(): # Use "lookup_tables" to find the which rows in "partioned_table" corresponds to ids [id_beg, id_end]. # The partitioned tables are already sorted. if id_beg > grp['lookup_tables'].shape[0]: to_remove.append(grp_name) continue if id_end >= grp['lookup_tables'].shape[0]: part_beg, part_end = grp['lookup_tables'][id_beg][0], grp['lookup_tables'][-1][1] to_remove.append(grp_name) # del caches[grp_name] else: part_beg, part_end = grp['lookup_tables'][id_beg][0], grp['lookup_tables'][id_end][1] if part_end - part_beg <= 0: continue # possible partion does contain any edges for the subset of ids we are looking for tmp_df = pd.DataFrame({ 'id': grp['partitioned_table'][part_beg:part_end, 0], 'range_beg': grp['partitioned_table'][part_beg:part_end, 1], 'range_end': grp['partitioned_table'][part_beg:part_end, 2] }) master_df = tmp_df if master_df is None else pd.concat((master_df, tmp_df)) if master_df is not None: master_df = master_df.sort_values(['id', 'range_beg']) index_grp['range_to_edge_id'][indx_beg:indx_end, :] = master_df[['range_beg', 'range_end']] for name in to_remove: del caches[name] del master_df id_beg = id_end + 1 id_end = np.searchsorted(global_offsets, read_block_size*block_num, side='right') id_end = np.min((max_id, id_end)) # Save state of loop in case of failure and we need to progress later cache_grp.attrs['block_num'] = block_num cache_grp.attrs['id_beg'] = indx_beg cache_grp.attrs['id_end'] = indx_end # Remove cache if 'cache' in index_grp: del index_grp['cache']