Skip to content

S3

Utilities for working with Amazon S3.


check_paths_in_sync

check_paths_in_sync(
    source_path,
    destination_path,
    size_only=False,
    ignore_folder_placeholder_objects=True,
    allow_subset=False,
    max_workers=None,
    **kwargs
)

Check whether source and destination paths are in sync.

Parameters:

Name Type Description Default
source_path Union[Path, S3URI]

Source path.

required
destination_path Union[Path, S3URI]

Destination path.

required
size_only bool

Limits content comparison to just size and date (no checksum/ETag). Defaults to False.

False
ignore_folder_placeholder_objects bool

Whether to ignore S3 folder placeholder objects (zero-byte objects with keys ending in /). Defaults to True.

True
allow_subset bool

Whether to allow source path to be a subset of destination path. Defaults to False.

False
max_workers Optional[int]

Number of worker threads to use for parallel comparison. Defaults to None (ThreadPoolExecutor default).

None
**kwargs

Additional arguments passed to the S3 client.

{}

Raises:

Type Description
ValueError

If the source path does not exist.

Returns:

Type Description
bool

True if paths are in sync, False otherwise.

Source code in src/aibs_informatics_aws_utils/s3.py
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
def check_paths_in_sync(
    source_path: Union[Path, S3URI],
    destination_path: Union[Path, S3URI],
    size_only: bool = False,
    ignore_folder_placeholder_objects: bool = True,
    allow_subset: bool = False,
    max_workers: Optional[int] = None,
    **kwargs,
) -> bool:
    """Check whether source and destination paths are in sync.

    Args:
        source_path (Union[Path, S3URI]): Source path.
        destination_path (Union[Path, S3URI]): Destination path.
        size_only (bool): Limits content comparison to just size and date
            (no checksum/ETag). Defaults to False.
        ignore_folder_placeholder_objects (bool): Whether to ignore S3 folder
            placeholder objects (zero-byte objects with keys ending in `/`). Defaults to True.
        allow_subset (bool): Whether to allow source path to be a subset of
            destination path. Defaults to False.
        max_workers (Optional[int]): Number of worker threads to use for
            parallel comparison. Defaults to None (ThreadPoolExecutor default).
        **kwargs: Additional arguments passed to the S3 client.

    Raises:
        ValueError: If the source path does not exist.

    Returns:
        True if paths are in sync, False otherwise.
    """

    def _resolve_paths(path: Union[Path, S3URI]) -> List[Union[Path, S3URI]]:
        if isinstance(path, Path):
            if path.is_dir():
                return list(map(Path, sorted(find_paths(path, include_dirs=False))))
            else:
                return [path]
        else:
            if is_object(path, **kwargs) and not is_folder_placeholder_object(path, **kwargs):
                return [path]
            else:
                return [
                    _
                    for _ in sorted(list_s3_paths(path, **kwargs))
                    if (
                        not ignore_folder_placeholder_objects
                        or not is_folder_placeholder_object(_, **kwargs)
                    )
                ]

    def _find_relative_path(full_path: Union[Path, S3URI], root_path: Union[Path, S3URI]) -> str:
        if isinstance(full_path, Path) and isinstance(root_path, Path):
            if full_path == root_path:
                return ""
            # Adding the leading "/" to ensure we return the leading "/" in the relative path for
            # files under a folder. This is to be consistent with S3URI behavior.
            relative_path = "/" + strip_path_root(full_path, root_path)
            return relative_path
        elif isinstance(full_path, S3URI) and isinstance(root_path, S3URI):
            if full_path == root_path:
                return ""
            # Stripping the "/" to ensure we return the leading "/" in the relative path for
            # objects under a folder. This means that if we have:
            # root_path = `s3://bucket/folder/`
            # or root_path = `s3://bucket/folder`,
            # full_path = `s3://bucket/folder/subfolder/object.txt`
            # The relative path returned will be: `/subfolder/object.txt`

            relative_path = full_path.removeprefix(root_path.rstrip("/"))
            return relative_path
        else:
            raise ValueError("Mismatched path types between full_path and root_path")

    source_paths = _resolve_paths(source_path)
    destination_paths = _resolve_paths(destination_path)

    if len(source_paths) == 0:
        raise ValueError(f"Source path {source_path} does not exist")

    stripped_source_path_to_path_map: Dict[str, Union[Path, S3URI]] = {
        _find_relative_path(sp, source_path): sp for sp in source_paths
    }

    stripped_destination_path_to_path_map: Dict[str, Union[Path, S3URI]] = {
        _find_relative_path(dp, destination_path): dp for dp in destination_paths
    }

    missing_destination_paths = set(stripped_source_path_to_path_map.keys()).difference(
        stripped_destination_path_to_path_map.keys()
    )
    if missing_destination_paths:
        logger.info(
            "The following source paths are missing in the destination path: "
            f"{missing_destination_paths}"
        )
        return False
    if not allow_subset:
        extra_destination_paths = set(stripped_destination_path_to_path_map.keys()).difference(
            stripped_source_path_to_path_map.keys()
        )
        if extra_destination_paths:
            logger.info(
                "The following destination paths are extra compared to the source path: "
                f"{extra_destination_paths}"
            )
            return False

    # Run comparisons in parallel
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_pair = {
            executor.submit(
                should_sync,
                (sp := stripped_source_path_to_path_map[relative_path]),
                (dp := stripped_destination_path_to_path_map[relative_path]),
                size_only,
                **kwargs,
            ): (sp, dp)
            for relative_path in stripped_source_path_to_path_map
        }

        for future in as_completed(future_to_pair):
            not_ok = future.result()
            if not_ok:
                # Cancel any still-pending futures; we already know it's not in sync.
                for f in future_to_pair:
                    f.cancel()
                return False

    return True

delete_s3_objects

delete_s3_objects(s3_paths, **kwargs)

Delete a list of S3 objects.

Parameters:

Name Type Description Default
s3_paths List[S3URI]

List of S3 paths to delete.

required
**kwargs

Additional arguments passed to the S3 client.

{}
Source code in src/aibs_informatics_aws_utils/s3.py
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
def delete_s3_objects(s3_paths: List[S3URI], **kwargs):
    """Delete a list of S3 objects.

    Args:
        s3_paths (List[S3URI]): List of S3 paths to delete.
        **kwargs: Additional arguments passed to the S3 client.
    """
    logger.info(f"Found {len(s3_paths)} objects to delete.")
    s3 = get_s3_client(**kwargs)

    bucket_objects: Dict[str, Set[str]] = defaultdict(set)
    for s3_path in s3_paths:
        bucket_objects[s3_path.bucket].add(s3_path.key)

    # Can only specify a max of 1000 objects per request.
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.delete_objects
    MAX_KEYS_PER_REQUEST = 1000

    for bucket, keys in bucket_objects.items():
        key_list = list(keys)
        for i in range(0, len(keys), MAX_KEYS_PER_REQUEST):
            i_max = min(len(keys), i + MAX_KEYS_PER_REQUEST)
            logger.info(f"Deleting objects {i + 1}..{i_max + 1} in {bucket} bucket")
            s3.delete_objects(
                Bucket=bucket,
                Delete={"Objects": [{"Key": key_list[j]} for j in range(i, i_max)]},
            )

delete_s3_path

delete_s3_path(
    s3_path, include=None, exclude=None, **kwargs
)

Delete an S3 path (object or prefix).

Parameters:

Name Type Description Default
s3_path S3URI

Path or key prefix to delete.

required
include Optional[List[Pattern]]

Patterns to include. Defaults to None.

None
exclude Optional[List[Pattern]]

