Source code for bmtk.utils.brain_observatory.ecephys.ecephys_project_cache

import pandas as pd
import functools
from pathlib import Path

from ..cache import Cache
from ..utils import write_from_stream
from ..rma_engine import RmaEngine


[docs] class EcephysSession: def __init__(self, nwb_path): self.nwb_path = nwb_path
[docs] class EcephysProjectWarehouseApi: def __init__(self, rma_engine=None): if rma_engine is None: rma_engine = RmaEngine(scheme="http", host="api.brain-map.org") self.rma_engine = rma_engine
[docs] @classmethod def default(cls, asynchronous=False, **rma_kwargs): _rma_kwargs = {"scheme": "http", "host": "api.brain-map.org"} _rma_kwargs.update(rma_kwargs) engine_cls = AsyncRmaEngine if asynchronous else RmaEngine return cls(engine_cls(**_rma_kwargs))
[docs] def get_session_data(self, session_id, **kwargs): query = "criteria=model::WellKnownFile" \ ",rma::criteria,well_known_file_type[name$eq'EcephysNwb']" \ "[attachable_type$eq'EcephysSession']" \ f"[attachable_id$eq{session_id}]" well_known_files = self.rma_engine.get_rma_tabular(query) if well_known_files.shape[0] != 1: raise ValueError( f"expected exactly 1 nwb file for session {session_id}, found: {well_known_files}" # noqa: E501 ) download_link = well_known_files.iloc[0]["download_link"] return self.rma_engine.stream(download_link)
[docs] def get_sessions( self, session_ids=None, has_eye_tracking=None, stimulus_names=None ): response = build_and_execute( ( "{% import 'rma_macros' as rm %}" "{% import 'macros' as m %}" "criteria=model::EcephysSession" r"{{rm.optional_contains('id',session_ids)}}" r"{%if has_eye_tracking is not none%}[fail_eye_tracking$eq{{m.str(not has_eye_tracking).lower()}}]{%endif%}" # noqa: E501 r"{{rm.optional_contains('stimulus_name',stimulus_names,True)}}" # noqa: E501 ",rma::include,specimen(donor(age))" ",well_known_files(well_known_file_type)" ), base=rma_macros(), engine=self.rma_engine.get_rma_tabular, session_ids=session_ids, has_eye_tracking=has_eye_tracking, stimulus_names=stimulus_names, ) response.set_index("id", inplace=True) age_in_days = [] sex = [] genotype = [] has_nwb = [] for idx, row in response.iterrows(): age_in_days.append(row["specimen"]["donor"]["age"]["days"]) sex.append(row["specimen"]["donor"]["sex"]) gt = row["specimen"]["donor"]["full_genotype"] if gt is None: gt = "wt" genotype.append(gt) current_has_nwb = False for wkf in row["well_known_files"]: if wkf["well_known_file_type"]["name"] == "EcephysNwb": current_has_nwb = True has_nwb.append(current_has_nwb) response["age_in_days"] = age_in_days response["sex"] = sex response["genotype"] = genotype response["has_nwb"] = has_nwb response.drop( columns=["specimen", "fail_eye_tracking", "well_known_files"], inplace=True, ) response.rename( columns={"stimulus_name": "session_type"}, inplace=True ) return response
[docs] def get_probes(self, probe_ids=None, session_ids=None): raise NotImplementedError()
[docs] def get_channels(self, channel_ids=None, probe_ids=None): raise NotImplementedError()
[docs] def get_rig_metadata(self): raise NotImplementedError()
[docs] def get_units(self, unit_ids=None, channel_ids=None, probe_ids=None, session_ids=None, *a, **k): raise NotImplementedError()
[docs] def get_unit_analysis_metrics(self, unit_ids=None, ecephys_session_ids=None, session_types=None): raise NotImplementedError()
[docs] def get_probe_lfp_data(self, probe_id): raise NotImplementedError()
[docs] class EcephysProjectCache(Cache): SESSIONS_KEY = 'sessions' PROBES_KEY = 'probes' CHANNELS_KEY = 'channels' UNITS_KEY = 'units' SESSION_DIR_KEY = 'session_data' SESSION_NWB_KEY = 'session_nwb' PROBE_LFP_NWB_KEY = "probe_lfp_nwb" NATURAL_MOVIE_DIR_KEY = "movie_dir" NATURAL_MOVIE_KEY = "natural_movie" NATURAL_SCENE_DIR_KEY = "natural_scene_dir" NATURAL_SCENE_KEY = "natural_scene" SESSION_ANALYSIS_METRICS_KEY = "session_analysis_metrics" TYPEWISE_ANALYSIS_METRICS_KEY = "typewise_analysis_metrics" MANIFEST_VERSION = '0.3.0' SUPPRESS_FROM_PROBES = ( "air_channel_index", "surface_channel_index", "date_of_acquisition", "published_at", "specimen_id", "session_type", "isi_experiment_id", "age_in_days", "sex", "genotype", "has_nwb", "lfp_temporal_subsampling_factor" ) def __init__( self, fetch_api=None, fetch_tries=2, stream_writer=None, manifest=None, version=None, cache=True): manifest_ = manifest or "ecephys_project_manifest.json" version_ = version or self.MANIFEST_VERSION super(EcephysProjectCache, self).__init__(manifest=manifest_, version=version_, cache=cache) self.fetch_api = (EcephysProjectWarehouseApi.default() if fetch_api is None else fetch_api) self.fetch_tries = fetch_tries self.stream_writer = (stream_writer or self.fetch_api.rma_engine.write_bytes) if stream_writer is not None: self.stream_writer = stream_writer else: if hasattr(self.fetch_api, "rma_engine"): # EcephysProjectWarehouseApi # noqa self.stream_writer = self.fetch_api.rma_engine.write_bytes # TODO: Make these names consistent in the different fetch apis elif hasattr(self.fetch_api, "app_engine"): # EcephysProjectLimsApi # noqa self.stream_writer = self.fetch_api.app_engine.write_bytes else: raise ValueError( "Must either set value for `stream_writer`, or use a " "`fetch_api` with an rma_engine or app_engine attribute " "that implements `write_bytes`. See `HttpEngine` and " "`AsyncHttpEngine` from " "allensdk.brain_observatory.ecephys.ecephys_project_api." "http_engine for examples.")
[docs] def add_manifest_paths(self, manifest_builder): manifest_builder = super(EcephysProjectCache, self).add_manifest_paths(manifest_builder) manifest_builder.add_path( self.SESSIONS_KEY, 'sessions.csv', parent_key='BASEDIR', typename='file' ) manifest_builder.add_path( self.PROBES_KEY, 'probes.csv', parent_key='BASEDIR', typename='file' ) manifest_builder.add_path( self.CHANNELS_KEY, 'channels.csv', parent_key='BASEDIR', typename='file' ) manifest_builder.add_path( self.UNITS_KEY, 'units.csv', parent_key='BASEDIR', typename='file' ) manifest_builder.add_path( self.SESSION_DIR_KEY, 'session_%d', parent_key='BASEDIR', typename='dir' ) manifest_builder.add_path( self.SESSION_NWB_KEY, 'session_%d.nwb', parent_key=self.SESSION_DIR_KEY, typename='file' ) manifest_builder.add_path( self.SESSION_ANALYSIS_METRICS_KEY, 'session_%d_analysis_metrics.csv', parent_key=self.SESSION_DIR_KEY, typename='file' ) manifest_builder.add_path( self.PROBE_LFP_NWB_KEY, 'probe_%d_lfp.nwb', parent_key=self.SESSION_DIR_KEY, typename='file' ) manifest_builder.add_path( self.NATURAL_MOVIE_DIR_KEY, "natural_movie_templates", parent_key="BASEDIR", typename="dir" ) manifest_builder.add_path( self.TYPEWISE_ANALYSIS_METRICS_KEY, "%s_analysis_metrics.csv", parent_key='BASEDIR', typename="file" ) manifest_builder.add_path( self.NATURAL_MOVIE_KEY, "natural_movie_%d.h5", parent_key=self.NATURAL_MOVIE_DIR_KEY, typename="file" ) manifest_builder.add_path( self.NATURAL_SCENE_DIR_KEY, "natural_scene_templates", parent_key="BASEDIR", typename="dir" ) manifest_builder.add_path( self.NATURAL_SCENE_KEY, "natural_scene_%d.tiff", parent_key=self.NATURAL_SCENE_DIR_KEY, typename="file" ) return manifest_builder
[docs] @classmethod def from_warehouse(cls, scheme=None, host=None, asynchronous=False, manifest=None, version=None, cache=True, fetch_tries=2, timeout=1200): if scheme and host: app_kwargs = {"scheme": scheme, "host": host, "asynchronous": asynchronous} else: app_kwargs = {"asynchronous": asynchronous} app_kwargs['timeout'] = timeout return cls._from_http_source_default( EcephysProjectWarehouseApi, app_kwargs, manifest=manifest, version=version, cache=cache, fetch_tries=fetch_tries )
@classmethod def _from_http_source_default(cls, fetch_api_cls, fetch_api_kwargs, **kwargs): fetch_api_kwargs = { "asynchronous": True } if fetch_api_kwargs is None else fetch_api_kwargs if kwargs.get("stream_writer") is None: if fetch_api_kwargs.get("asynchronous", True): kwargs["stream_writer"] = write_bytes_from_coroutine else: kwargs["stream_writer"] = write_from_stream return cls( fetch_api=fetch_api_cls.default(**fetch_api_kwargs), **kwargs )
[docs] def get_session_data(self, session_id, force_overwrite=False): """ Obtain an EcephysSession object containing detailed data for a single session """ path = self.get_cache_path(None, self.SESSION_NWB_KEY, session_id, session_id) fetch = functools.partial(self.fetch_api.get_session_data, session_id) write = self.stream_writer self._fetch_cached_session(path, fetch, write, force_overwrite) return EcephysSession(path)
def _fetch_cached_session(self, path, fetch, write, force_overwrite=False): path = path if isinstance(path, Path) else Path(path) if not force_overwrite and path.exists(): return path.parent.mkdir(parents=True, exist_ok=True) data = fetch() write(path, data)
[docs] def get_channels(self, suppress=None): raise NotImplementedError()
[docs] def get_probes(self, suppress=None): raise NotImplementedError()
def get_unit_analysis_metrics_for_session(self, session_id, annotate: bool = True, filter_by_validity: bool = True, **unit_filter_kwargs): raise NotImplementedError()
[docs] def get_unit_analysis_metrics_for_session(self, session_id, annotate: bool = True, filter_by_validity: bool = True, **unit_filter_kwargs): raise NotImplementedError()