Skip to content

codeocean

CACHED_FILE_EXTENSIONS module-attribute

CACHED_FILE_EXTENSIONS: dict[str, str] = dict.fromkeys(typing.get_args(NWBComponentStr), '.parquet')

Mapping of NWB component name to file extension

CapsuleComputationAPI module-attribute

CapsuleComputationAPI: TypeAlias = dict[Literal['created', 'end_status', 'has_results', 'id', 'name', 'run_time', 'state'], Any]

Result from CodeOceanAPI when querying for computations for a capsule

DataAssetAPI module-attribute

DataAssetAPI: TypeAlias = dict[Literal['created', 'custom_metadata', 'description', 'files', 'id', 'last_used', 'name', 'size', 'sourceBucket', 'state', 'tags', 'type'], Any]

Result from CodeOcean API when querying data assets.

ResultItemAPI module-attribute

ResultItemAPI: TypeAlias = dict[Literal['name', 'path', 'size', 'type'], Any]

Result from CodeOceanAPI when querying for results from a computation

MissingCredentials

Bases: KeyError

Raised when a required credential is not found in environment variables.

NoSessionInfo

Bases: ValueError

Raised when a session is not found in the tracked-sessions database.

SessionInfo dataclass

Minimal session metadata obtained quickly from a database.

Currently using: https://raw.githubusercontent.com/AllenInstitute/npc_lims/main/tracked_sessions.yaml and training spreadsheets.

date property

date: npc_session.DateRecord

YY-MM-DD

experiment_day class-attribute instance-attribute

experiment_day: int | None = None

Experiment day (ephys recording, or opto experiment), starting from 1 for each subject. None for training behavior-only sessions.

idx property

idx: int

Session index, starting from 0 for each subject on each day. Currently one session per day, so index isn't specified - implicitly equal to 0.

is_annotated cached property

is_annotated: bool

The subject associated with the sessions has CCF annotation data for probes available on S3.

Examples:

>>> next(session.is_annotated for session in get_session_info() if session.is_annotated)
True

is_sorted cached property

is_sorted: bool

The AIND sorting pipeline has yielded a Result asset for this session.

Examples:

>>> next(session.is_sorted for session in get_session_info() if session.is_sorted)
True

is_surface_channels cached property

is_surface_channels: bool

The session has ephys data collected separately to record surface channel.

Examples:

>>> get_session_info("DRpilot_660023_20230808").is_surface_channels
True

is_sync instance-attribute

is_sync: bool

The session has sync data, implying more than a behavior-box.

is_templeton cached property

is_templeton: bool

Uses project in tracked_sessions.yaml if available, then infers from whether the session is in Sam's DR training database.

Examples:

>>> get_session_info("2023-05-15_09-50-06_662983").is_templeton
True
>>> get_session_info("DRpilot_644867_20230221").is_templeton
False

is_uploaded cached property

is_uploaded: bool

All of the session's raw data has been uploaded to S3 and can be found in CodeOcean. Not the same as cloud_path being non-None: this property indicates a proper session upload via aind tools, with metadata etc.

Examples:

>>> next(session.is_uploaded for session in get_session_info() if session.is_uploaded)
True

rig property

rig: str