Patterns to exclude. Defaults to None.

None
**kwargs

Additional arguments passed to the S3 client.

{}
Source code in src/aibs_informatics_aws_utils/s3.py
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
def delete_s3_path(
    s3_path: S3URI,
    include: Optional[List[Pattern]] = None,
    exclude: Optional[List[Pattern]] = None,
    **kwargs,
):
    """Delete an S3 path (object or prefix).

    Args:
        s3_path (S3URI): Path or key prefix to delete.
        include (Optional[List[Pattern]]): Patterns to include. Defaults to None.
        exclude (Optional[List[Pattern]]): Patterns to exclude. Defaults to None.
        **kwargs: Additional arguments passed to the S3 client.
    """
    logger.info(f"Deleting S3 path {s3_path}")
    s3_paths = list_s3_paths(s3_path, include=include, exclude=exclude, **kwargs)
    delete_s3_objects(s3_paths, **kwargs)

determine_chunk_size

determine_chunk_size(
    path,
    default_chunk_size_bytes=AWS_S3_DEFAULT_CHUNK_SIZE_BYTES,
)

Determine the chunk size that aws s3 cp would use for a very large file.

Parameters:

Name Type Description Default
path Path

Path to the local file.

required
default_chunk_size_bytes int

Default chunk size in bytes. Defaults to AWS_S3_DEFAULT_CHUNK_SIZE_BYTES.

AWS_S3_DEFAULT_CHUNK_SIZE_BYTES

Returns:

Type Description
int

The appropriate chunk size in bytes.

Source code in src/aibs_informatics_aws_utils/s3.py
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
def determine_chunk_size(
    path: Path, default_chunk_size_bytes: int = AWS_S3_DEFAULT_CHUNK_SIZE_BYTES
) -> int:
    """Determine the chunk size that `aws s3 cp` would use for a very large file.

    Args:
        path (Path): Path to the local file.
        default_chunk_size_bytes (int): Default chunk size in bytes.
            Defaults to AWS_S3_DEFAULT_CHUNK_SIZE_BYTES.

    Returns:
        The appropriate chunk size in bytes.
    """
    file_size = path.stat().st_size
    correct_chunk_size_bytes = default_chunk_size_bytes
    while math.ceil(file_size / correct_chunk_size_bytes) > AWS_S3_MULTIPART_LIMIT:
        correct_chunk_size_bytes *= 2
    return correct_chunk_size_bytes

determine_multipart_attributes

determine_multipart_attributes(s3_path, **kwargs)

Determine multipart upload chunk size and approximate threshold, if applicable.

Multipart attributes are the following:

  • chunk size: The size of each part in bytes
  • threshold: The size in bytes at which a multipart upload is created
Note
  • The threshold is only an approximation, as it is not possible to determine the exact threshold used.
  • This assumes chunk size is constant for all parts. This is not necessarily always true, although rare.
  • The chunksize for a multipart upload has the following constraints: See S3 documentation. Most importantly, must be between 5MB and 5GB.

Parameters:

Name Type Description Default
s3_path S3URI

S3 object to check.

required
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
Tuple[Optional[int], Optional[int]]

Tuple[Optional[int], Optional[int]]: A tuple of (chunk_size, threshold).

Source code in src/aibs_informatics_aws_utils/s3.py
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
def determine_multipart_attributes(
    s3_path: S3URI, **kwargs
) -> Tuple[Optional[int], Optional[int]]:
    """Determine multipart upload chunk size and approximate threshold, if applicable.

    Multipart attributes are the following:

    - **chunk size**: The size of each part in bytes
    - **threshold**: The size in bytes at which a multipart upload is created

    Note:
        - The threshold is only an approximation, as it is not possible to
          determine the exact threshold used.
        - This assumes chunk size is constant for all parts.
          This is not necessarily always true, although rare.
        - The chunksize for a multipart upload has the following constraints:
          See [S3 documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html).
          Most importantly, must be between 5MB and 5GB.

    Args:
        s3_path (S3URI): S3 object to check.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        Tuple[Optional[int], Optional[int]]: A tuple of (chunk_size, threshold).
    """
    s3_client = get_s3_client(**kwargs)
    head_object_part = s3_client.head_object(Bucket=s3_path.bucket, Key=s3_path.key, PartNumber=1)

    object_parts = head_object_part.get("PartsCount", 1)
    object_part_content_length = head_object_part.get("ContentLength", 0)

    threshold, chunk_size = None, None
    if object_parts > 1:
        threshold, chunk_size = object_part_content_length, object_part_content_length
    else:
        head_object = s3_client.head_object(Bucket=s3_path.bucket, Key=s3_path.key)
        is_multipart_etag = head_object["ETag"].endswith('-1"')
        if is_multipart_etag:
            # This should ensure that multipart etag is created as expected
            threshold = object_part_content_length
        if object_part_content_length > AWS_S3_DEFAULT_CHUNK_SIZE_BYTES:
            chunk_size = object_part_content_length
            if not is_multipart_etag:
                chunk_size += 1
                threshold = chunk_size
    return chunk_size, threshold

download_s3_object

download_s3_object(
    s3_path,
    local_path,
    exist_ok=False,
    transfer_config=None,
    force=True,
    size_only=False,
    **kwargs
)

Download contents of an S3 object to file.

Parameters:

Name Type Description Default
s3_path S3URI

S3 URI to object.

required
local_path Path

Destination path.

required
exist_ok bool

If True, local path can already exist. Defaults to False.

False
transfer_config Optional[TransferConfig]

Transfer configuration. Defaults to None.

None
force bool

If True, force the download. Defaults to True.

True
size_only bool

If True, only compare sizes. Defaults to False.

False
**kwargs

Additional arguments passed to the S3 client.

{}

Raises:

Type Description
ValueError

Raised if local path exists and exist_ok is False.

Source code in src/aibs_informatics_aws_utils/s3.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
@retry(
    (ConnectionClosedError, EndpointConnectionError, ResponseStreamingError, ClientError),
    tries=10,
    backoff=2.0,
)
def download_s3_object(
    s3_path: S3URI,
    local_path: Path,
    exist_ok: bool = False,
    transfer_config: Optional[TransferConfig] = None,
    force: bool = True,
    size_only: bool = False,
    **kwargs,
):
    """Download contents of an S3 object to file.

    Args:
        s3_path (S3URI): S3 URI to object.
        local_path (Path): Destination path.
        exist_ok (bool): If True, local path can already exist. Defaults to False.
        transfer_config (Optional[TransferConfig]): Transfer configuration. Defaults to None.
        force (bool): If True, force the download. Defaults to True.
        size_only (bool): If True, only compare sizes. Defaults to False.
        **kwargs: Additional arguments passed to the S3 client.

    Raises:
        ValueError: Raised if local path exists and exist_ok is False.
    """
    s3_object = get_object(s3_path, **kwargs)
    if force or should_sync(
        source_path=s3_path, destination_path=local_path, size_only=size_only, **kwargs
    ):
        if local_path.exists() and not exist_ok and not force:
            raise ValueError(f"Unable to download S3 object to {local_path}. Path exists.")
        elif local_path.exists() and local_path.is_dir() and exist_ok:
            logger.warning(
                f"Overwriting directory {local_path} with S3 object {s3_path}"
                "This may cause unexpected behavior."
            )
            try:
                local_path.rmdir()
            except Exception as e:
                logger.error("Error removing directory: {e}")
                raise e
        local_path.parent.mkdir(parents=True, exist_ok=True)
        s3_object.download_file(Filename=str(local_path.resolve()), Config=transfer_config)

download_s3_object_prefix

