Skip to content

Core

Core EFS utilities.


get_efs_access_point

get_efs_access_point(
    access_point_id=None,
    access_point_name=None,
    access_point_tags=None,
    file_system_id=None,
    file_system_name=None,
    file_system_tags=None,
)

Get EFS access point.

You can filter on id, name and tags for both access point and file system.

Parameters:

Name Type Description Default
access_point_id Optional[str]

Optionally filter on access point id.

None
access_point_name Optional[str]

Optionally filter on name.

None
access_point_tags Optional[Dict[str, str]]

Optionally filter on access point tags. They should be a dict of key-value pairs.

None
file_system_id Optional[str]

Optionally filter on file system id.

None
file_system_name Optional[str]

Optionally filter on file system name.

None
file_system_tags Optional[Dict[str, str]]

Optionally filter on file system tags. They should be a dict of key-value pairs.

None

Raises:

Type Description
ValueError

If no access point is found based on the filters.

ValueError

If more than one access point is found based on the filters.

Returns:

Type Description
AccessPointDescriptionTypeDef

The access point description.

Source code in src/aibs_informatics_aws_utils/efs/core.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def get_efs_access_point(
    access_point_id: Optional[str] = None,
    access_point_name: Optional[str] = None,
    access_point_tags: Optional[Dict[str, str]] = None,
    file_system_id: Optional[str] = None,
    file_system_name: Optional[str] = None,
    file_system_tags: Optional[Dict[str, str]] = None,
) -> AccessPointDescriptionTypeDef:
    """Get EFS access point.

    You can filter on id, name and tags for both access point and file system.

    Args:
        access_point_id (Optional[str], optional): Optionally filter on access point id.
        access_point_name (Optional[str], optional): Optionally filter on name.
        access_point_tags (Optional[Dict[str, str]], optional): Optionally filter on access point
            tags. They should be a dict of key-value pairs.
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        file_system_name (Optional[str], optional): Optionally filter on file system name.
        file_system_tags (Optional[Dict[str, str]], optional): Optionally filter on file system
            tags. They should be a dict of key-value pairs.

    Raises:
        ValueError: If no access point is found based on the filters.
        ValueError: If more than one access point is found based on the filters.

    Returns:
        The access point description.
    """
    access_points = list_efs_access_points(
        access_point_id=access_point_id,
        access_point_name=access_point_name,
        access_point_tags=access_point_tags,
        file_system_id=file_system_id,
        file_system_name=file_system_name,
        file_system_tags=file_system_tags,
    )
    if len(access_points) > 1:
        raise ValueError(
            f"Found more than one access points ({len(access_points)}) "
            f"based on access point filters (id={access_point_id}, "
            f"name={access_point_name}, tags={access_point_tags}) "
            f"and on file system filters (id={file_system_id}, "
            f"name={file_system_name}, tags={file_system_tags}) "
        )
    elif len(access_points) == 0:
        raise ValueError(
            f"Found no access points "
            f"based on access point filters (id={access_point_id}, "
            f"name={access_point_name}, tags={access_point_tags}) "
            f"and on file system filters (id={file_system_id}, "
            f"name={file_system_name}, tags={file_system_tags}) "
        )
    return access_points[0]

get_efs_file_system

get_efs_file_system(
    file_system_id=None, name=None, tags=None
)

Get EFS file system.

You can filter on id, name and tags.

Parameters:

Name Type Description Default
file_system_id Optional[str]

Optionally filter on file system id.

None
name Optional[str]

Optionally filter on name.

None
tags Optional[Dict[str, str]]

Optionally filter on tags. They should be a dict of key-value pairs.

None

Raises:

Type Description
ValueError

If no file system is found based on the filters.

ValueError

If more than one file system is found based on the filters.

Returns:

Type Description
FileSystemDescriptionTypeDef

The file system description.

Source code in src/aibs_informatics_aws_utils/efs/core.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_efs_file_system(
    file_system_id: Optional[str] = None,
    name: Optional[str] = None,
    tags: Optional[Dict[str, str]] = None,
) -> FileSystemDescriptionTypeDef:
    """Get EFS file system.

    You can filter on id, name and tags.

    Args:
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        name (Optional[str], optional): Optionally filter on name.
        tags (Optional[Dict[str, str]], optional): Optionally filter on tags.
            They should be a dict of key-value pairs.

    Raises:
        ValueError: If no file system is found based on the filters.
        ValueError: If more than one file system is found based on the filters.

    Returns:
        The file system description.
    """
    file_systems = list_efs_file_systems(file_system_id=file_system_id, name=name, tags=tags)
    if len(file_systems) > 1:
        raise ValueError(
            f"Found more than one file systems ({len(file_systems)}) "
            f"based on id={file_system_id}, name={name}, tags={tags}"
        )
    elif len(file_systems) == 0:
        raise ValueError(
            f"Found no file systems based on id={file_system_id}, name={name}, tags={tags}"
        )
    return file_systems[0]

list_efs_access_points

list_efs_access_points(
    access_point_id=None,
    access_point_name=None,
    access_point_tags=None,
    file_system_id=None,
    file_system_name=None,
    file_system_tags=None,
)

List EFS access points.

You can filter on id, name and tags for both access point and file system.

Parameters:

Name Type Description Default
access_point_id Optional[str]

