Skip to content

Data Sync Operations

Handlers for data synchronization operations.

Handlers

  • GetJSONFromFileHandler - Retrieves JSON data from a file
  • PutJSONToFileHandler - Writes JSON data to a file
  • DataSyncHandler - Simple data sync task
  • BatchDataSyncHandler - Handles batch of data sync tasks
  • PrepareBatchDataSyncHandler - Prepares batch data sync tasks

Data synchronization operation handlers.

Provides Lambda handlers for reading, writing, and synchronizing data between local file systems and S3.

BatchDataSyncHandler dataclass

BatchDataSyncHandler()

Bases: LambdaHandler[BatchDataSyncRequest, BatchDataSyncResponse]

Handler for processing batches of data synchronization requests.

Processes multiple sync requests sequentially, with optional partial failure handling.

handle

handle(request)

Process a batch of data sync requests.

Parameters:

Name Type Description Default
request BatchDataSyncRequest

Request containing a list of sync requests.

required

Returns:

Type Description
BatchDataSyncResponse

Response containing aggregated results and any failed requests.

Raises:

Type Description
Exception

If a sync fails and allow_partial_failure is False.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def handle(self, request: BatchDataSyncRequest) -> BatchDataSyncResponse:
    """Process a batch of data sync requests.

    Args:
        request (BatchDataSyncRequest): Request containing a list of sync requests.

    Returns:
        Response containing aggregated results and any failed requests.

    Raises:
        Exception: If a sync fails and allow_partial_failure is False.
    """
    self.logger.info(f"Received {len(request.requests)} requests to transfer")
    if isinstance(request.requests, S3URI):
        self.logger.info(f"Request is stored at {request.requests}... fetching content.")
        _ = download_to_json(request.requests)
        assert isinstance(_, list)
        batch_requests = [DataSyncRequest.from_dict(__) for __ in _]
    else:
        batch_requests = request.requests

    batch_result = BatchDataSyncResult()
    response = BatchDataSyncResponse(result=batch_result, failed_requests=[])

    for i, _ in enumerate(batch_requests):
        sync_operations = DataSyncOperations(_)
        self.logger.info(
            f"[{i + 1}/{len(batch_requests)}] "
            f"Syncing content from {_.source_path} to {_.destination_path}"
        )
        try:
            result = sync_operations.sync(
                source_path=_.source_path,
                destination_path=_.destination_path,
                source_path_prefix=_.source_path_prefix,
            )
            if result.bytes_transferred is not None:
                batch_result.add_bytes_transferred(result.bytes_transferred)
            if result.files_transferred is not None:
                batch_result.add_files_transferred(result.files_transferred)
            batch_result.increment_successful_requests_count()

            if result.bytes_transferred:
                result.add_bytes_transferred(result.bytes_transferred)
        except Exception as e:
            batch_result.increment_failed_requests_count()
            response.add_failed_request(_)
            self.logger.error(
                f"Failed to sync content from {_.source_path} to {_.destination_path}"
            )
            self.logger.error(e)
            if not request.allow_partial_failure:
                raise e
    return response

DataSyncHandler dataclass

DataSyncHandler()

Bases: LambdaHandler[DataSyncRequest, DataSyncResponse]

Handler for synchronizing data between source and destination paths.

Supports syncing between local file systems and S3.

handle

handle(request)

Synchronize data from source to destination.

Parameters:

Name Type Description Default
request DataSyncRequest

Request containing source and destination paths.

required

Returns:

Type Description
DataSyncResponse

Response containing the sync result.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py
160
161
162
163
164
165
166
167
168
169
170
171
def handle(self, request: DataSyncRequest) -> DataSyncResponse:
    """Synchronize data from source to destination.

    Args:
        request (DataSyncRequest): Request containing source and destination paths.

    Returns:
        Response containing the sync result.
    """
    sync_operations = DataSyncOperations(request)
    result = sync_operations.sync_task(request)
    return DataSyncResponse(request=request, result=result)

GetJSONFromFileHandler dataclass

GetJSONFromFileHandler()

Bases: LambdaHandler[GetJSONFromFileRequest, GetJSONFromFileResponse]

Handler for retrieving JSON content from a file.

Supports loading JSON from both local files and S3 locations.

handle

handle(request)

Load JSON content from the specified path.

Parameters:

Name Type Description Default
request GetJSONFromFileRequest

Request containing the file path.

required

Returns:

Type Description
GetJSONFromFileResponse

Response containing the loaded JSON content.

Raises:

Type Description
Exception

If the content cannot be fetched.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def handle(self, request: GetJSONFromFileRequest) -> GetJSONFromFileResponse:
    """Load JSON content from the specified path.

    Args:
        request (GetJSONFromFileRequest): Request containing the file path.

    Returns:
        Response containing the loaded JSON content.

    Raises:
        Exception: If the content cannot be fetched.
    """
    try:
        path = request.path

        self.logger.info(f"Fetching content from {path}")
        if isinstance(path, S3URI):
            self.logger.info("Downloading from S3")
            content = download_to_json(s3_path=path)
        else:
            self.logger.info("Loading from path")
            content = load_json(path)
        return GetJSONFromFileResponse(content=content)
    except Exception as e:
        self.logger.error(f"Could not fetch content from {request.path}")
        raise e