download_s3_object_prefix(
    s3_path,
    local_path,
    exist_ok=False,
    transfer_config=None,
    force=True,
    size_only=False,
    **kwargs
)

Download an S3 object prefix to a local path.

Parameters:

Name Type Description Default
s3_path S3URI

URI of the object prefix (folder).

required
local_path Path

Local destination path.

required
exist_ok bool

If True, local path may exist previously. Defaults to False.

False
transfer_config Optional[TransferConfig]

Transfer configuration. Defaults to None.

None
force bool

If True, force the download. Defaults to True.

True
size_only bool

If True, only compare sizes. Defaults to False.

False
**kwargs

Additional arguments passed to the S3 client.

{}

Raises:

Type Description
AWSError

If the local path already exists and is not empty.

Source code in src/aibs_informatics_aws_utils/s3.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def download_s3_object_prefix(
    s3_path: S3URI,
    local_path: Path,
    exist_ok: bool = False,
    transfer_config: Optional[TransferConfig] = None,
    force: bool = True,
    size_only: bool = False,
    **kwargs,
):
    """Download an S3 object prefix to a local path.

    Args:
        s3_path (S3URI): URI of the object prefix (folder).
        local_path (Path): Local destination path.
        exist_ok (bool): If True, local path may exist previously. Defaults to False.
        transfer_config (Optional[TransferConfig]): Transfer configuration. Defaults to None.
        force (bool): If True, force the download. Defaults to True.
        size_only (bool): If True, only compare sizes. Defaults to False.
        **kwargs: Additional arguments passed to the S3 client.

    Raises:
        AWSError: If the local path already exists and is not empty.
    """

    s3_object_paths = list_s3_paths(s3_path=s3_path, **kwargs)
    logger.info(f"Downloading {len(s3_object_paths)} objects under {local_path}")

    if not exist_ok and local_path.exists() and local_path.is_dir() and any(local_path.iterdir()):
        raise AWSError(
            f"{local_path} already exists and is not empty. Cannot download to the directory"
        )
    for s3_object_path in s3_object_paths:
        relative_key = s3_object_path.key[len(s3_path.key) :].lstrip("/")
        if s3_object_path.has_folder_suffix():
            if get_object(s3_object_path, **kwargs).content_length != 0:
                err_msg = (
                    f"Cannot download S3 object {s3_path} to {local_path}. Downloads of "
                    "objects with '/' suffixed keys and of non-zero size are NOT supported."
                )
                logger.error(err_msg)
                raise AWSError(err_msg)
            continue
        local_filepath = (local_path / relative_key).resolve()
        download_s3_object(
            s3_path=s3_object_path,
            local_path=local_filepath,
            exist_ok=exist_ok,
            transfer_config=transfer_config,
            force=force,
            size_only=size_only,
            **kwargs,
        )

download_s3_path

download_s3_path(
    s3_path,
    local_path,
    exist_ok=False,
    transfer_config=None,
    force=True,
    size_only=False,
    **kwargs
)

Download an S3 Object or Folder to a local path.

This mimics the behavior of aws s3 cp, both with and without the --recursive flag.

The logic for using recursive behavior is as follows:

For non-trailing-slash S3 paths:

  • If S3 path is an object, it will be downloaded as a file.
  • If S3 path is an object prefix, it will be downloaded as a folder.

For trailing-slash S3 paths:

  • If S3 path is a non-empty object and object prefix, an error will be raised.
  • If S3 path is an object prefix, it will be downloaded as a folder.
  • If S3 path is an object, it will be downloaded as a file.

Parameters:

Name Type Description Default
s3_path S3URI

URI of the object or folder.

required
local_path Path

Local destination path.

required
exist_ok bool

If True, local path may exist previously. Defaults to False.

False
transfer_config Optional[TransferConfig]

Transfer configuration. Defaults to None.

None
force bool

If True, force the download. Defaults to True.

True
size_only bool

If True, only compare sizes. Defaults to False.

False
**kwargs

Additional arguments passed to the S3 client.

{}

Raises:

Type Description
AWSError

If S3 URI does not exist or is in an unsupported state.

Source code in src/aibs_informatics_aws_utils/s3.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def download_s3_path(
    s3_path: S3URI,
    local_path: Path,
    exist_ok: bool = False,
    transfer_config: Optional[TransferConfig] = None,
    force: bool = True,
    size_only: bool = False,
    **kwargs,
):
    """Download an S3 Object or Folder to a local path.

    This mimics the behavior of `aws s3 cp`, both with and without the `--recursive` flag.

    The logic for using recursive behavior is as follows:

    **For non-trailing-slash S3 paths:**

    - If S3 path is an object, it will be downloaded as a file.
    - If S3 path is an object prefix, it will be downloaded as a folder.

    **For trailing-slash S3 paths:**

    - If S3 path is a non-empty object and object prefix, an error will be raised.
    - If S3 path is an object prefix, it will be downloaded as a folder.
    - If S3 path is an object, it will be downloaded as a file.

    Args:
        s3_path (S3URI): URI of the object or folder.
        local_path (Path): Local destination path.
        exist_ok (bool): If True, local path may exist previously. Defaults to False.
        transfer_config (Optional[TransferConfig]): Transfer configuration. Defaults to None.
        force (bool): If True, force the download. Defaults to True.
        size_only (bool): If True, only compare sizes. Defaults to False.
        **kwargs: Additional arguments passed to the S3 client.

    Raises:
        AWSError: If S3 URI does not exist or is in an unsupported state.
    """
    path_is_object = is_object(s3_path, **kwargs)
    path_is_prefix = is_object_prefix(s3_path, **kwargs)
    path_has_folder_slash = s3_path.key.endswith("/")
    path_is_non_empty_object_fn = lru_cache(None)(
        lambda: path_is_object and get_object(s3_path, **kwargs).content_length != 0
    )

    # Scenarios for object download:
    #   1. path IS OBJECT and NOT HAS FOLDER SLASH
    #   2. path IS OBJECT and HAS FOLDER SLASH but NOT OBJECT PREFIX
    # Scenarios for object prefix download:
    #   1. path IS OBJECT PREFIX and NOT OBJECT
    #   2. path IS OBJECT PREFIX and HAS FOLDER SLASH and (NOT OBJECT or IS NON-EMPTY OBJECT)
    # Scenarios for raising error:
    #   1. path is NOT OBJECT and NOT OBJECT PREFIX
    #   2. path is OBJECT and OBJECT PREFIX and HAS FOLDER SLASH and IS NON-EMPTY OBJECT
    if (path_is_object and not path_has_folder_slash) or (path_is_object and not path_is_prefix):
        logger.info(f"{s3_path} is an object. Downloading to {local_path} as file.")
        download_s3_object(
            s3_path=s3_path,
            local_path=local_path,
            exist_ok=exist_ok,
            transfer_config=transfer_config,
            force=force,
            size_only=size_only,
            **kwargs,
        )
    elif (path_is_prefix and not path_is_object) or (
        path_is_prefix and path_has_folder_slash and not path_is_non_empty_object_fn()
    ):
        logger.info(f"{s3_path} is an object prefix. Downloading to {local_path} as folder.")
        download_s3_object_prefix(
            s3_path=s3_path,
            local_path=local_path,
            exist_ok=exist_ok,
            transfer_config=transfer_config,
            force=force,
            size_only=size_only,
            **kwargs,
        )
    elif not path_is_object and not path_is_prefix:
        raise AWSError(f"{s3_path} is neither an object or prefix. Does it exist?")
    elif (
        path_is_object
        and path_is_prefix
        and path_has_folder_slash
        and path_is_non_empty_object_fn()
    ):
        raise AWSError(
            f"{s3_path} is an object and object prefix and has folder suffix "
            "and is non-empty. This is not supported"
        )
    else:
        raise AWSError(
            f"Unhandled scenario for {s3_path}: is_object={path_is_object}, "
            f"is_object_prefix={path_is_prefix}, "
            f"has_folder_slash={path_has_folder_slash}, "
            f"is_non_empty_object={path_is_non_empty_object_fn()}"
        )