Optionally filter on access point id.

None
access_point_name Optional[str]

Optionally filter on name.

None
access_point_tags Optional[Dict[str, str]]

Optionally filter on access point tags. They should be a dict of key-value pairs.

None
file_system_id Optional[str]

Optionally filter on file system id.

None
file_system_name Optional[str]

Optionally filter on file system name.

None
file_system_tags Optional[Dict[str, str]]

Optionally filter on file system tags. They should be a dict of key-value pairs.

None

Returns:

Type Description
List[AccessPointDescriptionTypeDef]

List of matching access points

Source code in src/aibs_informatics_aws_utils/efs/core.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@retry(ClientError, [throttling_exception_callback])
def list_efs_access_points(
    access_point_id: Optional[str] = None,
    access_point_name: Optional[str] = None,
    access_point_tags: Optional[Dict[str, str]] = None,
    file_system_id: Optional[str] = None,
    file_system_name: Optional[str] = None,
    file_system_tags: Optional[Dict[str, str]] = None,
) -> List[AccessPointDescriptionTypeDef]:
    """List EFS access points.

    You can filter on id, name and tags for both access point and file system.

    Args:
        access_point_id (Optional[str], optional): Optionally filter on access point id.
        access_point_name (Optional[str], optional): Optionally filter on name.
        access_point_tags (Optional[Dict[str, str]], optional): Optionally filter on access point
            tags. They should be a dict of key-value pairs.
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        file_system_name (Optional[str], optional): Optionally filter on file system name.
        file_system_tags (Optional[Dict[str, str]], optional): Optionally filter on file system
            tags. They should be a dict of key-value pairs.

    Returns:
        List of matching access points
    """
    efs = get_efs_client()

    file_system_ids: List[str] = []
    if file_system_id:
        file_system_ids.append(file_system_id)
    elif file_system_name or file_system_tags:
        file_systems = list_efs_file_systems(
            file_system_id=file_system_id, name=file_system_name, tags=file_system_tags
        )
        file_system_ids.extend(map(lambda _: _["FileSystemId"], file_systems))

    access_points: List[AccessPointDescriptionTypeDef] = []

    if access_point_id or not file_system_ids:
        response = efs.describe_access_points(
            **remove_null_values(dict(AccessPointId=access_point_id))  # type: ignore
        )
        # If file_system_ids is empty, we want to include all access points. Otherwise,
        # we only want to include access points that belong to the file systems
        # in file_system_ids.
        for access_point in response["AccessPoints"]:
            if not file_system_ids or access_point.get("FileSystemId") in file_system_ids:
                access_points.append(access_point)
    else:
        for fs_id in file_system_ids:
            response = efs.describe_access_points(FileSystemId=fs_id)
            access_points.extend(response["AccessPoints"])
            while next_token := response.get("NextToken"):
                response = efs.describe_access_points(FileSystemId=fs_id, NextToken=next_token)
                access_points.extend(response["AccessPoints"])

    filtered_access_points: List[AccessPointDescriptionTypeDef] = []

    for ap in access_points:
        if access_point_name and ap.get("Name") != access_point_name:
            continue
        if access_point_tags:
            ap_tags = {tag["Key"]: tag["Value"] for tag in ap.get("Tags", {})}
            tags_match = [access_point_tags[k] == ap_tags.get(k) for k in access_point_tags]
            if not all(tags_match):
                continue
        filtered_access_points.append(ap)
    return filtered_access_points

list_efs_file_systems

list_efs_file_systems(
    file_system_id=None, name=None, tags=None
)

List EFS file systems.

You can filter on id, name and tags.

Parameters:

Name Type Description Default
file_system_id Optional[str]

Optionally filter on file system id.

None
name Optional[str]

Optionally filter on name.

None
tags Optional[Dict[str, str]]

Optionally filter on tags. They should be a dict of key-value pairs.

None

Returns:

Type Description
List[FileSystemDescriptionTypeDef]

List of matching file systems

Source code in src/aibs_informatics_aws_utils/efs/core.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@retry(ClientError, [throttling_exception_callback])
def list_efs_file_systems(
    file_system_id: Optional[str] = None,
    name: Optional[str] = None,
    tags: Optional[Dict[str, str]] = None,
) -> List[FileSystemDescriptionTypeDef]:
    """List EFS file systems.

    You can filter on id, name and tags.

    Args:
        file_system_id (Optional[str], optional): Optionally filter on file system id.
        name (Optional[str], optional): Optionally filter on name.
        tags (Optional[Dict[str, str]], optional): Optionally filter on tags.
            They should be a dict of key-value pairs.

    Returns:
        List of matching file systems
    """
    efs = get_efs_client()
    paginator = efs.get_paginator("describe_file_systems")

    file_systems: List[FileSystemDescriptionTypeDef] = []
    paginator_kwargs = remove_null_values(dict(FileSystemId=file_system_id))
    for results in paginator.paginate(**paginator_kwargs):  # type: ignore
        for fs in results["FileSystems"]:
            if name and fs.get("Name") != name:
                continue
            if tags:
                fs_tags = {tag["Key"]: tag["Value"] for tag in fs["Tags"]}
                if not all([tags[k] == fs_tags.get(k) for k in tags]):
                    continue
            file_systems.append(fs)
    return file_systems