Skip to content

EFS

Utilities for working with Amazon Elastic File System.

Overview

Module Description
Core Core EFS utilities
Mount Point Mount point management
Paths EFS path utilities

MountPointConfiguration dataclass

MountPointConfiguration(
    file_system, access_point, mount_point
)

Adapter for translating and mapping paths between AWS Elastic File System (EFS) mount points and local file system paths.

local file systems are most often used by: - EC2 instances - AWS Lambda functions

This class provides functionality to adapt file paths for applications that interact with AWS EFS. It allows for the conversion of paths from the EFS perspective to the local file system perspective and vice versa, considering the mount point and access point configurations.

Attributes:

Name Type Description
file_system FileSystemDescriptionTypeDef

The description of the EFS file system.

access_point Optional[AccessPointDescriptionTypeDef]

The description of the EFS access point, if used.

mount_point Path

The local file system path where the EFS is mounted.

access_point_path property

access_point_path

This is the access point on file system mounted to host

is_writeable property

is_writeable

Returns true, if the mount point is writeable

mount_point_path property

mount_point_path

This is the path on the host where the access point will be mounted

as_efs_path

as_efs_path(path)

Converts a path to a path on the EFS file system.

Behavior: - If the path is absolute, the path is made relative to either the mount point or access point first. examples: - "/efs/accesspoint/path/to/file" -> "path/to/file" - "/mnt/efs/path/to/file" -> "path/to/file" - If the path is absolute and does not start with the mount point or access point, a ValueError is raised. - If the path is relative, It is assumed to be the relative path from the access point. e.g. "path/to/file" -> "/efs/accesspoint/path/to/file"

Parameters:

Name Type Description Default
path StrPath

The path to convert to a path on the EFS file system.

required

Returns:

Type Description
Path

absolute path on the EFS file system.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def as_efs_path(self, path: StrPath) -> Path:
    """
    Converts a path to a path on the EFS file system.

    Behavior:
    - If the path is absolute, the path is made relative to either the
        mount point or access point first. examples:
        - "/efs/accesspoint/path/to/file" -> "path/to/file"
        - "/mnt/efs/path/to/file" -> "path/to/file"
    - If the path is absolute and does not start with the mount point or access point,
        a ValueError is raised.
    - If the path is relative, It is assumed to be the relative path from the access point.
        e.g. "path/to/file" -> "/efs/accesspoint/path/to/file"

    Args:
        path (StrPath): The path to convert to a path on the EFS file system.

    Returns:
        absolute path on the EFS file system.
    """
    path = self.as_relative_path(path)
    return (self.access_point_path / path).resolve()

as_efs_uri

as_efs_uri(path)

Converts a path to a customized URI EFSPath describing the location on an EFS file system.

Parameters:

Name Type Description Default
path StrPath

The path to convert to a path on the EFS file system.

required

Returns:

Type Description
EFSPath

absolute URI path on the EFS file system.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def as_efs_uri(self, path: StrPath) -> EFSPath:
    """
    Converts a path to a customized URI EFSPath describing the location on an EFS file system.

    Args:
        path (StrPath): The path to convert to a path on the EFS file system.

    Returns:
        absolute URI path on the EFS file system.
    """
    return EFSPath.build(
        resource_id=self.file_system["FileSystemId"],
        path=self.as_efs_path(path),
    )

as_env_vars

as_env_vars(name=None)

Converts the mount point configuration to environment variables.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
237
238
239
240
241
242
243
244
def as_env_vars(self, name: Optional[str] = None) -> Dict[str, str]:
    """Converts the mount point configuration to environment variables."""
    if self.access_point and self.access_point.get("AccessPointId"):
        mount_point_id = self.access_point["AccessPointId"]
    else:
        mount_point_id = self.file_system["FileSystemId"]

    return self.to_env_vars(self.mount_point, mount_point_id, name)

as_mounted_path

as_mounted_path(path)

Converts a path to a path on the host where the EFS is mounted.