download_to_json

download_to_json(s3_path, **kwargs)

Helper method to read a JSON file from S3.

Parameters:

Name Type Description Default
s3_path S3URI

S3 URI to the JSON file.

required
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
JSON

The parsed JSON content.

Raises:

Type Description
AWSError

If there is an error reading the JSON data.

Source code in src/aibs_informatics_aws_utils/s3.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def download_to_json(s3_path: S3URI, **kwargs) -> JSON:
    """Helper method to read a JSON file from S3.

    Args:
        s3_path (S3URI): S3 URI to the JSON file.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        The parsed JSON content.

    Raises:
        AWSError: If there is an error reading the JSON data.
    """
    logger.info(f"Reading file from s3: {s3_path}")
    s3_obj = get_object(s3_path=s3_path, **kwargs)

    try:
        data = json.load(s3_obj.get()["Body"])
    except Exception as e:
        raise AWSError(f"Error reading json data from {s3_path} [{e}]")

    return data

generate_presigned_url

generate_presigned_url(
    s3_path,
    action=PresignedUrlAction.READ,
    expires_in=3600,
    **kwargs
)

Generate a pre-signed URL for an S3 object.

Parameters:

Name Type Description Default
s3_path S3URI

Intended S3 path of the presigned URL.

required
action PresignedUrlAction

Desired action for presigned URL (READ or WRITE). Defaults to READ.

READ
expires_in int

TTL of URL in seconds. Defaults to 3600.

3600
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
str

A pre-signed URL.

Source code in src/aibs_informatics_aws_utils/s3.py
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
def generate_presigned_url(
    s3_path: S3URI,
    action: PresignedUrlAction = PresignedUrlAction.READ,
    expires_in: int = 3600,
    **kwargs,
) -> str:
    """Generate a pre-signed URL for an S3 object.

    Args:
        s3_path (S3URI): Intended S3 path of the presigned URL.
        action (PresignedUrlAction): Desired action for presigned URL (READ or WRITE).
            Defaults to READ.
        expires_in (int): TTL of URL in seconds. Defaults to 3600.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        A pre-signed URL.
    """
    s3 = get_s3_client(config=Config(signature_version="s3v4"), **kwargs)
    presigned_url = s3.generate_presigned_url(
        ClientMethod=action.value,
        Params={"Bucket": s3_path.bucket, "Key": s3_path.key},
        HttpMethod="GET" if action == PresignedUrlAction.READ else "PUT",
        ExpiresIn=expires_in,
    )
    return presigned_url

generate_presigned_urls

generate_presigned_urls(
    s3_paths,
    action=PresignedUrlAction.READ,
    expires_in=3600,
    **kwargs
)

Generate pre-signed URLs for given S3 paths.

Parameters:

Name Type Description Default
s3_paths List[S3URI]

List of S3 paths to generate URLs for.

required
action PresignedUrlAction

Desired action for presigned URL (READ or WRITE). Defaults to READ.

READ
expires_in int

TTL of URL in seconds. Defaults to 3600.

3600
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
List[str]

List of pre-signed URLs.

Source code in src/aibs_informatics_aws_utils/s3.py
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
def generate_presigned_urls(
    s3_paths: List[S3URI],
    action: PresignedUrlAction = PresignedUrlAction.READ,
    expires_in: int = 3600,
    **kwargs,
) -> List[str]:
    """Generate pre-signed URLs for given S3 paths.

    Args:
        s3_paths (List[S3URI]): List of S3 paths to generate URLs for.
        action (PresignedUrlAction): Desired action for presigned URL (READ or WRITE).
            Defaults to READ.
        expires_in (int): TTL of URL in seconds. Defaults to 3600.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        List of pre-signed URLs.
    """
    return [generate_presigned_url(s3_path, action, expires_in, **kwargs) for s3_path in s3_paths]

generate_transfer_request

generate_transfer_request(
    source_path,
    destination_path,
    source_path_prefix=None,
    extra_args=None,
)

Create an S3 transfer request.

Parameters:

Name Type Description Default
source_path Union[Path, S3URI]

Source copy path.

required
destination_path Union[Path, S3URI]

Destination copy path.

required
source_path_prefix Optional[str]

Optional prefix to remove from source path. Defaults to source path key.

None
extra_args Optional[Dict[str, Any]]

Extra arguments for the transfer.

None

Returns:

Type Description
S3TransferRequest

The appropriate transfer request (S3CopyRequest, S3UploadRequest, or S3DownloadRequest) based on source and destination types.

Raises:

Type Description
ValueError

If the source path prefix doesn't match the source path.

Source code in src/aibs_informatics_aws_utils/s3.py
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
def generate_transfer_request(
    source_path: Union[Path, S3URI],
    destination_path: Union[Path, S3URI],
    source_path_prefix: Optional[str] = None,
    extra_args: Optional[Dict[str, Any]] = None,
) -> S3TransferRequest:
    """Create an S3 transfer request.

    Args:
        source_path (Union[Path, S3URI]): Source copy path.
        destination_path (Union[Path, S3URI]): Destination copy path.
        source_path_prefix (Optional[str]): Optional prefix to remove from source path.
            Defaults to source path key.
        extra_args (Optional[Dict[str, Any]]): Extra arguments for the transfer.

    Returns:
        The appropriate transfer request (S3CopyRequest, S3UploadRequest,
            or S3DownloadRequest) based on source and destination types.

    Raises:
        ValueError: If the source path prefix doesn't match the source path.
    """
    relative_source_path = ""
    if source_path_prefix:
        source_key = source_path.key if isinstance(source_path, S3URI) else str(source_path)

        if not source_key.startswith(source_path_prefix):
            raise ValueError(
                f"Cannot generate S3CopyRequest with src={source_path}, dst={destination_path}. "
                f"source path prefix={source_path_prefix} is specified but does"
                " not match prefix or source path"
            )
        relative_source_path = source_key[len(source_path_prefix) :]

    if isinstance(destination_path, S3URI):
        # This will be sanitized by S3URI class (removing double slashes)
        new_destination_path = S3URI(destination_path + relative_source_path)
        if isinstance(source_path, S3URI):
            return S3CopyRequest(source_path, new_destination_path, extra_args=extra_args)
        else:
            return S3UploadRequest(source_path, new_destination_path, extra_args=extra_args)
    elif isinstance(source_path, S3URI) and isinstance(destination_path, Path):
        local_destination_path: Path = (
            Path(get_path_with_root(relative_source_path, destination_path))
            if relative_source_path
            else destination_path
        )
        return S3DownloadRequest(source_path, local_destination_path)
    else:
        raise ValueError("Local to local transfer is not ")

get_local_etag

get_local_etag(
    path, chunk_size_bytes=None, threshold_bytes=None
)

Calculates an expected AWS s3 upload etag for a local on-disk file. Takes into account multipart uploads, but does NOT account for additional encryption (like KMS keys)

Parameters:

Name Type Description Default
path Path

The path of the file that will be uploaded to s3

required
chunk_size_bytes Optional[int]

The default multipart upload chunksize in bytes. If None, we determine the chunk size based on file size.