PrepareBatchDataSyncHandler dataclass

PrepareBatchDataSyncHandler()

Bases: LambdaHandler[PrepareBatchDataSyncRequest, PrepareBatchDataSyncResponse]

Handler for preparing batch data synchronization requests.

Analyzes the source path and partitions files into optimally-sized batches for parallel processing using the bin-packing algorithm.

build_node_batches classmethod

build_node_batches(nodes, batch_size_bytes_limit)

Batch nodes based on threshold

This is a version of the classic "Bin Packing" problem. https://en.wikipedia.org/wiki/Bin_packing_problem

The following solutions implements the First-fit decreasing algorithm.

Notes
  • nodes can have sizes greater than the limit

Parameters:

Name Type Description Default
nodes List[Node]

List of nodes to batch.

required
batch_size_bytes_limit int

Size limit in bytes for a batch of nodes.

required

Returns:

Type Description
List[List[Node]]

List of node batches (list of lists).

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
@classmethod
def build_node_batches(
    cls, nodes: List[Node], batch_size_bytes_limit: int
) -> List[List[Node]]:
    """Batch nodes based on threshold

    This is a version of the classic "Bin Packing" problem.
    https://en.wikipedia.org/wiki/Bin_packing_problem

    The following solutions implements the First-fit decreasing algorithm.

    Notes:
        - nodes can have sizes greater than the limit

    Args:
        nodes (List[Node]): List of nodes to batch.
        batch_size_bytes_limit (int): Size limit in bytes for a batch of nodes.

    Returns:
        List of node batches (list of lists).
    """

    ## We will use a revised version of the bin packing problem:
    # https://en.wikipedia.org/wiki/Bin_packing_problem

    # Step 1:   and then sort the nodes by size (descending order)
    unbatched_nodes = sorted(nodes, key=lambda node: node.size_bytes, reverse=True)

    # Step 2:   Group nodes in order to maximize the data synced per request
    #           (bin packing problem)
    node_batches: List[List[Node]] = []

    # (Optimize) Convert all nodes that are larger than the threshold into single requests.
    while unbatched_nodes and unbatched_nodes[0].size_bytes > batch_size_bytes_limit:
        node_batches.append([unbatched_nodes.pop(0)])

    # Compute the batch node sizes (they should all be larger than the)
    node_batch_sizes = [
        sum([node.size_bytes for node in node_batch]) for node_batch in node_batches
    ]

    for node in unbatched_nodes:
        for i in range(len(node_batch_sizes)):
            if node_batch_sizes[i] + node.size_bytes <= batch_size_bytes_limit:
                node_batch_sizes[i] += node.size_bytes
                node_batches[i].append(node)
                break
        else:
            node_batch_sizes.append(node.size_bytes)
            node_batches.append([node])

    return node_batches

handle

handle(request)

Prepare batch data sync requests.

Partitions the source data into optimally-sized batches.

Parameters:

Name Type Description Default
request PrepareBatchDataSyncRequest

Request containing source path and configuration.

required

Returns:

Type Description
PrepareBatchDataSyncResponse

Response containing prepared batch requests.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def handle(self, request: PrepareBatchDataSyncRequest) -> PrepareBatchDataSyncResponse:
    """Prepare batch data sync requests.

    Partitions the source data into optimally-sized batches.

    Args:
        request (PrepareBatchDataSyncRequest):
            Request containing source path and configuration.

    Returns:
        Response containing prepared batch requests.
    """
    self.logger.info("Preparing S3 Batch Sync Requests")
    root: Union[S3FileSystem, LocalFileSystem]
    if isinstance(request.source_path, S3URI):
        root = S3FileSystem.from_path(request.source_path)
    else:
        root = LocalFileSystem.from_path(request.source_path)
    batch_size_bytes_limit = request.batch_size_bytes_limit or self.DEFAULT_SOFT_MAX_BYTES

    ## We will use a revised version of the bin packing problem:
    # https://en.wikipedia.org/wiki/Bin_packing_problem

    # Step 1A: Partition nodes s.t. we deal with fewer paths in total.
    self.logger.info(f"Partitioning batch size bytes limit: {batch_size_bytes_limit}")
    nodes = root.partition(size_bytes_limit=batch_size_bytes_limit)

    batch_data_sync_requests: List[BatchDataSyncRequest] = []

    node_batches = self.build_node_batches(nodes, batch_size_bytes_limit)
    self.logger.info(f"Batched {len(nodes)} nodes into {len(node_batches)} batches")
    for node_batch in node_batches:
        data_sync_requests: List[DataSyncRequest] = []
        for node in sorted(node_batch):
            data_sync_requests.append(
                DataSyncRequest(
                    source_path=self.build_source_path(request, node),
                    destination_path=self.build_destination_path(request, node),
                    source_path_prefix=request.source_path_prefix,
                    max_concurrency=request.max_concurrency,
                    retain_source_data=request.retain_source_data,
                    require_lock=request.require_lock,
                    force=request.force,
                    size_only=request.size_only,
                    fail_if_missing=request.fail_if_missing,
                    include_detailed_response=request.include_detailed_response,
                    remote_to_local_config=request.remote_to_local_config,
                )
            )
        batch_data_sync_requests.append(BatchDataSyncRequest(requests=data_sync_requests))

    if request.temporary_request_payload_path:
        self.logger.info(
            f"Uploading batch requests to {request.temporary_request_payload_path}"
        )
        new_batch_data_sync_requests = []

        for i, batch_data_sync_request in enumerate(batch_data_sync_requests):
            upload_json(
                [cast(DataSyncRequest, _).to_dict() for _ in batch_data_sync_request.requests],
                s3_path=(
                    s3_path := request.temporary_request_payload_path / f"request_{i}.json"
                ),
            )
            new_batch_data_sync_requests.append(BatchDataSyncRequest(requests=s3_path))
        return PrepareBatchDataSyncResponse(requests=new_batch_data_sync_requests)
    else:
        return PrepareBatchDataSyncResponse(requests=batch_data_sync_requests)