Behavior: - If the path is absolute, the path is made relative to either the mount point or access point first. examples: - "/efs/accesspoint/path/to/file" -> "path/to/file" - "/mnt/efs/path/to/file" -> "path/to/file" - If the path is absolute and does not start with the mount point or access point, a ValueError is raised. - If the path is relative, It is assumed to be the relative path from the mount point. e.g. "path/to/file" -> "/mnt/efs/path/to/file"

Parameters:

Name Type Description Default
path StrPath

The path to convert to a path on the host where the EFS is mounted.

required

Returns:

Type Description
Path

absolute path on the host where the EFS is mounted.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def as_mounted_path(self, path: StrPath) -> Path:
    """
    Converts a path to a path on the host where the EFS is mounted.

    Behavior:
    - If the path is absolute, the path is made relative to either the mount point
        or access point first. examples:
        - "/efs/accesspoint/path/to/file" -> "path/to/file"
        - "/mnt/efs/path/to/file" -> "path/to/file"
    - If the path is absolute and does not start with the mount point or access point,
        a ValueError is raised.
    - If the path is relative, It is assumed to be the relative path from the mount point.
        e.g. "path/to/file" -> "/mnt/efs/path/to/file"

    Args:
        path (StrPath): The path to convert to a path on the host where the EFS is mounted.

    Returns:
        absolute path on the host where the EFS is mounted.
    """
    path = self.as_relative_path(path)
    return (self.mount_point / path).resolve()

as_relative_path

as_relative_path(path)

Converts a path to a relative path from the mount point or access point.

Behavior: - If the path is absolute and is relative to the mount point or access point, it is returned as a relative path. - If the path is already relative, it is returned as is. - If the path is absolute and is not relative to the mount point or access point, a ValueError is raised.

Parameters:

Name Type Description Default
path StrPath

The path to convert to a relative path.

required

Returns:

Type Description
Path

The relative path.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def as_relative_path(self, path: StrPath) -> Path:
    """
    Converts a path to a relative path from the mount point or access point.

    Behavior:
    - If the path is absolute and is relative to the mount point or access point,
        it is returned as a relative path.
    - If the path is already relative,
        it is returned as is.
    - If the path is absolute and is not relative to the mount point or access point,
        a ValueError is raised.


    Args:
        path (StrPath): The path to convert to a relative path.

    Returns:
        The relative path.

    """
    path = Path(path)
    if path.is_absolute():
        # Check longer path first. This is to avoid the case where the mount point
        # is a subdirectory of the access point.
        check_mount_path_first = len(self.mount_point_path.as_posix()) > len(
            self.access_point_path.as_posix()
        )
        if check_mount_path_first and self.is_mounted_path(path):
            path = path.relative_to(self.mount_point)
        elif self.is_efs_path(path):
            path = path.relative_to(self.access_point_path)
        elif self.is_mounted_path(path):
            path = path.relative_to(self.mount_point)
        else:
            raise ValueError(
                f"Path {path} is not relative to either the mount point {self.mount_point} "
                f"or the access point {self.access_point_path}"
            )
    return path

build classmethod

build(
    mount_point,
    access_point=None,
    file_system=None,
    access_point_tags=None,
    file_system_tags=None,
)

Creates a new config from the given mount point and access point or file system.

Important: Must provide either access point or file system.

Parameters:

Name Type Description Default
mount_point StrPath

Intended mount point of the EFS file system on the host.

required
access_point Optional[Union[str, AccessPointDescriptionTypeDef]]

Identifier of the access point or the access point description. If specified as a string, can be either the access point id or name. Defaults to None.

None
file_system Optional[Union[str, FileSystemDescriptionTypeDef]]

Identifier of the file system or the file system description. If specified as a string, can be either the file system id or name. Defaults to None.

None
access_point_tags Optional[Dict[str, str]]

Tags to filter the access point. Defaults to None.

None
file_system_tags Optional[Dict[str, str]]

Tags to filter the file system. Defaults to None.

None

Raises: ValueError: if neither access point nor file system is provided.

Returns:

Type Description
MountPointConfiguration

