Step Functions Constructs¶
State machines, fragments, and custom states for AWS Step Functions.
Submodules¶
Fragments¶
Reusable state machine fragments:
fragments
¶
Classes¶
EnvBaseStateMachineFragment
¶
Bases: StateMachineFragment, StateMachineMixins
Environment-aware state machine fragment.
Combines StateMachineFragment with environment base naming conventions and state machine mixins.
Initialize an environment-aware state machine fragment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
The construct scope. |
required |
id
|
str
|
The construct ID. |
required |
env_base
|
EnvBase
|
Environment base for resource naming. |
required |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
Attributes¶
required_managed_policies
property
¶
Get required managed policies for this fragment.
Returns:
| Type | Description |
|---|---|
Sequence[IManagedPolicy | str]
|
Sequence of required managed policies. |
required_inline_policy_statements
property
¶
Get required inline policy statements for this fragment.
Returns:
| Type | Description |
|---|---|
Sequence[PolicyStatement]
|
Sequence of required policy statements. |
Functions¶
to_single_state
¶
to_single_state(
*,
prefix_states: str | None = None,
state_id: str | None = None,
comment: str | None = None,
input_path: str | None = None,
output_path: str | None = "$[0]",
result_path: str | None = None,
result_selector: Mapping[str, Any] | None = None,
arguments: Mapping[str, Any] | None = None,
parameters: Mapping[str, Any] | None = None,
query_language: QueryLanguage | None = None,
state_name: str | None = None,
assign: Mapping[str, Any] | None = None,
outputs: Any = None
) -> Parallel
Convert the fragment to a single parallel state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix_states
|
Optional[str]
|
Prefix for state names. |
None
|
state_id
|
Optional[str]
|
ID for the parallel state. |
None
|
comment
|
Optional[str]
|
Comment for the state. |
None
|
input_path
|
Optional[str]
|
Input path. |
None
|
output_path
|
Optional[str]
|
Output path. Defaults to "$[0]". |
'$[0]'
|
result_path
|
Optional[str]
|
Result path. |
None
|
result_selector
|
Optional[Mapping[str, Any]]
|
Result selector. |
None
|
Returns:
| Type | Description |
|---|---|
Parallel
|
A parallel state containing the fragment. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
to_state_machine
¶
to_state_machine(
state_machine_name: str,
role: Role | None = None,
logs: LogOptions | None = None,
timeout: Duration | None = None,
) -> StateMachine
Convert the fragment to a complete state machine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_machine_name
|
str
|
Name for the state machine. |
required |
role
|
Optional[Role]
|
IAM role. Creates default if None. |
None
|
logs
|
Optional[LogOptions]
|
Log options. |
None
|
timeout
|
Optional[Duration]
|
Execution timeout. |
None
|
Returns:
| Type | Description |
|---|---|
StateMachine
|
The created state machine. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
Modules¶
base
¶
Step Functions state machine fragment constructs.
This module provides base classes and utilities for building Step Functions state machine fragments and workflows.
Classes¶
StateMachineMixins
¶
Bases: EnvBaseConstructMixins
Mixin class providing state machine helper methods.
Provides methods for retrieving Lambda functions and state machines with caching support.
get_fn
¶Get a Lambda function by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
function_name
|
str
|
The function name. |
required |
Returns:
| Type | Description |
|---|---|
IFunction
|
The Lambda function interface. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
get_state_machine_from_name
¶Get a state machine by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_machine_name
|
str
|
The state machine name. |
required |
Returns:
| Type | Description |
|---|---|
IStateMachine
|
The state machine interface. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
StateMachineFragment
¶
Bases: StateMachineFragment
Base class for state machine fragments.
Provides common functionality for building reusable state machine fragments with definition management.
definition
property
writable
¶Get the state machine definition.
Returns:
| Type | Description |
|---|---|
IChainable
|
The chainable definition. |
start_state
property
¶Get the start state.
Returns:
| Type | Description |
|---|---|
State
|
The definition's start state. |
end_states
property
¶Get the end states.
Returns:
| Type | Description |
|---|---|
list[INextable]
|
List of nextable end states. |
enclose
¶enclose(
id: str | None = None,
input_path: str | None = None,
result_path: str | None = None,
) -> Chain
Enclose the fragment within a parallel state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
Optional[str]
|
Identifier for the parallel state. Defaults to the node ID. |
None
|
input_path
|
Optional[str]
|
Input path for the enclosed state. Defaults to "$". |
None
|
result_path
|
Optional[str]
|
Result path for output. Defaults to same as input_path. |
None
|
Returns:
| Type | Description |
|---|---|
Chain
|
The enclosed state machine fragment as a chain. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
EnvBaseStateMachineFragment
¶
Bases: StateMachineFragment, StateMachineMixins
Environment-aware state machine fragment.
Combines StateMachineFragment with environment base naming conventions and state machine mixins.
Initialize an environment-aware state machine fragment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
The construct scope. |
required |
id
|
str
|
The construct ID. |
required |
env_base
|
EnvBase
|
Environment base for resource naming. |
required |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
required_managed_policies
property
¶Get required managed policies for this fragment.
Returns:
| Type | Description |
|---|---|
Sequence[IManagedPolicy | str]
|
Sequence of required managed policies. |
required_inline_policy_statements
property
¶Get required inline policy statements for this fragment.
Returns:
| Type | Description |
|---|---|
Sequence[PolicyStatement]
|
Sequence of required policy statements. |
to_single_state
¶to_single_state(
*,
prefix_states: str | None = None,
state_id: str | None = None,
comment: str | None = None,
input_path: str | None = None,
output_path: str | None = "$[0]",
result_path: str | None = None,
result_selector: Mapping[str, Any] | None = None,
arguments: Mapping[str, Any] | None = None,
parameters: Mapping[str, Any] | None = None,
query_language: QueryLanguage | None = None,
state_name: str | None = None,
assign: Mapping[str, Any] | None = None,
outputs: Any = None
) -> Parallel
Convert the fragment to a single parallel state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix_states
|
Optional[str]
|
Prefix for state names. |
None
|
state_id
|
Optional[str]
|
ID for the parallel state. |
None
|
comment
|
Optional[str]
|
Comment for the state. |
None
|
input_path
|
Optional[str]
|
Input path. |
None
|
output_path
|
Optional[str]
|
Output path. Defaults to "$[0]". |
'$[0]'
|
result_path
|
Optional[str]
|
Result path. |
None
|
result_selector
|
Optional[Mapping[str, Any]]
|
Result selector. |
None
|
Returns:
| Type | Description |
|---|---|
Parallel
|
A parallel state containing the fragment. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
to_state_machine
¶to_state_machine(
state_machine_name: str,
role: Role | None = None,
logs: LogOptions | None = None,
timeout: Duration | None = None,
) -> StateMachine
Convert the fragment to a complete state machine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_machine_name
|
str
|
Name for the state machine. |
required |
role
|
Optional[Role]
|
IAM role. Creates default if None. |
None
|
logs
|
Optional[LogOptions]
|
Log options. |
None
|
timeout
|
Optional[Duration]
|
Execution timeout. |
None
|
Returns:
| Type | Description |
|---|---|
StateMachine
|
The created state machine. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
LazyLoadStateMachineFragment
¶
Bases: EnvBaseStateMachineFragment
State machine fragment with lazy-loaded definition.
The definition is built on first access via build_definition().
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
definition
property
writable
¶Get the state machine definition, building if needed.
Returns:
| Type | Description |
|---|---|
IChainable
|
The chainable definition. |
build_definition
abstractmethod
¶Build the state machine definition.
Subclasses must implement this method.
Returns:
| Type | Description |
|---|---|
IChainable
|
The built definition. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
TaskWithPrePostStatus
¶
Bases: LazyLoadStateMachineFragment
State machine fragment with pre/post status tracking.
Wraps a task with status updates for started, failed, and completed states.
Initialize a task with pre/post status tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
The construct scope. |
required |
id
|
str
|
The construct ID. |
required |
env_base
|
EnvBase
|
Environment base for resource naming. |
required |
task
|
Optional[IChainable]
|
The task to wrap. |
required |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
task
property
writable
¶Get the wrapped task.
Returns:
| Type | Description |
|---|---|
IChainable
|
The task. |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If task is not set. |
task_name
property
writable
¶Get the task name.
Returns:
| Type | Description |
|---|---|
str
|
The task name. |
task__augment_input
property
¶Run right after the sfn.Pass 'START' of the state machine fragment. Can be used to dynamically fill out 'contexts' JsonReferencePath for subsequent lambdas/tasks to use. NOTE: Outputs of this chain are MAINTAINED
task__status_started
property
¶Runs right before "task__pre_run" if defined. Otherwise runs right before main task executes. NOTE: Outputs within this chain get DISCARDED
task__status_completed
property
¶Runs right after "task__post_run" if defined. Otherwise runs right after main task is completed. NOTE: Outputs within this chain get DISCARDED
task__status_failed
property
¶Runs if main task fails during execution
task__pre_run
property
¶Runs right before main task executes. NOTE: Outputs within this chain get DISCARDED
task__post_run
property
¶Runs right after main task is completed NOTE: Outputs within this chain get DISCARDED
build_definition
¶Build the task definition with status tracking.
Returns:
| Type | Description |
|---|---|
IChainable
|
The complete task definition with pre/post status states. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 | |
Functions¶
create_log_options
¶
create_log_options(
scope: Construct,
id: str,
env_base: EnvBase,
removal_policy: RemovalPolicy | None = None,
retention: RetentionDays | None = None,
) -> LogOptions
Create log options for a state machine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
The construct scope. |
required |
id
|
str
|
Identifier for the log group. |
required |
env_base
|
EnvBase
|
Environment base for naming. |
required |
removal_policy
|
Optional[RemovalPolicy]
|
Removal policy. Defaults to DESTROY. |
None
|
retention
|
Optional[RetentionDays]
|
Log retention period. Defaults to ONE_MONTH. |
None
|
Returns:
| Type | Description |
|---|---|
LogOptions
|
Log options configured with a CloudWatch log group. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
create_role
¶
create_role(
scope: Construct,
id: str,
env_base: EnvBase,
assumed_by: IPrincipal = ServicePrincipal(
"states.amazonaws.com"
),
managed_policies: Sequence[IManagedPolicy | str]
| None = None,
inline_policies: Mapping[str, PolicyDocument]
| None = None,
inline_policies_from_statements: Mapping[
str, Sequence[PolicyStatement]
]
| None = None,
include_default_managed_policies: bool = True,
) -> Role
Create an IAM role for a state machine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
The construct scope. |
required |
id
|
str
|
Identifier for the role. |
required |
env_base
|
EnvBase
|
Environment base for naming. |
required |
assumed_by
|
IPrincipal
|
Principal that can assume the role. Defaults to states.amazonaws.com. |
ServicePrincipal('states.amazonaws.com')
|
managed_policies
|
Optional[Sequence[Union[IManagedPolicy, str]]]
|
Managed policies to attach. |
None
|
inline_policies
|
Optional[Mapping[str, PolicyDocument]]
|
Inline policy documents. |
None
|
inline_policies_from_statements
|
Optional[Mapping[str, Sequence[PolicyStatement]]]
|
Inline policies from statements. |
None
|
include_default_managed_policies
|
bool
|
Include default Step Functions managed policies. Defaults to True. |
True
|
Returns:
| Type | Description |
|---|---|
Role
|
The created IAM role. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
create_state_machine
¶
create_state_machine(
scope: Construct,
env_base: EnvBase,
id: str,
name: str | None,
definition: IChainable,
role: Role | None = None,
logs: LogOptions | None = None,
timeout: Duration | None = None,
) -> StateMachine
Create a state machine from a definition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
The construct scope. |
required |
env_base
|
EnvBase
|
Environment base for naming. |
required |
id
|
str
|
Construct identifier. |
required |
name
|
Optional[str]
|
State machine name. |
required |
definition
|
IChainable
|
The state machine definition. |
required |
role
|
Optional[Role]
|
IAM role for the state machine. |
None
|
logs
|
Optional[LogOptions]
|
Log options. |
None
|
timeout
|
Optional[Duration]
|
Execution timeout. |
None
|
Returns:
| Type | Description |
|---|---|
StateMachine
|
The created state machine. |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
informatics
¶
Classes¶
BatchInvokedExecutorFragment
¶
BatchInvokedExecutorFragment(
scope: Construct,
id: str,
env_base: EnvBase,
name: str,
image: str,
executor: str,
job_queue: str,
bucket_name: str,
key_prefix: str | None = None,
payload_path: str | None = None,
environment: Mapping[str, str] | str | None = None,
memory: int | str | None = None,
vcpus: int | str | None = None,
mount_point_configs: list[MountPointConfiguration]
| None = None,
mount_points: list[MountPointTypeDef] | None = None,
volumes: list[VolumeTypeDef] | None = None,
platform_capabilities: list[Literal["EC2", "FARGATE"]]
| str
| None = None,
job_role_arn: str | None = None,
)
Bases: BatchInvokedBaseFragment, AWSBatchMixins
Invoke an executor in an image via batch with a payload from s3
This targets any subclassing of aibs_informatics_core.executors.base.ExecutorBase
- https://github.com/AllenInstitute/aibs-informatics-core/blob/main/src/aibs_informatics_core/executors/base.py
This fragment creates a state machine fragment that
- Puts a payload to s3
- Submits a batch job
- Gets the response from s3
The payload is written to s3://
IMPORTANT
- Batch job queue / compute environment must have permissions to read/write to the bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
name
|
str
|
Name of the lambda function. This can be a reference path (e.g. "$.name") |
required |
image
|
str
|
Image URI or name. This can be a reference path (e.g. "$.image") |
required |
executor
|
str
|
qualified name of executor class. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler") |
required |
job_queue
|
str
|
Job queue to submit job to. This can be a reference path (e.g. "$.job_queue") |
required |
bucket_name
|
str
|
S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name") |
required |
key_prefix
|
str | None
|
Key prefix to write payload to and read response from. If not provided, |
None
|
payload_path
|
str | None
|
Optionally specify the reference path of the event payload. Defaults to "$". |
None
|
command
|
List[str] | str | None
|
Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used. |
required |
environment
|
Mapping[str, str] | str | None
|
environment variables to specify. This can be a reference path (e.g. "$.environment") |
None
|
memory
|
int | str | None
|
Memory in MiB (either int or reference path str). Defaults to None. |
None
|
vcpus
|
int | str | None
|
Number of vCPUs (either int or reference path str). Defaults to None. |
None
|
mount_points
|
List[MountPointTypeDef] | None
|
List of mount points to add to state machine. Defaults to None. |
None
|
volumes
|
List[VolumeTypeDef] | None
|
List of volumes to add to state machine. Defaults to None. |
None
|
platform_capabilities
|
List[Literal['EC2', 'FARGATE']] | str | None
|
platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities") |
None
|
job_role_arn
|
str | None
|
Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn") |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 | |
BatchInvokedLambdaFunction
¶
BatchInvokedLambdaFunction(
scope: Construct,
id: str,
env_base: EnvBase,
name: str,
image: str,
handler: str,
job_queue: str,
bucket_name: str,
key_prefix: str | None = None,
payload_path: str | None = None,
command: list[str] | str | None = None,
environment: Mapping[str, str] | None = None,
memory: int | str | None = None,
vcpus: int | str | None = None,
gpu: int | str | None = None,
mount_points: list[MountPointTypeDef]
| str
| None = None,
volumes: list[VolumeTypeDef] | str | None = None,
mount_point_configs: list[MountPointConfiguration]
| None = None,
platform_capabilities: list[Literal["EC2", "FARGATE"]]
| str
| None = None,
job_role_arn: str | None = None,
)
Bases: BatchInvokedBaseFragment, AWSBatchMixins
Invoke a command on image via batch with a payload from s3
This fragment creates a state machine fragment that
- Puts a payload to s3
- Submits a batch job
- Gets the response from s3
The payload is written to s3://
The batch job will be fed the following environment variables
- AWS_LAMBDA_FUNCTION_NAME: name of lambda function
- AWS_LAMBDA_FUNCTION_HANDLER: handler of lambda function
- AWS_LAMBDA_EVENT_PAYLOAD: The s3 location of the event payload
- AWS_LAMBDA_EVENT_RESPONSE_LOCATION: The s3 location to write the response to
IMPORTANT
- Batch job queue / compute environment must have permissions to read/write to the bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
name
|
str
|
Name of the lambda function. This can be a reference path (e.g. "$.name") |
required |
image
|
str
|
Image URI or name. This can be a reference path (e.g. "$.image") |
required |
handler
|
str
|
handler of lambda function. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler") |
required |
job_queue
|
str
|
Job queue to submit job to. This can be a reference path (e.g. "$.job_queue") |
required |
bucket_name
|
str
|
S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name") |
required |
key_prefix
|
str | None
|
Key prefix to write payload to and read response from. If not provided, |
None
|
payload_path
|
str | None
|
Optionally specify the reference path of the event payload. Defaults to "$". |
None
|
command
|
List[str] | str | None
|
Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used. |
None
|
environment
|
Mapping[str, str] | None
|
Additional environment variables to specify. These are added to default environment variables. |
None
|
memory
|
int | str | None
|
Memory in MiB (either int or reference path str). Defaults to None. |
None
|
vcpus
|
int | str | None
|
Number of vCPUs (either int or reference path str). Defaults to None. |
None
|
gpu
|
int | str | None
|
Number of GPUs (either int or reference path str). Defaults to None. |
None
|
mount_points
|
List[MountPointTypeDef] | None
|
List of mount points to add to state machine. Defaults to None. |
None
|
volumes
|
List[VolumeTypeDef] | None
|
List of volumes to add to state machine. Defaults to None. |
None
|
platform_capabilities
|
List[Literal['EC2', 'FARGATE']] | str | None
|
platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities") |
None
|
job_role_arn
|
str | None
|
Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn") |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | |
DataSyncFragment
¶
DataSyncFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
batch_job_role: Role | str | None = None,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
)
Bases: BatchInvokedBaseFragment, EnvBaseConstructMixins
Sync data from one s3 bucket to another
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
DockerImageAsset | str
|
Docker image asset or image uri str for the aibs informatics aws lambda |
required |
batch_job_queue
|
JobQueue | str
|
Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
scaffolding_bucket
|
Bucket
|
Primary bucket used for request/response json blobs used in the batch invoked lambda function. |
required |
batch_job_role
|
Optional[IRole | str]
|
Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used. |
None
|
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
List of mount point configurations to use. These can be overridden in the payload. |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
DistributedDataSyncFragment
¶
DistributedDataSyncFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
batch_job_role: str | Role | None = None,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
)
Bases: BatchInvokedBaseFragment
Sync data from one s3 bucket to another using distributed batch jobs
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
DockerImageAsset | str
|
Docker image asset or image uri str for the aibs informatics aws lambda |
required |
batch_job_queue
|
JobQueue | str
|
Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
scaffolding_bucket
|
Bucket
|
Primary bucket used for request/response json blobs used in the batch invoked lambda function. |
required |
batch_job_role
|
Optional[IRole | str]
|
Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used. |
None
|
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
List of mount point configurations to use. These can be overridden in the payload. |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | |
DemandExecutionFragment
¶
DemandExecutionFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
scaffolding_bucket: Bucket,
scaffolding_job_queue: JobQueue | str,
batch_invoked_lambda_state_machine: StateMachine,
data_sync_state_machine: StateMachine,
shared_mount_point_config: MountPointConfiguration
| None,
scratch_mount_point_config: MountPointConfiguration
| None,
tmp_mount_point_config: MountPointConfiguration
| None = None,
context_manager_configuration: dict[str, Any]
| None = None,
tags: dict[str, str] | None = None,
)
Bases: EnvBaseStateMachineFragment, EnvBaseConstructMixins
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | |
demand_execution_normalize_tags_chain
¶Merge build and runtime tags.
Chain Assumptions/Expectations: - input is the demand execution request - output is the demand execution request with merged tags
If runtime tags are not provided, only build‑time tags are used.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tags
|
Optional[Dict[str, str]]
|
build‑time tags |
required |
Returns: sfn.IChainable: the state machine fragment
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
CleanFileSystemFragment
¶
CleanFileSystemFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
memory: int = 1024,
vcpus: int = 1,
)
Bases: BatchInvokedBaseFragment
Clean up the file system by scanning for outdated data paths and removing them
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
DockerImageAsset | str
|
Docker image asset or image uri str for the aibs informatics aws lambda |
required |
batch_job_queue
|
JobQueue | str
|
Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
primary_bucket
|
Bucket
|
Primary bucket used for request/response json blobs used in the batch invoked lambda function. |
required |
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
List of mount point configurations to use. These can be overridden in the payload. |
None
|
memory
|
int
|
memory needed. Defaults to 1024. This memory value is used for both the outdated path scanner and removal of data paths. |
1024
|
vcpus
|
int
|
vcpus needed. Defaults to 1. This memory value is used for both the outdated path scanner and removal of data paths. |
1
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
Modules¶
batch
¶
BatchInvokedLambdaFunction
¶BatchInvokedLambdaFunction(
scope: Construct,
id: str,
env_base: EnvBase,
name: str,
image: str,
handler: str,
job_queue: str,
bucket_name: str,
key_prefix: str | None = None,
payload_path: str | None = None,
command: list[str] | str | None = None,
environment: Mapping[str, str] | None = None,
memory: int | str | None = None,
vcpus: int | str | None = None,
gpu: int | str | None = None,
mount_points: list[MountPointTypeDef]
| str
| None = None,
volumes: list[VolumeTypeDef] | str | None = None,
mount_point_configs: list[MountPointConfiguration]
| None = None,
platform_capabilities: list[Literal["EC2", "FARGATE"]]
| str
| None = None,
job_role_arn: str | None = None,
)
Bases: BatchInvokedBaseFragment, AWSBatchMixins
Invoke a command on image via batch with a payload from s3
This fragment creates a state machine fragment that
- Puts a payload to s3
- Submits a batch job
- Gets the response from s3
The payload is written to s3://
The batch job will be fed the following environment variables
- AWS_LAMBDA_FUNCTION_NAME: name of lambda function
- AWS_LAMBDA_FUNCTION_HANDLER: handler of lambda function
- AWS_LAMBDA_EVENT_PAYLOAD: The s3 location of the event payload
- AWS_LAMBDA_EVENT_RESPONSE_LOCATION: The s3 location to write the response to
IMPORTANT
- Batch job queue / compute environment must have permissions to read/write to the bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
name
|
str
|
Name of the lambda function. This can be a reference path (e.g. "$.name") |
required |
image
|
str
|
Image URI or name. This can be a reference path (e.g. "$.image") |
required |
handler
|
str
|
handler of lambda function. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler") |
required |
job_queue
|
str
|
Job queue to submit job to. This can be a reference path (e.g. "$.job_queue") |
required |
bucket_name
|
str
|
S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name") |
required |
key_prefix
|
str | None
|
Key prefix to write payload to and read response from. If not provided, |
None
|
payload_path
|
str | None
|
Optionally specify the reference path of the event payload. Defaults to "$". |
None
|
command
|
List[str] | str | None
|
Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used. |
None
|
environment
|
Mapping[str, str] | None
|
Additional environment variables to specify. These are added to default environment variables. |
None
|
memory
|
int | str | None
|
Memory in MiB (either int or reference path str). Defaults to None. |
None
|
vcpus
|
int | str | None
|
Number of vCPUs (either int or reference path str). Defaults to None. |
None
|
gpu
|
int | str | None
|
Number of GPUs (either int or reference path str). Defaults to None. |
None
|
mount_points
|
List[MountPointTypeDef] | None
|
List of mount points to add to state machine. Defaults to None. |
None
|
volumes
|
List[VolumeTypeDef] | None
|
List of volumes to add to state machine. Defaults to None. |
None
|
platform_capabilities
|
List[Literal['EC2', 'FARGATE']] | str | None
|
platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities") |
None
|
job_role_arn
|
str | None
|
Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn") |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | |
BatchInvokedExecutorFragment
¶BatchInvokedExecutorFragment(
scope: Construct,
id: str,
env_base: EnvBase,
name: str,
image: str,
executor: str,
job_queue: str,
bucket_name: str,
key_prefix: str | None = None,
payload_path: str | None = None,
environment: Mapping[str, str] | str | None = None,
memory: int | str | None = None,
vcpus: int | str | None = None,
mount_point_configs: list[MountPointConfiguration]
| None = None,
mount_points: list[MountPointTypeDef] | None = None,
volumes: list[VolumeTypeDef] | None = None,
platform_capabilities: list[Literal["EC2", "FARGATE"]]
| str
| None = None,
job_role_arn: str | None = None,
)
Bases: BatchInvokedBaseFragment, AWSBatchMixins
Invoke an executor in an image via batch with a payload from s3
This targets any subclassing of aibs_informatics_core.executors.base.ExecutorBase
- https://github.com/AllenInstitute/aibs-informatics-core/blob/main/src/aibs_informatics_core/executors/base.py
This fragment creates a state machine fragment that
- Puts a payload to s3
- Submits a batch job
- Gets the response from s3
The payload is written to s3://
IMPORTANT
- Batch job queue / compute environment must have permissions to read/write to the bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
name
|
str
|
Name of the lambda function. This can be a reference path (e.g. "$.name") |
required |
image
|
str
|
Image URI or name. This can be a reference path (e.g. "$.image") |
required |
executor
|
str
|
qualified name of executor class. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler") |
required |
job_queue
|
str
|
Job queue to submit job to. This can be a reference path (e.g. "$.job_queue") |
required |
bucket_name
|
str
|
S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name") |
required |
key_prefix
|
str | None
|
Key prefix to write payload to and read response from. If not provided, |
None
|
payload_path
|
str | None
|
Optionally specify the reference path of the event payload. Defaults to "$". |
None
|
command
|
List[str] | str | None
|
Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used. |
required |
environment
|
Mapping[str, str] | str | None
|
environment variables to specify. This can be a reference path (e.g. "$.environment") |
None
|
memory
|
int | str | None
|
Memory in MiB (either int or reference path str). Defaults to None. |
None
|
vcpus
|
int | str | None
|
Number of vCPUs (either int or reference path str). Defaults to None. |
None
|
mount_points
|
List[MountPointTypeDef] | None
|
List of mount points to add to state machine. Defaults to None. |
None
|
volumes
|
List[VolumeTypeDef] | None
|
List of volumes to add to state machine. Defaults to None. |
None
|
platform_capabilities
|
List[Literal['EC2', 'FARGATE']] | str | None
|
platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities") |
None
|
job_role_arn
|
str | None
|
Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn") |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 | |
data_sync
¶
DataSyncFragment
¶DataSyncFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
batch_job_role: Role | str | None = None,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
)
Bases: BatchInvokedBaseFragment, EnvBaseConstructMixins
Sync data from one s3 bucket to another
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
DockerImageAsset | str
|
Docker image asset or image uri str for the aibs informatics aws lambda |
required |
batch_job_queue
|
JobQueue | str
|
Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
scaffolding_bucket
|
Bucket
|
Primary bucket used for request/response json blobs used in the batch invoked lambda function. |
required |
batch_job_role
|
Optional[IRole | str]
|
Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used. |
None
|
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
List of mount point configurations to use. These can be overridden in the payload. |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
DistributedDataSyncFragment
¶DistributedDataSyncFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
batch_job_role: str | Role | None = None,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
)
Bases: BatchInvokedBaseFragment
Sync data from one s3 bucket to another using distributed batch jobs
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
DockerImageAsset | str
|
Docker image asset or image uri str for the aibs informatics aws lambda |
required |
batch_job_queue
|
JobQueue | str
|
Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
scaffolding_bucket
|
Bucket
|
Primary bucket used for request/response json blobs used in the batch invoked lambda function. |
required |
batch_job_role
|
Optional[IRole | str]
|
Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used. |
None
|
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
List of mount point configurations to use. These can be overridden in the payload. |
None
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | |
demand_execution
¶
DemandExecutionFragment
¶DemandExecutionFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
scaffolding_bucket: Bucket,
scaffolding_job_queue: JobQueue | str,
batch_invoked_lambda_state_machine: StateMachine,
data_sync_state_machine: StateMachine,
shared_mount_point_config: MountPointConfiguration
| None,
scratch_mount_point_config: MountPointConfiguration
| None,
tmp_mount_point_config: MountPointConfiguration
| None = None,
context_manager_configuration: dict[str, Any]
| None = None,
tags: dict[str, str] | None = None,
)
Bases: EnvBaseStateMachineFragment, EnvBaseConstructMixins
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | |
demand_execution_normalize_tags_chain
¶Merge build and runtime tags.
Chain Assumptions/Expectations: - input is the demand execution request - output is the demand execution request with merged tags
If runtime tags are not provided, only build‑time tags are used.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tags
|
Optional[Dict[str, str]]
|
build‑time tags |
required |
Returns: sfn.IChainable: the state machine fragment
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
efs
¶
CleanFileSystemFragment
¶CleanFileSystemFragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
memory: int = 1024,
vcpus: int = 1,
)
Bases: BatchInvokedBaseFragment
Clean up the file system by scanning for outdated data paths and removing them
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
id |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
DockerImageAsset | str
|
Docker image asset or image uri str for the aibs informatics aws lambda |
required |
batch_job_queue
|
JobQueue | str
|
Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
primary_bucket
|
Bucket
|
Primary bucket used for request/response json blobs used in the batch invoked lambda function. |
required |
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
List of mount point configurations to use. These can be overridden in the payload. |
None
|
memory
|
int
|
memory needed. Defaults to 1024. This memory value is used for both the outdated path scanner and removal of data paths. |
1024
|
vcpus
|
int
|
vcpus needed. Defaults to 1. This memory value is used for both the outdated path scanner and removal of data paths. |
1
|
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
get_data_path_stats_fragment
¶get_data_path_stats_fragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
memory: int = 1024,
vcpus: int = 1,
) -> BatchInvokedLambdaFunction
Returns a BatchInvokedLambdaFunction fragment for getting data path stats of EFS/S3 path
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
scope |
required |
id
|
str
|
id of the fragment |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
Union[DockerImageAsset, str]
|
docker image asset or image uri that has the get_data_path_stats_handler function |
required |
batch_job_queue
|
Union[JobQueue, str]
|
default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
scaffolding_bucket
|
Bucket
|
primary bucket used for request/response json blobs used in |
required |
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
Default EFS volumes to mount. Defaults to None. |
None
|
memory
|
int
|
memory needed. Defaults to 1024. |
1024
|
vcpus
|
int
|
vcpus needed. Defaults to 1. |
1
|
Returns:
| Type | Description |
|---|---|
BatchInvokedLambdaFunction
|
BatchInvokedLambdaFunction fragment for getting data path stats |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
outdated_data_path_scanner_fragment
¶outdated_data_path_scanner_fragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
memory: int = 1024,
vcpus: int = 1,
) -> BatchInvokedLambdaFunction
Returns a BatchInvokedLambdaFunction fragment for scanning outdated data paths of EFS/S3 path root
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
scope |
required |
id
|
str
|
id of the fragment |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
Union[DockerImageAsset, str]
|
docker image asset or image uri that has the outdated_data_path_scanner_handler function |
required |
batch_job_queue
|
Union[JobQueue, str]
|
default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
scaffolding_bucket
|
Bucket
|
primary bucket used for request/response json blobs used in |
required |
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
Default EFS volumes to mount. Defaults to None. |
None
|
memory
|
int
|
memory needed. Defaults to 1024. |
1024
|
vcpus
|
int
|
vcpus needed. Defaults to 1. |
1
|
Returns:
| Type | Description |
|---|---|
BatchInvokedLambdaFunction
|
BatchInvokedLambdaFunction fragment for scanning outdated data paths |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
remove_data_paths_fragment
¶remove_data_paths_fragment(
scope: Construct,
id: str,
env_base: EnvBase,
aibs_informatics_docker_asset: DockerImageAsset | str,
batch_job_queue: JobQueue | str,
scaffolding_bucket: Bucket,
mount_point_configs: Iterable[MountPointConfiguration]
| None = None,
memory: int = 1024,
vcpus: int = 1,
) -> BatchInvokedLambdaFunction
Returns a BatchInvokedLambdaFunction fragment for removing data paths (EFS / S3) during execution of a Step Function
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
scope |
required |
id
|
str
|
id of the fragment |
required |
env_base
|
EnvBase
|
env base |
required |
aibs_informatics_docker_asset
|
Union[DockerImageAsset, str]
|
docker image asset or image uri that has the remove_data_paths_handler function |
required |
batch_job_queue
|
Union[JobQueue, str]
|
default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload. |
required |
scaffolding_bucket
|
Bucket
|
primary bucket used for request/response json blobs used in |
required |
mount_point_configs
|
Optional[Iterable[MountPointConfiguration]]
|
Default EFS volumes to mount. Defaults to None. |
None
|
memory
|
int
|
memory needed. Defaults to 1024. |
1024
|
vcpus
|
int
|
vcpus needed. Defaults to 1. |
1
|
Returns:
| Type | Description |
|---|---|
BatchInvokedLambdaFunction
|
BatchInvokedLambdaFunction fragment for removing data paths |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
lambda_
¶
Classes¶
LambdaFunctionFragment
¶
Bases: EnvBaseStateMachineFragment
Creates a single task state machine fragment for a generic lambda function
[Pass] # Start State. Augments input.
|| #
[lambda fn]
(x) // \ (o) #
[Fail] || # Catch state for failed lambda execution
|| #
[Pass] # End state. Reforms output of lambda as sfn output
Returns:
| Type | Description |
|---|---|
None
|
The state machine fragment |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/lambda_.py
States¶
Custom Step Function states:
states
¶
Modules¶
batch
¶
Classes¶
BatchOperation
¶
register_job_definition
classmethod
¶register_job_definition(
scope: Construct,
id: str,
command: list[str] | str | None,
image: str,
job_definition_name: str,
job_role_arn: str | None = None,
environment: Mapping[str, str] | str | None = None,
memory: int | str | None = None,
vcpus: int | str | None = None,
gpu: int | str | None = None,
mount_points: list[MountPointTypeDef]
| str
| None = None,
volumes: list[VolumeTypeDef] | str | None = None,
platform_capabilities: list[Literal["EC2", "FARGATE"]]
| str
| None = None,
result_path: str | None = "$",
output_path: str | None = "$",
) -> Chain
Creates chain to register new job definition
Following parameters support reference paths: - command - image - job_definition_name - environment - memory - vcpus - gpu
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
scope |
required |
id
|
str
|
ID prefix |
required |
command
|
Union[List[str], str]
|
List of strings or string representing command to run Supports reference paths (e.g. "$.foo.bar") |
required |
image
|
str
|
image URI or name. Supports reference paths (e.g. "$.foo.bar") |
required |
job_definition_name
|
str
|
name of job definition. Supports reference paths (e.g. "$.foo.bar") |
required |
job_role_arn
|
Optional[str]
|
Optional job role arn to use for the job. Supports reference paths (e.g. "$.foo.bar") |
None
|
environment
|
Optional[Union[Mapping[str, str], str]]
|
Optional environment variables. Supports reference paths both as individual values as well as for the entire list of variables. However, if a reference path is used for the entire list, the list must be a list of mappings with Name/Value keys". |
None
|
memory
|
Optional[Union[int, str]]
|
Optionally specify memory. Supports reference paths (e.g. "$.foo.bar") |
None
|
vcpus
|
Optional[Union[int, str]]
|
Optionally specify . Defaults to None. |
None
|
gpu
|
Optional[Union[int, str]]
|
Optionally specify GPU. Defaults to None. |
None
|
mount_points
|
Optional[List[MountPointTypeDef]]
|
Optionally specify mount points. Defaults to None. |
None
|
volumes
|
Optional[List[VolumeTypeDef]]
|
Optionally specify volumes. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain: chain to register job definition |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/batch.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | |
common
¶
Classes¶
CommonOperation
¶
merge_defaults
classmethod
¶merge_defaults(
scope: Construct,
id: str,
defaults: dict[str, Any],
input_path: str = "$",
target_path: str = "$",
result_path: str | None = None,
order_of_preference: Literal[
"target", "default"
] = "target",
check_if_target_present: bool = False,
) -> Chain
Wrapper chain that merges input with defaults.
Notes
- reference paths in defaults should be relative to the input path
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
construct scope |
required |
id
|
str
|
identifier for the states created |
required |
defaults
|
dict[str, Any]
|
default values to merge with input. If any reference paths are present in the defaults, they should be relative to the input path. |
required |
input_path
|
str
|
Input path of object to merge. De faults to "$". |
'$'
|
target_path
|
str
|
target path to merge with. This should be relative to the input_path parameter. Defaults to "$". |
'$'
|
result_path
|
Optional[str]
|
result path to store merged results. If not specified, it defaults to the target_path relative to input_path. If specified, it is considered an absolute path. |
None
|
order_of_preference
|
Literal['target', 'default']
|
If "target", the target path will be merged with the defaults. If "default", the defaults will be merged with the target path. Defaults to "target". |
'target'
|
check_if_target_present
|
bool
|
If true, check if the target path is present in the input. If not, the defaults will be used as the result. Otherwise, the defaults will be merged with the target path. This is useful for optional parameters that may or may not be present in the input. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain: the new chain that merges defaults with input |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/common.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | |
enclose_chainable
classmethod
¶enclose_chainable(
scope: Construct,
id: str,
definition: IChainable,
input_path: str | None = None,
result_path: str | None = None,
) -> Chain
Enclose the current state machine fragment within a parallel state.
Notes
- If input_path is not provided, it will default to "$"
- If result_path is not provided, it will default to input_path
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
an identifier for the parallel state |
required |
input_path
|
Optional[str]
|
input path for the enclosed state. Defaults to "$". |
None
|
result_path
|
Optional[str]
|
result path to put output of enclosed state. Defaults to same as input_path. |
None
|
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain: the new state machine fragment |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/common.py
s3
¶
Classes¶
S3Operation
¶
put_object
classmethod
¶put_object(
scope: Construct,
id: str,
bucket_name: str,
key: str,
body: Any,
result_path: str | None = "$",
output_path: str | None = "$",
) -> Chain
Create a chain to put a body of text to S3
This chain consists of two states
- Pass state (resolving references and resctructuring inputs)
- API Call to put object.
The context at the end state of the chain should contain the following fields
- Bucket
- Key
All parameters can be either a reference (e.g. $.path.to.my.value) or an explicit value.
Examples:
Context: {} Definition: S3Operation.put_object(..., bucket_name="bucket", key="key", body="body") Result: # text "body" is put at s3://bucket/key
Context: {"bucket": "woah", "key": "wait/what", "nested": {"a": "b"}} Definition: S3Operation.put_object(..., bucket_name="$.bucket", key="$.key", body="$.nested") Result: # text '{"a": "b"}' is put at s3://woah/wait/what
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
scope construct |
required |
id
|
str
|
An ID prefix |
required |
bucket_name
|
str
|
explicit or reference to name of bucket |
required |
key
|
str
|
explicit or reference to name of key |
required |
body
|
Any
|
explicit or reference to body to upload |
required |
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | |
get_object
classmethod
¶get_object(
scope: Construct,
id: str,
bucket_name: str,
key: str,
result_path: str | None = "$",
output_path: str | None = "$",
) -> Chain
Creates a chain to get a body of text from S3
This chain consists of two states
- Pass state (resolving references and resctructuring inputs)
- API Call to get object.
The context at the end state of the chain should contain the following fields
- Body
- Bucket
- Key
All parameters can be either a reference (e.g. $.path.to.my.value) or an explicit value.
Examples:
Context: {} Definition: S3Operation.get_object(..., bucket_name="bucket", key="key") Result: # text "body" is fetched from s3://bucket/key
Context: {"bucket": "woah", "key": "wait/what"} Definition: S3Operation.get_object(..., bucket_name="$.bucket", key="$.key") Result: # text '{"a": "b"}' is fetched from s3://woah/wait/what {"Body": '{"a": "b"}'}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
scope construct |
required |
id
|
str
|
An ID prefix |
required |
bucket_name
|
str
|
explicit or reference to name of bucket |
required |
key
|
str
|
explicit or reference to name of key |
required |
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | |
put_payload
classmethod
¶put_payload(
scope: Construct,
id: str,
payload: str,
bucket_name: str,
key: str | None = None,
result_path: str | None = "$",
output_path: str | None = "$",
) -> Chain
Puts a payload to s3 and returns the location of the payload in s3
All parameters can be either a reference (e.g. $.path.to.my.value) or an explicit value.
Examples:
Context: {"a": "b"} Definition: S3Operation.put_payload(..., bucket_name="bucket", payload="$") Result: # text '{"a": "b"}' is written to s3://bucket/1234.../...1234
Context: {"bucket": "woah", "key": "wait/what", "data": {"a": "b"}} Definition: S3Operation.put_payload(..., bucket_name="$.bucket", payload="$.data") Result: # text '{"a": "b"}' is written to s3://woah/wait/what
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
cdk construct |
required |
id
|
str
|
id |
required |
payload
|
str
|
explicit value or reference path in the context object (e.g. "$", "$.path") |
required |
bucket_name
|
str
|
explicit value or reference path for bucket name |
required |
key
|
Optional[str]
|
explicit value or reference path for key. If not provided, the following Key is generated: {$$.Execution.Name}/{UUID} |
None
|
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
get_payload
classmethod
¶get_payload(
scope: Construct,
id: str,
bucket_name: str,
key: str,
result_path: str | None = "$",
) -> Chain
Gets a payload from s3
This chain fetches object and then passes the body through a json parser
The resulting payload will be stored to path specified by result_path
Examples:
Use Case #1 - No result path Context: {"bucket": "woah", "key": "wait/what"} Definition: S3Operation.get_payload(..., bucket_name="$.bucket", key="$.key") Result: # text '{"a": "b"}' is fetched from s3://woah/wait/what
Use Case #2 - result path specified Context: {"bucket": "woah", "key": "wait/what"} Definition: S3Operation.get_payload( ..., bucket_name="$.bucket", key="$.key", result_path="$.payload" ) Result: # text '{"a": "b"}' is fetched from s3://woah/wait/what { "bucket": "woah", "key": "wait/what", "payload": {"a": "b"} }
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
Construct
|
cdk construct |
required |
id
|
str
|
id |
required |
bucket_name
|
str
|
bucket name. Can be a reference path |
required |
key
|
str
|
key name. Can be a reference path |
required |
result_path
|
Optional[str]
|
path to store the payload. Defaults to "$". |
'$'
|
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain: chain |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
Utilities¶
Step Function utility functions:
utils
¶
Classes¶
JsonReferencePath
¶
Bases: str
str extension with properties that provide some functionality for defining JsonPath reference expressions More details: https://github.com/json-path/JsonPath
Primarily supports "$" reference.
Functions¶
convert_to_sfn_api_action_case
¶
Converts a dictionary of parameters to the format expected by the Step Functions for service integration.
Even if a native API specifies a parameter in camelCase, the Step Functions SDK expects it in pascal case.
https://docs.aws.amazon.com/step-functions/latest/dg/supported-services-awssdk.html#use-awssdk-integ
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parameters
|
Dict[str, Any]
|
parameters for SDK action |
required |
Returns:
| Type | Description |
|---|---|
T
|
Dict[str, Any]: parameters for SDK action in pascal case |
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/utils.py
enclosed_chain
¶
enclosed_chain(
scope: Construct,
id: str,
definition: IChainable,
input_path: str | None = None,
result_path: str | None = None,
) -> Chain
Enclose the current state machine fragment within a parallel state.
Notes
- If input_path is not provided, it will default to "$"
- If result_path is not provided, it will default to input_path
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
an identifier for the parallel state |
required |
input_path
|
Optional[str]
|
input path for the enclosed state. Defaults to "$". |
None
|
result_path
|
Optional[str]
|
result path to put output of enclosed state. Defaults to same as input_path. |
None
|
Returns:
| Type | Description |
|---|---|
Chain
|
sfn.Chain: the new state machine fragment |