None
threshold_bytes Optional[int]

The threshold in bytes for multipart uploads. If None, we determine the threshold based on file size.

None

Returns:

Type Description
str

The expected etag

Source code in src/aibs_informatics_aws_utils/s3.py
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
@retry(OSError)
def get_local_etag(
    path: Path, chunk_size_bytes: Optional[int] = None, threshold_bytes: Optional[int] = None
) -> str:
    """Calculates an expected AWS s3 upload etag for a local on-disk file.
    Takes into account multipart uploads, but does NOT account for additional encryption
    (like KMS keys)

    Args:
        path (Path): The path of the file that will be uploaded to s3
        chunk_size_bytes (Optional[int]): The default multipart upload chunksize in bytes.
            If None, we determine the chunk size based on file size.
        threshold_bytes (Optional[int]): The threshold in bytes for multipart uploads.
            If None, we determine the threshold based on file size.

    Returns:
        The expected etag
    """
    if chunk_size_bytes is None:
        chunk_size_bytes = determine_chunk_size(path)

    if threshold_bytes is None:
        threshold_bytes = determine_chunk_size(path)

    size_bytes = path.stat().st_size

    chunk_digests: List[bytes] = []
    buffer_size = min(chunk_size_bytes, LOCAL_ETAG_READ_BUFFER_BYTES)

    with open(path, "rb") as fp:
        chunk_hasher = hashlib.md5()
        chunk_bytes_read = 0

        while True:
            if chunk_bytes_read == chunk_size_bytes:
                chunk_digests.append(chunk_hasher.digest())
                chunk_hasher = hashlib.md5()
                chunk_bytes_read = 0

            bytes_remaining_in_chunk = chunk_size_bytes - chunk_bytes_read
            read_size = min(buffer_size, bytes_remaining_in_chunk)
            data = fp.read(read_size)
            if not data:
                break

            chunk_hasher.update(data)
            chunk_bytes_read += len(data)

        if chunk_bytes_read:
            chunk_digests.append(chunk_hasher.digest())

    if len(chunk_digests) == 0:  # Empty file (can never be multipart upload)
        expected_etag = f'"{hashlib.md5().hexdigest()}"'
    elif len(chunk_digests) == 1 and size_bytes < threshold_bytes:  # File smaller than threshold
        expected_etag = f'"{chunk_digests[0].hex()}"'
    else:  # We are dealing with a multipart upload
        digests = b"".join(chunk_digests)
        multipart_md5 = hashlib.md5(digests)
        expected_etag = f'"{multipart_md5.hexdigest()}-{len(chunk_digests)}"'

    return expected_etag

get_s3_path_stats

get_s3_path_stats(s3_path, **kwargs)

Get statistics for an S3 path including size, object count, and last modified time.

Parameters:

Name Type Description Default
s3_path S3URI

Path to the file or prefix in S3.

required
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
S3PathStats

Statistics object containing last_modified, size_bytes, and object_count.

Source code in src/aibs_informatics_aws_utils/s3.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def get_s3_path_stats(s3_path: S3URI, **kwargs) -> S3PathStats:
    """Get statistics for an S3 path including size, object count, and last modified time.

    Args:
        s3_path (S3URI): Path to the file or prefix in S3.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        Statistics object containing last_modified, size_bytes, and object_count.
    """
    s3 = get_s3_client(**kwargs)
    last_modified = get_current_time()
    bucket, key = s3_path.bucket, s3_path.key
    logger.info(f"Getting file stats: {s3_path}")

    size_bytes: Optional[int] = None
    object_count: Optional[int] = None

    try:
        file_info = s3.head_object(Bucket=bucket, Key=key)
        last_modified = file_info["LastModified"]
        size_bytes = file_info["ContentLength"]
        object_count = 1
    except ClientError:
        logger.debug("Caught client error, assuming it means this is a dir")
        # This means that the path is most likely a prefix, not an object key
        size_bytes, object_count = _get_prefix_size_and_count(
            bucket_name=bucket, key_prefix=key, **kwargs
        )
        last_modified = _get_prefix_last_modified(bucket_name=bucket, key_prefix=key, **kwargs)
    except Exception as e:
        logger.error("Caught unexpected exception.")
        logger.exception(e)
        raise e

    return S3PathStats(
        last_modified=last_modified, size_bytes=size_bytes, object_count=object_count
    )

is_folder

is_folder(s3_path, **kwargs)

Check if S3 Path is a "folder" or object.

To be a "folder", it must satisfy following conditions:

  • The Key Prefix HAS objects under prefix
  • All objects under Key Prefix are separated by /
    • i.e. key=key_prefix/...obj1, key=key_prefix/...obj2, etc.
Example
# For a bucket="s3://bucket" with the following keys:
#   /path/to/one/object1
#   /path/to/one/object2
#   /path/to/one_object1
#   /path/to/one_object2
#   /path/to/another
#   /path/to/another/object1
#   /path/to/another/object2

is_folder("s3://bucket/path/to/one")        # >> TRUE
is_folder("s3://bucket/path/to/another")    # >> TRUE
is_folder("s3://bucket/path/to/another/")   # >> TRUE

is_folder("s3://bucket/path/to/one_")       # >> FALSE
is_folder("s3://bucket/path/to/one_object1")  # >> FALSE
is_folder("s3://bucket/path/to/doesnotexist") # >> FALSE

Parameters:

Name Type Description Default
s3_path S3URI

S3 URI to check.

required
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
bool

True if s3 path is a folder.

Source code in src/aibs_informatics_aws_utils/s3.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def is_folder(s3_path: S3URI, **kwargs) -> bool:
    """Check if S3 Path is a "folder" or object.

    To be a "folder", it must satisfy following conditions:

    - The Key Prefix HAS objects under prefix
    - All objects under Key Prefix are separated by `/`
        - i.e. `key=key_prefix/...obj1`, `key=key_prefix/...obj2`, etc.

    Example:
        ```python
        # For a bucket="s3://bucket" with the following keys:
        #   /path/to/one/object1
        #   /path/to/one/object2
        #   /path/to/one_object1
        #   /path/to/one_object2
        #   /path/to/another
        #   /path/to/another/object1
        #   /path/to/another/object2

        is_folder("s3://bucket/path/to/one")        # >> TRUE
        is_folder("s3://bucket/path/to/another")    # >> TRUE
        is_folder("s3://bucket/path/to/another/")   # >> TRUE

        is_folder("s3://bucket/path/to/one_")       # >> FALSE
        is_folder("s3://bucket/path/to/one_object1")  # >> FALSE
        is_folder("s3://bucket/path/to/doesnotexist") # >> FALSE
        ```

    Args:
        s3_path (S3URI): S3 URI to check.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        True if s3 path is a folder.
    """
    return is_object_prefix(
        s3_path=S3URI.build(bucket_name=s3_path.bucket, key=s3_path.key_with_folder_suffix),
        **kwargs,
    )

is_folder_placeholder_object

is_folder_placeholder_object(s3_path, **kwargs)

Check if S3 Path is a "folder placeholder" object.

A "folder placeholder" object is defined as an S3 object that:

  • Has a key that ends with a / character.
  • Has a content length of zero bytes.

These objects are often used to represent folders in S3, which is a flat storage system. For these purposes, we want to ignore such objects when considering the contents of a folder.

Parameters:

Name Type Description Default
s3_path S3URI

S3 URI to check.

required
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
bool

True if the S3 path is a folder placeholder object, False otherwise.