The config

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
@classmethod
def build(
    cls,
    mount_point: StrPath,
    access_point: Optional[Union[str, AccessPointDescriptionTypeDef]] = None,
    file_system: Optional[Union[str, FileSystemDescriptionTypeDef]] = None,
    access_point_tags: Optional[Dict[str, str]] = None,
    file_system_tags: Optional[Dict[str, str]] = None,
) -> MountPointConfiguration:
    """Creates a new config from the given mount point and access point or file system.

    Important: Must provide either access point or file system.

    Args:
        mount_point (StrPath): Intended mount point of the EFS file system on the host.
        access_point (Optional[Union[str, AccessPointDescriptionTypeDef]]):
            Identifier of the access point or the access point description.
            If specified as a string, can be either the access point id or name.
            Defaults to None.
        file_system (Optional[Union[str, FileSystemDescriptionTypeDef]]):
            Identifier of the file system or the file system description.
            If specified as a string, can be either the file system id or name.
            Defaults to None.
        access_point_tags (Optional[Dict[str, str]]): Tags to filter the access point.
            Defaults to None.
        file_system_tags (Optional[Dict[str, str]]): Tags to filter the file system.
            Defaults to None.
    Raises:
        ValueError: if neither access point nor file system is provided.

    Returns:
        The config
    """

    if access_point is None and file_system is None:
        raise ValueError("Either access point or file system must be provided")

    file_system_id = None
    if isinstance(file_system, str):
        logger.info(f"Resolving file system config from name {file_system}")
        fs_id, fs_name = (
            (FileSystemId(file_system).normalized, None)
            if FileSystemId.is_valid(file_system)
            else (None, file_system)
        )
        file_system = get_efs_file_system(
            name=fs_name, file_system_id=fs_id, tags=file_system_tags
        )
        file_system_id = file_system["FileSystemId"]
        logger.info(f"Resolved file system id {file_system_id}")

    access_point_id = None
    if isinstance(access_point, str):
        logger.info(f"Resolving access point config from name {access_point}")
        ap_id, ap_name = (
            (AccessPointId(access_point).normalized, None)
            if AccessPointId.is_valid(access_point)
            else (None, access_point)
        )
        access_point = get_efs_access_point(
            access_point_name=ap_name,
            access_point_id=ap_id,
            file_system_id=file_system_id,
            access_point_tags=access_point_tags,
        )
        access_point_id = access_point.get("AccessPointId")
        logger.info(f"Resolved access point id {access_point_id}")

    if file_system is None:
        assert access_point is not None
        file_system_id = access_point.get("FileSystemId")
        logger.info(f"Resolving file system config from access point id {access_point_id}")
        file_system = get_efs_file_system(file_system_id=access_point.get("FileSystemId"))
        logger.info(f"Resolved file system id {file_system_id}")

    if not isinstance(mount_point, Path):
        mount_point = Path(mount_point)

    logger.info(
        f"Generating EFS Mount Point Connection using "
        f"file system {file_system}, "
        f"access point {access_point}, "
        f"with target {mount_point} mount point"
    )
    return cls(
        file_system=file_system,
        access_point=access_point,
        mount_point=mount_point,
    )

is_efs_path

is_efs_path(path)

Checks if the path is relative to the access point

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
229
230
231
def is_efs_path(self, path: StrPath) -> bool:
    """Checks if the path is relative to the access point"""
    return Path(path).is_relative_to(self.access_point_path)

is_mounted_path

is_mounted_path(path)

Checks if the path is relative to the mount point

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
233
234
235
def is_mounted_path(self, path: StrPath) -> bool:
    """Checks if the path is relative to the mount point"""
    return Path(path).is_relative_to(self.mount_point)

to_env_vars classmethod

to_env_vars(mount_point, mount_point_id, name=None)

Converts the mount point configuration to environment variables.

Parameters:

Name Type Description Default
mount_point StrPath

The mount point.

required
mount_point_id Union[str, AccessPointId, FileSystemId]

The mount point identifier.

required
name Optional[str]

Optional name for the mount point. Defaults to None.

None

Returns:

Type Description
Dict[str, str]

