Skip to content

Data Sync File System

Handlers for file system operations.

Handlers

  • GetDataPathStatsHandler - Retrieves statistics about data paths
  • ListDataPathsHandler - Lists data paths
  • OutdatedDataPathScannerHandler - Scans for outdated data paths
  • RemoveDataPathsHandler - Removes data paths

File system operation handlers.

Provides Lambda handlers for file system operations including listing paths, getting statistics, scanning for outdated files, and removing paths.

GetDataPathStatsHandler dataclass

GetDataPathStatsHandler()

Bases: LambdaHandler[GetDataPathStatsRequest, GetDataPathStatsResponse]

Handler for retrieving statistics about a data path.

Returns size, modification time, and child path information.

handle

handle(request)

Get statistics for the specified path.

Parameters:

Name Type Description Default
request GetDataPathStatsRequest

Request containing the path to analyze.

required

Returns:

Type Description
GetDataPathStatsResponse

Response containing path statistics and child information.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def handle(self, request: GetDataPathStatsRequest) -> GetDataPathStatsResponse:
    """Get statistics for the specified path.

    Args:
        request (GetDataPathStatsRequest): Request containing the path to analyze.

    Returns:
        Response containing path statistics and child information.
    """
    root = get_file_system(request.path)
    node = root.node
    return GetDataPathStatsResponse(
        path=node.path,
        path_stats=node.path_stats,
        children={
            child_path: child_node.path_stats
            for child_path, child_node in node.children.items()
        },
    )

ListDataPathsHandler dataclass

ListDataPathsHandler()

Bases: LambdaHandler[ListDataPathsRequest, ListDataPathsResponse]

Handler for listing paths under a root directory.

Supports include and exclude patterns for filtering results.

handle

handle(request)

List all paths under the specified root.

Parameters:

Name Type Description Default
request ListDataPathsRequest

Request containing the root path and optional patterns.

required

Returns:

Type Description
ListDataPathsResponse

Response containing the list of matching paths.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def handle(self, request: ListDataPathsRequest) -> ListDataPathsResponse:
    """List all paths under the specified root.

    Args:
        request (ListDataPathsRequest): Request containing the root path and optional patterns.

    Returns:
        Response containing the list of matching paths.
    """
    root = get_file_system(request.path)
    paths: List[DataPath] = sorted([n.path for n in root.node.list_nodes()])

    if request.include_patterns or request.exclude_patterns:
        new_paths = []
        for path in paths:
            rel_path = strip_path_root(path, root.node.path)
            if request.include_patterns:
                if not any([i.match(rel_path) for i in request.include_patterns]):
                    continue
            if request.exclude_patterns:
                if any([i.match(rel_path) for i in request.exclude_patterns]):
                    continue
            new_paths.append(path)
        paths = new_paths
    return ListDataPathsResponse(paths=paths)

OutdatedDataPathScannerHandler dataclass

OutdatedDataPathScannerHandler()

Bases: LambdaHandler[OutdatedDataPathScannerRequest, OutdatedDataPathScannerResponse]

Handler for scanning and identifying outdated data paths.

Identifies stale files based on last access time while respecting minimum size thresholds to maintain EFS throughput performance.

handle

handle(request)

Scan for outdated paths to delete.

Uses a two-step process: 1. Identify stale nodes whose days_since_last_accessed exceeds the threshold. 2. Sort stale nodes oldest-first and select for deletion while maintaining the minimum size requirement.

Parameters:

Name Type Description Default
request OutdatedDataPathScannerRequest

Request containing scan parameters.

required

Returns:

Type Description
OutdatedDataPathScannerResponse