Source code in src/aibs_informatics_aws_utils/s3.py
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def is_folder_placeholder_object(s3_path: S3URI, **kwargs) -> bool:
    """Check if S3 Path is a "folder placeholder" object.

    A "folder placeholder" object is defined as an S3 object that:

    - Has a key that ends with a `/` character.
    - Has a content length of zero bytes.

    These objects are often used to represent folders in S3, which is a flat storage system.
    For these purposes, we want to ignore such objects when considering the contents of a folder.

    Args:
        s3_path (S3URI): S3 URI to check.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        True if the S3 path is a folder placeholder object, False otherwise.
    """
    if not s3_path.has_folder_suffix():
        return False

    s3 = get_s3_client(**kwargs)
    try:
        obj = s3.head_object(Bucket=s3_path.bucket, Key=s3_path.key)
        return obj["ContentLength"] == 0
    except ClientError as e:
        if client_error_code_check(e, "404", "NoSuchKey", "NotFound"):
            return False
        raise AWSError(
            f"Error checking existence of {s3_path}: {get_client_error_message(e)}"
        ) from e

list_s3_paths

list_s3_paths(
    s3_path, include=None, exclude=None, **kwargs
)

List all S3 paths under a Key prefix (as defined by S3 path).

Include/Exclude patterns are applied to the RELATIVE KEY PATH.

Logic for how the include/exclude patterns are applied:

  • include/exclude: pattern provided? Y/N
  • I/E Match: If pattern provided, does S3 relative Key match? Y/N
include I Match exclude E Match Append?
N - N - Y
Y Y N - Y
Y N N - N
N - Y Y N
N - Y N Y
Y Y Y Y N
Y N Y Y N
Y Y Y N Y
Y N Y N N

Parameters:

Name Type Description Default
s3_path S3URI

The root key path under which to find objects.

required
include Optional[List[Pattern]]

Optional list of regex patterns on which to retain objects if matching any. Defaults to None.

None
exclude Optional[List[Pattern]]

Optional list of regex patterns on which to filter out objects if matching any. Defaults to None.

None
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
List[S3URI]

List of S3 paths under root that satisfy filters.

Source code in src/aibs_informatics_aws_utils/s3.py
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
def list_s3_paths(
    s3_path: S3URI,
    include: Optional[List[Pattern]] = None,
    exclude: Optional[List[Pattern]] = None,
    **kwargs,
) -> List[S3URI]:
    """List all S3 paths under a Key prefix (as defined by S3 path).

    Include/Exclude patterns are applied to the RELATIVE KEY PATH.

    Logic for how the include/exclude patterns are applied:

    - **include/exclude**: pattern provided? Y/N
    - **I/E Match**: If pattern provided, does S3 relative Key match? Y/N

    | include | I Match | exclude | E Match | Append? |
    |---------|---------|---------|---------|--------|
    | N       | -       | N       | -       | Y      |
    | Y       | Y       | N       | -       | Y      |
    | Y       | N       | N       | -       | N      |
    | N       | -       | Y       | Y       | N      |
    | N       | -       | Y       | N       | Y      |
    | Y       | Y       | Y       | Y       | N      |
    | Y       | N       | Y       | Y       | N      |
    | Y       | Y       | Y       | N       | Y      |
    | Y       | N       | Y       | N       | N      |

    Args:
        s3_path (S3URI): The root key path under which to find objects.
        include (Optional[List[Pattern]]): Optional list of regex patterns on which
            to retain objects if matching any. Defaults to None.
        exclude (Optional[List[Pattern]]): Optional list of regex patterns on which
            to filter out objects if matching any. Defaults to None.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        List of S3 paths under root that satisfy filters.
    """

    empty_include = (include is None) or (not any(include))
    empty_exclude = (exclude is None) or (not any(exclude))

    s3 = get_s3_client(**kwargs)

    def match_results(value: str, patterns: List[Pattern]) -> List[bool]:
        return [_.match(value) is not None for _ in patterns]

    paginator = s3.get_paginator("list_objects_v2")

    s3_paths: List[S3URI] = []
    for response in paginator.paginate(Bucket=s3_path.bucket, Prefix=s3_path.key):
        for item in response.get("Contents", []):
            key = item.get("Key", "")
            relative_key = key[len(s3_path.key) :]
            if empty_include or any(match_results(relative_key, include)):  # type: ignore
                if empty_exclude or (not any(match_results(relative_key, exclude))):  # type: ignore
                    s3_paths.append(S3URI.build(bucket_name=s3_path.bucket, key=key))
    return s3_paths

move_s3_path

move_s3_path(
    source_path,
    destination_path,
    include=None,
    exclude=None,
    extra_args=None,
    transfer_config=None,
    **kwargs
)

Move S3 Path from source to destination.

There is no explicit "move" S3 method, so we combine COPY + DELETE operations.

Parameters:

Name Type Description Default
source_path S3URI

Source S3 path.

required
destination_path S3URI

Destination S3 path.

required
include Optional[List[Pattern]]

Patterns to include. Defaults to None.

None
exclude Optional[List[Pattern]]

Patterns to exclude. Defaults to None.

None
extra_args Optional[Dict[str, Any]]

Extra arguments for the copy. Defaults to None.

None
transfer_config Optional[TransferConfig]

Transfer configuration. Defaults to None.

None
**kwargs

Additional arguments passed to the S3 client.

{}
Source code in src/aibs_informatics_aws_utils/s3.py
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
def move_s3_path(
    source_path: S3URI,
    destination_path: S3URI,
    include: Optional[List[Pattern]] = None,
    exclude: Optional[List[Pattern]] = None,
    extra_args: Optional[Dict[str, Any]] = None,
    transfer_config: Optional[TransferConfig] = None,
    **kwargs,
):
    """Move S3 Path from source to destination.

    There is no explicit "move" S3 method, so we combine COPY + DELETE operations.

    Args:
        source_path (S3URI): Source S3 path.
        destination_path (S3URI): Destination S3 path.
        include (Optional[List[Pattern]]): Patterns to include. Defaults to None.
        exclude (Optional[List[Pattern]]): Patterns to exclude. Defaults to None.
        extra_args (Optional[Dict[str, Any]]): Extra arguments for the copy. Defaults to None.
        transfer_config (Optional[TransferConfig]): Transfer configuration. Defaults to None.
        **kwargs: Additional arguments passed to the S3 client.
    """
    logger.info(f"Moving {source_path} to {destination_path}. Starting copy")
    responses = sync_paths(
        source_path=source_path,
        destination_path=destination_path,
        include=include,
        exclude=exclude,
        extra_args=extra_args,
        transfer_config=transfer_config,
        **kwargs,
    )
    logger.info(f"Copy complete. Starting deletion of {source_path}")
    paths_to_delete = [_.request.source_path for _ in responses if not _.failed]
    delete_s3_objects(paths_to_delete, **kwargs)

process_transfer_requests

process_transfer_requests(
    *transfer_requests,
    transfer_config=None,
    force=False,
    size_only=False,
    suppress_errors=False,
    **kwargs
)

Process a list of S3 transfer requests.

Parameters:

Name Type Description Default
*transfer_requests S3TransferRequest

Variable number of transfer requests to process.

()
transfer_config Optional[TransferConfig]

Transfer configuration. Defaults to None.

None
force bool

Whether to force the transfer. Defaults to False.

False
size_only bool

Whether to only check size when transferring. Defaults to False.

False
suppress_errors bool

Whether to suppress errors. Defaults to False.

False
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
List[S3TransferResponse]

List of transfer responses.