Dict[str, str]: The environment variables.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
@classmethod
def to_env_vars(
    cls,
    mount_point: StrPath,
    mount_point_id: Union[str, AccessPointId, FileSystemId],
    name: Optional[str] = None,
) -> Dict[str, str]:
    """Converts the mount point configuration to environment variables.

    Args:
        mount_point (StrPath): The mount point.
        mount_point_id (Union[str, AccessPointId, FileSystemId]): The mount point identifier.
        name (Optional[str]): Optional name for the mount point. Defaults to None.

    Returns:
        Dict[str, str]: The environment variables.
    """

    if not isinstance(mount_point, Path):
        mount_point = Path(mount_point)

    if not isinstance(mount_point_id, AccessPointId) and not isinstance(
        mount_point_id, FileSystemId
    ):
        if AccessPointId.is_valid(mount_point_id):
            mount_point_id = AccessPointId(mount_point_id).normalized
        elif FileSystemId.is_valid(mount_point_id):
            mount_point_id = FileSystemId(mount_point_id).normalized
        else:
            raise ValueError(
                f"Invalid mount point id {mount_point_id}. "
                "Must be either an access point id or file system id."
            )
    else:
        mount_point_id = mount_point_id.normalized

    if name and not re.match(r"[a-zA-Z][\w]{0,11}", name):
        raise ValueError(
            f"Invalid mount point name {name}. Must be a valid environment variable name."
        )
    else:
        name = name or sha256_hexdigest([mount_point.as_posix(), mount_point_id])[:6]

    return {
        f"{EFS_MOUNT_POINT_PATH_VAR}_{name}": mount_point.as_posix(),
        f"{EFS_MOUNT_POINT_ID_VAR}_{name}": mount_point_id,
    }

translate_mounted_path

translate_mounted_path(path, other)

Translates the location of a path described by another mount point to a location on this mount point

Parameters:

Name Type Description Default
path StrPath

The path to translate. should be relative path or absolute relative to the other mount point/access point.

required
other T

The config of other mount point/access point to translate from.

required

Raises:

Type Description
ValueError

If file system of other mount point/access point is not the same as this mount point/access point.

Returns:

Type Description
Path

The translated absolute path on this mount point.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def translate_mounted_path(self, path: StrPath, other: MountPointConfiguration) -> Path:
    """
    Translates the location of a path described by another mount point to
    a location on this mount point

    Args:
        path (StrPath): The path to translate. should be relative path
            or absolute relative to the other mount point/access point.
        other (T): The config of other mount point/access point to translate from.

    Raises:
        ValueError: If file system of other mount point/access point is
            not the same as this mount point/access point.

    Returns:
        The translated absolute path on this mount point.
    """
    path = Path(path)
    if self.file_system["FileSystemId"] != other.file_system["FileSystemId"]:
        raise ValueError(
            f"Cannot resolve path {path} from file system "
            f"{other.file_system['FileSystemId']} to file system "
            f"{self.file_system['FileSystemId']}. They are not the same file system!"
        )

    # First, convert the path to a global path on EFS File System
    global_path = other.as_efs_path(path)
    # Next, convert the global path to the path on the container
    return self.as_mounted_path(global_path)

deduplicate_mount_points

deduplicate_mount_points(mount_points)

Deduplicates a list of MountPointConfiguration objects based on the file system id and access point id.

Source code in src/aibs_informatics_aws_utils/efs/mount_point.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def deduplicate_mount_points(
    mount_points: List[MountPointConfiguration],
) -> List[MountPointConfiguration]:
    """
    Deduplicates a list of MountPointConfiguration objects based on the file
    system id and access point id.
    """

    unique_configs: Dict[str, MountPointConfiguration] = {}
    for mp_config in mount_points:
        key = mp_config.mount_point.as_posix()

        if key in unique_configs:
            other = unique_configs[key]
            if mp_config.access_point_path != other.access_point_path:
                raise ValueError(
                    f"Found conflicting mount points for {key}: "
                    f"{mp_config.access_point_path} and {unique_configs[key].access_point_path}"
                )
            elif mp_config.file_system["FileSystemId"] != other.file_system["FileSystemId"]:
                raise ValueError(
                    f"Found conflicting file systems for {key}: "
                    f"{mp_config.file_system['FileSystemId']} "
                    f"and {unique_configs[key].file_system['FileSystemId']}"
                )
            elif mp_config.access_point != other.access_point:
                raise ValueError(
                    f"Found conflicting access points for {key}: "
                    f"{mp_config.access_point} and {unique_configs[key].access_point}"
                )
            continue
        unique_configs[key] = mp_config
    return list(unique_configs.values())