PutJSONToFileHandler dataclass

PutJSONToFileHandler()

Bases: LambdaHandler[PutJSONToFileRequest, PutJSONToFileResponse]

Handler for writing JSON content to a file.

Supports writing to both local files and S3 locations. If no path is provided, generates a scratch S3 path.

handle

handle(request)

Write JSON content to the specified path.

Parameters:

Name Type Description Default
request PutJSONToFileRequest

Request containing the content and optional path.

required

Returns:

Type Description
Optional[PutJSONToFileResponse]

Response containing the path where content was written.

Raises:

Type Description
ValueError

If no path is provided and bucket name cannot be inferred.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def handle(self, request: PutJSONToFileRequest) -> Optional[PutJSONToFileResponse]:
    """Write JSON content to the specified path.

    Args:
        request (PutJSONToFileRequest): Request containing the content and optional path.

    Returns:
        Response containing the path where content was written.

    Raises:
        ValueError: If no path is provided and bucket name cannot be inferred.
    """
    path, content = request.path, request.content

    if path is None:
        bucket_name = get_env_var(DEFAULT_BUCKET_NAME_ENV_VAR, default_value=None)
        if bucket_name is None:
            raise ValueError(
                "No path provided and Could not infer bucket "
                f"name from {DEFAULT_BUCKET_NAME_ENV_VAR} environment variable"
            )
        path = S3URI.build(
            bucket_name=bucket_name,
            key=get_s3_scratch_key(
                content=content,
                unique_id=UniqueID(self.context.aws_request_id),
            ),
        )

    self.logger.info(f"Writing content to {path}")
    self.logger.info(f"Content to write: {content}")
    if isinstance(path, S3URI):
        self.logger.info("Uploading to S3")
        upload_json(content, s3_path=path, extra_args=SCRATCH_EXTRA_ARGS)
    else:
        self.logger.info("Writing to file")
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(json.dumps(content, indent=4, sort_keys=True))
    return PutJSONToFileResponse(path=path)

get_s3_scratch_key

get_s3_scratch_key(
    filename=None, content=None, unique_id=None
)

Generates a scratch file s3 key

The key is constructed from filename, content and unique ID.

If filename is not provided, a hexdigest is created from content (which will be random if content is None).

Parameters:

Name Type Description Default
filename Optional[str]

Optional name of file. If None, file hash is generated.

None
content Optional[JSON]

Optional content of file to put. Only used if filename is not provided. Defaults to None.

None
unique_id Optional[UniqueID]

A unique ID used in key generation. If None, a random UUID is generated.

None

Returns:

Type Description
S3Key

S3 Scratch key (not gauranteed to be empty)

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def get_s3_scratch_key(
    filename: Optional[str] = None,
    content: Optional[JSON] = None,
    unique_id: Optional[UniqueID] = None,
) -> S3Key:
    """Generates a scratch file s3 key

    The key is constructed from filename, content and unique ID.

    If filename is not provided, a hexdigest is created from content (which will
    be random if content is None).

    Args:
        filename (Optional[str]): Optional name of file.
            If None, file hash is generated.
        content (Optional[JSON]): Optional content of file to put.
            Only used if filename is not provided. Defaults to None.
        unique_id (Optional[UniqueID]): A unique ID used in key generation.
            If None, a random UUID is generated.

    Returns:
        S3 Scratch key (not gauranteed to be empty)
    """
    file_hash = filename or sha256_hexdigest(content=content)
    return S3Key(f"scratch/{unique_id or UniqueID.create()}/{file_hash}")