Source code in src/aibs_informatics_aws_utils/s3.py
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
def process_transfer_requests(
    *transfer_requests: S3TransferRequest,
    transfer_config: Optional[TransferConfig] = None,
    force: bool = False,
    size_only: bool = False,
    suppress_errors: bool = False,
    **kwargs,
) -> List[S3TransferResponse]:
    """Process a list of S3 transfer requests.

    Args:
        *transfer_requests (S3TransferRequest): Variable number of transfer requests to process.
        transfer_config (Optional[TransferConfig]): Transfer configuration. Defaults to None.
        force (bool): Whether to force the transfer. Defaults to False.
        size_only (bool): Whether to only check size when transferring. Defaults to False.
        suppress_errors (bool): Whether to suppress errors. Defaults to False.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        List of transfer responses.
    """
    transfer_responses = []

    for i, request in enumerate(transfer_requests):
        try:
            if isinstance(request, S3CopyRequest):
                copy_s3_object(
                    source_path=request.source_path,
                    destination_path=request.destination_path,
                    extra_args=request.extra_args,
                    transfer_config=transfer_config,
                    force=force,
                    size_only=size_only,
                    **kwargs,
                )
            elif isinstance(request, S3UploadRequest):
                upload_file(
                    local_path=request.source_path,
                    s3_path=request.destination_path,
                    extra_args=request.extra_args,
                    transfer_config=transfer_config,
                    force=force,
                    size_only=size_only,
                    **kwargs,
                )
            elif isinstance(request, S3DownloadRequest):
                # This is a special case where s3 object path has a trailing slash.
                # This should be ignored if the object is empty and raise an error otherwise.
                if request.source_path.has_folder_suffix():
                    if get_object(request.source_path, **kwargs).content_length != 0:
                        raise ValueError(
                            "Cannot download S3 object to local path. Downloads of objects "
                            "with '/' suffixed keys and of non-zero size are NOT supported."
                        )
                else:
                    download_s3_object(
                        s3_path=request.source_path,
                        local_path=request.destination_path,
                        transfer_config=transfer_config,
                        force=force,
                        size_only=size_only,
                        **kwargs,
                    )
            transfer_responses.append(S3TransferResponse(request, False))
            logger.info(f"Processed s3 transfer request {i + 1} of {len(transfer_requests)}")
        except Exception as e:
            msg = f"Failed to copy {request.source_path} to {request.destination_path}: {e}"
            if not suppress_errors:
                logger.error(msg)
                logger.exception(msg)
                raise e
            logger.warning(msg)
            transfer_responses.append(S3TransferResponse(request, True, f"{e}"))
    return transfer_responses

should_sync

should_sync(
    source_path, destination_path, size_only=False, **kwargs
)

Check whether transfer from source to destination is required.

This logic matches the logic in aws s3 sync command.

A transfer from SRC -> DST is necessary if any of the following are true:

  • DST does not exist
  • SRC was last modified more recently than DST
  • SRC size is different than DST
  • size_only is False and SRC ETag is different than DST

Parameters:

Name Type Description Default
source_path Union[Path, S3URI]

Source path.

required
destination_path Union[Path, S3URI]

Destination to transfer to.

required
size_only bool

If True, limits content comparison to size and date only. Defaults to False.

False
**kwargs

Additional arguments passed to the S3 client.

{}

Returns:

Type Description
bool

True if sync is needed, False otherwise.

Source code in src/aibs_informatics_aws_utils/s3.py
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
def should_sync(
    source_path: Union[Path, S3URI],
    destination_path: Union[Path, S3URI],
    size_only: bool = False,
    **kwargs,
) -> bool:
    """Check whether transfer from source to destination is required.

    This logic matches the logic in `aws s3 sync` command.

    A transfer from SRC -> DST is necessary if any of the following are true:

    - DST does not exist
    - SRC was last modified more recently than DST
    - SRC size is different than DST
    - `size_only` is False and SRC ETag is different than DST

    Args:
        source_path (Union[Path, S3URI]): Source path.
        destination_path (Union[Path, S3URI]): Destination to transfer to.
        size_only (bool): If True, limits content comparison to size and date only.
            Defaults to False.
        **kwargs: Additional arguments passed to the S3 client.

    Returns:
        True if sync is needed, False otherwise.
    """
    source_last_modified: datetime
    source_size_bytes: int
    source_hash: Callable[[], Optional[str]]
    dest_last_modified: Optional[datetime] = None
    dest_size_bytes: Optional[int] = None
    dest_hash: Callable[[], Optional[str]]
    multipart_chunk_size_bytes: Optional[int] = None
    multipart_threshold_bytes: Optional[int] = None

    if isinstance(destination_path, S3URI) and is_object(destination_path):
        dest_s3_object = get_object(destination_path, **kwargs)
        dest_last_modified = dest_s3_object.last_modified
        dest_size_bytes = dest_s3_object.content_length
        multipart_chunk_size_bytes, multipart_threshold_bytes = determine_multipart_attributes(
            destination_path, **kwargs
        )

        def dest_hash() -> Optional[str]:
            return dest_s3_object.e_tag if not size_only else None
    elif isinstance(destination_path, Path) and destination_path.exists():
        dest_local_path = destination_path
        local_stats = dest_local_path.stat()
        dest_last_modified = datetime.fromtimestamp(local_stats.st_mtime, tz=timezone.utc)
        dest_size_bytes = local_stats.st_size

        def dest_hash() -> Optional[str]:
            return (
                get_local_etag(
                    dest_local_path, multipart_chunk_size_bytes, multipart_threshold_bytes
                )
                if not size_only
                else None
            )
    else:
        return True

    if isinstance(source_path, S3URI) and is_object(source_path):
        src_s3_object = get_object(source_path, **kwargs)
        source_last_modified = src_s3_object.last_modified
        source_size_bytes = src_s3_object.content_length
        multipart_chunk_size_bytes, multipart_threshold_bytes = determine_multipart_attributes(
            source_path, **kwargs
        )

        def source_hash() -> Optional[str]:
            return src_s3_object.e_tag if not size_only else None
    elif isinstance(source_path, Path) and source_path.exists():
        src_local_path = source_path
        local_stats = src_local_path.stat()
        source_last_modified = datetime.fromtimestamp(local_stats.st_mtime, tz=timezone.utc)
        source_size_bytes = local_stats.st_size

        def source_hash() -> Optional[str]:
            return (
                get_local_etag(
                    src_local_path, multipart_chunk_size_bytes, multipart_threshold_bytes
                )
                if not size_only
                else None
            )
    else:
        raise ValueError(
            f"Cannot transfer, source path {source_path} does not exist! "
            f"is s3={isinstance(source_path, S3URI)}, is local={isinstance(source_path, Path)} "
            f"is object={isinstance(source_path, S3URI) and is_object(source_path, **kwargs)}, "
            f"is local exists={isinstance(source_path, Path) and source_path.exists()}, "
            f"type={type(source_path)}"
        )

    if dest_size_bytes is None or dest_last_modified is None:
        return True
    if source_size_bytes != dest_size_bytes:
        return True
    if source_last_modified.replace(microsecond=0) > dest_last_modified.replace(microsecond=0):
        return True
    if not size_only and source_hash() != dest_hash():
        return True
    return False

update_path_tags

update_path_tags(
    s3_path, tags, mode="append", recursive=True, **kwargs
)

Update tags for all objects at or prefixed by the specified path.

There are three modes for updating tags:

  • replace: Replace all existing tags with new tags
  • append: Merge new tags with existing tags
  • delete: Delete specified tags from existing tags (values do not matter)

If recursive is True and s3_path is an object prefix, all objects under the prefix will have their tags updated. If there is an object at s3_path, it will also have its tags updated.

Parameters:

Name Type Description Default
s3_path S3URI