get_efs_access_point

get_efs_access_point(
    access_point_id=None,
    access_point_name=None,
    access_point_tags=None,
    file_system_id=None,
    file_system_name=None,
    file_system_tags=None,
)

Get EFS access point.

You can filter on id, name and tags for both access point and file system.

Parameters:

Name Type Description Default
access_point_id Optional[str]

Optionally filter on access point id.

None
access_point_name Optional[str]

Optionally filter on name.

None
access_point_tags Optional[Dict[str, str]]

Optionally filter on access point tags. They should be a dict of key-value pairs.

None
file_system_id Optional[str]

Optionally filter on file system id.

None
file_system_name Optional[str]

Optionally filter on file system name.

None
file_system_tags Optional[Dict[str, str]]

Optionally filter on file system tags. They should be a dict of key-value pairs.

None

Raises:

Type Description
ValueError

If no access point is found based on the filters.

ValueError

If more than one access point is found based on the filters.

Returns:

Type Description
AccessPointDescriptionTypeDef

The access point description.

Source code in src/aibs_informatics_aws_utils/efs/core.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def get_efs_access_point(
    access_point_id: Optional[str] = None,
    access_point_name: Optional[str] = None,
    access_point_tags: Optional[Dict[str, str]] = None,
    file_system_id: Optional[str] = None,
    file_system_name: Optional[str] = None,
    file_system_tags: Optional[Dict[str, str]] = None,
) -> AccessPointDescriptionTypeDef:
    """Get EFS access point.

    You can filter on id, name and tags for both access point and file system.

    Args:
        access_point_id (Optional[str], optional): Optionally filter on access point id.
        access_point_name (Optional[str], optional): Optionally filter on name.
        access_point_tags (Optional[Dict[str, str]], optional): Optionally filter on access point
            tags. They should be a dict of key-value pairs.
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        file_system_name (Optional[str], optional): Optionally filter on file system name.
        file_system_tags (Optional[Dict[str, str]], optional): Optionally filter on file system
            tags. They should be a dict of key-value pairs.

    Raises:
        ValueError: If no access point is found based on the filters.
        ValueError: If more than one access point is found based on the filters.

    Returns:
        The access point description.
    """
    access_points = list_efs_access_points(
        access_point_id=access_point_id,
        access_point_name=access_point_name,
        access_point_tags=access_point_tags,
        file_system_id=file_system_id,
        file_system_name=file_system_name,
        file_system_tags=file_system_tags,
    )
    if len(access_points) > 1:
        raise ValueError(
            f"Found more than one access points ({len(access_points)}) "
            f"based on access point filters (id={access_point_id}, "
            f"name={access_point_name}, tags={access_point_tags}) "
            f"and on file system filters (id={file_system_id}, "
            f"name={file_system_name}, tags={file_system_tags}) "
        )
    elif len(access_points) == 0:
        raise ValueError(
            f"Found no access points "
            f"based on access point filters (id={access_point_id}, "
            f"name={access_point_name}, tags={access_point_tags}) "
            f"and on file system filters (id={file_system_id}, "
            f"name={file_system_name}, tags={file_system_tags}) "
        )
    return access_points[0]

get_efs_file_system

get_efs_file_system(
    file_system_id=None, name=None, tags=None
)

Get EFS file system.

You can filter on id, name and tags.

Parameters:

Name Type Description Default
file_system_id Optional[str]

Optionally filter on file system id.

None
name Optional[str]

Optionally filter on name.

None
tags Optional[Dict[str, str]]