Response containing paths eligible for deletion.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def handle(self, request: OutdatedDataPathScannerRequest) -> OutdatedDataPathScannerResponse:
    """Scan for outdated paths to delete.

    Uses a two-step process:
    1. Identify stale nodes whose days_since_last_accessed exceeds the threshold.
    2. Sort stale nodes oldest-first and select for deletion while
       maintaining the minimum size requirement.

    Args:
        request (OutdatedDataPathScannerRequest): Request containing scan parameters.

    Returns:
        Response containing paths eligible for deletion.
    """
    fs = get_file_system(request.path)

    stale_nodes: List[Node] = []
    days_since_last_accessed = timedelta(days=request.days_since_last_accessed)
    unvisited_nodes: List[Node] = [fs.node]

    self.logger.info(
        f"Checking for nodes older than {request.days_since_last_accessed} days. "
        f"Max depth = {request.max_depth}"
    )
    # Step 1)
    while unvisited_nodes:
        node = unvisited_nodes.pop()
        if (request.current_time - node.last_modified) > days_since_last_accessed:
            if (
                request.min_depth is None
                or (node.depth - fs.node.depth) >= request.min_depth
                or not node.has_children()
            ):
                stale_nodes.append(node)
            else:
                unvisited_nodes.extend(node.children.values())
        elif request.max_depth is None or (node.depth - fs.node.depth) < request.max_depth:
            unvisited_nodes.extend(node.children.values())

    # Step 2)
    # Get the current size of the EFS volume, this is used to ensure we do not delete too
    # many files and allows us to maintain a minimum desired EFS throughput performance.
    # For more details see: https://docs.aws.amazon.com/efs/latest/ug/performance.html
    current_efs_size_bytes = fs.node.size_bytes
    paths_to_delete: List[str] = []

    # Sort so newest nodes are first, nodes are considered starting from the list end (oldest)
    nodes_to_delete = sorted(stale_nodes, key=lambda n: n.last_modified, reverse=True)
    while nodes_to_delete and current_efs_size_bytes > request.min_size_bytes_allowed:
        node = nodes_to_delete.pop()
        paths_to_delete.append(node.path)
        current_efs_size_bytes -= node.size_bytes

    return OutdatedDataPathScannerResponse(
        paths=sorted(paths_to_delete),
    )

RemoveDataPathsHandler dataclass

RemoveDataPathsHandler()

Bases: LambdaHandler[RemoveDataPathsRequest, RemoveDataPathsResponse]

Handler for removing data paths.

Supports removing local paths and EFS paths. S3 path removal is not currently supported.

handle

handle(request)

Remove the specified paths.

Parameters:

Name Type Description Default
request RemoveDataPathsRequest

Request containing the list of paths to remove.

required

Returns:

Type Description
RemoveDataPathsResponse

Response containing the total bytes removed and paths processed.

Source code in src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def handle(self, request: RemoveDataPathsRequest) -> RemoveDataPathsResponse:
    """Remove the specified paths.

    Args:
        request (RemoveDataPathsRequest): Request containing the list of paths to remove.

    Returns:
        Response containing the total bytes removed and paths processed.
    """
    self.logger.info(f"Removing {len(request.paths)}")

    mount_points = None
    size_bytes_removed = 0
    paths_removed = []
    for path in request.paths:
        if isinstance(path, S3URI):
            # # TODO: add support for S3URI when more guardrails are in place
            # path_stats = get_s3_path_stats(path)
            # delete_s3_path(path)
            # size_bytes_removed += path_stats.size_bytes
            self.logger.warning(f"Skipping S3URI path deletion ({path}). Not supported yet.")
        else:
            if isinstance(path, EFSPath):
                self.logger.info(f"Converting EFSPath ({path}) to local path")
                if mount_points is None:
                    mount_points = detect_mount_points()
                path = get_local_path(efs_path=path, mount_points=mount_points)
            elif not isinstance(path, Path):
                path = Path(path)
            try:
                size_bytes = get_path_size_bytes(path)
                self.logger.info(f"Removing {path} (size {size_bytes} bytes)")
                remove_path(path)
                size_bytes_removed += size_bytes
                paths_removed.append(path)
            except FileNotFoundError as e:
                self.logger.warning(f"File at {path} does not exist anymore. Reason: {e}")
    return RemoveDataPathsResponse(size_bytes_removed, paths_removed)