S3 path or prefix to update tags for.

required
tags Dict[str, str]

Tags to update.

required
mode Literal['replace', 'append', 'delete']

Tag update mode. Defaults to "append".

'append'
recursive bool

Whether to update tags recursively for all objects under prefix. Defaults to True.

True
Source code in src/aibs_informatics_aws_utils/s3.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
def update_path_tags(
    s3_path: S3URI,
    tags: Dict[str, str],
    mode: Literal["replace", "append", "delete"] = "append",
    recursive: bool = True,
    **kwargs,
):
    """Update tags for all objects at or prefixed by the specified path.

    There are three modes for updating tags:

    - **replace**: Replace all existing tags with new tags
    - **append**: Merge new tags with existing tags
    - **delete**: Delete specified tags from existing tags (values do not matter)

    If recursive is True and s3_path is an object prefix, all objects under the prefix
    will have their tags updated. If there is an object at s3_path, it will also have
    its tags updated.

    Args:
        s3_path (S3URI): S3 path or prefix to update tags for.
        tags (Dict[str, str]): Tags to update.
        mode (Literal["replace", "append", "delete"]): Tag update mode. Defaults to "append".
        recursive (bool): Whether to update tags recursively for all objects under prefix.
            Defaults to True.
    """
    if recursive and is_object_prefix(s3_path, **kwargs):
        s3_paths = list_s3_paths(s3_path=s3_path, **kwargs)
        logger.info(f"Updating tags for {len(s3_paths)} objects under prefix {s3_path}")
        for nested_s3_path in s3_paths:
            update_path_tags(nested_s3_path, tags, mode, recursive=False, **kwargs)
    if is_object(s3_path, **kwargs):
        s3 = get_s3_client(**kwargs)

        current_tags = {
            tag["Key"]: tag["Value"]
            for tag in s3.get_object_tagging(
                Bucket=s3_path.bucket,
                Key=s3_path.key,
            ).get("TagSet", [])
        }

        if mode == "replace":
            new_tags = tags
        elif mode in ["append", "delete"]:
            new_tags = current_tags.copy()
            if mode == "append":
                new_tags.update(tags)
            else:
                for tag_key in tags.keys():
                    if tag_key in new_tags:
                        del new_tags[tag_key]
        else:
            raise ValueError(f"Unknown tag update mode: {mode}")  # pragma: no cover

        s3.put_object_tagging(
            Bucket=s3_path.bucket,
            Key=s3_path.key,
            Tagging={"TagSet": [{"Key": k, "Value": v} for k, v in new_tags.items()]},
        )

update_s3_storage_class

update_s3_storage_class(s3_path, target_storage_class)

Transition an object (or objects) represented by an s3_path to a target storage class.

Note

This function needs to be called again if it returns False.

Parameters:

Name Type Description Default
s3_path S3URI

The s3_path representing an S3 key or prefix whose object(s) should have their storage class updated.

required
target_storage_class S3StorageClass

The target storage class.

required

Raises:

Type Description
RuntimeError

If an unsupported target_storage_class is provided.

RuntimeError

If the path or paths under the provided root s3_path have a storage class that does not support transitions (e.g. S3StorageClass.OUTPOSTS, S3StorageClass.REDUCED_REDUNDANCY).

Returns:

Type Description
bool

True if the s3_path successfully had its storage class updated. False if s3_path did not fully update its storage class, specifically:

  • A constituent object or objects under S3 archive storage class are still restoring
  • A constituent object or objects failed to transition to the desired storage class
Source code in src/aibs_informatics_aws_utils/s3.py
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
def update_s3_storage_class(
    s3_path: S3URI,
    target_storage_class: S3StorageClass,
) -> bool:
    """Transition an object (or objects) represented by an s3_path to a target storage class.

    Note:
        This function needs to be called again if it returns False.

    Args:
        s3_path (S3URI): The s3_path representing an S3 key or prefix whose object(s)
            should have their storage class updated.
        target_storage_class (S3StorageClass): The target storage class.

    Raises:
        RuntimeError: If an unsupported target_storage_class is provided.
        RuntimeError: If the path or paths under the provided root s3_path have a storage class
            that does not support transitions (e.g. `S3StorageClass.OUTPOSTS`,
            `S3StorageClass.REDUCED_REDUNDANCY`).

    Returns:
        True if the s3_path successfully had its storage class updated.
            False if s3_path did not fully update its storage class, specifically:

            - A constituent object or objects under S3 archive storage class are still restoring
            - A constituent object or objects failed to transition to the desired storage class
    """

    if target_storage_class not in S3StorageClass.list_transitionable_storage_classes():
        raise RuntimeError(
            f"Error trying to update s3 storage class for s3_path ({s3_path}) "
            f"with unsupported target storage class ({target_storage_class.value})."
        )

    s3_paths = list_s3_paths(s3_path=s3_path)

    # 1. Iterate over all s3 paths under our s3_path and determine archive restores to be done.
    #    Also, start any storage class transitions that can be done.
    paths_to_restore: List[S3URI] = []
    paths_restoring: List[S3URI] = []
    failed_transitions: List[S3URI] = []
    for p in s3_paths:
        run_path_transition: bool = False
        s3_obj = get_object(p)
        print(
            f"debug: current storage class: {s3_obj.storage_class}, target: {target_storage_class}"
        )
        current_storage_class = S3StorageClass.from_boto_s3_obj(s3_obj)  # type: ignore[arg-type]
        # Current storage class matches target: No-op
        if current_storage_class == target_storage_class:
            continue
        # Current storage class is archived: Check restore status
        elif current_storage_class in S3StorageClass.list_archive_storage_classes():
            o = S3RestoreStatus.from_raw_s3_restore_status(s3_obj.restore)
            print(
                f"s3 path ({p}), current: {current_storage_class}, "
                f"target: {target_storage_class}, restore status: {o}"
            )
            if o.restore_status == S3RestoreStatusEnum.NOT_STARTED:
                paths_to_restore.append(p)
            elif o.restore_status == S3RestoreStatusEnum.IN_PROGRESS:
                paths_restoring.append(p)
            elif o.restore_status == S3RestoreStatusEnum.FINISHED:
                run_path_transition = True
        # Current storage class does not match target: Needs transition
        elif current_storage_class in S3StorageClass.list_transitionable_storage_classes():
            run_path_transition = True
        # Current storage class cannot be transitioned
        else:
            raise RuntimeError(
                f"Error trying to update the s3 storage class for s3_path ({p}) "
                f"which has an unsupported current storage class: {current_storage_class}"
            )

        if run_path_transition:
            try:
                copy_s3_object(
                    source_path=p,
                    destination_path=p,
                    extra_args={"StorageClass": target_storage_class.value},
                )
            except ClientError as e:
                logger.error(
                    f"Failed to transition s3 path ({p}) to storage "
                    f"class ({target_storage_class.value}). Error details: {e}"
                )
                failed_transitions.append(p)

    # 2. Start off any restores that need to be done
    for p in paths_to_restore:
        s3_obj = get_object(p)
        s3_obj.restore_object(
            RestoreRequest={
                "Days": 2,
                "GlacierJobParameters": {"Tier": "Standard"},
            }
        )

    # 3. If there are restores (started or in progress) or failed transitions for objects
    #    under our `s3_path` return False and we'll need to call update_s3_storage class in
    #    the future again.
    if any(paths_to_restore) or any(paths_restoring):
        return False

    if any(failed_transitions):
        logger.warning(
            "The following paths failed to transition to the target_storage_class "
            f"({target_storage_class}): {failed_transitions}"
        )
        return False

    return True