Optionally filter on tags. They should be a dict of key-value pairs.

None

Raises:

Type Description
ValueError

If no file system is found based on the filters.

ValueError

If more than one file system is found based on the filters.

Returns:

Type Description
FileSystemDescriptionTypeDef

The file system description.

Source code in src/aibs_informatics_aws_utils/efs/core.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_efs_file_system(
    file_system_id: Optional[str] = None,
    name: Optional[str] = None,
    tags: Optional[Dict[str, str]] = None,
) -> FileSystemDescriptionTypeDef:
    """Get EFS file system.

    You can filter on id, name and tags.

    Args:
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        name (Optional[str], optional): Optionally filter on name.
        tags (Optional[Dict[str, str]], optional): Optionally filter on tags.
            They should be a dict of key-value pairs.

    Raises:
        ValueError: If no file system is found based on the filters.
        ValueError: If more than one file system is found based on the filters.

    Returns:
        The file system description.
    """
    file_systems = list_efs_file_systems(file_system_id=file_system_id, name=name, tags=tags)
    if len(file_systems) > 1:
        raise ValueError(
            f"Found more than one file systems ({len(file_systems)}) "
            f"based on id={file_system_id}, name={name}, tags={tags}"
        )
    elif len(file_systems) == 0:
        raise ValueError(
            f"Found no file systems based on id={file_system_id}, name={name}, tags={tags}"
        )
    return file_systems[0]

get_efs_path

get_efs_path(
    local_path: Path,
    raise_if_unresolved: Literal[False],
    mount_points: Optional[
        List[MountPointConfiguration]
    ] = None,
) -> Optional[EFSPath]
get_efs_path(
    local_path: Path,
    raise_if_unresolved: Literal[True] = True,
    mount_points: Optional[
        List[MountPointConfiguration]
    ] = None,
) -> EFSPath
get_efs_path(
    local_path, raise_if_unresolved=True, mount_points=None
)

Converts a local path assumed to be on a mount point to the EFS path

Parameters:

Name Type Description Default
local_path Path

Local path

required
raise_if_unresolved bool

If True, raises an error if the local path is not under an identifiable mount point. Defaults to True.

True
mount_points List[MountPointConfiguration] | None

Optionally can override list of mount_points. If None, mount points are detected. Defaults to None.

None

Returns:

Type Description
Optional[EFSPath]

Corresponding EFS URI or None if the path cannot be resolved and raise_if_unresolved is False

Source code in src/aibs_informatics_aws_utils/efs/paths.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def get_efs_path(
    local_path: Path,
    raise_if_unresolved: bool = True,
    mount_points: Optional[List[MountPointConfiguration]] = None,
) -> Optional[EFSPath]:
    """Converts a local path assumed to be on a mount point to the EFS path

    Args:
        local_path (Path): Local path
        raise_if_unresolved (bool): If True, raises an error if the local path is not
            under an identifiable mount point. Defaults to True.
        mount_points (List[MountPointConfiguration] | None): Optionally can override
            list of mount_points. If None, mount points are detected. Defaults to None.

    Returns:
        Corresponding EFS URI or None if the path cannot be resolved and
            raise_if_unresolved is False
    """
    mount_points = mount_points if mount_points is not None else detect_mount_points()

    for mp in mount_points:
        if mp.is_mounted_path(local_path):
            logger.debug(f"Found mount point {mp} that matches path {local_path}")
            return mp.as_efs_uri(local_path)
    else:
        message = (
            f"Local path {local_path} is not relative to any of the "
            f"{len(mount_points)} mount point mount_points. Adapters: {mount_points}"
        )
        if raise_if_unresolved:
            logger.error(message)
            raise ValueError(message)
        logger.warning(message)
        return None

get_local_path

get_local_path(
    efs_path: EFSPath,
    raise_if_unmounted: Literal[False],
    mount_points: Optional[
        List[MountPointConfiguration]
    ] = None,
) -> Optional[Path]
get_local_path(
    efs_path: EFSPath,
    raise_if_unmounted: Literal[True] = True,
    mount_points: Optional[
        List[MountPointConfiguration]
    ] = None,
) -> Path
get_local_path(
    efs_path, raise_if_unmounted=True, mount_points=None
)