From DR training spreadsheet (NP2, B2, 'BEH.E`).

  • does not necessarily match AIBS_RIG_ID on computer
  • unknown if not available (for Templeton sessions)

task_version property

task_version: str

From DR training spreadsheet (stage 5 ori AMN moving timeouts repeats). - unknown if not available (for Templeton sessions)

training_info cached property

training_info: dict[str, Any]

Session metadata from Sam's DR training database. - empty dict for Templeton sessions

Examples:

>>> next(get_session_info()).session_info                       # doctest: +SKIP
{'ID': 1, 'start_time': '2023-03-07 12:56:27', 'rig_name': 'B2', 'task_version': 'stage 0 moving', 'hits': '0', 'dprime_same_modality': '', 'dprime_other_modality_go_stim': '', 'pass': '1', 'ignore': '0'}
>>> assert next(session.training_info for session in get_session_info() if session.training_info)

excel_to_sqlite

excel_to_sqlite(spreadsheet: str | upath.UPath, save_path: str | upath.UPath) -> upath.UPath

This code uses the openpyxl package for playing around with excel using Python code to convert complete excel workbook (all sheets) to an SQLite database The code assumes that the first row of every sheet is the column name Every sheet is stored in a separate table The sheet name is assigned as the table name for every sheet.

From https://stackoverflow.com/questions/17439885/export-data-from-excel-to-sqlite-database

Source code in npc_lims/metadata/spreadsheets.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def excel_to_sqlite(
    spreadsheet: str | upath.UPath,
    save_path: str | upath.UPath,
) -> upath.UPath:
    """
    This code uses the openpyxl package for playing around with excel using Python code
    to convert complete excel workbook (all sheets) to an SQLite database
    The code assumes that the first row of every sheet is the column name
    Every sheet is stored in a separate table
    The sheet name is assigned as the table name for every sheet.

    From
    https://stackoverflow.com/questions/17439885/export-data-from-excel-to-sqlite-database
    """
    spreadsheet = upath.UPath(spreadsheet)
    save_path = upath.UPath(save_path)

    db_path = tempfile.mkstemp(suffix=".sqlite")[1]
    xls_path = tempfile.mkstemp(suffix=spreadsheet.suffix)[1]
    upath.UPath(xls_path).write_bytes(spreadsheet.read_bytes())

    # Replace with a database name
    con = sqlite3.connect(db_path)

    # replace with the complete path to your excel workbook
    wb = openpyxl.load_workbook(filename=xls_path)

    def slugify(text: str, lower=1) -> str:
        if lower == 1:
            text = text.strip().lower()
        text = text.replace("d'", "dprime")
        text = re.sub(r"[^\w _-]+", "", text)
        text = re.sub(r"[- ]+", "_", text)
        return text

    for sheet in wb.sheetnames:
        ws = wb[sheet]
        columns = []
        duplicate_column_idx = []
        query = (
            "CREATE TABLE "
            + repr(str(slugify(sheet)))
            + "(ID INTEGER PRIMARY KEY AUTOINCREMENT"
        )
        for row in ws.rows:
            for idx, col in enumerate(row):
                column_name = slugify(col.value)
                if column_name not in columns:
                    query += ", " + column_name + " TEXT"
                    columns.append(column_name)
                else:
                    duplicate_column_idx.append(idx)
            break  # only want column names from first row
        query += ");"
        if not columns:
            continue

        con.execute(query)

        tup = []
        for i, col in enumerate(ws):
            tuprow = []
            if i == 0:
                continue
            for idx, col in enumerate(col):
                if idx in duplicate_column_idx:
                    continue
                tuprow.append(str(col.value).strip()) if str(
                    col.value
                ).strip() != "None" else tuprow.append("")
            tup.append(tuple(tuprow))

        insQuery1 = "INSERT INTO " + repr(str(slugify(sheet))) + "("
        insQuery2 = ""
        for col in columns:
            insQuery1 += col + ", "
            insQuery2 += "?, "
        insQuery1 = insQuery1[:-2] + ") VALUES("
        insQuery2 = insQuery2[:-2] + ")"
        insQuery = insQuery1 + insQuery2

        con.executemany(insQuery, tup)
        con.commit()

    con.close()
    save_path.write_bytes(upath.UPath(db_path).read_bytes())
    return save_path

get_all_cache_paths

get_all_cache_paths(nwb_component: NWBComponentStr, version: str | None = None) -> tuple[upath.UPath, ...]

For a particular NWB component, return cached file paths for all sessions, for the latest version (default) or a specific version.

get_all_cache_paths("units", version="0.0.0") ()

Source code in npc_lims/paths/cache.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def get_all_cache_paths(
    nwb_component: NWBComponentStr,
    version: str | None = None,
) -> tuple[upath.UPath, ...]:
    """
    For a particular NWB component, return cached file paths for all sessions, for
    the latest version (default) or a specific version.

    >>> get_all_cache_paths("units", version="0.0.0")
    ()
    """
    dir_path = get_cache_path(
        nwb_component=nwb_component, version=version, consolidated=False
    )
    if not dir_path.exists():
        raise FileNotFoundError(
            f"Cache directory for {nwb_component} {version=} does not exist"
        )
    return tuple(
        path for path in dir_path.glob(f"*{get_cache_file_suffix(nwb_component)}")
    )

get_behavior_video_path_from_s3 cached

get_behavior_video_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

get_behavior_video_path_from_s3('686740_2023-10-26') S3Path('s3://aind-ephys-data/ecephys_686740_2023-10-26_12-29-08/behavior_videos/Behavior_20231026T122922.mp4')

Source code in npc_lims/paths/s3.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@functools.cache
def get_behavior_video_path_from_s3(
    session: str | npc_session.SessionRecord,
) -> upath.UPath:
    """
    >>> get_behavior_video_path_from_s3('686740_2023-10-26')
    S3Path('s3://aind-ephys-data/ecephys_686740_2023-10-26_12-29-08/behavior_videos/Behavior_20231026T122922.mp4')
    """
    raw_data_paths = get_raw_data_paths_from_s3(session)
    behavior_video_path = tuple(
        path
        for path in raw_data_paths
        if "Behavior" in path.stem and path.suffix in VIDEO_SUFFIXES
    )

    if not behavior_video_path:
        raise FileNotFoundError(f"{session} has no behavior video on s3")

    return behavior_video_path[0]

get_cache_file_suffix

get_cache_file_suffix(nwb_component: NWBComponentStr) -> str

get_cache_file_suffix("session") '.parquet'

Source code in npc_lims/paths/cache.py
46
47
48
49
50
51
52
53
54
55
def get_cache_file_suffix(nwb_component: NWBComponentStr) -> str:
    """
    >>> get_cache_file_suffix("session")
    '.parquet'
    """
    if (ext := CACHED_FILE_EXTENSIONS.get(nwb_component, None)) is None:
        raise ValueError(
            f"Unknown NWB component {nwb_component!r} - must be one of {NWBComponentStr}"
        )
    return ext

get_cache_path

get_cache_path(nwb_component: NWBComponentStr, session_id: str | npc_session.SessionRecord | None = None, version: str | None = None, consolidated: bool = True) -> upath.UPath

If version is not specified, the latest version currently in the cache will be used, ie. will point to the most recent version of the file.

get_cache_path(nwb_component="units", version="1.0.0") S3Path('s3://aind-scratch-data/ben.hardcastle/cache/nwb_components/v1.0.0/units') get_cache_path(nwb_component="units", session_id="366122_2023-12-31", version="1.0.0") S3Path('s3://aind-scratch-data/ben.hardcastle/cache/nwb_components/v1.0.0/units/366122_2023-12-31.parquet') get_cache_path(nwb_component="trials", version="1.0.0") S3Path('s3://aind-scratch-data/ben.hardcastle/cache/nwb_components/v1.0.0/consolidated/trials.parquet')

Source code in npc_lims/paths/cache.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def get_cache_path(
    nwb_component: NWBComponentStr,
    session_id: str | npc_session.SessionRecord | None = None,
    version: str | None = None,
    consolidated: bool = True,
) -> upath.UPath:
    """
    If version is not specified, the latest version currently in the cache will be
    used, ie. will point to the most recent version of the file.

    >>> get_cache_path(nwb_component="units", version="1.0.0")
    S3Path('s3://aind-scratch-data/ben.hardcastle/cache/nwb_components/v1.0.0/units')
    >>> get_cache_path(nwb_component="units", session_id="366122_2023-12-31", version="1.0.0")
    S3Path('s3://aind-scratch-data/ben.hardcastle/cache/nwb_components/v1.0.0/units/366122_2023-12-31.parquet')
    >>> get_cache_path(nwb_component="trials", version="1.0.0")
    S3Path('s3://aind-scratch-data/ben.hardcastle/cache/nwb_components/v1.0.0/consolidated/trials.parquet')
    """
    path = _parse_cache_path(
        session_id=session_id, nwb_component=nwb_component, version=version
    )
    if consolidated and session_id is None and nwb_component != "units":
        path = path.parent / "consolidated" / f"{nwb_component}.parquet"
    return path

get_current_cache_version

get_current_cache_version() -> str

The current version of npc_sessions, formatted as a string starting with 'v'.

(get_cache_path(nwb_component="units", session_id="366122_2023-12-31", version="v0.0.0").parent / 'test.txt').touch() v = get_current_cache_version() assert v >= 'v0.0.0'

Source code in npc_lims/paths/cache.py
58
59
60
61
62
63
64
65
66
67
68
69
def get_current_cache_version() -> str:
    """The current version of npc_sessions, formatted as a string starting with
    'v'.

    >>> (get_cache_path(nwb_component="units", session_id="366122_2023-12-31", version="v0.0.0").parent / 'test.txt').touch()
    >>> v = get_current_cache_version()
    >>> assert v >= 'v0.0.0'
    """
    if not (version_dirs := sorted(CACHE_ROOT.glob("v*"))):
        raise FileNotFoundError(f"No cache versions found in {CACHE_ROOT}")
    npc_sessions_info = requests.get("https://pypi.org/pypi/npc_sessions/json").json()
    return _parse_version(npc_sessions_info["info"]["version"])

get_data_asset

get_data_asset(asset: str | uuid.UUID | DataAssetAPI) -> DataAssetAPI

Converts an asset uuid to dict of info from CodeOcean API.

Source code in npc_lims/metadata/codeocean.py
226
227
228
229
230
231
232
233
def get_data_asset(asset: str | uuid.UUID | DataAssetAPI) -> DataAssetAPI:
    """Converts an asset uuid to dict of info from CodeOcean API."""
    if not isinstance(asset, Mapping):
        response = get_codeocean_client().get_data_asset(str(asset))
        response.raise_for_status()
        asset = response.json()
    assert isinstance(asset, Mapping), f"Unexpected {type(asset) = }, {asset = }"
    return asset

get_data_asset_s3_path

get_data_asset_s3_path(asset_id: str | codeocean.DataAssetAPI) -> upath.UPath

Path on s3 that contains actual data for CodeOcean data asset.

  • asset id is a UUID
  • accept anything with an "id" attribute or key, or a string Assumes that the data asset has data on s3, which may not be true, and we can't tell from asset info.
Source code in npc_lims/paths/s3.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_data_asset_s3_path(asset_id: str | codeocean.DataAssetAPI) -> upath.UPath:
    """Path on s3 that contains actual data for CodeOcean data asset.

    - asset `id` is a UUID
    - accept anything with an "id" attribute or key, or a string
    Assumes that the data asset has data on s3, which may not be true, and we can't tell from asset info.
    """
    bucket = CODE_OCEAN_DATA_BUCKET
    with contextlib.suppress(AttributeError, KeyError):
        bucket = upath.UPath(upath.UPath(f's3://{asset_id["sourceBucket"]}'))  # type: ignore[index]
    with contextlib.suppress(AttributeError, KeyError):
        return bucket / asset_id.get("id")  # type: ignore[union-attr, operator]
    with contextlib.suppress(AttributeError):
        return bucket / asset_id.id  # type: ignore[union-attr]
    return bucket / str(asset_id)

get_eye_video_path_from_s3 cached

get_eye_video_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

get_eye_video_path_from_s3('686740_2023-10-26') S3Path('s3://aind-ephys-data/ecephys_686740_2023-10-26_12-29-08/behavior_videos/Eye_20231026T122922.mp4')

Source code in npc_lims/paths/s3.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@functools.cache
def get_eye_video_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath:
    """
    >>> get_eye_video_path_from_s3('686740_2023-10-26')
    S3Path('s3://aind-ephys-data/ecephys_686740_2023-10-26_12-29-08/behavior_videos/Eye_20231026T122922.mp4')
    """
    raw_data_paths = get_raw_data_paths_from_s3(session)
    eye_video_path = tuple(
        path
        for path in raw_data_paths
        if "Eye" in path.stem and path.suffix in VIDEO_SUFFIXES
    )

    if not eye_video_path:
        raise FileNotFoundError(f"{session} has no eye video on s3")

    return eye_video_path[0]

get_face_video_path_from_s3 cached

get_face_video_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

get_face_video_path_from_s3('686740_2023-10-26') S3Path('s3://aind-ephys-data/ecephys_686740_2023-10-26_12-29-08/behavior_videos/Face_20231026T122923.mp4')

Source code in npc_lims/paths/s3.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@functools.cache
def get_face_video_path_from_s3(
    session: str | npc_session.SessionRecord,
) -> upath.UPath:
    """
    >>> get_face_video_path_from_s3('686740_2023-10-26')
    S3Path('s3://aind-ephys-data/ecephys_686740_2023-10-26_12-29-08/behavior_videos/Face_20231026T122923.mp4')
    """
    raw_data_paths = get_raw_data_paths_from_s3(session)
    face_video_path = tuple(
        path
        for path in raw_data_paths
        if "Face" in path.stem and path.suffix in VIDEO_SUFFIXES
    )

    if not face_video_path:
        raise FileNotFoundError(f"{session} has no face video on s3")

    return face_video_path[0]

get_h5_sync_from_s3 cached

get_h5_sync_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

Examples:

>>> get_h5_sync_from_s3('662892_20230821')
S3Path('s3://aind-ephys-data/ecephys_662892_2023-08-21_12-43-45/behavior/20230821T124345.h5')
Source code in npc_lims/paths/s3.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@functools.cache
def get_h5_sync_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath:
    """
    Examples:
        >>> get_h5_sync_from_s3('662892_20230821')
        S3Path('s3://aind-ephys-data/ecephys_662892_2023-08-21_12-43-45/behavior/20230821T124345.h5')
    """
    raw_data_paths_s3 = get_raw_data_paths_from_s3(session)
    sync_path = tuple(path for path in raw_data_paths_s3 if ".h5" in path.suffix)

    if not sync_path:
        raise FileNotFoundError(f"No sync file found in {raw_data_paths_s3!r}")

    return sync_path[0]

get_hdf5_stim_files_from_s3 cached

get_hdf5_stim_files_from_s3(session: str | npc_session.SessionRecord) -> tuple[StimFile, ...]

All the stim files for a session, from the synced DynamicRoutingTask/Data folder on s3.

  • filters out files that are obviously wrong

Examples:

>>> files = get_hdf5_stim_files_from_s3('668759_20230711')
>>> assert len(files) > 0
>>> files[0].name, files[0].time
('DynamicRouting1', '13:25:00')
Source code in npc_lims/paths/s3.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
@functools.cache
def get_hdf5_stim_files_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[StimFile, ...]:
    """All the stim files for a session, from the synced
    `DynamicRoutingTask/Data` folder on s3.

    - filters out files that are obviously wrong

    Examples:
        >>> files = get_hdf5_stim_files_from_s3('668759_20230711')
        >>> assert len(files) > 0
        >>> files[0].name, files[0].time
        ('DynamicRouting1', '13:25:00')
    """
    session = npc_session.SessionRecord(session)
    root = DR_DATA_REPO / str(session.subject)
    if not root.exists():
        if not DR_DATA_REPO.exists():
            raise FileNotFoundError(f"{DR_DATA_REPO = } does not exist")
        raise FileNotFoundError(
            f"Subject {session.subject} hdf5s not on s3: may have been run by NSB, in which case they are on lims2"
        )
    file_glob = f"*_{session.subject}_{session.date.replace('-', '')}_??????.hdf5"
    files = [StimFile(path, session) for path in root.glob(file_glob)]

    test_glob = file_glob.replace(str(session.subject), "test")
    files += [
        StimFile(path, session)
        for path in root.glob(test_glob)
        if str(session.subject) in path.as_posix()
    ]

    # no empty files:
    files = [f for f in files if f.size > 0]

    # single behavior task:
    behavior_tasks = tuple(f for f in files if "DynamicRouting" in f.name)
    if len(behavior_tasks) > 1:
        largest = max(behavior_tasks, key=lambda f: f.size)
        for f in behavior_tasks:
            if f.path != largest.path:
                files.remove(f)

    return tuple(files)

get_mean_waveform_codeocean_kilosort_path_from_s3 cached

get_mean_waveform_codeocean_kilosort_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

Examples:

>>> path = get_spike_times_codeocean_kilosort_path_from_s3('668759_20230711')
>>> assert path
Source code in npc_lims/paths/s3.py
515
516
517
518
519
520
521
522
523
524
525
526
527
@functools.cache
def get_mean_waveform_codeocean_kilosort_path_from_s3(
    session: str | npc_session.SessionRecord,
) -> upath.UPath:
    """
    Examples:
        >>> path = get_spike_times_codeocean_kilosort_path_from_s3('668759_20230711')
        >>> assert path
    """
    files = get_units_spikes_codeocean_kilosort_top_level_files(session)
    mean_waveforms_path = next(path for path in files if "mean" in str(path))

    return mean_waveforms_path

get_nwb_file_from_s3 cached

get_nwb_file_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

Examples:

>>> get_nwb_file_from_s3('636766_20230125')
S3Path('s3://aind-scratch-data/ben.hardcastle/nwb/nwb/DRpilot_636766_20230125.nwb')
Source code in npc_lims/paths/s3.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
@functools.cache
def get_nwb_file_from_s3(
    session: str | npc_session.SessionRecord,
) -> upath.UPath:
    """
    Examples:
        >>> get_nwb_file_from_s3('636766_20230125')
        S3Path('s3://aind-scratch-data/ben.hardcastle/nwb/nwb/DRpilot_636766_20230125.nwb')
    """
    session = npc_session.SessionRecord(session)
    root = NWB_REPO
    glob = "*.nwb*"
    result = next(
        (
            path
            for path in root.glob(glob)
            if session == npc_session.SessionRecord(path.name)
        ),
        None,
    )
    if not result:
        raise FileNotFoundError(f"No NWB file found at {root}/{glob}")
    return result

get_path_from_data_asset

get_path_from_data_asset(asset: DataAssetAPI) -> upath.UPath

Reconstruct path to raw data in bucket (e.g. on s3) using data asset uuid or dict of info from Code Ocean API.

Source code in npc_lims/metadata/codeocean.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def get_path_from_data_asset(asset: DataAssetAPI) -> upath.UPath:
    """Reconstruct path to raw data in bucket (e.g. on s3) using data asset
    uuid or dict of info from Code Ocean API."""
    if "sourceBucket" not in asset:
        raise ValueError(
            f"Asset {asset['id']} has no `sourceBucket` info - not sure how to create UPath:\n{asset!r}"
        )
    bucket_info = asset["sourceBucket"]
    roots = {"aws": "s3", "gcs": "gs"}
    if bucket_info["origin"] not in roots:
        raise RuntimeError(
            f"Unknown bucket origin - not sure how to create UPath: {bucket_info = }"
        )
    return upath.UPath(
        f"{roots[bucket_info['origin']]}://{bucket_info['bucket']}/{bucket_info['prefix']}"
    )

get_quality_metrics_paths_from_s3 cached

get_quality_metrics_paths_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

Examples:

>>> quality_metrics_paths = get_quality_metrics_paths_from_s3('662892_2023-08-21')
>>> assert len(quality_metrics_paths) > 0
Source code in npc_lims/paths/s3.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@functools.cache
def get_quality_metrics_paths_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> quality_metrics_paths = get_quality_metrics_paths_from_s3('662892_2023-08-21')
        >>> assert len(quality_metrics_paths) > 0
    """
    sorted_paths = get_sorted_data_paths_from_s3(session)
    postprocessed_files = next(
        path for path in sorted_paths if "postprocessed" in str(path)
    ).iterdir()
    quality_metrics_paths = tuple(
        next(path.glob("quality_metrics/metrics.csv")) for path in postprocessed_files
    )

    return quality_metrics_paths

get_raw_data_paths_from_s3 cached

get_raw_data_paths_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

All top-level files and folders from the ephys & behavior subdirectories in a session's raw data folder on s3.

Examples:

>>> files = get_raw_data_paths_from_s3 ('668759_20230711')
>>> assert len(files) > 0
Source code in npc_lims/paths/s3.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@functools.cache
def get_raw_data_paths_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """All top-level files and folders from the `ephys` & `behavior`
    subdirectories in a session's raw data folder on s3.

    Examples:
        >>> files = get_raw_data_paths_from_s3 ('668759_20230711')
        >>> assert len(files) > 0
    """
    raw_data_root = codeocean.get_raw_data_root(session)
    directories: Iterator[upath.UPath] = (
        directory for directory in raw_data_root.iterdir() if directory.is_dir()
    )
    first_level_files_directories: Iterator = (
        tuple(directory.iterdir()) for directory in directories
    )

    paths = functools.reduce(operator.add, first_level_files_directories)

    if not paths:
        raise FileNotFoundError(
            f"Raw data paths empty for {session} on s3. Looks like an upload was started, but no files have been transferred."
        )
    return paths

get_raw_data_root cached

get_raw_data_root(session: str | npc_session.SessionRecord) -> upath.UPath

Reconstruct path to raw data in bucket (e.g. on s3) using data-asset info from Code Ocean.

>>> get_raw_data_root('668759_20230711')
S3Path('s3://aind-ephys-data/ecephys_668759_2023-07-11_13-07-32')
Source code in npc_lims/metadata/codeocean.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@functools.cache
def get_raw_data_root(session: str | npc_session.SessionRecord) -> upath.UPath:
    """Reconstruct path to raw data in bucket (e.g. on s3) using data-asset
    info from Code Ocean.

        >>> get_raw_data_root('668759_20230711')
        S3Path('s3://aind-ephys-data/ecephys_668759_2023-07-11_13-07-32')
    """
    session = npc_session.SessionRecord(session)
    raw_assets = tuple(
        asset for asset in get_session_data_assets(session) if is_raw_data_asset(asset)
    )
    raw_asset = get_single_data_asset(session, raw_assets, "raw")

    return get_path_from_data_asset(raw_asset)

get_recording_dirs_experiment_path_from_s3 cached

get_recording_dirs_experiment_path_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

Examples:

>>> recording_dirs = get_recording_dirs_experiment_path_from_s3('662892_20230821')
>>> assert len(recording_dirs) > 0
Source code in npc_lims/paths/s3.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@functools.cache
def get_recording_dirs_experiment_path_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> recording_dirs = get_recording_dirs_experiment_path_from_s3('662892_20230821')
        >>> assert len(recording_dirs) > 0
    """
    raw_data_paths = get_raw_data_paths_from_s3(session)
    recording_dirs = (
        path
        for path in raw_data_paths
        if "Record Node" in str(path) and "zarr" not in str(path)
    )
    recording_dirs_experiment = tuple(
        next(path.glob("*/recording*")) for path in recording_dirs
    )

    return recording_dirs_experiment

get_session_computation_id_and_data_asset_name

get_session_computation_id_and_data_asset_name(session: npc_session.SessionRecord, model_name: str, capsule_computations: list[CapsuleComputationAPI]) -> tuple[str, str]

Returns the computation id and data asset name for the session that will be used to create the data asset Test below fails, since arjun ran capsule but github has different token

>>> session = npc_session.SessionRecord('626791_20220816')

>>> capsule_computations = get_codeocean_client().get_capsule_computations(MODEL_CAPSULE_MAPPING['dlc_eye'])

>>> capsule_computations.raise_for_status()

>>> get_session_computation_id_and_data_asset_name(session, 'eyetracking', capsule_computations.json())

('3010ff06-aae5-4b35-b070-57df9ef85582', 'ecephys_626791_2022-08-16_00-00-00_eyetracking')

Source code in npc_lims/metadata/codeocean.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def get_session_computation_id_and_data_asset_name(
    session: npc_session.SessionRecord,
    model_name: str,
    capsule_computations: list[CapsuleComputationAPI],
) -> tuple[str, str]:
    """
    Returns the computation id and data asset name for the session that will be used to create the data asset
    Test below fails, since arjun ran capsule but github has different token
    #>>> session = npc_session.SessionRecord('626791_20220816')
    #>>> capsule_computations = get_codeocean_client().get_capsule_computations(MODEL_CAPSULE_MAPPING['dlc_eye'])
    #>>> capsule_computations.raise_for_status()
    #>>> get_session_computation_id_and_data_asset_name(session, 'eyetracking', capsule_computations.json())
    ('3010ff06-aae5-4b35-b070-57df9ef85582', 'ecephys_626791_2022-08-16_00-00-00_eyetracking')
    """
    for computation in capsule_computations:
        if not computation["has_results"]:
            continue

        response_result_items = get_codeocean_client().get_list_result_items(
            computation["id"]
        )
        response_result_items.raise_for_status()
        result_items = response_result_items.json()

        session_result_item = tuple(
            item
            for item in result_items["items"]
            if re.match(  # TODO add folder
                f"ecephys_{session.subject}_{session.date}_{npc_session.PARSE_TIME}.json",
                item["name"],
            )
        )

        if not session_result_item:
            continue

        session_item = session_result_item[0]
        session_comp_id_data_asset_name = (
            computation["id"],
            session_item["name"].replace(".json", f"_{model_name}"),
        )
        break

    return session_comp_id_data_asset_name

get_session_id_from_db_row

get_session_id_from_db_row(subject: int | str, row: dict[str, Any]) -> npc_session.SessionRecord

Examples:

>>> get_session_id_from_db_row(366122, {'start_time': '2023-01-30 12:56:27'})
'366122_2023-01-30'
Source code in npc_lims/status/behavior_sessions.py
60
61
62
63
64
65
66
67
68
69
70
def get_session_id_from_db_row(
    subject: int | str, row: dict[str, Any]
) -> npc_session.SessionRecord:
    """
    Examples:
        >>> get_session_id_from_db_row(366122, {'start_time': '2023-01-30 12:56:27'})
        '366122_2023-01-30'
    """
    return npc_session.SessionRecord(
        f"{subject} {row[next(k for k in row.keys() if 'start' in k and any(t in k for t in ('date', 'time')))]}"
    )

get_session_info

get_session_info(session: str | npc_session.SessionRecord | SessionInfo | None = None, **bool_filter_kwargs: bool) -> tuple[SessionInfo, ...] | SessionInfo

Quickly get a sequence of all tracked sessions.

Examples:

Each object in the sequence has info about one session:
>>> sessions = get_session_info()
>>> sessions[0].__class__.__name__
'SessionInfo'
>>> sessions[0].is_ephys                    # doctest: +SKIP
True
>>> any(s for s in sessions if s.date.year < 2021)
False

Pass a session str or SessionRecord to get the info for that session:
>>> info = get_session_info("DRpilot_667252_20230927")
>>> assert isinstance(info, SessionInfo)
>>> info.is_templeton
False
Source code in npc_lims/status/tracked_sessions.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def get_session_info(
    session: str | npc_session.SessionRecord | SessionInfo | None = None,
    **bool_filter_kwargs: bool,
) -> tuple[SessionInfo, ...] | SessionInfo:
    """Quickly get a sequence of all tracked sessions.

    Examples:

        Each object in the sequence has info about one session:
        >>> sessions = get_session_info()
        >>> sessions[0].__class__.__name__
        'SessionInfo'
        >>> sessions[0].is_ephys                    # doctest: +SKIP
        True
        >>> any(s for s in sessions if s.date.year < 2021)
        False

        Pass a session str or SessionRecord to get the info for that session:
        >>> info = get_session_info("DRpilot_667252_20230927")
        >>> assert isinstance(info, SessionInfo)
        >>> info.is_templeton
        False
    """
    if isinstance(session, SessionInfo):
        session = session.id
    tracked_sessions = set(
        _get_session_info_from_file(),
    )
    tracked_sessions.update(_get_session_info_from_data_repo())
    if session is None:
        filtered_sessions = (
            s
            for s in tracked_sessions
            if all(getattr(s, k) == v for k, v in bool_filter_kwargs.items())
        )
        return tuple(sorted(filtered_sessions, key=lambda s: s.id.date, reverse=True))
    with contextlib.suppress(StopIteration):
        return next(
            s
            for s in tracked_sessions
            if s.id == (record := npc_session.SessionRecord(session))
        )
    raise exceptions.NoSessionInfo(f"{record} not found in tracked sessions")

get_session_issues

get_session_issues(session: str | npc_session.SessionRecord | None = None) -> list[str] | list | dict[npc_session.SessionRecord, list[str]]

Get a dictionary of all sessions with issues mapped to their issue url.

Examples:

>>> issues = get_session_issues()
>>> issues                                                              # doctest: +SKIP
{
    '644867_2023-02-21': ['https://github.com/AllenInstitute/npc_sessions/issues/28'],
    '660023_2023-08-08': ['https://github.com/AllenInstitute/npc_sessions/issues/26'],
}

>>> single_session_issues = get_session_issues("DRPilot_644867_20230221")
>>> assert isinstance(single_session_issues, typing.Sequence)
>>> single_session_issues                                               # doctest: +SKIP
['https://github.com/AllenInstitute/npc_sessions/issues/28']
Source code in npc_lims/status/tracked_sessions.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def get_session_issues(
    session: str | npc_session.SessionRecord | None = None,
) -> list[str] | list | dict[npc_session.SessionRecord, list[str]]:
    """Get a dictionary of all sessions with issues mapped to their issue url.

    Examples:

        >>> issues = get_session_issues()
        >>> issues                                                              # doctest: +SKIP
        {
            '644867_2023-02-21': ['https://github.com/AllenInstitute/npc_sessions/issues/28'],
            '660023_2023-08-08': ['https://github.com/AllenInstitute/npc_sessions/issues/26'],
        }

        >>> single_session_issues = get_session_issues("DRPilot_644867_20230221")
        >>> assert isinstance(single_session_issues, typing.Sequence)
        >>> single_session_issues                                               # doctest: +SKIP
        ['https://github.com/AllenInstitute/npc_sessions/issues/28']
    """
    if session:
        try:
            return get_session_info(session).issues
        except exceptions.NoSessionInfo:
            return []
    return {
        session.id: session.issues for session in get_session_info() if session.issues
    }

get_session_kwargs

get_session_kwargs(session: str | npc_session.SessionRecord | None = None) -> dict[str, str] | dict | dict[npc_session.SessionRecord, dict[str, str]]

Get a dictionary of all sessions mapped to their config kwargs. kwargs will be an empty dict if no kwargs have been specified.

Examples:

>>> kwargs = get_session_kwargs()
>>> kwargs                                                          # doctest: +SKIP
{   '670248_2023-08-02': {
        'is_task': False,
    },
    '667252_2023-09-25': {
        'invalid_times': [
            {'start_time': 4996, 'stop_time': -1, 'reason': 'auditory stimulus not presented (amplifier power issue)'}
        ]
    },
}
>>> single_session_kwargs = get_session_kwargs("DRpilot_670248_20230802")
>>> assert isinstance(single_session_kwargs, dict)
>>> single_session_kwargs                                           # doctest: +SKIP
{'is_task': False}
Source code in npc_lims/status/tracked_sessions.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def get_session_kwargs(
    session: str | npc_session.SessionRecord | None = None,
) -> dict[str, str] | dict | dict[npc_session.SessionRecord, dict[str, str]]:
    """Get a dictionary of all sessions mapped to their config kwargs. kwargs will
    be an empty dict if no kwargs have been specified.

    Examples:

        >>> kwargs = get_session_kwargs()
        >>> kwargs                                                          # doctest: +SKIP
        {   '670248_2023-08-02': {
                'is_task': False,
            },
            '667252_2023-09-25': {
                'invalid_times': [
                    {'start_time': 4996, 'stop_time': -1, 'reason': 'auditory stimulus not presented (amplifier power issue)'}
                ]
            },
        }
        >>> single_session_kwargs = get_session_kwargs("DRpilot_670248_20230802")
        >>> assert isinstance(single_session_kwargs, dict)
        >>> single_session_kwargs                                           # doctest: +SKIP
        {'is_task': False}
    """
    if session:
        try:
            return get_session_info(session).session_kwargs
        except exceptions.NoSessionInfo:
            return {}
    return {session.id: session.session_kwargs for session in get_session_info()}

get_session_raw_data_asset

get_session_raw_data_asset(session: str | npc_session.SessionRecord) -> DataAssetAPI

Examples:

>>> get_session_raw_data_asset('668759_20230711')["id"]
'83636983-f80d-42d6-a075-09b60c6abd5e'
Source code in npc_lims/metadata/codeocean.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_session_raw_data_asset(
    session: str | npc_session.SessionRecord,
) -> DataAssetAPI:
    """
    Examples:
        >>> get_session_raw_data_asset('668759_20230711')["id"]
        '83636983-f80d-42d6-a075-09b60c6abd5e'
    """
    session = npc_session.SessionRecord(session)
    raw_assets = tuple(
        asset for asset in get_session_data_assets(session) if is_raw_data_asset(asset)
    )

    if not raw_assets:
        raise ValueError(f"Session {session} has no raw data assets")

    return get_single_data_asset(session, raw_assets, "raw")

get_session_result_data_assets

get_session_result_data_assets(session: str | npc_session.SessionRecord) -> tuple[DataAssetAPI, ...]

Examples:

>>> result_data_assets = get_session_result_data_assets('668759_20230711')
>>> assert len(result_data_assets) > 0
Source code in npc_lims/metadata/codeocean.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def get_session_result_data_assets(
    session: str | npc_session.SessionRecord,
) -> tuple[DataAssetAPI, ...]:
    """
    Examples:
        >>> result_data_assets = get_session_result_data_assets('668759_20230711')
        >>> assert len(result_data_assets) > 0
    """
    session_data_assets = get_session_data_assets(session)
    result_data_assets = tuple(
        data_asset
        for data_asset in session_data_assets
        if data_asset["type"] == "result"
    )

    return result_data_assets

get_session_sorted_data_asset

get_session_sorted_data_asset(session: str | npc_session.SessionRecord) -> DataAssetAPI

Examples:

>>> sorted_data_asset = get_session_sorted_data_asset('668759_20230711')
>>> assert isinstance(sorted_data_asset, dict)
Source code in npc_lims/metadata/codeocean.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def get_session_sorted_data_asset(
    session: str | npc_session.SessionRecord,
) -> DataAssetAPI:
    """
    Examples:
        >>> sorted_data_asset = get_session_sorted_data_asset('668759_20230711')
        >>> assert isinstance(sorted_data_asset, dict)
    """
    session_result_data_assets = get_session_data_assets(session)
    sorted_data_assets = tuple(
        data_asset
        for data_asset in session_result_data_assets
        if is_sorted_data_asset(data_asset) and data_asset["files"] > 2
    )

    if not sorted_data_assets:
        raise ValueError(f"Session {session} has no sorted data assets")

    return get_single_data_asset(session, sorted_data_assets, "sorted")

get_session_units_data_asset cached

get_session_units_data_asset(session_id: str | npc_session.SessionRecord) -> DataAssetAPI

Examples:

>>> units_data_asset = get_session_units_data_asset('668759_20230711')
>>> assert units_data_asset is not None
Source code in npc_lims/metadata/codeocean.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@functools.cache
def get_session_units_data_asset(
    session_id: str | npc_session.SessionRecord,
) -> DataAssetAPI:
    """
    Examples:
        >>> units_data_asset = get_session_units_data_asset('668759_20230711')
        >>> assert units_data_asset is not None
    """
    session = npc_session.SessionRecord(session_id)
    session_data_assets = get_session_data_assets(session)
    session_units_data_assets = tuple(
        data_asset
        for data_asset in session_data_assets
        if "units" in data_asset["name"] and "peak" not in data_asset["name"]
    )
    session_units_data_asset = get_single_data_asset(
        session, session_units_data_assets, "units"
    )

    return session_units_data_asset

get_session_units_spikes_with_peak_channels_data_asset cached

get_session_units_spikes_with_peak_channels_data_asset(session_id: str | npc_session.SessionRecord) -> DataAssetAPI

Examples:

>>> units_peak_channel_data_asset = get_session_units_spikes_with_peak_channels_data_asset('668759_20230711')
>>> assert units_peak_channel_data_asset is not None
Source code in npc_lims/metadata/codeocean.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@functools.cache
def get_session_units_spikes_with_peak_channels_data_asset(
    session_id: str | npc_session.SessionRecord,
) -> DataAssetAPI:
    """
    Examples:
        >>> units_peak_channel_data_asset = get_session_units_spikes_with_peak_channels_data_asset('668759_20230711')
        >>> assert units_peak_channel_data_asset is not None
    """
    session = npc_session.SessionRecord(session_id)
    session_data_assets = get_session_data_assets(session)
    session_units_spikes_peak_channel_data_assets = tuple(
        data_asset
        for data_asset in session_data_assets
        if "units_with_peak_channels" in data_asset["name"]
    )

    session_units_spikes_peak_channel_data_asset = get_single_data_asset(
        session, session_units_spikes_peak_channel_data_assets, "units"
    )

    return session_units_spikes_peak_channel_data_asset

get_sessions_from_data_repo cached

get_sessions_from_data_repo(subject: int | str | None = None) -> tuple[npc_session.SessionRecord, ...] | dict[npc_session.SubjectRecord, tuple[npc_session.SessionRecord, ...]]

Globs synced behavior data repo for sessions.

Examples:

get a dict of all subjects mapped to their sessions:

>>> all_subjects_sessions = get_sessions_from_data_repo()
>>> len(all_subjects_sessions)
93
>>> len(tuple(all_subjects_sessions.values())[0])
45

get a specific subject's sessions as a sequence:

>>> get_sessions_from_data_repo(366122)[0]
'366122_2023-01-30'
Source code in npc_lims/status/behavior_sessions.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@functools.cache
def get_sessions_from_data_repo(
    subject: int | str | None = None,
) -> (
    tuple[npc_session.SessionRecord, ...]
    | dict[npc_session.SubjectRecord, tuple[npc_session.SessionRecord, ...]]
):
    """
    Globs synced behavior data repo for sessions.

    Examples:
        get a dict of all subjects mapped to their sessions:
        >>> all_subjects_sessions = get_sessions_from_data_repo()
        >>> len(all_subjects_sessions)                      # doctest: +SKIP
        93

        >>> len(tuple(all_subjects_sessions.values())[0])   # doctest: +SKIP
        45

        get a specific subject's sessions as a sequence:
        >>> get_sessions_from_data_repo(366122)[0]
        '366122_2023-01-30'

    """

    def _get_sessions_from_subfolders(
        folder: upath.UPath,
    ) -> tuple[npc_session.SessionRecord, ...]:
        sessions = set()
        for path in folder.iterdir():
            try:
                session = npc_session.SessionRecord(path.as_posix())
            except ValueError:
                continue
            sessions.add(session)
        return tuple(sorted(sessions))

    if subject is not None:
        return _get_sessions_from_subfolders(
            get_subject_folders_from_data_repo(subject)
        )

    subject_to_sessions: dict[
        npc_session.SubjectRecord, tuple[npc_session.SessionRecord, ...]
    ] = {}
    for _subject, folder in get_subject_folders_from_data_repo().items():
        subject_to_sessions.setdefault(_subject, _get_sessions_from_subfolders(folder))
    return subject_to_sessions

get_sessions_from_training_db cached

get_sessions_from_training_db() -> dict[int, tuple[dict[str, Any], ...]]

Includes NSB sessions.

{subject: ({spreadsheet row}, ... )}

Examples:

>>> sessions = get_sessions_from_training_db()
>>> assert len(sessions) > 0
>>> sessions[659250][0]
{'ID': 1, 'start_time': '2023-03-07 12:56:27', 'rig_name': 'B2', 'task_version': 'stage 0 moving', 'hits': '0', 'dprime_same_modality': '', 'dprime_other_modality_go_stim': '', 'pass': '1', 'ignore': '0'}
Source code in npc_lims/status/behavior_sessions.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@functools.cache
def get_sessions_from_training_db() -> dict[int, tuple[dict[str, Any], ...]]:
    """
    Includes NSB sessions.

    {subject: ({spreadsheet row}, ... )}

    Examples:
        >>> sessions = get_sessions_from_training_db()
        >>> assert len(sessions) > 0
        >>> sessions[659250][0]                         # doctest: +SKIP
        {'ID': 1, 'start_time': '2023-03-07 12:56:27', 'rig_name': 'B2', 'task_version': 'stage 0 moving', 'hits': '0', 'dprime_same_modality': '', 'dprime_other_modality_go_stim': '', 'pass': '1', 'ignore': '0'}
    """
    sessions: dict[int, tuple[dict[str, Any], ...]] = {}
    for nsb in (False, True):
        db = npc_lims.metadata.get_training_db(nsb)
        ## using tables other than `all_mice`
        subjects = tuple(
            npc_session.SubjectRecord(table["name"])
            for table in db.execute(
                "SELECT name FROM sqlite_master WHERE type='table'"
            ).fetchall()
            if table["name"] not in ("sqlite_sequence", "all_mice")
        )
        for subject in subjects:
            sessions[subject] = tuple(
                row | {"nsb": nsb}
                for row in db.execute(
                    f"SELECT * FROM '{subject}' WHERE ignore != 1"
                ).fetchall()
            )
    return sessions

get_sessions_with_data_assets cached

get_sessions_with_data_assets(subject: str | int) -> tuple[npc_session.SessionRecord, ...]

Examples:

>>> sessions = get_sessions_with_data_assets(668759)
>>> assert len(sessions) > 0
Source code in npc_lims/metadata/codeocean.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@functools.cache
def get_sessions_with_data_assets(
    subject: str | int,
) -> tuple[npc_session.SessionRecord, ...]:
    """
    Examples:
        >>> sessions = get_sessions_with_data_assets(668759)
        >>> assert len(sessions) > 0
    """
    assets = get_subject_data_assets(subject)
    sessions = set()
    for asset in assets:
        try:
            session = npc_session.SessionRecord(asset["name"])
        except ValueError:
            continue
        sessions.add(session)
    return tuple(sessions)

get_settings_xml_path_from_s3 cached

get_settings_xml_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

Examples:

>>> settings_xml_path = get_settings_xml_path_from_s3('670180-2023-07-26')
>>> assert settings_xml_path.exists()
Source code in npc_lims/paths/s3.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@functools.cache
def get_settings_xml_path_from_s3(
    session: str | npc_session.SessionRecord,
) -> upath.UPath:
    """
    Examples:
        >>> settings_xml_path = get_settings_xml_path_from_s3('670180-2023-07-26')
        >>> assert settings_xml_path.exists()
    """
    raw_data_paths_s3 = get_raw_data_paths_from_s3(session)

    directories = (
        raw_path
        for raw_path in raw_data_paths_s3
        if raw_path.is_dir() and ".zarr" not in raw_path.suffix
    )
    return tuple(raw_path / "settings.xml" for raw_path in directories)[0]

get_sorted_data_paths_from_s3 cached

get_sorted_data_paths_from_s3(session: str | npc_session.SessionRecord | None = None, sorted_data_asset_id: str | None = None) -> tuple[upath.UPath, ...]

Gets the top level files/folders for the sorted data

Examples:

>>> sorted_data_s3_paths = get_sorted_data_paths_from_s3('668759_20230711')
>>> assert len(sorted_data_s3_paths) > 0
Source code in npc_lims/paths/s3.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@functools.cache
def get_sorted_data_paths_from_s3(
    session: str | npc_session.SessionRecord | None = None,
    sorted_data_asset_id: str | None = None,
) -> tuple[upath.UPath, ...]:
    """
    Gets the top level files/folders for the sorted data

    Examples:
        >>> sorted_data_s3_paths = get_sorted_data_paths_from_s3('668759_20230711')
        >>> assert len(sorted_data_s3_paths) > 0
    """
    if sorted_data_asset_id is not None:
        sorted_data_asset = codeocean.get_data_asset(sorted_data_asset_id)
    elif session is not None:
        sorted_data_asset = codeocean.get_session_sorted_data_asset(session)
    else:
        raise ValueError("Must provide either session or sorted_data_asset_id")
    return tuple(get_data_asset_s3_path(sorted_data_asset).iterdir())

get_sorted_precurated_paths_from_s3 cached

get_sorted_precurated_paths_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

Examples:

>>> sorted_precurated_paths = get_sorted_precurated_paths_from_s3('662892_2023-08-21')
>>> assert len(sorted_precurated_paths) > 0
Source code in npc_lims/paths/s3.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
@functools.cache
def get_sorted_precurated_paths_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> sorted_precurated_paths = get_sorted_precurated_paths_from_s3('662892_2023-08-21')
        >>> assert len(sorted_precurated_paths) > 0
    """
    sorted_paths = get_sorted_data_paths_from_s3(session)
    sorted_precurated_dirs = tuple(
        next(
            path for path in sorted_paths if "sorting_precurated" in str(path)
        ).iterdir()
    )

    return sorted_precurated_dirs

get_spike_sorted_paths_from_s3 cached

get_spike_sorted_paths_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

spike_sorted_paths = get_spike_sorted_paths_from_s3('662892_20230821') assert spike_sorted_paths[0].exists()

Source code in npc_lims/paths/s3.py
132
133
134
135
136
137
138
139
140
141
142
143
@functools.cache
def get_spike_sorted_paths_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    >>> spike_sorted_paths = get_spike_sorted_paths_from_s3('662892_20230821')
    >>> assert spike_sorted_paths[0].exists()
    """
    sorted_data_paths = get_sorted_data_paths_from_s3(session)
    return tuple(
        next(path for path in sorted_data_paths if "spike" in str(path)).iterdir()
    )

get_spike_sorting_device_path_from_s3 cached

get_spike_sorting_device_path_from_s3(session: str | npc_session.SessionRecord, device_name: str) -> upath.UPath

Examples:

>>> get_spike_sorting_device_path_from_s3('662892_20230821', 'ProbeA')
S3Path('s3://codeocean-s3datasetsbucket-1u41qdg42ur9/d527db85-39b7-4c4f-a465-9ca499b0ca47/spikesorted/experiment1_Record Node 102#Neuropix-PXI-100.ProbeA-AP_recording1/sorting_cached.npz')
Source code in npc_lims/paths/s3.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@functools.cache
def get_spike_sorting_device_path_from_s3(
    session: str | npc_session.SessionRecord, device_name: str
) -> upath.UPath:
    """
    Examples:
        >>> get_spike_sorting_device_path_from_s3('662892_20230821', 'ProbeA')
        S3Path('s3://codeocean-s3datasetsbucket-1u41qdg42ur9/d527db85-39b7-4c4f-a465-9ca499b0ca47/spikesorted/experiment1_Record Node 102#Neuropix-PXI-100.ProbeA-AP_recording1/sorting_cached.npz')
    """
    spike_sorted_paths = get_spike_sorted_paths_from_s3(session)
    spike_probe_paths = next(
        path for path in spike_sorted_paths if device_name in str(path)
    ).iterdir()
    sorting_cached_path = next(
        path for path in spike_probe_paths if "sorting_cached" in str(path)
    )

    return sorting_cached_path

get_spike_times_codeocean_kilosort_path_from_s3 cached

get_spike_times_codeocean_kilosort_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

Examples:

>>> path = get_spike_times_codeocean_kilosort_path_from_s3('668759_20230711')
>>> assert path
Source code in npc_lims/paths/s3.py
500
501
502
503
504
505
506
507
508
509
510
511
512
@functools.cache
def get_spike_times_codeocean_kilosort_path_from_s3(
    session: str | npc_session.SessionRecord,
) -> upath.UPath:
    """
    Examples:
        >>> path = get_spike_times_codeocean_kilosort_path_from_s3('668759_20230711')
        >>> assert path
    """
    files = get_units_spikes_codeocean_kilosort_top_level_files(session)
    spike_times_path = next(path for path in files if "spike" in str(path))

    return spike_times_path

get_spikesorted_cache_paths_from_s3 cached

get_spikesorted_cache_paths_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

Examples:

>>> spike_sorted_cache_paths = get_spikesorted_cache_paths_from_s3('662892_20230821')
>>> assert len(spike_sorted_cache_paths) > 0
Source code in npc_lims/paths/s3.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@functools.cache
def get_spikesorted_cache_paths_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> spike_sorted_cache_paths = get_spikesorted_cache_paths_from_s3('662892_20230821')
        >>> assert len(spike_sorted_cache_paths) > 0
    """
    spike_sorted_paths = get_spike_sorted_paths_from_s3(session)
    spike_sorted_cache_files = tuple(
        next(path.glob("sorting_cached.npz")) for path in spike_sorted_paths
    )

    return spike_sorted_cache_files

get_subject_data_assets cached

get_subject_data_assets(subject: str | int) -> tuple[DataAssetAPI, ...]

All assets associated with a subject ID.

Examples:

>>> assets = get_subject_data_assets(668759)
>>> assert len(assets) > 0
Source code in npc_lims/metadata/codeocean.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@functools.cache
def get_subject_data_assets(subject: str | int) -> tuple[DataAssetAPI, ...]:
    """
    All assets associated with a subject ID.

    Examples:
        >>> assets = get_subject_data_assets(668759)
        >>> assert len(assets) > 0
    """
    response = get_codeocean_client().search_all_data_assets(
        query=f"subject id: {npc_session.SubjectRecord(subject)}"
    )
    response.raise_for_status()
    return response.json()["results"]

get_subject_folders_from_data_repo cached

get_subject_folders_from_data_repo(subject: int | str | None = None) -> dict[npc_session.SubjectRecord, upath.UPath] | upath.UPath

Examples:

>>> all_subjects = get_subject_folders_from_data_repo()
>>> len(all_subjects)
93
>>> get_subject_folders_from_data_repo(366122).name
'366122'
Source code in npc_lims/status/behavior_sessions.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@functools.cache
def get_subject_folders_from_data_repo(
    subject: int | str | None = None,
) -> dict[npc_session.SubjectRecord, upath.UPath] | upath.UPath:
    """
    Examples:
        >>> all_subjects = get_subject_folders_from_data_repo()
        >>> len(all_subjects)                               # doctest: +SKIP
        93

        >>> get_subject_folders_from_data_repo(366122).name
        '366122'
    """
    if subject is not None:
        if not (
            path := npc_lims.paths.DR_DATA_REPO
            / str(npc_session.SubjectRecord(subject))
        ).exists():
            raise FileNotFoundError(f"{path=} does not exist")
        return path
    subject_to_folder: dict[npc_session.SubjectRecord, upath.UPath] = {}
    for path in npc_lims.paths.DR_DATA_REPO.iterdir():
        if path.is_file():
            continue
        if any(invalid_key in path.name for invalid_key in INVALID_SUBJECT_KEYS):
            continue
        try:
            _subject = npc_session.SubjectRecord(path.name)
        except ValueError:
            continue
        if _subject in subject_to_folder:
            raise ValueError(f"Duplicate path for {_subject=}: {path}")
        subject_to_folder[_subject] = path
    return subject_to_folder

get_subjects_from_training_db cached

get_subjects_from_training_db(nsb: bool = False) -> dict[npc_session.SubjectRecord, dict[str, Any]]

Dynamic Routing training spreadsheet info.

{subject: ({spreadsheet row}, ... )}

Examples:

>>> subjects = get_subjects_from_training_db(nsb=True)
>>> assert len(subjects) > 0
>>> subjects[659250]
{'ID': 50, 'mouse_id': '659250', 'alive': 'False', 'genotype': 'PV Cre x Ai32', 'sex': 'male', 'birthdate': '2022-11-21 00:00:00', 'surgery_week': '2023-01-30 00:00:00', 'craniotomy': 'True', 'trainer': 'Sam', 'regimen': '7', 'wheel_fixed': 'False', 'timeouts': 'True', 'next_task_version': 'dead'}
Source code in npc_lims/status/behavior_sessions.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@functools.cache
def get_subjects_from_training_db(
    nsb: bool = False,
) -> dict[npc_session.SubjectRecord, dict[str, Any]]:
    """
    Dynamic Routing training spreadsheet info.

    {subject: ({spreadsheet row}, ... )}

    Examples:
        >>> subjects = get_subjects_from_training_db(nsb=True)
        >>> assert len(subjects) > 0
        >>> subjects[659250]                       # doctest: +SKIP
        {'ID': 50, 'mouse_id': '659250', 'alive': 'False', 'genotype': 'PV Cre x Ai32', 'sex': 'male', 'birthdate': '2022-11-21 00:00:00', 'surgery_week': '2023-01-30 00:00:00', 'craniotomy': 'True', 'trainer': 'Sam', 'regimen': '7', 'wheel_fixed': 'False', 'timeouts': 'True', 'next_task_version': 'dead'}
    """
    db = npc_lims.metadata.get_training_db(nsb)

    # use entries in `all_mice` table
    subjects = tuple(
        {
            npc_session.SubjectRecord(result["mouse_id"])
            for result in db.execute("SELECT * FROM all_mice").fetchall()
        }
    )

    return {
        subject: db.execute(
            "SELECT * FROM 'all_mice' WHERE mouse_id=?", (subject,)
        ).fetchone()
        for subject in subjects
    }

get_surface_channel_root

get_surface_channel_root(session: str | npc_session.SessionRecord) -> upath.UPath

Reconstruct path to surface channel data in bucket (e.g. on s3) using data-asset info from Code Ocean.

Examples:

>>> get_surface_channel_root('660023_20230808')
S3Path('s3://aind-ephys-data/ecephys_660023_2023-08-08_15-11-14')
>>> assert get_surface_channel_root('660023_20230808') != get_raw_data_root('660023_20230808')
>>> get_surface_channel_root('649943_20230216')
Traceback (most recent call last):
...
FileNotFoundError: 649943_20230216 has no surface channel data assets
Source code in npc_lims/metadata/codeocean.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def get_surface_channel_root(session: str | npc_session.SessionRecord) -> upath.UPath:
    """Reconstruct path to surface channel data in bucket (e.g. on s3) using data-asset
    info from Code Ocean.

    Examples:
        >>> get_surface_channel_root('660023_20230808')
        S3Path('s3://aind-ephys-data/ecephys_660023_2023-08-08_15-11-14')
        >>> assert get_surface_channel_root('660023_20230808') != get_raw_data_root('660023_20230808')
        >>> get_surface_channel_root('649943_20230216')
        Traceback (most recent call last):
        ...
        FileNotFoundError: 649943_20230216 has no surface channel data assets
    """
    session = npc_session.SessionRecord(session)
    raw_assets = tuple(
        asset for asset in get_session_data_assets(session) if is_raw_data_asset(asset)
    )
    try:
        raw_asset = get_single_data_asset(session.with_idx(1), raw_assets, "raw")
    except SessionIndexError:
        raise FileNotFoundError(
            f"{session} has no surface channel data assets"
        ) from None
    return get_path_from_data_asset(raw_asset)

get_template_metrics_paths_from_s3 cached

get_template_metrics_paths_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

Examples:

>>> template_metrics_paths = get_template_metrics_paths_from_s3('662892_2023-08-21')
>>> assert len(template_metrics_paths) > 0
Source code in npc_lims/paths/s3.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@functools.cache
def get_template_metrics_paths_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> template_metrics_paths = get_template_metrics_paths_from_s3('662892_2023-08-21')
        >>> assert len(template_metrics_paths) > 0
    """
    sorted_paths = get_sorted_data_paths_from_s3(session)
    postprocessed_files = next(
        path for path in sorted_paths if "postprocessed" in str(path)
    ).iterdir()
    template_metrics_paths = tuple(
        next(path.glob("template_metrics/metrics.csv")) for path in postprocessed_files
    )

    return template_metrics_paths

get_tissuecyte_annotation_files_from_s3 cached

get_tissuecyte_annotation_files_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

For each probe inserted, get a csv file containing CCF coordinates for each electrode (channel) on the probe.

Examples:

>>> electrode_files = get_tissuecyte_annotation_files_from_s3('626791_2022-08-16')
>>> assert len(electrode_files) > 0
>>> electrode_files[0].name
'Probe_A2_channels_626791_warped_processed.csv'
Source code in npc_lims/paths/s3.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@functools.cache
def get_tissuecyte_annotation_files_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """For each probe inserted, get a csv file containing CCF coordinates for each
    electrode (channel) on the probe.

    Examples:
        >>> electrode_files = get_tissuecyte_annotation_files_from_s3('626791_2022-08-16')
        >>> assert len(electrode_files) > 0
        >>> electrode_files[0].name
        'Probe_A2_channels_626791_warped_processed.csv'
    """
    session = npc_session.SessionRecord(session)
    day = tracked_sessions.get_session_info(session).experiment_day
    subject_electrode_network_path = TISSUECYTE_REPO / str(session.subject.id)

    if not subject_electrode_network_path.exists():
        raise FileNotFoundError(
            f"CCF annotations for {session} have not been uploaded to s3"
        )

    electrode_files = tuple(
        subject_electrode_network_path.glob(
            f"Probe_*{day}_channels_{str(session.subject.id)}_warped_processed.csv"
        )
    )
    if not electrode_files:
        raise FileNotFoundError(
            f"{subject_electrode_network_path} exists, but no CCF annotation files found matching {day} and {session.subject.id} - check session day"
        )

    return electrode_files

get_training_db cached

get_training_db(nsb: bool = False) -> sqlite3.Connection

Download db to tempdir, open connection, return connection.

Examples:

>>> assert get_training_db()
Source code in npc_lims/metadata/spreadsheets.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@functools.cache
def get_training_db(nsb: bool = False) -> sqlite3.Connection:
    """
    Download db to tempdir, open connection, return connection.

    Examples:
        >>> assert get_training_db()
    """
    db_path = upath.UPath(tempfile.mkstemp(suffix=".db")[1])
    s3_path = next(
        path for path in get_training_sqlite_paths() if ("NSB" in path.name) == nsb
    )
    db_path.write_bytes(s3_path.read_bytes())
    con = sqlite3.connect(db_path)

    def dict_factory(cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d

    con.row_factory = dict_factory
    return con

get_training_spreadsheet_paths

get_training_spreadsheet_paths() -> tuple[upath.UPath, ...]

Examples:

>>> assert len(get_training_spreadsheet_paths()) > 0
Source code in npc_lims/metadata/spreadsheets.py
49
50
51
52
53
54
def get_training_spreadsheet_paths() -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> assert len(get_training_spreadsheet_paths()) > 0
    """
    return tuple(DR_DATA_REPO.parent.glob("DynamicRoutingTraining*.xlsx"))

get_training_sqlite_paths

get_training_sqlite_paths() -> tuple[upath.UPath, ...]

Examples:

>>> assert len(get_training_sqlite_paths()) == len(get_training_spreadsheet_paths())
Source code in npc_lims/metadata/spreadsheets.py
14
15
16
17
18
19
20
21
def get_training_sqlite_paths() -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> assert len(get_training_sqlite_paths()) == len(get_training_spreadsheet_paths())
    """
    return tuple(
        path.with_suffix(".sqlite") for path in get_training_spreadsheet_paths()
    )

get_unit_locations_paths_from_s3 cached

get_unit_locations_paths_from_s3(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

Examples:

>>> unit_locations_paths = get_unit_locations_paths_from_s3('662892_2023-08-21')
>>> assert len(unit_locations_paths) > 0
Source code in npc_lims/paths/s3.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
@functools.cache
def get_unit_locations_paths_from_s3(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> unit_locations_paths = get_unit_locations_paths_from_s3('662892_2023-08-21')
        >>> assert len(unit_locations_paths) > 0
    """
    sorted_paths = get_sorted_data_paths_from_s3(session)
    postprocessed_files = next(
        path for path in sorted_paths if "postprocessed" in str(path)
    ).iterdir()
    unit_locations_paths = tuple(
        next(path.glob("unit_locations/unit_locations.npy"))
        for path in postprocessed_files
    )

    return unit_locations_paths

get_units_codeoean_kilosort_path_from_s3 cached

get_units_codeoean_kilosort_path_from_s3(session: str | npc_session.SessionRecord) -> upath.UPath

Examples:

>>> path = get_units_codeoean_kilosort_path_from_s3('668759_20230711')
>>> assert path
Source code in npc_lims/paths/s3.py
485
486
487
488
489
490
491
492
493
494
495
496
497
@functools.cache
def get_units_codeoean_kilosort_path_from_s3(
    session: str | npc_session.SessionRecord,
) -> upath.UPath:
    """
    Examples:
        >>> path = get_units_codeoean_kilosort_path_from_s3('668759_20230711')
        >>> assert path
    """
    files = get_units_spikes_codeocean_kilosort_top_level_files(session)
    units_path = next(path for path in files if "csv" in str(path))

    return units_path

get_units_spikes_codeocean_kilosort_top_level_files cached

get_units_spikes_codeocean_kilosort_top_level_files(session: str | npc_session.SessionRecord) -> tuple[upath.UPath, ...]

Examples:

>>> paths = get_units_spikes_codeocean_kilosort_top_level_files('668759_20230711')
>>> assert paths
Source code in npc_lims/paths/s3.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
@functools.cache
def get_units_spikes_codeocean_kilosort_top_level_files(
    session: str | npc_session.SessionRecord,
) -> tuple[upath.UPath, ...]:
    """
    Examples:
        >>> paths = get_units_spikes_codeocean_kilosort_top_level_files('668759_20230711')
        >>> assert paths
    """
    units_spikes_data_asset = (
        codeocean.get_session_units_spikes_with_peak_channels_data_asset(session)
    )

    units_directory = next(
        unit_path
        for unit_path in get_data_asset_s3_path(units_spikes_data_asset).iterdir()
        if unit_path.is_dir()
    )

    return tuple(units_directory.iterdir())

is_raw_data_asset

is_raw_data_asset(asset: str | DataAssetAPI) -> bool

Examples:

>>> is_raw_data_asset('83636983-f80d-42d6-a075-09b60c6abd5e')
True
>>> is_raw_data_asset('173e2fdc-0ca3-4a4e-9886-b74207a91a9a')
False
Source code in npc_lims/metadata/codeocean.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def is_raw_data_asset(asset: str | DataAssetAPI) -> bool:
    """
    Examples:
        >>> is_raw_data_asset('83636983-f80d-42d6-a075-09b60c6abd5e')
        True
        >>> is_raw_data_asset('173e2fdc-0ca3-4a4e-9886-b74207a91a9a')
        False
    """
    asset = get_data_asset(asset)
    if is_sorted_data_asset(asset):
        return False
    return asset.get("custom_metadata", {}).get(
        "data level"
    ) == "raw data" or "raw" in asset.get("tags", [])

is_sorted_data_asset

is_sorted_data_asset(asset: str | DataAssetAPI) -> bool

Examples:

>>> is_sorted_data_asset('173e2fdc-0ca3-4a4e-9886-b74207a91a9a')
True
>>> is_sorted_data_asset('83636983-f80d-42d6-a075-09b60c6abd5e')
False
Source code in npc_lims/metadata/codeocean.py
252
253
254
255
256
257
258
259
260
261
262
263
def is_sorted_data_asset(asset: str | DataAssetAPI) -> bool:
    """
    Examples:
        >>> is_sorted_data_asset('173e2fdc-0ca3-4a4e-9886-b74207a91a9a')
        True
        >>> is_sorted_data_asset('83636983-f80d-42d6-a075-09b60c6abd5e')
        False
    """
    asset = get_data_asset(asset)
    if "ecephys" not in asset["name"]:
        return False
    return "sorted" in asset["name"]

update_training_dbs

update_training_dbs() -> None

Read spreadsheets from the data repo and write them to corresponding databases, currently sqlite files in the same directory.

Examples:

>>> update_training_dbs()
Source code in npc_lims/metadata/spreadsheets.py
57
58
59
60
61
62
63
64
65
66
67
68
def update_training_dbs() -> None:
    """
    Read spreadsheets from the data repo and write them to corresponding
    databases, currently sqlite files in the same directory.

    Examples:
        >>> update_training_dbs()
    """
    for spreadsheet, sqlite in zip(
        get_training_spreadsheet_paths(), get_training_sqlite_paths()
    ):
        excel_to_sqlite(spreadsheet, sqlite)