Gets a valid locally mounted path for the given EFS path.

Parameters:

Name Type Description Default
efs_path EFSPath

The EFS path. e.g., "efs://fs-12345678:/path/to/file.txt"

required
raise_if_unmounted bool

If True, raises an error if the EFS path is not mounted locally. Defaults to True.

True
mount_points List[MountPointConfiguration] | None

Optionally can override list of mount points. If None, mount points are detected. Defaults to None.

None

Returns:

Type Description
Optional[Path]

The local path. e.g., "/mnt/efs/path/to/file.txt" or None if the path cannot be resolved and raise_if_unmounted is False

Source code in src/aibs_informatics_aws_utils/efs/paths.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_local_path(
    efs_path: EFSPath,
    raise_if_unmounted: bool = True,
    mount_points: Optional[List[MountPointConfiguration]] = None,
) -> Optional[Path]:
    """Gets a valid locally mounted path for the given EFS path.

    Args:
        efs_path (EFSPath): The EFS path. e.g., "efs://fs-12345678:/path/to/file.txt"
        raise_if_unmounted (bool): If True, raises an error if the EFS path is
            not mounted locally. Defaults to True.
        mount_points (List[MountPointConfiguration] | None): Optionally can override
            list of mount points. If None, mount points are detected. Defaults to None.

    Returns:
        The local path. e.g., "/mnt/efs/path/to/file.txt" or None if the path
            cannot be resolved and raise_if_unmounted is False
    """
    mount_points = mount_points if mount_points is not None else detect_mount_points()
    for mount_point in mount_points:
        if mount_point.file_system["FileSystemId"] == efs_path.file_system_id:
            logger.debug(
                f"Found {mount_point} with matching file system id for efs path {efs_path}"
            )

            if not efs_path.path.is_relative_to(mount_point.access_point_path):
                logger.debug(
                    f"EFS Path {efs_path.path} is not relative "
                    f"to mount point access point {mount_point.access_point_path}. Skipping"
                )
                continue
            logger.info(f"Found matching mount point {mount_point} for efs path {efs_path}")
            return mount_point.as_mounted_path(efs_path.path)
    else:
        message = (
            f"Could not resolve local path for EFS path {efs_path} from "
            f"{len(mount_points)} mount points detected on host."
        )
        if raise_if_unmounted:
            logger.error(message)
            raise ValueError(message)
        logger.warning(message)
        return None

list_efs_access_points

list_efs_access_points(
    access_point_id=None,
    access_point_name=None,
    access_point_tags=None,
    file_system_id=None,
    file_system_name=None,
    file_system_tags=None,
)

List EFS access points.

You can filter on id, name and tags for both access point and file system.

Parameters:

Name Type Description Default
access_point_id Optional[str]

Optionally filter on access point id.

None
access_point_name Optional[str]

Optionally filter on name.

None
access_point_tags Optional[Dict[str, str]]

Optionally filter on access point tags. They should be a dict of key-value pairs.

None
file_system_id Optional[str]

Optionally filter on file system id.

None
file_system_name Optional[str]

Optionally filter on file system name.

None
file_system_tags Optional[Dict[str, str]]

Optionally filter on file system tags. They should be a dict of key-value pairs.

None

Returns:

Type Description
List[AccessPointDescriptionTypeDef]

List of matching access points

Source code in src/aibs_informatics_aws_utils/efs/core.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@retry(ClientError, [throttling_exception_callback])
def list_efs_access_points(
    access_point_id: Optional[str] = None,
    access_point_name: Optional[str] = None,
    access_point_tags: Optional[Dict[str, str]] = None,
    file_system_id: Optional[str] = None,
    file_system_name: Optional[str] = None,
    file_system_tags: Optional[Dict[str, str]] = None,
) -> List[AccessPointDescriptionTypeDef]:
    """List EFS access points.

    You can filter on id, name and tags for both access point and file system.

    Args:
        access_point_id (Optional[str], optional): Optionally filter on access point id.
        access_point_name (Optional[str], optional): Optionally filter on name.
        access_point_tags (Optional[Dict[str, str]], optional): Optionally filter on access point
            tags. They should be a dict of key-value pairs.
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        file_system_name (Optional[str], optional): Optionally filter on file system name.
        file_system_tags (Optional[Dict[str, str]], optional): Optionally filter on file system
            tags. They should be a dict of key-value pairs.

    Returns:
        List of matching access points
    """
    efs = get_efs_client()

    file_system_ids: List[str] = []
    if file_system_id:
        file_system_ids.append(file_system_id)
    elif file_system_name or file_system_tags:
        file_systems = list_efs_file_systems(
            file_system_id=file_system_id, name=file_system_name, tags=file_system_tags
        )
        file_system_ids.extend(map(lambda _: _["FileSystemId"], file_systems))

    access_points: List[AccessPointDescriptionTypeDef] = []

    if access_point_id or not file_system_ids:
        response = efs.describe_access_points(
            **remove_null_values(dict(AccessPointId=access_point_id))  # type: ignore
        )
        # If file_system_ids is empty, we want to include all access points. Otherwise,
        # we only want to include access points that belong to the file systems
        # in file_system_ids.
        for access_point in response["AccessPoints"]:
            if not file_system_ids or access_point.get("FileSystemId") in file_system_ids:
                access_points.append(access_point)
    else:
        for fs_id in file_system_ids:
            response = efs.describe_access_points(FileSystemId=fs_id)
            access_points.extend(response["AccessPoints"])
            while next_token := response.get("NextToken"):
                response = efs.describe_access_points(FileSystemId=fs_id, NextToken=next_token)
                access_points.extend(response["AccessPoints"])

    filtered_access_points: List[AccessPointDescriptionTypeDef] = []

    for ap in access_points:
        if access_point_name and ap.get("Name") != access_point_name:
            continue
        if access_point_tags:
            ap_tags = {tag["Key"]: tag["Value"] for tag in ap.get("Tags", {})}
            tags_match = [access_point_tags[k] == ap_tags.get(k) for k in access_point_tags]
            if not all(tags_match):
                continue
        filtered_access_points.append(ap)
    return filtered_access_points

list_efs_file_systems

list_efs_file_systems(
    file_system_id=None, name=None, tags=None
)

List EFS file systems.

You can filter on id, name and tags.

Parameters:

Name Type Description Default
file_system_id Optional[str]

Optionally filter on file system id.

None
name Optional[str]

Optionally filter on name.

None
tags Optional[Dict[str, str]]

Optionally filter on tags. They should be a dict of key-value pairs.

None

Returns:

Type Description
List[FileSystemDescriptionTypeDef]

List of matching file systems

Source code in src/aibs_informatics_aws_utils/efs/core.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@retry(ClientError, [throttling_exception_callback])
def list_efs_file_systems(
    file_system_id: Optional[str] = None,
    name: Optional[str] = None,
    tags: Optional[Dict[str, str]] = None,
) -> List[FileSystemDescriptionTypeDef]:
    """List EFS file systems.

    You can filter on id, name and tags.

    Args:
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        name (Optional[str], optional): Optionally filter on name.
        tags (Optional[Dict[str, str]], optional): Optionally filter on tags.
            They should be a dict of key-value pairs.

    Returns:
        List of matching file systems
    """
    efs = get_efs_client()
    paginator = efs.get_paginator("describe_file_systems")

    file_systems: List[FileSystemDescriptionTypeDef] = []
    paginator_kwargs = remove_null_values(dict(FileSystemId=file_system_id))
    for results in paginator.paginate(**paginator_kwargs):  # type: ignore
        for fs in results["FileSystems"]:
            if name and fs.get("Name") != name:
                continue
            if tags:
                fs_tags = {tag["Key"]: tag["Value"] for tag in fs["Tags"]}
                if not all([tags[k] == fs_tags.get(k) for k in tags]):
                    continue
            file_systems.append(fs)
    return file_systems