Skip to content

Step Functions Constructs

State machines, fragments, and custom states for AWS Step Functions.

Submodules

Fragments

Reusable state machine fragments:

fragments

Classes

EnvBaseStateMachineFragment

EnvBaseStateMachineFragment(
    scope: Construct, id: str, env_base: EnvBase
)

Bases: StateMachineFragment, StateMachineMixins

Environment-aware state machine fragment.

Combines StateMachineFragment with environment base naming conventions and state machine mixins.

Initialize an environment-aware state machine fragment.

Parameters:

Name Type Description Default
scope Construct

The construct scope.

required
id str

The construct ID.

required
env_base EnvBase

Environment base for resource naming.

required
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
) -> None:
    """Initialize an environment-aware state machine fragment.

    Args:
        scope (constructs.Construct): The construct scope.
        id (str): The construct ID.
        env_base (EnvBase): Environment base for resource naming.
    """
    super().__init__(scope, id)
    self.env_base = env_base
Attributes
required_managed_policies property
required_managed_policies: Sequence[IManagedPolicy | str]

Get required managed policies for this fragment.

Returns:

Type Description
Sequence[IManagedPolicy | str]

Sequence of required managed policies.

required_inline_policy_statements property
required_inline_policy_statements: Sequence[PolicyStatement]

Get required inline policy statements for this fragment.

Returns:

Type Description
Sequence[PolicyStatement]

Sequence of required policy statements.

Functions
to_single_state
to_single_state(
    *,
    prefix_states: str | None = None,
    state_id: str | None = None,
    comment: str | None = None,
    input_path: str | None = None,
    output_path: str | None = "$[0]",
    result_path: str | None = None,
    result_selector: Mapping[str, Any] | None = None,
    arguments: Mapping[str, Any] | None = None,
    parameters: Mapping[str, Any] | None = None,
    query_language: QueryLanguage | None = None,
    state_name: str | None = None,
    assign: Mapping[str, Any] | None = None,
    outputs: Any = None
) -> Parallel

Convert the fragment to a single parallel state.

Parameters:

Name Type Description Default
prefix_states Optional[str]

Prefix for state names.

None
state_id Optional[str]

ID for the parallel state.

None
comment Optional[str]

Comment for the state.

None
input_path Optional[str]

Input path.

None
output_path Optional[str]

Output path. Defaults to "$[0]".

'$[0]'
result_path Optional[str]

Result path.

None
result_selector Optional[Mapping[str, Any]]

Result selector.

None

Returns:

Type Description
Parallel

A parallel state containing the fragment.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def to_single_state(
    self,
    *,
    prefix_states: str | None = None,
    state_id: str | None = None,
    comment: str | None = None,
    input_path: str | None = None,
    output_path: str | None = "$[0]",
    result_path: str | None = None,
    result_selector: Mapping[str, Any] | None = None,
    arguments: Mapping[str, Any] | None = None,
    parameters: Mapping[str, Any] | None = None,
    query_language: sfn.QueryLanguage | None = None,
    state_name: str | None = None,
    assign: Mapping[str, Any] | None = None,
    outputs: Any = None,
) -> sfn.Parallel:
    """Convert the fragment to a single parallel state.

    Args:
        prefix_states (Optional[str]): Prefix for state names.
        state_id (Optional[str]): ID for the parallel state.
        comment (Optional[str]): Comment for the state.
        input_path (Optional[str]): Input path.
        output_path (Optional[str]): Output path. Defaults to "$[0]".
        result_path (Optional[str]): Result path.
        result_selector (Optional[Mapping[str, Any]]): Result selector.

    Returns:
        A parallel state containing the fragment.
    """
    return super().to_single_state(
        prefix_states=prefix_states,
        state_id=state_id,
        comment=comment,
        input_path=input_path,
        output_path=output_path,
        result_path=result_path,
        result_selector=result_selector,
        arguments=arguments,
        parameters=parameters,
        query_language=query_language,
        state_name=state_name,
        assign=assign,
        outputs=outputs,
    )
to_state_machine
to_state_machine(
    state_machine_name: str,
    role: Role | None = None,
    logs: LogOptions | None = None,
    timeout: Duration | None = None,
) -> StateMachine

Convert the fragment to a complete state machine.

Parameters:

Name Type Description Default
state_machine_name str

Name for the state machine.

required
role Optional[Role]

IAM role. Creates default if None.

None
logs Optional[LogOptions]

Log options.

None
timeout Optional[Duration]

Execution timeout.

None

Returns:

Type Description
StateMachine

The created state machine.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def to_state_machine(
    self,
    state_machine_name: str,
    role: iam.Role | None = None,
    logs: sfn.LogOptions | None = None,
    timeout: cdk.Duration | None = None,
) -> sfn.StateMachine:
    """Convert the fragment to a complete state machine.

    Args:
        state_machine_name (str): Name for the state machine.
        role (Optional[iam.Role]): IAM role. Creates default if None.
        logs (Optional[sfn.LogOptions]): Log options.
        timeout (Optional[cdk.Duration]): Execution timeout.

    Returns:
        The created state machine.
    """
    if role is None:
        role = create_role(
            self,
            state_machine_name,
            self.env_base,
            managed_policies=self.required_managed_policies,
            inline_policies_from_statements={
                "default": self.required_inline_policy_statements,
            },
        )
    else:
        for policy in self.required_managed_policies:
            if isinstance(policy, str):
                policy = iam.ManagedPolicy.from_aws_managed_policy_name(policy)
            role.add_managed_policy(policy)

        for statement in self.required_inline_policy_statements:
            role.add_to_policy(statement)

    return sfn.StateMachine(
        self,
        self.get_construct_id(state_machine_name),
        state_machine_name=self.env_base.get_state_machine_name(state_machine_name),
        logs=logs or create_log_options(self, state_machine_name, self.env_base),
        role=(
            role if role is not None else create_role(self, state_machine_name, self.env_base)
        ),  # type: ignore[arg-type]
        definition_body=sfn.DefinitionBody.from_chainable(self.definition),
        timeout=timeout,
    )

Modules

base

Step Functions state machine fragment constructs.

This module provides base classes and utilities for building Step Functions state machine fragments and workflows.

Classes
StateMachineMixins

Bases: EnvBaseConstructMixins

Mixin class providing state machine helper methods.

Provides methods for retrieving Lambda functions and state machines with caching support.

Functions
get_fn
get_fn(function_name: str) -> IFunction

Get a Lambda function by name.

Parameters:

Name Type Description Default
function_name str

The function name.

required

Returns:

Type Description
IFunction

The Lambda function interface.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def get_fn(self, function_name: str) -> lambda_.IFunction:
    """Get a Lambda function by name.

    Args:
        function_name (str): The function name.

    Returns:
        The Lambda function interface.
    """
    cache_attr = "_function_cache"
    if not hasattr(self, cache_attr):
        setattr(self, cache_attr, {})
    resource_cache = cast(dict[str, lambda_.IFunction], getattr(self, cache_attr))
    if function_name not in resource_cache:
        resource_cache[function_name] = lambda_.Function.from_function_arn(
            scope=self.as_construct(),
            id=self.env_base.get_construct_id(function_name, "from-arn"),
            function_arn=build_lambda_arn(
                resource_type="function",
                resource_id=self.env_base.get_function_name(function_name),
            ),
        )
    return resource_cache[function_name]
get_state_machine_from_name
get_state_machine_from_name(
    state_machine_name: str,
) -> IStateMachine

Get a state machine by name.

Parameters:

Name Type Description Default
state_machine_name str

The state machine name.

required

Returns:

Type Description
IStateMachine

The state machine interface.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def get_state_machine_from_name(self, state_machine_name: str) -> sfn.IStateMachine:
    """Get a state machine by name.

    Args:
        state_machine_name (str): The state machine name.

    Returns:
        The state machine interface.
    """
    cache_attr = "_state_machine_cache"
    if not hasattr(self, cache_attr):
        setattr(self, cache_attr, {})
    resource_cache = cast(dict[str, sfn.IStateMachine], getattr(self, cache_attr))
    if state_machine_name not in resource_cache:
        resource_cache[state_machine_name] = sfn.StateMachine.from_state_machine_name(
            scope=self.as_construct(),
            id=self.env_base.get_construct_id(state_machine_name, "from-name"),
            state_machine_name=self.env_base.get_state_machine_name(state_machine_name),
        )
    return resource_cache[state_machine_name]
StateMachineFragment

Bases: StateMachineFragment

Base class for state machine fragments.

Provides common functionality for building reusable state machine fragments with definition management.

Attributes
definition property writable
definition: IChainable

Get the state machine definition.

Returns:

Type Description
IChainable

The chainable definition.

start_state property
start_state: State

Get the start state.

Returns:

Type Description
State

The definition's start state.

end_states property
end_states: list[INextable]

Get the end states.

Returns:

Type Description
list[INextable]

List of nextable end states.

Functions
enclose
enclose(
    id: str | None = None,
    input_path: str | None = None,
    result_path: str | None = None,
) -> Chain

Enclose the fragment within a parallel state.

Parameters:

Name Type Description Default
id Optional[str]

Identifier for the parallel state. Defaults to the node ID.

None
input_path Optional[str]

Input path for the enclosed state. Defaults to "$".

None
result_path Optional[str]

Result path for output. Defaults to same as input_path.

None

Returns:

Type Description
Chain

The enclosed state machine fragment as a chain.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def enclose(
    self,
    id: str | None = None,
    input_path: str | None = None,
    result_path: str | None = None,
) -> sfn.Chain:
    """Enclose the fragment within a parallel state.

    Args:
        id (Optional[str]): Identifier for the parallel state.
            Defaults to the node ID.
        input_path (Optional[str]): Input path for the enclosed state.
            Defaults to "$".
        result_path (Optional[str]): Result path for output.
            Defaults to same as input_path.

    Returns:
        The enclosed state machine fragment as a chain.
    """
    id = id or self.node.id

    if input_path is None:
        input_path = "$"
    if result_path is None:
        result_path = input_path

    chain = (
        sfn.Chain.start(self.definition)
        if not isinstance(self.definition, (sfn.Chain, sfn.StateMachineFragment))
        else self.definition
    )

    if isinstance(chain, sfn.Chain):
        parallel = chain.to_single_state(
            id=f"{id} Enclosure", input_path=input_path, result_path=result_path
        )
    else:
        parallel = chain.to_single_state(input_path=input_path, result_path=result_path)
    definition = sfn.Chain.start(parallel)

    if result_path and result_path != sfn.JsonPath.DISCARD:
        restructure = sfn.Pass(
            self,
            f"{id} Enclosure Post",
            input_path=f"{result_path}[0]",
            result_path=result_path,
        )
        definition = definition.next(restructure)

    return definition
EnvBaseStateMachineFragment
EnvBaseStateMachineFragment(
    scope: Construct, id: str, env_base: EnvBase
)

Bases: StateMachineFragment, StateMachineMixins

Environment-aware state machine fragment.

Combines StateMachineFragment with environment base naming conventions and state machine mixins.

Initialize an environment-aware state machine fragment.

Parameters:

Name Type Description Default
scope Construct

The construct scope.

required
id str

The construct ID.

required
env_base EnvBase

Environment base for resource naming.

required
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
) -> None:
    """Initialize an environment-aware state machine fragment.

    Args:
        scope (constructs.Construct): The construct scope.
        id (str): The construct ID.
        env_base (EnvBase): Environment base for resource naming.
    """
    super().__init__(scope, id)
    self.env_base = env_base
Attributes
required_managed_policies property
required_managed_policies: Sequence[IManagedPolicy | str]

Get required managed policies for this fragment.

Returns:

Type Description
Sequence[IManagedPolicy | str]

Sequence of required managed policies.

required_inline_policy_statements property
required_inline_policy_statements: Sequence[PolicyStatement]

Get required inline policy statements for this fragment.

Returns:

Type Description
Sequence[PolicyStatement]

Sequence of required policy statements.

Functions
to_single_state
to_single_state(
    *,
    prefix_states: str | None = None,
    state_id: str | None = None,
    comment: str | None = None,
    input_path: str | None = None,
    output_path: str | None = "$[0]",
    result_path: str | None = None,
    result_selector: Mapping[str, Any] | None = None,
    arguments: Mapping[str, Any] | None = None,
    parameters: Mapping[str, Any] | None = None,
    query_language: QueryLanguage | None = None,
    state_name: str | None = None,
    assign: Mapping[str, Any] | None = None,
    outputs: Any = None
) -> Parallel

Convert the fragment to a single parallel state.

Parameters:

Name Type Description Default
prefix_states Optional[str]

Prefix for state names.

None
state_id Optional[str]

ID for the parallel state.

None
comment Optional[str]

Comment for the state.

None
input_path Optional[str]

Input path.

None
output_path Optional[str]

Output path. Defaults to "$[0]".

'$[0]'
result_path Optional[str]

Result path.

None
result_selector Optional[Mapping[str, Any]]

Result selector.

None

Returns:

Type Description
Parallel

A parallel state containing the fragment.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def to_single_state(
    self,
    *,
    prefix_states: str | None = None,
    state_id: str | None = None,
    comment: str | None = None,
    input_path: str | None = None,
    output_path: str | None = "$[0]",
    result_path: str | None = None,
    result_selector: Mapping[str, Any] | None = None,
    arguments: Mapping[str, Any] | None = None,
    parameters: Mapping[str, Any] | None = None,
    query_language: sfn.QueryLanguage | None = None,
    state_name: str | None = None,
    assign: Mapping[str, Any] | None = None,
    outputs: Any = None,
) -> sfn.Parallel:
    """Convert the fragment to a single parallel state.

    Args:
        prefix_states (Optional[str]): Prefix for state names.
        state_id (Optional[str]): ID for the parallel state.
        comment (Optional[str]): Comment for the state.
        input_path (Optional[str]): Input path.
        output_path (Optional[str]): Output path. Defaults to "$[0]".
        result_path (Optional[str]): Result path.
        result_selector (Optional[Mapping[str, Any]]): Result selector.

    Returns:
        A parallel state containing the fragment.
    """
    return super().to_single_state(
        prefix_states=prefix_states,
        state_id=state_id,
        comment=comment,
        input_path=input_path,
        output_path=output_path,
        result_path=result_path,
        result_selector=result_selector,
        arguments=arguments,
        parameters=parameters,
        query_language=query_language,
        state_name=state_name,
        assign=assign,
        outputs=outputs,
    )
to_state_machine
to_state_machine(
    state_machine_name: str,
    role: Role | None = None,
    logs: LogOptions | None = None,
    timeout: Duration | None = None,
) -> StateMachine

Convert the fragment to a complete state machine.

Parameters:

Name Type Description Default
state_machine_name str

Name for the state machine.

required
role Optional[Role]

IAM role. Creates default if None.

None
logs Optional[LogOptions]

Log options.

None
timeout Optional[Duration]

Execution timeout.

None

Returns:

Type Description
StateMachine

The created state machine.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def to_state_machine(
    self,
    state_machine_name: str,
    role: iam.Role | None = None,
    logs: sfn.LogOptions | None = None,
    timeout: cdk.Duration | None = None,
) -> sfn.StateMachine:
    """Convert the fragment to a complete state machine.

    Args:
        state_machine_name (str): Name for the state machine.
        role (Optional[iam.Role]): IAM role. Creates default if None.
        logs (Optional[sfn.LogOptions]): Log options.
        timeout (Optional[cdk.Duration]): Execution timeout.

    Returns:
        The created state machine.
    """
    if role is None:
        role = create_role(
            self,
            state_machine_name,
            self.env_base,
            managed_policies=self.required_managed_policies,
            inline_policies_from_statements={
                "default": self.required_inline_policy_statements,
            },
        )
    else:
        for policy in self.required_managed_policies:
            if isinstance(policy, str):
                policy = iam.ManagedPolicy.from_aws_managed_policy_name(policy)
            role.add_managed_policy(policy)

        for statement in self.required_inline_policy_statements:
            role.add_to_policy(statement)

    return sfn.StateMachine(
        self,
        self.get_construct_id(state_machine_name),
        state_machine_name=self.env_base.get_state_machine_name(state_machine_name),
        logs=logs or create_log_options(self, state_machine_name, self.env_base),
        role=(
            role if role is not None else create_role(self, state_machine_name, self.env_base)
        ),  # type: ignore[arg-type]
        definition_body=sfn.DefinitionBody.from_chainable(self.definition),
        timeout=timeout,
    )
LazyLoadStateMachineFragment
LazyLoadStateMachineFragment(
    scope: Construct, id: str, env_base: EnvBase
)

Bases: EnvBaseStateMachineFragment

State machine fragment with lazy-loaded definition.

The definition is built on first access via build_definition().

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
) -> None:
    """Initialize an environment-aware state machine fragment.

    Args:
        scope (constructs.Construct): The construct scope.
        id (str): The construct ID.
        env_base (EnvBase): Environment base for resource naming.
    """
    super().__init__(scope, id)
    self.env_base = env_base
Attributes
definition property writable
definition: IChainable

Get the state machine definition, building if needed.

Returns:

Type Description
IChainable

The chainable definition.

Functions
build_definition abstractmethod
build_definition() -> IChainable

Build the state machine definition.

Subclasses must implement this method.

Returns:

Type Description
IChainable

The built definition.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
@abstractmethod
def build_definition(self) -> sfn.IChainable:
    """Build the state machine definition.

    Subclasses must implement this method.

    Returns:
        The built definition.
    """
    raise NotImplementedError("Must implement")
TaskWithPrePostStatus
TaskWithPrePostStatus(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    task: IChainable | None,
)

Bases: LazyLoadStateMachineFragment

State machine fragment with pre/post status tracking.

Wraps a task with status updates for started, failed, and completed states.

Initialize a task with pre/post status tracking.

Parameters:

Name Type Description Default
scope Construct

The construct scope.

required
id str

The construct ID.

required
env_base EnvBase

Environment base for resource naming.

required
task Optional[IChainable]

The task to wrap.

required
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    task: sfn.IChainable | None,
) -> None:
    """Initialize a task with pre/post status tracking.

    Args:
        scope (constructs.Construct): The construct scope.
        id (str): The construct ID.
        env_base (EnvBase): Environment base for resource naming.
        task (Optional[sfn.IChainable]): The task to wrap.
    """
    super().__init__(scope, id, env_base)
    self.task = task
    self.task_name = id

    self.raw_task_input_path = JsonReferencePath("$")
Attributes
task property writable
task: IChainable

Get the wrapped task.

Returns:

Type Description
IChainable

The task.

Raises:

Type Description
AssertionError

If task is not set.

task_name property writable
task_name: str

Get the task name.

Returns:

Type Description
str

The task name.

task__augment_input property
task__augment_input: IChainable | None

Run right after the sfn.Pass 'START' of the state machine fragment. Can be used to dynamically fill out 'contexts' JsonReferencePath for subsequent lambdas/tasks to use. NOTE: Outputs of this chain are MAINTAINED

task__status_started property
task__status_started: IChainable | None

Runs right before "task__pre_run" if defined. Otherwise runs right before main task executes. NOTE: Outputs within this chain get DISCARDED

task__status_completed property
task__status_completed: IChainable | None

Runs right after "task__post_run" if defined. Otherwise runs right after main task is completed. NOTE: Outputs within this chain get DISCARDED

task__status_failed property
task__status_failed: IChainable | None

Runs if main task fails during execution

task__pre_run property
task__pre_run: IChainable | None

Runs right before main task executes. NOTE: Outputs within this chain get DISCARDED

task__post_run property
task__post_run: IChainable | None

Runs right after main task is completed NOTE: Outputs within this chain get DISCARDED

Functions
build_definition
build_definition() -> IChainable

Build the task definition with status tracking.

Returns:

Type Description
IChainable

The complete task definition with pre/post status states.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def build_definition(self) -> sfn.IChainable:
    """Build the task definition with status tracking.

    Returns:
        The complete task definition with pre/post status states.
    """
    # Should only evaluate once, otherwise errors for duplicate construct will be raised\
    task__augment_input = self.task__augment_input
    task__status_started = self.task__status_started
    task__status_failed = self.task__status_failed
    task__status_completed = self.task__status_completed
    task__pre_run = self.task__pre_run
    task__post_run = self.task__post_run

    # ---------------------------
    # START DEFINITION
    # ---------------------------
    definition: sfn.Chain | sfn.Pass = sfn.Pass(
        self,
        f"{self.task_name} Start",
        parameters={
            "input": self.raw_task_input_path.as_jsonpath_object,
            "context": self.task_context,
        },
    )

    # -------------
    # AUGMENT INPUT
    # -------------
    if task__augment_input:
        definition = definition.next(
            sfn.Chain.start(task__augment_input).to_single_state(
                f"{self.task_name} Augment Input",
                result_path=self.task_input_path.as_reference,
                output_path=f"{self.task_input_path.as_reference}[0]",
            )
        )

    # -------------
    # PRE TASK
    # -------------
    if task__status_started:
        definition = definition.next(
            sfn.Chain.start(task__status_started).to_single_state(
                f"{self.task_name} Status Started", result_path=sfn.JsonPath.DISCARD
            )
        )
    if task__pre_run:
        definition = definition.next(
            sfn.Chain.start(task__pre_run).to_single_state(
                f"{self.task_name} Pre Run", result_path=sfn.JsonPath.DISCARD
            )
        )

    # -------------
    # TASK
    # -------------
    task_chain = sfn.Chain.start(self.task)
    task_enclosure = task_chain.to_single_state(
        f"{self.task_name} Run",
        input_path=self.task_input_path.as_reference,
        result_path=self.task_result_path.as_reference,
    )
    # fmt: off
    definition = (
        definition
        .next(task_enclosure)
        .next(
            sfn.Pass(
                self,
                f"{self.task_name} Run (Restructure)",
                input_path=f"{self.task_result_path.as_reference}[0]",
                result_path=self.task_result_path.as_reference,
            )
        )
    )
    # fmt: on

    # -------------
    # TASK FAILED
    # -------------
    if task__status_failed:
        task_enclosure.add_catch(
            sfn.Chain.start(task__status_failed)
            .to_single_state(
                f"{self.task_name} Status Failed", result_path=sfn.JsonPath.DISCARD
            )
            .next(
                sfn.Fail(
                    self,
                    f"{self.task_name} Fail State",
                    cause=f"Task {self.task_name} failed during execution.",
                )
            ),
            result_path=self.task_error_path.as_reference,
        )

    # -------------
    # POST TASK
    # -------------
    if task__post_run:
        definition = definition.next(
            sfn.Chain.start(task__post_run).to_single_state(
                f"{self.task_name} Post Run", result_path=sfn.JsonPath.DISCARD
            )
        )
    if task__status_completed:
        definition = definition.next(
            sfn.Chain.start(task__status_completed).to_single_state(
                f"{self.task_name} Status Complete", result_path=sfn.JsonPath.DISCARD
            )
        )

    definition.next(
        sfn.Pass(self, f"{self.task_name} End", input_path=self.task_result_path.as_reference)
    )

    # ---------------------------
    # COMPLETE DEFINITION
    # ---------------------------
    return definition
Functions
create_log_options
create_log_options(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    removal_policy: RemovalPolicy | None = None,
    retention: RetentionDays | None = None,
) -> LogOptions

Create log options for a state machine.

Parameters:

Name Type Description Default
scope Construct

The construct scope.

required
id str

Identifier for the log group.

required
env_base EnvBase

Environment base for naming.

required
removal_policy Optional[RemovalPolicy]

Removal policy. Defaults to DESTROY.

None
retention Optional[RetentionDays]

Log retention period. Defaults to ONE_MONTH.

None

Returns:

Type Description
LogOptions

Log options configured with a CloudWatch log group.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def create_log_options(
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    removal_policy: cdk.RemovalPolicy | None = None,
    retention: logs_.RetentionDays | None = None,
) -> sfn.LogOptions:
    """Create log options for a state machine.

    Args:
        scope (constructs.Construct): The construct scope.
        id (str): Identifier for the log group.
        env_base (EnvBase): Environment base for naming.
        removal_policy (Optional[cdk.RemovalPolicy]): Removal policy.
            Defaults to DESTROY.
        retention (Optional[logs_.RetentionDays]): Log retention period.
            Defaults to ONE_MONTH.

    Returns:
        Log options configured with a CloudWatch log group.
    """
    return sfn.LogOptions(
        destination=logs_.LogGroup(
            scope,
            env_base.get_construct_id(id, "state-loggroup"),
            log_group_name=env_base.get_state_machine_log_group_name(id),
            removal_policy=removal_policy or cdk.RemovalPolicy.DESTROY,
            retention=retention or logs_.RetentionDays.ONE_MONTH,
        )
    )
create_role
create_role(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    assumed_by: IPrincipal = ServicePrincipal(
        "states.amazonaws.com"
    ),
    managed_policies: Sequence[IManagedPolicy | str]
    | None = None,
    inline_policies: Mapping[str, PolicyDocument]
    | None = None,
    inline_policies_from_statements: Mapping[
        str, Sequence[PolicyStatement]
    ]
    | None = None,
    include_default_managed_policies: bool = True,
) -> Role

Create an IAM role for a state machine.

Parameters:

Name Type Description Default
scope Construct

The construct scope.

required
id str

Identifier for the role.

required
env_base EnvBase

Environment base for naming.

required
assumed_by IPrincipal

Principal that can assume the role. Defaults to states.amazonaws.com.

ServicePrincipal('states.amazonaws.com')
managed_policies Optional[Sequence[Union[IManagedPolicy, str]]]

Managed policies to attach.

None
inline_policies Optional[Mapping[str, PolicyDocument]]

Inline policy documents.

None
inline_policies_from_statements Optional[Mapping[str, Sequence[PolicyStatement]]]

Inline policies from statements.

None
include_default_managed_policies bool

Include default Step Functions managed policies. Defaults to True.

True

Returns:

Type Description
Role

The created IAM role.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def create_role(
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    assumed_by: iam.IPrincipal = iam.ServicePrincipal("states.amazonaws.com"),  # type: ignore[assignment]
    managed_policies: Sequence[iam.IManagedPolicy | str] | None = None,
    inline_policies: Mapping[str, iam.PolicyDocument] | None = None,
    inline_policies_from_statements: Mapping[str, Sequence[iam.PolicyStatement]] | None = None,
    include_default_managed_policies: bool = True,
) -> iam.Role:
    """Create an IAM role for a state machine.

    Args:
        scope (constructs.Construct): The construct scope.
        id (str): Identifier for the role.
        env_base (EnvBase): Environment base for naming.
        assumed_by (iam.IPrincipal): Principal that can assume the role.
            Defaults to states.amazonaws.com.
        managed_policies (Optional[Sequence[Union[iam.IManagedPolicy, str]]]):
            Managed policies to attach.
        inline_policies (Optional[Mapping[str, iam.PolicyDocument]]):
            Inline policy documents.
        inline_policies_from_statements (Optional[Mapping[str, Sequence[iam.PolicyStatement]]]):
            Inline policies from statements.
        include_default_managed_policies (bool): Include default Step Functions
            managed policies. Defaults to True.

    Returns:
        The created IAM role.
    """
    construct_id = env_base.get_construct_id(id, "role")

    if managed_policies is not None:
        managed_policies = [
            iam.ManagedPolicy.from_aws_managed_policy_name(policy)
            if isinstance(policy, str)
            else policy
            for policy in managed_policies
        ]

    if inline_policies is None:
        inline_policies = {}
    if inline_policies_from_statements:
        inline_policies = {
            **inline_policies,
            **{
                name: iam.PolicyDocument(statements=statements)
                for name, statements in inline_policies_from_statements.items()
            },
        }

    return iam.Role(
        scope,
        construct_id,
        assumed_by=assumed_by,  # type: ignore
        managed_policies=[
            *(managed_policies or []),  # type: ignore[list-item] # mypy does not understand that ManagedPolicy is an IManagedPolicy
            *[
                iam.ManagedPolicy.from_aws_managed_policy_name(policy)
                for policy in (
                    [
                        "AWSStepFunctionsFullAccess",
                        "CloudWatchLogsFullAccess",
                        "CloudWatchEventsFullAccess",
                    ]
                    if include_default_managed_policies
                    else []
                )
            ],
        ],
        inline_policies=inline_policies,
    )
create_state_machine
create_state_machine(
    scope: Construct,
    env_base: EnvBase,
    id: str,
    name: str | None,
    definition: IChainable,
    role: Role | None = None,
    logs: LogOptions | None = None,
    timeout: Duration | None = None,
) -> StateMachine

Create a state machine from a definition.

Parameters:

Name Type Description Default
scope Construct

The construct scope.

required
env_base EnvBase

Environment base for naming.

required
id str

Construct identifier.

required
name Optional[str]

State machine name.

required
definition IChainable

The state machine definition.

required
role Optional[Role]

IAM role for the state machine.

None
logs Optional[LogOptions]

Log options.

None
timeout Optional[Duration]

Execution timeout.

None

Returns:

Type Description
StateMachine

The created state machine.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/base.py
def create_state_machine(
    scope: constructs.Construct,
    env_base: EnvBase,
    id: str,
    name: str | None,
    definition: sfn.IChainable,
    role: iam.Role | None = None,
    logs: sfn.LogOptions | None = None,
    timeout: cdk.Duration | None = None,
) -> sfn.StateMachine:
    """Create a state machine from a definition.

    Args:
        scope (constructs.Construct): The construct scope.
        env_base (EnvBase): Environment base for naming.
        id (str): Construct identifier.
        name (Optional[str]): State machine name.
        definition (sfn.IChainable): The state machine definition.
        role (Optional[iam.Role]): IAM role for the state machine.
        logs (Optional[sfn.LogOptions]): Log options.
        timeout (Optional[cdk.Duration]): Execution timeout.

    Returns:
        The created state machine.
    """
    return sfn.StateMachine(
        scope,
        env_base.get_construct_id(id),
        state_machine_name=env_base.get_state_machine_name(name) if name else None,
        logs=(
            logs
            or sfn.LogOptions(
                destination=logs_.LogGroup(
                    scope,
                    env_base.get_construct_id(id, "state-loggroup"),
                    log_group_name=env_base.get_state_machine_log_group_name(name or id),
                    removal_policy=cdk.RemovalPolicy.DESTROY,
                    retention=logs_.RetentionDays.ONE_MONTH,
                )
            )
        ),
        role=cast(iam.IRole, role),
        definition_body=sfn.DefinitionBody.from_chainable(definition),
        timeout=timeout,
    )

informatics

Classes
BatchInvokedExecutorFragment
BatchInvokedExecutorFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    executor: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    environment: Mapping[str, str] | str | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    mount_point_configs: list[MountPointConfiguration]
    | None = None,
    mount_points: list[MountPointTypeDef] | None = None,
    volumes: list[VolumeTypeDef] | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]]
    | str
    | None = None,
    job_role_arn: str | None = None,
)

Bases: BatchInvokedBaseFragment, AWSBatchMixins

Invoke an executor in an image via batch with a payload from s3

This targets any subclassing of aibs_informatics_core.executors.base.ExecutorBase - https://github.com/AllenInstitute/aibs-informatics-core/blob/main/src/aibs_informatics_core/executors/base.py

This fragment creates a state machine fragment that
  1. Puts a payload to s3
  2. Submits a batch job
  3. Gets the response from s3

The payload is written to s3://///request.json The response is read from s3://///response.json

IMPORTANT
  • Batch job queue / compute environment must have permissions to read/write to the bucket.

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
name str

Name of the lambda function. This can be a reference path (e.g. "$.name")

required
image str

Image URI or name. This can be a reference path (e.g. "$.image")

required
executor str

qualified name of executor class. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")

required
job_queue str

Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")

required
bucket_name str

S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")

required
key_prefix str | None

Key prefix to write payload to and read response from. If not provided, scratch/ is used. Can be a reference path (e.g. "$.key_prefix")

None
payload_path str | None

Optionally specify the reference path of the event payload. Defaults to "$".

None
command List[str] | str | None

Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.

required
environment Mapping[str, str] | str | None

environment variables to specify. This can be a reference path (e.g. "$.environment")

None
memory int | str | None

Memory in MiB (either int or reference path str). Defaults to None.

None
vcpus int | str | None

Number of vCPUs (either int or reference path str). Defaults to None.

None
mount_points List[MountPointTypeDef] | None

List of mount points to add to state machine. Defaults to None.

None
volumes List[VolumeTypeDef] | None

List of volumes to add to state machine. Defaults to None.

None
platform_capabilities List[Literal['EC2', 'FARGATE']] | str | None

platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")

None
job_role_arn str | None

Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    executor: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    environment: Mapping[str, str] | str | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    mount_point_configs: list[MountPointConfiguration] | None = None,
    mount_points: list[MountPointTypeDef] | None = None,
    volumes: list[VolumeTypeDef] | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]] | str | None = None,
    job_role_arn: str | None = None,
) -> None:
    """Invoke an executor in an image via batch with a payload from s3

    This targets any subclassing of `aibs_informatics_core.executors.base.ExecutorBase`
    - https://github.com/AllenInstitute/aibs-informatics-core/blob/main/src/aibs_informatics_core/executors/base.py


    This fragment creates a state machine fragment that:
        1. Puts a payload to s3
        2. Submits a batch job
        3. Gets the response from s3

    The payload is written to s3://<bucket_name>/<key_prefix>/<execution_name>/request.json
    The response is read from s3://<bucket_name>/<key_prefix>/<execution_name>/response.json

    IMPORTANT:
        - Batch job queue / compute environment must have permissions to read/write to the bucket.

    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        name (str): Name of the lambda function. This can be a reference path (e.g. "$.name")
        image (str): Image URI or name. This can be a reference path (e.g. "$.image")
        executor (str): qualified name of executor class. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")
        job_queue (str): Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")
        bucket_name (str): S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")
        key_prefix (str | None): Key prefix to write payload to and read response from. If not provided, `scratch/` is used. Can be a reference path (e.g. "$.key_prefix")
        payload_path (str | None): Optionally specify the reference path of the event payload. Defaults to "$".
        command (List[str] | str | None): Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.
        environment (Mapping[str, str] | str | None): environment variables to specify. This can be a reference path (e.g. "$.environment")
        memory (int | str | None): Memory in MiB (either int or reference path str). Defaults to None.
        vcpus (int | str | None): Number of vCPUs (either int or reference path str). Defaults to None.
        mount_points (List[MountPointTypeDef] | None): List of mount points to add to state machine. Defaults to None.
        volumes (List[VolumeTypeDef] | None): List of volumes to add to state machine. Defaults to None.
        platform_capabilities (List[Literal["EC2", "FARGATE"]] | str | None): platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")
        job_role_arn (str | None): Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")
    """  # noqa: E501
    super().__init__(scope, id, env_base)
    key_prefix = key_prefix or S3_SCRATCH_KEY_PREFIX

    request_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/request.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )
    response_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/response.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )

    start = sfn.Pass(
        self,
        f"{id} Prep S3 Keys",
        parameters={
            "task_id": sfn.JsonPath.uuid(),
        },
        result_path="$.taskResult.prep",
    )

    if mount_point_configs:
        if mount_points or volumes:
            raise ValueError("Cannot specify both mount_point_configs and mount_points")
        mount_points, volumes = self.convert_to_mount_point_and_volumes(mount_point_configs)

    put_payload = S3Operation.put_payload(
        self,
        f"{id} Put Request to S3",
        payload=payload_path or sfn.JsonPath.entire_payload,
        bucket_name=bucket_name,
        key=request_key,
        result_path="$.taskResult.put",
    )

    submit_job = SubmitJobFragment(
        self,
        id + "Batch",
        env_base=env_base,
        name=name,
        job_queue=job_queue,
        command=[
            "run_cli_executor",
            "--executor",
            executor,
            "--input",
            sfn.JsonPath.format("s3://{}/{}", "$.Bucket", "$.Key"),
            "--output-location",
            sfn.JsonPath.format("s3://{}/{}", bucket_name, response_key),
        ],
        image=image,
        environment=environment,
        memory=memory,
        vcpus=vcpus,
        mount_points=mount_points or [],
        volumes=volumes or [],
        platform_capabilities=platform_capabilities,
        job_role_arn=job_role_arn,
    )

    get_response = S3Operation.get_payload(
        self,
        f"{id}",
        bucket_name=bucket_name,
        key=response_key,
    ).to_single_state(
        f"{id} Get Response from S3",
        output_path="$[0]",
    )

    self.definition = start.next(put_payload).next(submit_job).next(get_response)
Functions
BatchInvokedLambdaFunction
BatchInvokedLambdaFunction(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    handler: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    command: list[str] | str | None = None,
    environment: Mapping[str, str] | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    gpu: int | str | None = None,
    mount_points: list[MountPointTypeDef]
    | str
    | None = None,
    volumes: list[VolumeTypeDef] | str | None = None,
    mount_point_configs: list[MountPointConfiguration]
    | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]]
    | str
    | None = None,
    job_role_arn: str | None = None,
)

Bases: BatchInvokedBaseFragment, AWSBatchMixins

Invoke a command on image via batch with a payload from s3

This fragment creates a state machine fragment that
  1. Puts a payload to s3
  2. Submits a batch job
  3. Gets the response from s3

The payload is written to s3://///request.json The response is read from s3://///response.json

The batch job will be fed the following environment variables
  • AWS_LAMBDA_FUNCTION_NAME: name of lambda function
  • AWS_LAMBDA_FUNCTION_HANDLER: handler of lambda function
  • AWS_LAMBDA_EVENT_PAYLOAD: The s3 location of the event payload
  • AWS_LAMBDA_EVENT_RESPONSE_LOCATION: The s3 location to write the response to
IMPORTANT
  • Batch job queue / compute environment must have permissions to read/write to the bucket.

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
name str

Name of the lambda function. This can be a reference path (e.g. "$.name")

required
image str

Image URI or name. This can be a reference path (e.g. "$.image")

required
handler str

handler of lambda function. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")

required
job_queue str

Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")

required
bucket_name str

S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")

required
key_prefix str | None

Key prefix to write payload to and read response from. If not provided, scratch/ is used. Can be a reference path (e.g. "$.key_prefix")

None
payload_path str | None

Optionally specify the reference path of the event payload. Defaults to "$".

None
command List[str] | str | None

Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.

None
environment Mapping[str, str] | None

Additional environment variables to specify. These are added to default environment variables.

None
memory int | str | None

Memory in MiB (either int or reference path str). Defaults to None.

None
vcpus int | str | None

Number of vCPUs (either int or reference path str). Defaults to None.

None
gpu int | str | None

Number of GPUs (either int or reference path str). Defaults to None.

None
mount_points List[MountPointTypeDef] | None

List of mount points to add to state machine. Defaults to None.

None
volumes List[VolumeTypeDef] | None

List of volumes to add to state machine. Defaults to None.

None
platform_capabilities List[Literal['EC2', 'FARGATE']] | str | None

platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")

None
job_role_arn str | None

Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    handler: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    command: list[str] | str | None = None,
    environment: Mapping[str, str] | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    gpu: int | str | None = None,
    mount_points: list[MountPointTypeDef] | str | None = None,
    volumes: list[VolumeTypeDef] | str | None = None,
    mount_point_configs: list[MountPointConfiguration] | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]] | str | None = None,
    job_role_arn: str | None = None,
) -> None:
    """Invoke a command on image via batch with a payload from s3

    This fragment creates a state machine fragment that:
        1. Puts a payload to s3
        2. Submits a batch job
        3. Gets the response from s3

    The payload is written to s3://<bucket_name>/<key_prefix>/<execution_name>/request.json
    The response is read from s3://<bucket_name>/<key_prefix>/<execution_name>/response.json

    The batch job will be fed the following environment variables:
        - AWS_LAMBDA_FUNCTION_NAME: name of lambda function
        - AWS_LAMBDA_FUNCTION_HANDLER: handler of lambda function
        - AWS_LAMBDA_EVENT_PAYLOAD: The s3 location of the event payload
        - AWS_LAMBDA_EVENT_RESPONSE_LOCATION: The s3 location to write the response to

    IMPORTANT:
        - Batch job queue / compute environment must have permissions to read/write to the bucket.


    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        name (str): Name of the lambda function. This can be a reference path (e.g. "$.name")
        image (str): Image URI or name. This can be a reference path (e.g. "$.image")
        handler (str): handler of lambda function. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")
        job_queue (str): Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")
        bucket_name (str): S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")
        key_prefix (str | None): Key prefix to write payload to and read response from. If not provided, `scratch/` is used. Can be a reference path (e.g. "$.key_prefix")
        payload_path (str | None): Optionally specify the reference path of the event payload. Defaults to "$".
        command (List[str] | str | None): Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.
        environment (Mapping[str, str] | None): Additional environment variables to specify. These are added to default environment variables.
        memory (int | str | None): Memory in MiB (either int or reference path str). Defaults to None.
        vcpus (int | str | None): Number of vCPUs (either int or reference path str). Defaults to None.
        gpu (int | str | None): Number of GPUs (either int or reference path str). Defaults to None.
        mount_points (List[MountPointTypeDef] | None): List of mount points to add to state machine. Defaults to None.
        volumes (List[VolumeTypeDef] | None): List of volumes to add to state machine. Defaults to None.
        platform_capabilities (List[Literal["EC2", "FARGATE"]] | str | None): platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")
        job_role_arn (str | None): Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")
    """  # noqa: E501
    super().__init__(scope, id, env_base)
    key_prefix = key_prefix or S3_SCRATCH_KEY_PREFIX

    request_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/request.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )
    response_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/response.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )

    start = sfn.Pass(
        self,
        f"{id} Prep S3 Keys",
        parameters={
            "task_id": sfn.JsonPath.uuid(),
        },
        result_path="$.taskResult.prep",
    )

    if mount_point_configs:
        if mount_points or volumes:
            raise ValueError("Cannot specify both mount_point_configs and mount_points")
        mount_points, volumes = self.convert_to_mount_point_and_volumes(mount_point_configs)

    put_payload = S3Operation.put_payload(
        self,
        f"{id} Put Request to S3",
        payload=payload_path or sfn.JsonPath.entire_payload,
        bucket_name=bucket_name,
        key=request_key,
        result_path="$.taskResult.put",
    )

    default_environment = {
        AWS_LAMBDA_FUNCTION_NAME_KEY: name,
        AWS_LAMBDA_FUNCTION_HANDLER_KEY: handler,
        AWS_LAMBDA_EVENT_PAYLOAD_KEY: sfn.JsonPath.format(
            "s3://{}/{}",
            sfn.JsonPath.string_at("$.taskResult.put.Bucket"),
            sfn.JsonPath.string_at("$.taskResult.put.Key"),
        ),
        AWS_LAMBDA_EVENT_RESPONSE_LOCATION_KEY: sfn.JsonPath.format(
            "s3://{}/{}",
            bucket_name,
            response_key,
        ),
        EnvBase.ENV_BASE_KEY: self.env_base,
        "AWS_REGION": self.aws_region,
        "AWS_ACCOUNT_ID": self.aws_account,
    }

    submit_job = SubmitJobFragment(
        self,
        f"{id} Batch",
        env_base=env_base,
        name=name,
        job_queue=job_queue,
        command=command or [],
        image=image,
        environment={
            **(environment if environment else {}),
            **default_environment,
        },
        memory=memory,
        vcpus=vcpus,
        gpu=gpu,
        mount_points=mount_points or [],
        volumes=volumes or [],
        platform_capabilities=platform_capabilities,
        job_role_arn=job_role_arn,
    )

    get_response = S3Operation.get_payload(
        self,
        f"{id}",
        bucket_name=bucket_name,
        key=response_key,
    ).to_single_state(
        f"{id} Get Response from S3",
        output_path="$[0]",
    )

    self.definition = start.next(put_payload).next(submit_job).next(get_response)
Functions
DataSyncFragment
DataSyncFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    batch_job_role: Role | str | None = None,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
)

Bases: BatchInvokedBaseFragment, EnvBaseConstructMixins

Sync data from one s3 bucket to another

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset DockerImageAsset | str

Docker image asset or image uri str for the aibs informatics aws lambda

required
batch_job_queue JobQueue | str

Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
scaffolding_bucket Bucket

Primary bucket used for request/response json blobs used in the batch invoked lambda function.

required
batch_job_role Optional[IRole | str]

Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used.

None
mount_point_configs Optional[Iterable[MountPointConfiguration]]

List of mount point configurations to use. These can be overridden in the payload.

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    batch_job_role: iam.Role | str | None = None,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
) -> None:
    """Sync data from one s3 bucket to another


    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri
            str for the aibs informatics aws lambda
        batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        scaffolding_bucket (Bucket): Primary bucket used for request/response json blobs used
            in the batch invoked lambda function.
        batch_job_role (Optional[IRole|str], optional): Optional role to use for the batch job.
            If not provided, the default role created by the batch compute construct will be
            used.
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional):
            List of mount point configurations to use. These can be overridden in the payload.

    """
    super().__init__(scope, id, env_base)

    aibs_informatics_image_uri = (
        aibs_informatics_docker_asset
        if isinstance(aibs_informatics_docker_asset, str)
        else aibs_informatics_docker_asset.image_uri
    )

    self.batch_job_queue_name = (
        batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name
    )

    start = sfn.Pass(
        self,
        "Input Restructure",
        parameters={
            "handler": "aibs_informatics_aws_lambda.handlers.data_sync.data_sync_handler",
            "image": aibs_informatics_image_uri,
            "payload": sfn.JsonPath.object_at("$"),
        },
    )

    self.fragment = BatchInvokedLambdaFunction.with_defaults(
        self,
        "Data Sync",
        env_base=self.env_base,
        name="data-sync",
        job_queue=self.batch_job_queue_name,
        bucket_name=scaffolding_bucket.bucket_name,
        handler_path="$.handler",
        image_path="$.image",
        payload_path="$.payload",
        memory="1024",
        vcpus="1",
        mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
        job_role_arn=(
            batch_job_role if isinstance(batch_job_role, str) else batch_job_role.role_arn
        )
        if batch_job_role
        else None,
        environment={
            EnvBase.ENV_BASE_KEY: self.env_base,
            "AWS_REGION": self.aws_region,
            "AWS_ACCOUNT_ID": self.aws_account,
        },
    )

    self.definition = start.next(self.fragment.to_single_state())
Functions
DistributedDataSyncFragment
DistributedDataSyncFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    batch_job_role: str | Role | None = None,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
)

Bases: BatchInvokedBaseFragment

Sync data from one s3 bucket to another using distributed batch jobs

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset DockerImageAsset | str

Docker image asset or image uri str for the aibs informatics aws lambda

required
batch_job_queue JobQueue | str

Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
scaffolding_bucket Bucket

Primary bucket used for request/response json blobs used in the batch invoked lambda function.

required
batch_job_role Optional[IRole | str]

Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used.

None
mount_point_configs Optional[Iterable[MountPointConfiguration]]

List of mount point configurations to use. These can be overridden in the payload.

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    batch_job_role: str | iam.Role | None = None,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
) -> None:
    """Sync data from one s3 bucket to another using distributed batch jobs

    Args:
        scope (constructs.Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri
            str for the aibs informatics aws lambda
        batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        scaffolding_bucket (Bucket): Primary bucket used for request/response json blobs used
            in the batch invoked lambda function.
        batch_job_role (Optional[IRole|str], optional): Optional role to use for the batch job.
            If not provided, the default role created by the batch compute construct will be
            used.
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional):
            List of mount point configurations to use. These can be overridden in the payload.
    """
    super().__init__(scope, id, env_base)
    start_pass_state = sfn.Pass(
        self,
        f"{id}: Start",
        parameters={
            "request": sfn.JsonPath.object_at("$"),
        },
    )
    prep_batch_sync_task_name = "prep-batch-data-sync-requests"

    prep_batch_sync = BatchInvokedLambdaFunction(
        scope=scope,
        id=f"{id}: Prep Batch Data Sync",
        env_base=env_base,
        name=prep_batch_sync_task_name,
        payload_path="$.request",
        image=(
            aibs_informatics_docker_asset
            if isinstance(aibs_informatics_docker_asset, str)
            else aibs_informatics_docker_asset.image_uri
        ),
        handler="aibs_informatics_aws_lambda.handlers.data_sync.prepare_batch_data_sync_handler",
        job_queue=(
            batch_job_queue
            if isinstance(batch_job_queue, str)
            else batch_job_queue.job_queue_name
        ),
        bucket_name=scaffolding_bucket.bucket_name,
        memory=1024,
        vcpus=1,
        mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
        job_role_arn=(
            batch_job_role if isinstance(batch_job_role, str) else batch_job_role.role_arn
        )
        if batch_job_role
        else None,
    ).enclose(result_path=f"$.tasks.{prep_batch_sync_task_name}.response")

    batch_sync_map_state = sfn.Map(
        self,
        f"{id}: Batch Data Sync: Map Start",
        comment="Runs requests for batch sync in parallel",
        items_path=f"$.tasks.{prep_batch_sync_task_name}.response.requests",
        result_path=sfn.JsonPath.DISCARD,
    )

    batch_sync_map_state.iterator(
        BatchInvokedLambdaFunction(
            scope=scope,
            id=f"{id}: Batch Data Sync",
            env_base=env_base,
            name="batch-data-sync",
            payload_path="$",
            image=(
                aibs_informatics_docker_asset
                if isinstance(aibs_informatics_docker_asset, str)
                else aibs_informatics_docker_asset.image_uri
            ),
            handler="aibs_informatics_aws_lambda.handlers.data_sync.batch_data_sync_handler",
            job_queue=(
                batch_job_queue
                if isinstance(batch_job_queue, str)
                else batch_job_queue.job_queue_name
            ),
            bucket_name=scaffolding_bucket.bucket_name,
            memory=4096,
            vcpus=2,
            mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
            job_role_arn=(
                batch_job_role if isinstance(batch_job_role, str) else batch_job_role.role_arn
            )
            if batch_job_role
            else None,
        )
    )
    # fmt: off
    self.definition = (
        start_pass_state
        .next(prep_batch_sync)
        .next(batch_sync_map_state)
    )
Functions
DemandExecutionFragment
DemandExecutionFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    scaffolding_bucket: Bucket,
    scaffolding_job_queue: JobQueue | str,
    batch_invoked_lambda_state_machine: StateMachine,
    data_sync_state_machine: StateMachine,
    shared_mount_point_config: MountPointConfiguration
    | None,
    scratch_mount_point_config: MountPointConfiguration
    | None,
    tmp_mount_point_config: MountPointConfiguration
    | None = None,
    context_manager_configuration: dict[str, Any]
    | None = None,
    tags: dict[str, str] | None = None,
)

Bases: EnvBaseStateMachineFragment, EnvBaseConstructMixins

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    scaffolding_bucket: s3.Bucket,
    scaffolding_job_queue: batch.JobQueue | str,
    batch_invoked_lambda_state_machine: sfn.StateMachine,
    data_sync_state_machine: sfn.StateMachine,
    shared_mount_point_config: MountPointConfiguration | None,
    scratch_mount_point_config: MountPointConfiguration | None,
    tmp_mount_point_config: MountPointConfiguration | None = None,
    context_manager_configuration: dict[str, Any] | None = None,
    tags: dict[str, str] | None = None,
) -> None:
    super().__init__(scope, id, env_base)

    # ----------------- Validation -----------------
    if not (shared_mount_point_config and scratch_mount_point_config) or not (
        shared_mount_point_config or scratch_mount_point_config
    ):
        raise ValueError(
            "If shared or scratch mount point configurations are provided,"
            "Both shared and scratch mount point configurations must be provided."
        )

    # ------------------- Setup -------------------

    config_scaffolding_path = "config.scaffolding"
    config_setup_results_path = f"{config_scaffolding_path}.setup_results"
    config_batch_args_path = f"{config_setup_results_path}.batch_args"

    config_cleanup_results_path = "tasks.cleanup.cleanup_results"

    # Create common kwargs for the batch invoked lambda functions
    # - specify the bucket name and job queue
    # - specify the mount points and volumes if provided
    batch_invoked_lambda_kwargs: dict[str, Any] = {
        "bucket_name": scaffolding_bucket.bucket_name,
        "image": aibs_informatics_docker_asset
        if isinstance(aibs_informatics_docker_asset, str)
        else aibs_informatics_docker_asset.image_uri,
        "job_queue": scaffolding_job_queue
        if isinstance(scaffolding_job_queue, str)
        else scaffolding_job_queue.job_queue_name,
    }

    # Create request input for the demand scaffolding
    file_system_configurations = {}

    # Update arguments with mount points and volumes if provided
    if shared_mount_point_config or scratch_mount_point_config or tmp_mount_point_config:
        mount_points = []
        volumes = []
        if shared_mount_point_config:
            # update file system configurations for scaffolding function
            file_system_configurations["shared"] = {
                "file_system": shared_mount_point_config.file_system_id,
                "access_point": shared_mount_point_config.access_point_id,
                "container_path": shared_mount_point_config.mount_point,
            }
            # add to mount point and volumes list for batch invoked lambda functions
            mount_points.append(
                shared_mount_point_config.to_batch_mount_point("shared", sfn_format=True)
            )
            volumes.append(
                shared_mount_point_config.to_batch_volume("shared", sfn_format=True)
            )

        if scratch_mount_point_config:
            # update file system configurations for scaffolding function
            file_system_configurations["scratch"] = {
                "file_system": scratch_mount_point_config.file_system_id,
                "access_point": scratch_mount_point_config.access_point_id,
                "container_path": scratch_mount_point_config.mount_point,
            }
            # add to mount point and volumes list for batch invoked lambda functions
            mount_points.append(
                scratch_mount_point_config.to_batch_mount_point("scratch", sfn_format=True)
            )
            volumes.append(
                scratch_mount_point_config.to_batch_volume("scratch", sfn_format=True)
            )
        if tmp_mount_point_config:
            # update file system configurations for scaffolding function
            file_system_configurations["tmp"] = {
                "file_system": tmp_mount_point_config.file_system_id,
                "access_point": tmp_mount_point_config.access_point_id,
                "container_path": tmp_mount_point_config.mount_point,
            }
            # add to mount point and volumes list for batch invoked lambda functions
            mount_points.append(
                tmp_mount_point_config.to_batch_mount_point("tmp", sfn_format=True)
            )
            volumes.append(tmp_mount_point_config.to_batch_volume("tmp", sfn_format=True))

        batch_invoked_lambda_kwargs["mount_points"] = mount_points
        batch_invoked_lambda_kwargs["volumes"] = volumes

    request = {
        "demand_execution": sfn.JsonPath.object_at("$"),
        "file_system_configurations": file_system_configurations,
    }
    if context_manager_configuration:
        request["context_manager_configuration"] = context_manager_configuration

    start_state = sfn.Pass(
        self,
        "Start Demand Batch Task",
        parameters={
            "request": request,
        },
    )

    # normalization steps:
    # - merge build and runtime tags

    norm_parallel = CommonOperation.enclose_chainable(
        self,
        "Normalize Demand Execution",
        self.demand_execution_normalize_tags_chain(tags),
        input_path="$.request.demand_execution",
        result_path="$.request.demand_execution",
    )

    prep_scaffolding_task = CommonOperation.enclose_chainable(
        self,
        "Prepare Demand Scaffolding",
        sfn.Pass(
            self,
            "Pass: Prepare Demand Scaffolding",
            parameters={
                "handler": "aibs_informatics_aws_lambda.handlers.demand.scaffolding.handler",
                "payload": sfn.JsonPath.object_at("$"),
                **batch_invoked_lambda_kwargs,
            },
        ).next(
            sfn_tasks.StepFunctionsStartExecution(
                self,
                "SM: Prepare Demand Scaffolding",
                state_machine=batch_invoked_lambda_state_machine,
                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                associate_with_parent=False,
                input_path="$",
                output_path="$.Output",
            )
        ),
        input_path="$.request",
        result_path=f"$.{config_scaffolding_path}",
    )

    create_def_and_prepare_job_args_task = CommonOperation.enclose_chainable(
        self,
        "Create Definition and Prep Job Args",
        sfn.Pass(
            self,
            "Pass: Create Definition and Prep Job Args",
            parameters={
                "handler": "aibs_informatics_aws_lambda.handlers.batch.create.handler",
                "payload": sfn.JsonPath.object_at("$"),
                **batch_invoked_lambda_kwargs,
            },
        ).next(
            sfn_tasks.StepFunctionsStartExecution(
                self,
                "SM: Create Definition and Prep Job Args",
                state_machine=batch_invoked_lambda_state_machine,
                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                associate_with_parent=False,
                input_path="$",
                output_path="$.Output",
            )
        ),
        input_path="$.batch_create_request",
        result_path="$",
    )

    setup_tasks = (
        sfn.Parallel(
            self,
            "Execution Setup Steps",
            input_path=f"$.{config_scaffolding_path}.setup_configs",
            result_path=f"$.{'.'.join(config_batch_args_path.split('.')[:-1])}",
            result_selector={f"{config_batch_args_path.split('.')[-1]}.$": "$[0]"},
        )
        .branch(create_def_and_prepare_job_args_task)
        .branch(
            sfn.Map(
                self,
                "Transfer Inputs TO Batch Job",
                items_path="$.data_sync_requests",
            ).iterator(
                sfn_tasks.StepFunctionsStartExecution(
                    self,
                    "Transfer Input",
                    state_machine=data_sync_state_machine,
                    integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                    associate_with_parent=False,
                    result_path=sfn.JsonPath.DISCARD,
                )
            )
        )
    )

    execution_task = sfn.CustomState(
        self,
        "Submit Batch Job",
        state_json={
            "Type": "Task",
            "Resource": "arn:aws:states:::batch:submitJob.sync",
            "Parameters": {
                "JobName.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_name"),
                "JobDefinition.$": sfn.JsonPath.string_at(
                    f"$.{config_batch_args_path}.job_definition_arn"
                ),  # noqa: E501
                "JobQueue.$": sfn.JsonPath.string_at(
                    f"$.{config_batch_args_path}.job_queue_arn"
                ),  # noqa: E501
                "Parameters.$": sfn.JsonPath.object_at(
                    f"$.{config_batch_args_path}.parameters"
                ),  # noqa: E501
                "ContainerOverrides.$": sfn.JsonPath.object_at(
                    f"$.{config_batch_args_path}.container_overrides"
                ),  # noqa: E501
                "Tags.$": sfn.JsonPath.object_at(
                    "$.request.demand_execution.execution_metadata.tags"
                ),  # noqa: E501
                "PropagateTags": True,
            },
            "ResultPath": "$.tasks.batch_submit_task",
        },
    )

    cleanup_tasks = sfn.Chain.start(
        sfn.Map(
            self,
            "Transfer Results FROM Batch Job",
            input_path=f"$.{config_scaffolding_path}.cleanup_configs.data_sync_requests",
            result_path=f"$.{config_cleanup_results_path}.transfer_results",
        ).iterator(
            sfn_tasks.StepFunctionsStartExecution(
                self,
                "Transfer Result",
                state_machine=data_sync_state_machine,
                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                associate_with_parent=False,
                result_path=sfn.JsonPath.DISCARD,
            )
        )
    ).next(
        sfn.Choice(self, "Cleanup Choice")
        .when(
            condition=sfn.Condition.is_present(
                f"$.{config_scaffolding_path}.cleanup_configs.remove_data_paths_requests"
            ),
            next=sfn.Chain.start(
                sfn.Map(
                    self,
                    "Map: Cleanup Data Paths",
                    input_path=f"$.{config_scaffolding_path}.cleanup_configs.remove_data_paths_requests",
                    result_path=f"$.{config_cleanup_results_path}.remove_data_paths_results",
                ).iterator(
                    CommonOperation.enclose_chainable(
                        self,
                        "Cleanup Data Path",
                        definition=sfn.Pass(
                            self,
                            "Pass: Cleanup Data Path",
                            parameters={
                                "handler": "aibs_informatics_aws_lambda.handlers.data_sync.remove_data_paths_handler",  # noqa: E501
                                "payload": sfn.JsonPath.object_at("$"),
                                **batch_invoked_lambda_kwargs,
                            },
                        ).next(
                            sfn_tasks.StepFunctionsStartExecution(
                                self,
                                "SM: Cleanup Data Path",
                                state_machine=batch_invoked_lambda_state_machine,
                                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                                associate_with_parent=False,
                                input_path="$",
                                output_path="$.Output",
                            )
                        ),
                    )
                )
            ),
        )
        .otherwise(sfn.Pass(self, "No Data Paths to Cleanup"))
    )

    # fmt: off
    definition = (
        start_state
        .next(norm_parallel)
        .next(prep_scaffolding_task)
        .next(setup_tasks)
        .next(execution_task)
        .next(cleanup_tasks)
    )
    # fmt: on
    self.definition = definition
Functions
demand_execution_normalize_tags_chain
demand_execution_normalize_tags_chain(
    tags: dict[str, str] | None
) -> IChainable

Merge build and runtime tags.

Chain Assumptions/Expectations: - input is the demand execution request - output is the demand execution request with merged tags

If runtime tags are not provided, only build‑time tags are used.

Parameters:

Name Type Description Default
tags Optional[Dict[str, str]]

build‑time tags

required

Returns: sfn.IChainable: the state machine fragment

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
def demand_execution_normalize_tags_chain(self, tags: dict[str, str] | None) -> sfn.IChainable:
    """Merge build and runtime tags.

    Chain Assumptions/Expectations:
        - input is the demand execution request
        - output is the demand execution request with merged tags

    If runtime tags are not provided, only build‑time tags are used.

    Args:
        tags (Optional[Dict[str, str]]): build‑time tags
    Returns:
        sfn.IChainable: the state machine fragment
    """

    static_tags: dict[str, str] = tags or {}  # build‑time default
    execution_tags_path = JsonReferencePath("execution_metadata.tags")

    static_tags = {
        f"{k}.$" if isinstance(v, str) and v.startswith("$") else k: v
        for k, v in static_tags.items()
    }

    merge_tags_chain = CommonOperation.merge_defaults(
        self,
        "Merge Tags",
        input_path="$",
        target_path=execution_tags_path.as_reference,
        defaults=static_tags,
        check_if_target_present=True,
    )
    return merge_tags_chain
CleanFileSystemFragment
CleanFileSystemFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
    memory: int = 1024,
    vcpus: int = 1,
)

Bases: BatchInvokedBaseFragment

Clean up the file system by scanning for outdated data paths and removing them

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset DockerImageAsset | str

Docker image asset or image uri str for the aibs informatics aws lambda

required
batch_job_queue JobQueue | str

Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
primary_bucket Bucket

Primary bucket used for request/response json blobs used in the batch invoked lambda function.

required
mount_point_configs Optional[Iterable[MountPointConfiguration]]

List of mount point configurations to use. These can be overridden in the payload.

None
memory int

memory needed. Defaults to 1024. This memory value is used for both the outdated path scanner and removal of data paths.

1024
vcpus int

vcpus needed. Defaults to 1. This memory value is used for both the outdated path scanner and removal of data paths.

1
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> None:
    """Clean up the file system by scanning for outdated data paths and removing them

    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri
            str for the aibs informatics aws lambda
        batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        primary_bucket (Bucket): Primary bucket used for request/response json blobs used in
            the batch invoked lambda function.
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional):
            List of mount point configurations to use. These can be overridden in the payload.
        memory (int, optional): memory needed. Defaults to 1024.
            This memory value is used for both the outdated path scanner and removal of data paths.
        vcpus (int, optional): vcpus needed. Defaults to 1.
            This memory value is used for both the outdated path scanner and removal of data paths.
    """  # noqa: E501
    super().__init__(scope, id, env_base)

    aibs_informatics_image_uri = (
        aibs_informatics_docker_asset
        if isinstance(aibs_informatics_docker_asset, str)
        else aibs_informatics_docker_asset.image_uri
    )

    start_pass_state = sfn.Pass(
        self,
        "Data Cleanup: Start",
    )

    self.outdated_data_path_scanner = outdated_data_path_scanner_fragment(
        self,
        "Scan for Outdated Data Paths",
        env_base=self.env_base,
        aibs_informatics_docker_asset=aibs_informatics_image_uri,
        batch_job_queue=batch_job_queue,
        scaffolding_bucket=scaffolding_bucket,
        mount_point_configs=mount_point_configs,
        memory=memory,
        vcpus=vcpus,
    )

    self.remove_data_paths = remove_data_paths_fragment(
        self,
        "Remove Data Paths",
        env_base=self.env_base,
        aibs_informatics_docker_asset=aibs_informatics_image_uri,
        batch_job_queue=batch_job_queue,
        scaffolding_bucket=scaffolding_bucket,
        mount_point_configs=mount_point_configs,
        memory=memory,
        vcpus=vcpus,
    )

    # fmt: off
    self.definition = (
        start_pass_state
        .next(self.outdated_data_path_scanner.enclose())
        .next(self.remove_data_paths.enclose())
    )
Functions
Modules
batch
Classes
BatchInvokedLambdaFunction
BatchInvokedLambdaFunction(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    handler: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    command: list[str] | str | None = None,
    environment: Mapping[str, str] | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    gpu: int | str | None = None,
    mount_points: list[MountPointTypeDef]
    | str
    | None = None,
    volumes: list[VolumeTypeDef] | str | None = None,
    mount_point_configs: list[MountPointConfiguration]
    | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]]
    | str
    | None = None,
    job_role_arn: str | None = None,
)

Bases: BatchInvokedBaseFragment, AWSBatchMixins

Invoke a command on image via batch with a payload from s3

This fragment creates a state machine fragment that
  1. Puts a payload to s3
  2. Submits a batch job
  3. Gets the response from s3

The payload is written to s3://///request.json The response is read from s3://///response.json

The batch job will be fed the following environment variables
  • AWS_LAMBDA_FUNCTION_NAME: name of lambda function
  • AWS_LAMBDA_FUNCTION_HANDLER: handler of lambda function
  • AWS_LAMBDA_EVENT_PAYLOAD: The s3 location of the event payload
  • AWS_LAMBDA_EVENT_RESPONSE_LOCATION: The s3 location to write the response to
IMPORTANT
  • Batch job queue / compute environment must have permissions to read/write to the bucket.

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
name str

Name of the lambda function. This can be a reference path (e.g. "$.name")

required
image str

Image URI or name. This can be a reference path (e.g. "$.image")

required
handler str

handler of lambda function. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")

required
job_queue str

Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")

required
bucket_name str

S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")

required
key_prefix str | None

Key prefix to write payload to and read response from. If not provided, scratch/ is used. Can be a reference path (e.g. "$.key_prefix")

None
payload_path str | None

Optionally specify the reference path of the event payload. Defaults to "$".

None
command List[str] | str | None

Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.

None
environment Mapping[str, str] | None

Additional environment variables to specify. These are added to default environment variables.

None
memory int | str | None

Memory in MiB (either int or reference path str). Defaults to None.

None
vcpus int | str | None

Number of vCPUs (either int or reference path str). Defaults to None.

None
gpu int | str | None

Number of GPUs (either int or reference path str). Defaults to None.

None
mount_points List[MountPointTypeDef] | None

List of mount points to add to state machine. Defaults to None.

None
volumes List[VolumeTypeDef] | None

List of volumes to add to state machine. Defaults to None.

None
platform_capabilities List[Literal['EC2', 'FARGATE']] | str | None

platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")

None
job_role_arn str | None

Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    handler: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    command: list[str] | str | None = None,
    environment: Mapping[str, str] | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    gpu: int | str | None = None,
    mount_points: list[MountPointTypeDef] | str | None = None,
    volumes: list[VolumeTypeDef] | str | None = None,
    mount_point_configs: list[MountPointConfiguration] | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]] | str | None = None,
    job_role_arn: str | None = None,
) -> None:
    """Invoke a command on image via batch with a payload from s3

    This fragment creates a state machine fragment that:
        1. Puts a payload to s3
        2. Submits a batch job
        3. Gets the response from s3

    The payload is written to s3://<bucket_name>/<key_prefix>/<execution_name>/request.json
    The response is read from s3://<bucket_name>/<key_prefix>/<execution_name>/response.json

    The batch job will be fed the following environment variables:
        - AWS_LAMBDA_FUNCTION_NAME: name of lambda function
        - AWS_LAMBDA_FUNCTION_HANDLER: handler of lambda function
        - AWS_LAMBDA_EVENT_PAYLOAD: The s3 location of the event payload
        - AWS_LAMBDA_EVENT_RESPONSE_LOCATION: The s3 location to write the response to

    IMPORTANT:
        - Batch job queue / compute environment must have permissions to read/write to the bucket.


    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        name (str): Name of the lambda function. This can be a reference path (e.g. "$.name")
        image (str): Image URI or name. This can be a reference path (e.g. "$.image")
        handler (str): handler of lambda function. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")
        job_queue (str): Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")
        bucket_name (str): S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")
        key_prefix (str | None): Key prefix to write payload to and read response from. If not provided, `scratch/` is used. Can be a reference path (e.g. "$.key_prefix")
        payload_path (str | None): Optionally specify the reference path of the event payload. Defaults to "$".
        command (List[str] | str | None): Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.
        environment (Mapping[str, str] | None): Additional environment variables to specify. These are added to default environment variables.
        memory (int | str | None): Memory in MiB (either int or reference path str). Defaults to None.
        vcpus (int | str | None): Number of vCPUs (either int or reference path str). Defaults to None.
        gpu (int | str | None): Number of GPUs (either int or reference path str). Defaults to None.
        mount_points (List[MountPointTypeDef] | None): List of mount points to add to state machine. Defaults to None.
        volumes (List[VolumeTypeDef] | None): List of volumes to add to state machine. Defaults to None.
        platform_capabilities (List[Literal["EC2", "FARGATE"]] | str | None): platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")
        job_role_arn (str | None): Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")
    """  # noqa: E501
    super().__init__(scope, id, env_base)
    key_prefix = key_prefix or S3_SCRATCH_KEY_PREFIX

    request_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/request.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )
    response_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/response.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )

    start = sfn.Pass(
        self,
        f"{id} Prep S3 Keys",
        parameters={
            "task_id": sfn.JsonPath.uuid(),
        },
        result_path="$.taskResult.prep",
    )

    if mount_point_configs:
        if mount_points or volumes:
            raise ValueError("Cannot specify both mount_point_configs and mount_points")
        mount_points, volumes = self.convert_to_mount_point_and_volumes(mount_point_configs)

    put_payload = S3Operation.put_payload(
        self,
        f"{id} Put Request to S3",
        payload=payload_path or sfn.JsonPath.entire_payload,
        bucket_name=bucket_name,
        key=request_key,
        result_path="$.taskResult.put",
    )

    default_environment = {
        AWS_LAMBDA_FUNCTION_NAME_KEY: name,
        AWS_LAMBDA_FUNCTION_HANDLER_KEY: handler,
        AWS_LAMBDA_EVENT_PAYLOAD_KEY: sfn.JsonPath.format(
            "s3://{}/{}",
            sfn.JsonPath.string_at("$.taskResult.put.Bucket"),
            sfn.JsonPath.string_at("$.taskResult.put.Key"),
        ),
        AWS_LAMBDA_EVENT_RESPONSE_LOCATION_KEY: sfn.JsonPath.format(
            "s3://{}/{}",
            bucket_name,
            response_key,
        ),
        EnvBase.ENV_BASE_KEY: self.env_base,
        "AWS_REGION": self.aws_region,
        "AWS_ACCOUNT_ID": self.aws_account,
    }

    submit_job = SubmitJobFragment(
        self,
        f"{id} Batch",
        env_base=env_base,
        name=name,
        job_queue=job_queue,
        command=command or [],
        image=image,
        environment={
            **(environment if environment else {}),
            **default_environment,
        },
        memory=memory,
        vcpus=vcpus,
        gpu=gpu,
        mount_points=mount_points or [],
        volumes=volumes or [],
        platform_capabilities=platform_capabilities,
        job_role_arn=job_role_arn,
    )

    get_response = S3Operation.get_payload(
        self,
        f"{id}",
        bucket_name=bucket_name,
        key=response_key,
    ).to_single_state(
        f"{id} Get Response from S3",
        output_path="$[0]",
    )

    self.definition = start.next(put_payload).next(submit_job).next(get_response)
Functions
BatchInvokedExecutorFragment
BatchInvokedExecutorFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    executor: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    environment: Mapping[str, str] | str | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    mount_point_configs: list[MountPointConfiguration]
    | None = None,
    mount_points: list[MountPointTypeDef] | None = None,
    volumes: list[VolumeTypeDef] | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]]
    | str
    | None = None,
    job_role_arn: str | None = None,
)

Bases: BatchInvokedBaseFragment, AWSBatchMixins

Invoke an executor in an image via batch with a payload from s3

This targets any subclassing of aibs_informatics_core.executors.base.ExecutorBase - https://github.com/AllenInstitute/aibs-informatics-core/blob/main/src/aibs_informatics_core/executors/base.py

This fragment creates a state machine fragment that
  1. Puts a payload to s3
  2. Submits a batch job
  3. Gets the response from s3

The payload is written to s3://///request.json The response is read from s3://///response.json

IMPORTANT
  • Batch job queue / compute environment must have permissions to read/write to the bucket.

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
name str

Name of the lambda function. This can be a reference path (e.g. "$.name")

required
image str

Image URI or name. This can be a reference path (e.g. "$.image")

required
executor str

qualified name of executor class. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")

required
job_queue str

Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")

required
bucket_name str

S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")

required
key_prefix str | None

Key prefix to write payload to and read response from. If not provided, scratch/ is used. Can be a reference path (e.g. "$.key_prefix")

None
payload_path str | None

Optionally specify the reference path of the event payload. Defaults to "$".

None
command List[str] | str | None

Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.

required
environment Mapping[str, str] | str | None

environment variables to specify. This can be a reference path (e.g. "$.environment")

None
memory int | str | None

Memory in MiB (either int or reference path str). Defaults to None.

None
vcpus int | str | None

Number of vCPUs (either int or reference path str). Defaults to None.

None
mount_points List[MountPointTypeDef] | None

List of mount points to add to state machine. Defaults to None.

None
volumes List[VolumeTypeDef] | None

List of volumes to add to state machine. Defaults to None.

None
platform_capabilities List[Literal['EC2', 'FARGATE']] | str | None

platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")

None
job_role_arn str | None

Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/batch.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    name: str,
    image: str,
    executor: str,
    job_queue: str,
    bucket_name: str,
    key_prefix: str | None = None,
    payload_path: str | None = None,
    environment: Mapping[str, str] | str | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    mount_point_configs: list[MountPointConfiguration] | None = None,
    mount_points: list[MountPointTypeDef] | None = None,
    volumes: list[VolumeTypeDef] | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]] | str | None = None,
    job_role_arn: str | None = None,
) -> None:
    """Invoke an executor in an image via batch with a payload from s3

    This targets any subclassing of `aibs_informatics_core.executors.base.ExecutorBase`
    - https://github.com/AllenInstitute/aibs-informatics-core/blob/main/src/aibs_informatics_core/executors/base.py


    This fragment creates a state machine fragment that:
        1. Puts a payload to s3
        2. Submits a batch job
        3. Gets the response from s3

    The payload is written to s3://<bucket_name>/<key_prefix>/<execution_name>/request.json
    The response is read from s3://<bucket_name>/<key_prefix>/<execution_name>/response.json

    IMPORTANT:
        - Batch job queue / compute environment must have permissions to read/write to the bucket.

    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        name (str): Name of the lambda function. This can be a reference path (e.g. "$.name")
        image (str): Image URI or name. This can be a reference path (e.g. "$.image")
        executor (str): qualified name of executor class. This should describe a fully qualified path to function handler. This can be a reference path (e.g. "$.handler")
        job_queue (str): Job queue to submit job to. This can be a reference path (e.g. "$.job_queue")
        bucket_name (str): S3 Bucket name to write payload to and read response from. This can be a reference path (e.g. "$.bucket_name")
        key_prefix (str | None): Key prefix to write payload to and read response from. If not provided, `scratch/` is used. Can be a reference path (e.g. "$.key_prefix")
        payload_path (str | None): Optionally specify the reference path of the event payload. Defaults to "$".
        command (List[str] | str | None): Command to run in container. Can be a reference path (e.g. "$.command"). If unspecified, the container's CMD is used.
        environment (Mapping[str, str] | str | None): environment variables to specify. This can be a reference path (e.g. "$.environment")
        memory (int | str | None): Memory in MiB (either int or reference path str). Defaults to None.
        vcpus (int | str | None): Number of vCPUs (either int or reference path str). Defaults to None.
        mount_points (List[MountPointTypeDef] | None): List of mount points to add to state machine. Defaults to None.
        volumes (List[VolumeTypeDef] | None): List of volumes to add to state machine. Defaults to None.
        platform_capabilities (List[Literal["EC2", "FARGATE"]] | str | None): platform capabilities to use. This can be a reference path (e.g. "$.platform_capabilities")
        job_role_arn (str | None): Job role arn to use for the job. This can be a reference path (e.g. "$.job_role_arn")
    """  # noqa: E501
    super().__init__(scope, id, env_base)
    key_prefix = key_prefix or S3_SCRATCH_KEY_PREFIX

    request_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/request.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )
    response_key = sfn.JsonPath.format(
        f"{key_prefix}{{}}/{{}}/response.json",
        sfn.JsonPath.execution_name,
        sfn.JsonPath.string_at("$.taskResult.prep.task_id"),
    )

    start = sfn.Pass(
        self,
        f"{id} Prep S3 Keys",
        parameters={
            "task_id": sfn.JsonPath.uuid(),
        },
        result_path="$.taskResult.prep",
    )

    if mount_point_configs:
        if mount_points or volumes:
            raise ValueError("Cannot specify both mount_point_configs and mount_points")
        mount_points, volumes = self.convert_to_mount_point_and_volumes(mount_point_configs)

    put_payload = S3Operation.put_payload(
        self,
        f"{id} Put Request to S3",
        payload=payload_path or sfn.JsonPath.entire_payload,
        bucket_name=bucket_name,
        key=request_key,
        result_path="$.taskResult.put",
    )

    submit_job = SubmitJobFragment(
        self,
        id + "Batch",
        env_base=env_base,
        name=name,
        job_queue=job_queue,
        command=[
            "run_cli_executor",
            "--executor",
            executor,
            "--input",
            sfn.JsonPath.format("s3://{}/{}", "$.Bucket", "$.Key"),
            "--output-location",
            sfn.JsonPath.format("s3://{}/{}", bucket_name, response_key),
        ],
        image=image,
        environment=environment,
        memory=memory,
        vcpus=vcpus,
        mount_points=mount_points or [],
        volumes=volumes or [],
        platform_capabilities=platform_capabilities,
        job_role_arn=job_role_arn,
    )

    get_response = S3Operation.get_payload(
        self,
        f"{id}",
        bucket_name=bucket_name,
        key=response_key,
    ).to_single_state(
        f"{id} Get Response from S3",
        output_path="$[0]",
    )

    self.definition = start.next(put_payload).next(submit_job).next(get_response)
Functions
Functions
data_sync
Classes
DataSyncFragment
DataSyncFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    batch_job_role: Role | str | None = None,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
)

Bases: BatchInvokedBaseFragment, EnvBaseConstructMixins

Sync data from one s3 bucket to another

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset DockerImageAsset | str

Docker image asset or image uri str for the aibs informatics aws lambda

required
batch_job_queue JobQueue | str

Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
scaffolding_bucket Bucket

Primary bucket used for request/response json blobs used in the batch invoked lambda function.

required
batch_job_role Optional[IRole | str]

Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used.

None
mount_point_configs Optional[Iterable[MountPointConfiguration]]

List of mount point configurations to use. These can be overridden in the payload.

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    batch_job_role: iam.Role | str | None = None,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
) -> None:
    """Sync data from one s3 bucket to another


    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri
            str for the aibs informatics aws lambda
        batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        scaffolding_bucket (Bucket): Primary bucket used for request/response json blobs used
            in the batch invoked lambda function.
        batch_job_role (Optional[IRole|str], optional): Optional role to use for the batch job.
            If not provided, the default role created by the batch compute construct will be
            used.
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional):
            List of mount point configurations to use. These can be overridden in the payload.

    """
    super().__init__(scope, id, env_base)

    aibs_informatics_image_uri = (
        aibs_informatics_docker_asset
        if isinstance(aibs_informatics_docker_asset, str)
        else aibs_informatics_docker_asset.image_uri
    )

    self.batch_job_queue_name = (
        batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name
    )

    start = sfn.Pass(
        self,
        "Input Restructure",
        parameters={
            "handler": "aibs_informatics_aws_lambda.handlers.data_sync.data_sync_handler",
            "image": aibs_informatics_image_uri,
            "payload": sfn.JsonPath.object_at("$"),
        },
    )

    self.fragment = BatchInvokedLambdaFunction.with_defaults(
        self,
        "Data Sync",
        env_base=self.env_base,
        name="data-sync",
        job_queue=self.batch_job_queue_name,
        bucket_name=scaffolding_bucket.bucket_name,
        handler_path="$.handler",
        image_path="$.image",
        payload_path="$.payload",
        memory="1024",
        vcpus="1",
        mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
        job_role_arn=(
            batch_job_role if isinstance(batch_job_role, str) else batch_job_role.role_arn
        )
        if batch_job_role
        else None,
        environment={
            EnvBase.ENV_BASE_KEY: self.env_base,
            "AWS_REGION": self.aws_region,
            "AWS_ACCOUNT_ID": self.aws_account,
        },
    )

    self.definition = start.next(self.fragment.to_single_state())
Functions
DistributedDataSyncFragment
DistributedDataSyncFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    batch_job_role: str | Role | None = None,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
)

Bases: BatchInvokedBaseFragment

Sync data from one s3 bucket to another using distributed batch jobs

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset DockerImageAsset | str

Docker image asset or image uri str for the aibs informatics aws lambda

required
batch_job_queue JobQueue | str

Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
scaffolding_bucket Bucket

Primary bucket used for request/response json blobs used in the batch invoked lambda function.

required
batch_job_role Optional[IRole | str]

Optional role to use for the batch job. If not provided, the default role created by the batch compute construct will be used.

None
mount_point_configs Optional[Iterable[MountPointConfiguration]]

List of mount point configurations to use. These can be overridden in the payload.

None
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/data_sync.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    batch_job_role: str | iam.Role | None = None,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
) -> None:
    """Sync data from one s3 bucket to another using distributed batch jobs

    Args:
        scope (constructs.Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri
            str for the aibs informatics aws lambda
        batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        scaffolding_bucket (Bucket): Primary bucket used for request/response json blobs used
            in the batch invoked lambda function.
        batch_job_role (Optional[IRole|str], optional): Optional role to use for the batch job.
            If not provided, the default role created by the batch compute construct will be
            used.
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional):
            List of mount point configurations to use. These can be overridden in the payload.
    """
    super().__init__(scope, id, env_base)
    start_pass_state = sfn.Pass(
        self,
        f"{id}: Start",
        parameters={
            "request": sfn.JsonPath.object_at("$"),
        },
    )
    prep_batch_sync_task_name = "prep-batch-data-sync-requests"

    prep_batch_sync = BatchInvokedLambdaFunction(
        scope=scope,
        id=f"{id}: Prep Batch Data Sync",
        env_base=env_base,
        name=prep_batch_sync_task_name,
        payload_path="$.request",
        image=(
            aibs_informatics_docker_asset
            if isinstance(aibs_informatics_docker_asset, str)
            else aibs_informatics_docker_asset.image_uri
        ),
        handler="aibs_informatics_aws_lambda.handlers.data_sync.prepare_batch_data_sync_handler",
        job_queue=(
            batch_job_queue
            if isinstance(batch_job_queue, str)
            else batch_job_queue.job_queue_name
        ),
        bucket_name=scaffolding_bucket.bucket_name,
        memory=1024,
        vcpus=1,
        mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
        job_role_arn=(
            batch_job_role if isinstance(batch_job_role, str) else batch_job_role.role_arn
        )
        if batch_job_role
        else None,
    ).enclose(result_path=f"$.tasks.{prep_batch_sync_task_name}.response")

    batch_sync_map_state = sfn.Map(
        self,
        f"{id}: Batch Data Sync: Map Start",
        comment="Runs requests for batch sync in parallel",
        items_path=f"$.tasks.{prep_batch_sync_task_name}.response.requests",
        result_path=sfn.JsonPath.DISCARD,
    )

    batch_sync_map_state.iterator(
        BatchInvokedLambdaFunction(
            scope=scope,
            id=f"{id}: Batch Data Sync",
            env_base=env_base,
            name="batch-data-sync",
            payload_path="$",
            image=(
                aibs_informatics_docker_asset
                if isinstance(aibs_informatics_docker_asset, str)
                else aibs_informatics_docker_asset.image_uri
            ),
            handler="aibs_informatics_aws_lambda.handlers.data_sync.batch_data_sync_handler",
            job_queue=(
                batch_job_queue
                if isinstance(batch_job_queue, str)
                else batch_job_queue.job_queue_name
            ),
            bucket_name=scaffolding_bucket.bucket_name,
            memory=4096,
            vcpus=2,
            mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
            job_role_arn=(
                batch_job_role if isinstance(batch_job_role, str) else batch_job_role.role_arn
            )
            if batch_job_role
            else None,
        )
    )
    # fmt: off
    self.definition = (
        start_pass_state
        .next(prep_batch_sync)
        .next(batch_sync_map_state)
    )
Functions
Functions
demand_execution
Classes
DemandExecutionFragment
DemandExecutionFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    scaffolding_bucket: Bucket,
    scaffolding_job_queue: JobQueue | str,
    batch_invoked_lambda_state_machine: StateMachine,
    data_sync_state_machine: StateMachine,
    shared_mount_point_config: MountPointConfiguration
    | None,
    scratch_mount_point_config: MountPointConfiguration
    | None,
    tmp_mount_point_config: MountPointConfiguration
    | None = None,
    context_manager_configuration: dict[str, Any]
    | None = None,
    tags: dict[str, str] | None = None,
)

Bases: EnvBaseStateMachineFragment, EnvBaseConstructMixins

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    scaffolding_bucket: s3.Bucket,
    scaffolding_job_queue: batch.JobQueue | str,
    batch_invoked_lambda_state_machine: sfn.StateMachine,
    data_sync_state_machine: sfn.StateMachine,
    shared_mount_point_config: MountPointConfiguration | None,
    scratch_mount_point_config: MountPointConfiguration | None,
    tmp_mount_point_config: MountPointConfiguration | None = None,
    context_manager_configuration: dict[str, Any] | None = None,
    tags: dict[str, str] | None = None,
) -> None:
    super().__init__(scope, id, env_base)

    # ----------------- Validation -----------------
    if not (shared_mount_point_config and scratch_mount_point_config) or not (
        shared_mount_point_config or scratch_mount_point_config
    ):
        raise ValueError(
            "If shared or scratch mount point configurations are provided,"
            "Both shared and scratch mount point configurations must be provided."
        )

    # ------------------- Setup -------------------

    config_scaffolding_path = "config.scaffolding"
    config_setup_results_path = f"{config_scaffolding_path}.setup_results"
    config_batch_args_path = f"{config_setup_results_path}.batch_args"

    config_cleanup_results_path = "tasks.cleanup.cleanup_results"

    # Create common kwargs for the batch invoked lambda functions
    # - specify the bucket name and job queue
    # - specify the mount points and volumes if provided
    batch_invoked_lambda_kwargs: dict[str, Any] = {
        "bucket_name": scaffolding_bucket.bucket_name,
        "image": aibs_informatics_docker_asset
        if isinstance(aibs_informatics_docker_asset, str)
        else aibs_informatics_docker_asset.image_uri,
        "job_queue": scaffolding_job_queue
        if isinstance(scaffolding_job_queue, str)
        else scaffolding_job_queue.job_queue_name,
    }

    # Create request input for the demand scaffolding
    file_system_configurations = {}

    # Update arguments with mount points and volumes if provided
    if shared_mount_point_config or scratch_mount_point_config or tmp_mount_point_config:
        mount_points = []
        volumes = []
        if shared_mount_point_config:
            # update file system configurations for scaffolding function
            file_system_configurations["shared"] = {
                "file_system": shared_mount_point_config.file_system_id,
                "access_point": shared_mount_point_config.access_point_id,
                "container_path": shared_mount_point_config.mount_point,
            }
            # add to mount point and volumes list for batch invoked lambda functions
            mount_points.append(
                shared_mount_point_config.to_batch_mount_point("shared", sfn_format=True)
            )
            volumes.append(
                shared_mount_point_config.to_batch_volume("shared", sfn_format=True)
            )

        if scratch_mount_point_config:
            # update file system configurations for scaffolding function
            file_system_configurations["scratch"] = {
                "file_system": scratch_mount_point_config.file_system_id,
                "access_point": scratch_mount_point_config.access_point_id,
                "container_path": scratch_mount_point_config.mount_point,
            }
            # add to mount point and volumes list for batch invoked lambda functions
            mount_points.append(
                scratch_mount_point_config.to_batch_mount_point("scratch", sfn_format=True)
            )
            volumes.append(
                scratch_mount_point_config.to_batch_volume("scratch", sfn_format=True)
            )
        if tmp_mount_point_config:
            # update file system configurations for scaffolding function
            file_system_configurations["tmp"] = {
                "file_system": tmp_mount_point_config.file_system_id,
                "access_point": tmp_mount_point_config.access_point_id,
                "container_path": tmp_mount_point_config.mount_point,
            }
            # add to mount point and volumes list for batch invoked lambda functions
            mount_points.append(
                tmp_mount_point_config.to_batch_mount_point("tmp", sfn_format=True)
            )
            volumes.append(tmp_mount_point_config.to_batch_volume("tmp", sfn_format=True))

        batch_invoked_lambda_kwargs["mount_points"] = mount_points
        batch_invoked_lambda_kwargs["volumes"] = volumes

    request = {
        "demand_execution": sfn.JsonPath.object_at("$"),
        "file_system_configurations": file_system_configurations,
    }
    if context_manager_configuration:
        request["context_manager_configuration"] = context_manager_configuration

    start_state = sfn.Pass(
        self,
        "Start Demand Batch Task",
        parameters={
            "request": request,
        },
    )

    # normalization steps:
    # - merge build and runtime tags

    norm_parallel = CommonOperation.enclose_chainable(
        self,
        "Normalize Demand Execution",
        self.demand_execution_normalize_tags_chain(tags),
        input_path="$.request.demand_execution",
        result_path="$.request.demand_execution",
    )

    prep_scaffolding_task = CommonOperation.enclose_chainable(
        self,
        "Prepare Demand Scaffolding",
        sfn.Pass(
            self,
            "Pass: Prepare Demand Scaffolding",
            parameters={
                "handler": "aibs_informatics_aws_lambda.handlers.demand.scaffolding.handler",
                "payload": sfn.JsonPath.object_at("$"),
                **batch_invoked_lambda_kwargs,
            },
        ).next(
            sfn_tasks.StepFunctionsStartExecution(
                self,
                "SM: Prepare Demand Scaffolding",
                state_machine=batch_invoked_lambda_state_machine,
                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                associate_with_parent=False,
                input_path="$",
                output_path="$.Output",
            )
        ),
        input_path="$.request",
        result_path=f"$.{config_scaffolding_path}",
    )

    create_def_and_prepare_job_args_task = CommonOperation.enclose_chainable(
        self,
        "Create Definition and Prep Job Args",
        sfn.Pass(
            self,
            "Pass: Create Definition and Prep Job Args",
            parameters={
                "handler": "aibs_informatics_aws_lambda.handlers.batch.create.handler",
                "payload": sfn.JsonPath.object_at("$"),
                **batch_invoked_lambda_kwargs,
            },
        ).next(
            sfn_tasks.StepFunctionsStartExecution(
                self,
                "SM: Create Definition and Prep Job Args",
                state_machine=batch_invoked_lambda_state_machine,
                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                associate_with_parent=False,
                input_path="$",
                output_path="$.Output",
            )
        ),
        input_path="$.batch_create_request",
        result_path="$",
    )

    setup_tasks = (
        sfn.Parallel(
            self,
            "Execution Setup Steps",
            input_path=f"$.{config_scaffolding_path}.setup_configs",
            result_path=f"$.{'.'.join(config_batch_args_path.split('.')[:-1])}",
            result_selector={f"{config_batch_args_path.split('.')[-1]}.$": "$[0]"},
        )
        .branch(create_def_and_prepare_job_args_task)
        .branch(
            sfn.Map(
                self,
                "Transfer Inputs TO Batch Job",
                items_path="$.data_sync_requests",
            ).iterator(
                sfn_tasks.StepFunctionsStartExecution(
                    self,
                    "Transfer Input",
                    state_machine=data_sync_state_machine,
                    integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                    associate_with_parent=False,
                    result_path=sfn.JsonPath.DISCARD,
                )
            )
        )
    )

    execution_task = sfn.CustomState(
        self,
        "Submit Batch Job",
        state_json={
            "Type": "Task",
            "Resource": "arn:aws:states:::batch:submitJob.sync",
            "Parameters": {
                "JobName.$": sfn.JsonPath.string_at(f"$.{config_batch_args_path}.job_name"),
                "JobDefinition.$": sfn.JsonPath.string_at(
                    f"$.{config_batch_args_path}.job_definition_arn"
                ),  # noqa: E501
                "JobQueue.$": sfn.JsonPath.string_at(
                    f"$.{config_batch_args_path}.job_queue_arn"
                ),  # noqa: E501
                "Parameters.$": sfn.JsonPath.object_at(
                    f"$.{config_batch_args_path}.parameters"
                ),  # noqa: E501
                "ContainerOverrides.$": sfn.JsonPath.object_at(
                    f"$.{config_batch_args_path}.container_overrides"
                ),  # noqa: E501
                "Tags.$": sfn.JsonPath.object_at(
                    "$.request.demand_execution.execution_metadata.tags"
                ),  # noqa: E501
                "PropagateTags": True,
            },
            "ResultPath": "$.tasks.batch_submit_task",
        },
    )

    cleanup_tasks = sfn.Chain.start(
        sfn.Map(
            self,
            "Transfer Results FROM Batch Job",
            input_path=f"$.{config_scaffolding_path}.cleanup_configs.data_sync_requests",
            result_path=f"$.{config_cleanup_results_path}.transfer_results",
        ).iterator(
            sfn_tasks.StepFunctionsStartExecution(
                self,
                "Transfer Result",
                state_machine=data_sync_state_machine,
                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                associate_with_parent=False,
                result_path=sfn.JsonPath.DISCARD,
            )
        )
    ).next(
        sfn.Choice(self, "Cleanup Choice")
        .when(
            condition=sfn.Condition.is_present(
                f"$.{config_scaffolding_path}.cleanup_configs.remove_data_paths_requests"
            ),
            next=sfn.Chain.start(
                sfn.Map(
                    self,
                    "Map: Cleanup Data Paths",
                    input_path=f"$.{config_scaffolding_path}.cleanup_configs.remove_data_paths_requests",
                    result_path=f"$.{config_cleanup_results_path}.remove_data_paths_results",
                ).iterator(
                    CommonOperation.enclose_chainable(
                        self,
                        "Cleanup Data Path",
                        definition=sfn.Pass(
                            self,
                            "Pass: Cleanup Data Path",
                            parameters={
                                "handler": "aibs_informatics_aws_lambda.handlers.data_sync.remove_data_paths_handler",  # noqa: E501
                                "payload": sfn.JsonPath.object_at("$"),
                                **batch_invoked_lambda_kwargs,
                            },
                        ).next(
                            sfn_tasks.StepFunctionsStartExecution(
                                self,
                                "SM: Cleanup Data Path",
                                state_machine=batch_invoked_lambda_state_machine,
                                integration_pattern=sfn.IntegrationPattern.RUN_JOB,
                                associate_with_parent=False,
                                input_path="$",
                                output_path="$.Output",
                            )
                        ),
                    )
                )
            ),
        )
        .otherwise(sfn.Pass(self, "No Data Paths to Cleanup"))
    )

    # fmt: off
    definition = (
        start_state
        .next(norm_parallel)
        .next(prep_scaffolding_task)
        .next(setup_tasks)
        .next(execution_task)
        .next(cleanup_tasks)
    )
    # fmt: on
    self.definition = definition
Functions
demand_execution_normalize_tags_chain
demand_execution_normalize_tags_chain(
    tags: dict[str, str] | None
) -> IChainable

Merge build and runtime tags.

Chain Assumptions/Expectations: - input is the demand execution request - output is the demand execution request with merged tags

If runtime tags are not provided, only build‑time tags are used.

Parameters:

Name Type Description Default
tags Optional[Dict[str, str]]

build‑time tags

required

Returns: sfn.IChainable: the state machine fragment

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/demand_execution.py
def demand_execution_normalize_tags_chain(self, tags: dict[str, str] | None) -> sfn.IChainable:
    """Merge build and runtime tags.

    Chain Assumptions/Expectations:
        - input is the demand execution request
        - output is the demand execution request with merged tags

    If runtime tags are not provided, only build‑time tags are used.

    Args:
        tags (Optional[Dict[str, str]]): build‑time tags
    Returns:
        sfn.IChainable: the state machine fragment
    """

    static_tags: dict[str, str] = tags or {}  # build‑time default
    execution_tags_path = JsonReferencePath("execution_metadata.tags")

    static_tags = {
        f"{k}.$" if isinstance(v, str) and v.startswith("$") else k: v
        for k, v in static_tags.items()
    }

    merge_tags_chain = CommonOperation.merge_defaults(
        self,
        "Merge Tags",
        input_path="$",
        target_path=execution_tags_path.as_reference,
        defaults=static_tags,
        check_if_target_present=True,
    )
    return merge_tags_chain
Functions
efs
Classes
CleanFileSystemFragment
CleanFileSystemFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
    memory: int = 1024,
    vcpus: int = 1,
)

Bases: BatchInvokedBaseFragment

Clean up the file system by scanning for outdated data paths and removing them

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

id

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset DockerImageAsset | str

Docker image asset or image uri str for the aibs informatics aws lambda

required
batch_job_queue JobQueue | str

Default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
primary_bucket Bucket

Primary bucket used for request/response json blobs used in the batch invoked lambda function.

required
mount_point_configs Optional[Iterable[MountPointConfiguration]]

List of mount point configurations to use. These can be overridden in the payload.

None
memory int

memory needed. Defaults to 1024. This memory value is used for both the outdated path scanner and removal of data paths.

1024
vcpus int

vcpus needed. Defaults to 1. This memory value is used for both the outdated path scanner and removal of data paths.

1
Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> None:
    """Clean up the file system by scanning for outdated data paths and removing them

    Args:
        scope (Construct): construct scope
        id (str): id
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (DockerImageAsset|str): Docker image asset or image uri
            str for the aibs informatics aws lambda
        batch_job_queue (JobQueue|str): Default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        primary_bucket (Bucket): Primary bucket used for request/response json blobs used in
            the batch invoked lambda function.
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional):
            List of mount point configurations to use. These can be overridden in the payload.
        memory (int, optional): memory needed. Defaults to 1024.
            This memory value is used for both the outdated path scanner and removal of data paths.
        vcpus (int, optional): vcpus needed. Defaults to 1.
            This memory value is used for both the outdated path scanner and removal of data paths.
    """  # noqa: E501
    super().__init__(scope, id, env_base)

    aibs_informatics_image_uri = (
        aibs_informatics_docker_asset
        if isinstance(aibs_informatics_docker_asset, str)
        else aibs_informatics_docker_asset.image_uri
    )

    start_pass_state = sfn.Pass(
        self,
        "Data Cleanup: Start",
    )

    self.outdated_data_path_scanner = outdated_data_path_scanner_fragment(
        self,
        "Scan for Outdated Data Paths",
        env_base=self.env_base,
        aibs_informatics_docker_asset=aibs_informatics_image_uri,
        batch_job_queue=batch_job_queue,
        scaffolding_bucket=scaffolding_bucket,
        mount_point_configs=mount_point_configs,
        memory=memory,
        vcpus=vcpus,
    )

    self.remove_data_paths = remove_data_paths_fragment(
        self,
        "Remove Data Paths",
        env_base=self.env_base,
        aibs_informatics_docker_asset=aibs_informatics_image_uri,
        batch_job_queue=batch_job_queue,
        scaffolding_bucket=scaffolding_bucket,
        mount_point_configs=mount_point_configs,
        memory=memory,
        vcpus=vcpus,
    )

    # fmt: off
    self.definition = (
        start_pass_state
        .next(self.outdated_data_path_scanner.enclose())
        .next(self.remove_data_paths.enclose())
    )
Functions
Functions
get_data_path_stats_fragment
get_data_path_stats_fragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> BatchInvokedLambdaFunction

Returns a BatchInvokedLambdaFunction fragment for getting data path stats of EFS/S3 path

Parameters:

Name Type Description Default
scope Construct

scope

required
id str

id of the fragment

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset Union[DockerImageAsset, str]

docker image asset or image uri that has the get_data_path_stats_handler function

required
batch_job_queue Union[JobQueue, str]

default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
scaffolding_bucket Bucket

primary bucket used for request/response json blobs used in

required
mount_point_configs Optional[Iterable[MountPointConfiguration]]

Default EFS volumes to mount. Defaults to None.

None
memory int

memory needed. Defaults to 1024.

1024
vcpus int

vcpus needed. Defaults to 1.

1

Returns:

Type Description
BatchInvokedLambdaFunction

BatchInvokedLambdaFunction fragment for getting data path stats

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
def get_data_path_stats_fragment(
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> BatchInvokedLambdaFunction:
    """Returns a BatchInvokedLambdaFunction fragment for getting data path stats of EFS/S3 path

    Args:
        scope (constructs.Construct): scope
        id (str): id of the fragment
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri
            that has the get_data_path_stats_handler function
        batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount.
            Defaults to None.
        memory (int, optional): memory needed. Defaults to 1024.
        vcpus (int, optional): vcpus needed. Defaults to 1.

    Returns:
        BatchInvokedLambdaFunction fragment for getting data path stats
    """  # noqa: E501
    fragment = BatchInvokedLambdaFunction(
        scope=scope,
        id=id,
        env_base=env_base,
        name="get-data-path-stats",
        image=(
            aibs_informatics_docker_asset
            if isinstance(aibs_informatics_docker_asset, str)
            else aibs_informatics_docker_asset.image_uri
        ),
        handler="aibs_informatics_aws_lambda.handlers.data_sync.get_data_path_stats_handler",
        job_queue=(
            batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name
        ),
        bucket_name=scaffolding_bucket.bucket_name,
        memory=memory,
        vcpus=vcpus,
        mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
    )
    return fragment
outdated_data_path_scanner_fragment
outdated_data_path_scanner_fragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> BatchInvokedLambdaFunction

Returns a BatchInvokedLambdaFunction fragment for scanning outdated data paths of EFS/S3 path root

Parameters:

Name Type Description Default
scope Construct

scope

required
id str

id of the fragment

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset Union[DockerImageAsset, str]

docker image asset or image uri that has the outdated_data_path_scanner_handler function

required
batch_job_queue Union[JobQueue, str]

default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
scaffolding_bucket Bucket

primary bucket used for request/response json blobs used in

required
mount_point_configs Optional[Iterable[MountPointConfiguration]]

Default EFS volumes to mount. Defaults to None.

None
memory int

memory needed. Defaults to 1024.

1024
vcpus int

vcpus needed. Defaults to 1.

1

Returns:

Type Description
BatchInvokedLambdaFunction

BatchInvokedLambdaFunction fragment for scanning outdated data paths

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
def outdated_data_path_scanner_fragment(
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> BatchInvokedLambdaFunction:
    """Returns a BatchInvokedLambdaFunction fragment for scanning outdated data paths of EFS/S3 path root

    Args:
        scope (constructs.Construct): scope
        id (str): id of the fragment
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri
            that has the outdated_data_path_scanner_handler function
        batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount.
            Defaults to None.
        memory (int, optional): memory needed. Defaults to 1024.
        vcpus (int, optional): vcpus needed. Defaults to 1.

    Returns:
        BatchInvokedLambdaFunction fragment for scanning outdated data paths
    """  # noqa: E501

    fragment = BatchInvokedLambdaFunction(
        scope=scope,
        id=id,
        env_base=env_base,
        name="outdated-data-path-scanner",
        image=(
            aibs_informatics_docker_asset
            if isinstance(aibs_informatics_docker_asset, str)
            else aibs_informatics_docker_asset.image_uri
        ),
        handler="aibs_informatics_aws_lambda.handlers.data_sync.outdated_data_path_scanner_handler",
        job_queue=(
            batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name
        ),
        bucket_name=scaffolding_bucket.bucket_name,
        memory=memory,
        vcpus=vcpus,
        # mount_points=mount_points,
        # volumes=volumes,
        mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
    )
    return fragment
remove_data_paths_fragment
remove_data_paths_fragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: DockerImageAsset | str,
    batch_job_queue: JobQueue | str,
    scaffolding_bucket: Bucket,
    mount_point_configs: Iterable[MountPointConfiguration]
    | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> BatchInvokedLambdaFunction

Returns a BatchInvokedLambdaFunction fragment for removing data paths (EFS / S3) during execution of a Step Function

Parameters:

Name Type Description Default
scope Construct

scope

required
id str

id of the fragment

required
env_base EnvBase

env base

required
aibs_informatics_docker_asset Union[DockerImageAsset, str]

docker image asset or image uri that has the remove_data_paths_handler function

required
batch_job_queue Union[JobQueue, str]

default batch job queue or job queue name str that the batch job will be submitted to. This can be override by the payload.

required
scaffolding_bucket Bucket

primary bucket used for request/response json blobs used in

required
mount_point_configs Optional[Iterable[MountPointConfiguration]]

Default EFS volumes to mount. Defaults to None.

None
memory int

memory needed. Defaults to 1024.

1024
vcpus int

vcpus needed. Defaults to 1.

1

Returns:

Type Description
BatchInvokedLambdaFunction

BatchInvokedLambdaFunction fragment for removing data paths

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/informatics/efs.py
def remove_data_paths_fragment(
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    aibs_informatics_docker_asset: ecr_assets.DockerImageAsset | str,
    batch_job_queue: batch.JobQueue | str,
    scaffolding_bucket: s3.Bucket,
    mount_point_configs: Iterable[MountPointConfiguration] | None = None,
    memory: int = 1024,
    vcpus: int = 1,
) -> BatchInvokedLambdaFunction:
    """Returns a BatchInvokedLambdaFunction fragment for removing data paths (EFS / S3) during execution of a Step Function

    Args:
        scope (constructs.Construct): scope
        id (str): id of the fragment
        env_base (EnvBase): env base
        aibs_informatics_docker_asset (Union[ecr_assets.DockerImageAsset, str]): docker image asset or image uri
            that has the remove_data_paths_handler function
        batch_job_queue (Union[batch.JobQueue, str]): default batch job queue or job queue name str that
            the batch job will be submitted to. This can be override by the payload.
        scaffolding_bucket (s3.Bucket): primary bucket used for request/response json blobs used in
        mount_point_configs (Optional[Iterable[MountPointConfiguration]], optional): Default EFS volumes to mount.
            Defaults to None.
        memory (int, optional): memory needed. Defaults to 1024.
        vcpus (int, optional): vcpus needed. Defaults to 1.

    Returns:
        BatchInvokedLambdaFunction fragment for removing data paths
    """  # noqa: E501
    fragment = BatchInvokedLambdaFunction(
        scope=scope,
        id=id,
        env_base=env_base,
        name="remove-data-paths",
        image=(
            aibs_informatics_docker_asset
            if isinstance(aibs_informatics_docker_asset, str)
            else aibs_informatics_docker_asset.image_uri
        ),
        handler="aibs_informatics_aws_lambda.handlers.data_sync.remove_data_paths_handler",
        job_queue=(
            batch_job_queue if isinstance(batch_job_queue, str) else batch_job_queue.job_queue_name
        ),
        bucket_name=scaffolding_bucket.bucket_name,
        memory=memory,
        vcpus=vcpus,
        mount_point_configs=list(mount_point_configs) if mount_point_configs else None,
    )
    return fragment

lambda_

Classes
LambdaFunctionFragment
LambdaFunctionFragment(
    scope: Construct,
    id: str,
    env_base: EnvBase,
    lambda_function: Function,
)

Bases: EnvBaseStateMachineFragment

Creates a single task state machine fragment for a generic lambda function

        [Pass]          # Start State. Augments input.
          ||            #
      [lambda fn]
   (x)  //   \ (o)     #
    [Fail]    ||        # Catch state for failed lambda execution
              ||        #
            [Pass]      # End state. Reforms output of lambda as sfn output

Returns:

Type Description
None

The state machine fragment

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/fragments/lambda_.py
def __init__(
    self,
    scope: constructs.Construct,
    id: str,
    env_base: EnvBase,
    lambda_function: lambda_.Function,
) -> None:
    """Creates a single task state machine fragment for a generic lambda function

                [Pass]          # Start State. Augments input.
                  ||            #
              [lambda fn]
           (x)  //   \\ (o)     #
            [Fail]    ||        # Catch state for failed lambda execution
                      ||        #
                    [Pass]      # End state. Reforms output of lambda as sfn output

    Returns:
        The state machine fragment
    """
    super().__init__(scope, id, env_base)

    lambda_task = stepfn_tasks.LambdaInvoke(
        self,
        f"{lambda_function.function_name} Function Execution",
        lambda_function=cast(lambda_.IFunction, lambda_function),
        payload_response_only=True,
    )

    self.definition = sfn.Chain.start(lambda_task)
Functions

States

Custom Step Function states:

states

Modules

batch

Classes
BatchOperation
Functions
register_job_definition classmethod
register_job_definition(
    scope: Construct,
    id: str,
    command: list[str] | str | None,
    image: str,
    job_definition_name: str,
    job_role_arn: str | None = None,
    environment: Mapping[str, str] | str | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    gpu: int | str | None = None,
    mount_points: list[MountPointTypeDef]
    | str
    | None = None,
    volumes: list[VolumeTypeDef] | str | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]]
    | str
    | None = None,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> Chain

Creates chain to register new job definition

Following parameters support reference paths: - command - image - job_definition_name - environment - memory - vcpus - gpu

Parameters:

Name Type Description Default
scope Construct

scope

required
id str

ID prefix

required
command Union[List[str], str]

List of strings or string representing command to run Supports reference paths (e.g. "$.foo.bar")

required
image str

image URI or name. Supports reference paths (e.g. "$.foo.bar")

required
job_definition_name str

name of job definition. Supports reference paths (e.g. "$.foo.bar")

required
job_role_arn Optional[str]

Optional job role arn to use for the job. Supports reference paths (e.g. "$.foo.bar")

None
environment Optional[Union[Mapping[str, str], str]]

Optional environment variables. Supports reference paths both as individual values as well as for the entire list of variables. However, if a reference path is used for the entire list, the list must be a list of mappings with Name/Value keys".

None
memory Optional[Union[int, str]]

Optionally specify memory. Supports reference paths (e.g. "$.foo.bar")

None
vcpus Optional[Union[int, str]]

Optionally specify . Defaults to None.

None
gpu Optional[Union[int, str]]

Optionally specify GPU. Defaults to None.

None
mount_points Optional[List[MountPointTypeDef]]

Optionally specify mount points. Defaults to None.

None
volumes Optional[List[VolumeTypeDef]]

Optionally specify volumes. Defaults to None.

None

Returns:

Type Description
Chain

sfn.Chain: chain to register job definition

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/batch.py
@classmethod
def register_job_definition(
    cls,
    scope: constructs.Construct,
    id: str,
    command: list[str] | str | None,
    image: str,
    job_definition_name: str,
    job_role_arn: str | None = None,
    environment: Mapping[str, str] | str | None = None,
    memory: int | str | None = None,
    vcpus: int | str | None = None,
    gpu: int | str | None = None,
    mount_points: list[MountPointTypeDef] | str | None = None,
    volumes: list[VolumeTypeDef] | str | None = None,
    platform_capabilities: list[Literal["EC2", "FARGATE"]] | str | None = None,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> sfn.Chain:
    """Creates chain to register new job definition

    Following parameters support reference paths:
    - command
    - image
    - job_definition_name
    - environment
    - memory
    - vcpus
    - gpu

    Args:
        scope (constructs.Construct): scope
        id (str): ID prefix
        command (Union[List[str], str]): List of strings or string representing command to run
            Supports reference paths (e.g. "$.foo.bar")
        image (str): image URI or name.
            Supports reference paths (e.g. "$.foo.bar")
        job_definition_name (str): name of job definition.
            Supports reference paths (e.g. "$.foo.bar")
        job_role_arn (Optional[str], optional): Optional job role arn to use for the job.
            Supports reference paths (e.g. "$.foo.bar")
        environment (Optional[Union[Mapping[str, str], str]], optional): Optional environment variables.
            Supports reference paths both as individual values as well as for the entire list of variables.
            However, if a reference path is used for the entire list, the list must be a list of mappings with Name/Value keys".
        memory (Optional[Union[int, str]], optional): Optionally specify memory.
            Supports reference paths (e.g. "$.foo.bar")
        vcpus (Optional[Union[int, str]], optional): Optionally specify . Defaults to None.
        gpu (Optional[Union[int, str]], optional): Optionally specify GPU. Defaults to None.
        mount_points (Optional[List[MountPointTypeDef]], optional): Optionally specify mount points. Defaults to None.
        volumes (Optional[List[VolumeTypeDef]], optional): Optionally specify volumes. Defaults to None.

    Returns:
        sfn.Chain: chain to register job definition
    """  # noqa: E501

    job_definition_name = sfn.JsonPath.format(
        "{}-{}", job_definition_name, sfn.JsonPath.uuid()
    )
    environment_pairs: list[KeyValuePairTypeDef] | str | None
    if not isinstance(environment, str):
        environment_pairs = to_key_value_pairs(dict(environment or {}))
    else:
        environment_pairs = environment

    request: RegisterJobDefinitionRequestTypeDef = {
        "jobDefinitionName": job_definition_name,
        "type": "container",
        "containerProperties": {
            "image": image,
            "command": command,
            "environment": environment_pairs,
            "resourceRequirements": to_resource_requirements(gpu, memory, vcpus),  # type: ignore # must be string
            "mountPoints": mount_points,
            "volumes": volumes,
        },
        "retryStrategy": build_retry_strategy(include_default_evaluate_on_exit_configs=True),
    }  # type: ignore
    if platform_capabilities:
        request["platformCapabilities"] = platform_capabilities  # type: ignore[typeddict-item] # the typing does not understand that this can be a reference path
    if job_role_arn:
        assert "containerProperties" in request  # mollifies mypy
        request["containerProperties"]["jobRoleArn"] = job_role_arn
    parameters = convert_key_case(request, pascalcase)

    start = sfn.Pass(
        scope,
        id + " RegisterJobDefinition Prep",
        parameters=convert_reference_paths(parameters),  # type: ignore  # misundertands type
        result_path=result_path or "$",
    )
    register = sfn.CustomState(
        scope,
        id + " RegisterJobDefinition API Call",
        state_json={
            "Type": "Task",
            "Resource": "arn:aws:states:::aws-sdk:batch:registerJobDefinition",
            "Parameters": {
                f"{k}.$": f"{result_path if result_path else '$'}.{k}"
                for k in parameters.keys()
            },
            "ResultSelector": {
                "JobDefinitionArn.$": "$.JobDefinitionArn",
                "JobDefinitionName.$": "$.JobDefinitionName",
                "Revision.$": "$.Revision",
            },
            "ResultPath": result_path,
            "OutputPath": output_path,
            "Retry": [
                {
                    "ErrorEquals": ["Batch.BatchException"],
                    # Interval at attempt n = IntervalSeconds x BackoffRate ^(n-1)
                    # Total time from first try: 3 + 6 + 12 + 24 + 48 + 96 = 189 seconds
                    "IntervalSeconds": 3,
                    "MaxAttempts": 7,
                    "BackoffRate": 2.0,
                    "JitterStrategy": "FULL",
                },
            ],
        },
    )
    chain: sfn.Chain | sfn.Pass = start
    if gpu is not None:
        chain = chain.next(
            sfn.Pass(
                scope,
                id + " Register Definition Filter Resource Requirements",
                input_path=f"{result_path or '$'}.ContainerProperties.ResourceRequirements[?(@.Value != 0 && @.Value != '0')]",  # noqa: E501
                result_path=f"{result_path or '$'}.ContainerProperties.ResourceRequirements",
            )
        )
    return chain.next(register)

common

Classes
CommonOperation
Functions
merge_defaults classmethod
merge_defaults(
    scope: Construct,
    id: str,
    defaults: dict[str, Any],
    input_path: str = "$",
    target_path: str = "$",
    result_path: str | None = None,
    order_of_preference: Literal[
        "target", "default"
    ] = "target",
    check_if_target_present: bool = False,
) -> Chain

Wrapper chain that merges input with defaults.

Notes
  • reference paths in defaults should be relative to the input path

Parameters:

Name Type Description Default
scope Construct

construct scope

required
id str

identifier for the states created

required
defaults dict[str, Any]

default values to merge with input. If any reference paths are present in the defaults, they should be relative to the input path.

required
input_path str

Input path of object to merge. De faults to "$".

'$'
target_path str

target path to merge with. This should be relative to the input_path parameter. Defaults to "$".

'$'
result_path Optional[str]

result path to store merged results. If not specified, it defaults to the target_path relative to input_path. If specified, it is considered an absolute path.

None
order_of_preference Literal['target', 'default']

If "target", the target path will be merged with the defaults. If "default", the defaults will be merged with the target path. Defaults to "target".

'target'
check_if_target_present bool

If true, check if the target path is present in the input. If not, the defaults will be used as the result. Otherwise, the defaults will be merged with the target path. This is useful for optional parameters that may or may not be present in the input. Defaults to False.

False

Returns:

Type Description
Chain

sfn.Chain: the new chain that merges defaults with input

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/common.py
@classmethod
def merge_defaults(
    cls,
    scope: constructs.Construct,
    id: str,
    defaults: dict[str, Any],
    input_path: str = "$",
    target_path: str = "$",
    result_path: str | None = None,
    order_of_preference: Literal["target", "default"] = "target",
    check_if_target_present: bool = False,
) -> sfn.Chain:
    """Wrapper chain that merges input with defaults.

    Notes:
        - reference paths in defaults should be relative to the input path

    Args:
        scope (constructs.Construct): construct scope
        id (str): identifier for the states created
        defaults (dict[str, Any]): default values to merge with input. If any reference paths
            are present in the defaults, they should be relative to the input path.
        input_path (str, optional): Input path of object to merge. De faults to "$".
        target_path (str, optional): target path to merge with. This should be relative to
            the input_path parameter. Defaults to "$".
        result_path (Optional[str], optional): result path to store merged results.
            If not specified, it defaults to the target_path relative to input_path.
            If specified, it is considered an absolute path.
        order_of_preference (Literal["target", "default"], optional): If "target", the target
            path will be merged with the defaults. If "default", the defaults will be merged
            with the target path. Defaults to "target".
        check_if_target_present (bool, optional): If true, check if the target path is present
            in the input. If not, the defaults will be used as the result. Otherwise, the
            defaults will be merged with the target path. This is useful for optional
            parameters that may or may not be present in the input.
            Defaults to False.

    Returns:
        sfn.Chain: the new chain that merges defaults with input
    """
    input_path = JsonReferencePath(input_path)
    target_path = JsonReferencePath(target_path)
    result_path = (
        JsonReferencePath(result_path) if result_path else input_path.extend(target_path)
    )

    pref1 = "$.target" if order_of_preference == "target" else "$.default"
    pref2 = "$.default" if order_of_preference == "target" else "$.target"

    merge_task = sfn.Pass(
        scope,
        "Merge Pass",
        parameters={
            "merged": sfn.JsonPath.json_merge(
                sfn.JsonPath.object_at(pref2),
                sfn.JsonPath.object_at(pref1),
            ),
        },
        output_path="$.merged",
    )
    chain_start: sfn.Pass | sfn.Choice
    if check_if_target_present:
        # Branch based on presence of the target
        choice = sfn.Choice(scope, "Check Target")
        present_pass = sfn.Pass(
            scope,
            "Target Present",
            parameters={
                "target": sfn.JsonPath.object_at(target_path.as_reference),
                "default": defaults,
            },
        )
        not_present_pass = sfn.Pass(
            scope,
            "Target Not Present",
            parameters={
                "target": {},
                "default": defaults,
            },
        )
        # Chain both branches into the merge task
        present_pass.next(merge_task)
        not_present_pass.next(merge_task)
        choice.when(
            sfn.Condition.is_present(target_path.as_reference),
            present_pass,
        ).otherwise(not_present_pass)
        chain_start = choice
    else:
        init_pass = sfn.Pass(
            scope,
            "Init Pass",
            parameters={
                "target": sfn.JsonPath.object_at(target_path.as_reference),
                "default": defaults,
            },
        )
        init_pass.next(merge_task)
        chain_start = init_pass

    parallel = sfn.Chain.start(chain_start).to_single_state(
        id=id,
        input_path=input_path.as_reference,
        result_path=result_path.as_reference,
    )
    restructure = sfn.Pass(
        scope,
        f"{id} Restructure",
        input_path=f"{result_path.as_reference}[0]",
        result_path=result_path.as_reference,
    )
    return parallel.next(restructure)
enclose_chainable classmethod
enclose_chainable(
    scope: Construct,
    id: str,
    definition: IChainable,
    input_path: str | None = None,
    result_path: str | None = None,
) -> Chain

Enclose the current state machine fragment within a parallel state.

Notes
  • If input_path is not provided, it will default to "$"
  • If result_path is not provided, it will default to input_path

Parameters:

Name Type Description Default
id str

an identifier for the parallel state

required
input_path Optional[str]

input path for the enclosed state. Defaults to "$".

None
result_path Optional[str]

result path to put output of enclosed state. Defaults to same as input_path.

None

Returns:

Type Description
Chain

sfn.Chain: the new state machine fragment

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/common.py
@classmethod
def enclose_chainable(
    cls,
    scope: constructs.Construct,
    id: str,
    definition: sfn.IChainable,
    input_path: str | None = None,
    result_path: str | None = None,
) -> sfn.Chain:
    """Enclose the current state machine fragment within a parallel state.

    Notes:
        - If input_path is not provided, it will default to "$"
        - If result_path is not provided, it will default to input_path

    Args:
        id (str): an identifier for the parallel state
        input_path (Optional[str], optional): input path for the enclosed state.
            Defaults to "$".
        result_path (Optional[str], optional): result path to put output of enclosed state.
            Defaults to same as input_path.

    Returns:
        sfn.Chain: the new state machine fragment
    """
    if input_path is None:
        input_path = "$"
    if result_path is None:
        result_path = input_path

    chain = (
        sfn.Chain.start(definition)
        if not isinstance(definition, (sfn.Chain, sfn.StateMachineFragment))
        else definition
    )

    if isinstance(chain, sfn.Chain):
        parallel = chain.to_single_state(
            id=f"{id} Enclosure", input_path=input_path, result_path=result_path
        )
    else:
        parallel = chain.to_single_state(input_path=input_path, result_path=result_path)
    definition = sfn.Chain.start(parallel)

    if result_path and result_path != sfn.JsonPath.DISCARD:
        restructure = sfn.Pass(
            scope,
            f"{id} Enclosure Post",
            input_path=f"{result_path}[0]",
            result_path=result_path,
        )
        definition = definition.next(restructure)

    return definition

s3

Classes
S3Operation
Functions
put_object classmethod
put_object(
    scope: Construct,
    id: str,
    bucket_name: str,
    key: str,
    body: Any,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> Chain

Create a chain to put a body of text to S3

This chain consists of two states
  1. Pass state (resolving references and resctructuring inputs)
  2. API Call to put object.
The context at the end state of the chain should contain the following fields
  • Bucket
  • Key

All parameters can be either a reference (e.g. $.path.to.my.value) or an explicit value.

Examples:

Context: {} Definition: S3Operation.put_object(..., bucket_name="bucket", key="key", body="body") Result: # text "body" is put at s3://bucket/key

Context: {"bucket": "woah", "key": "wait/what", "nested": {"a": "b"}} Definition: S3Operation.put_object(..., bucket_name="$.bucket", key="$.key", body="$.nested") Result: # text '{"a": "b"}' is put at s3://woah/wait/what

Parameters:

Name Type Description Default
scope Construct

scope construct

required
id str

An ID prefix

required
bucket_name str

explicit or reference to name of bucket

required
key str

explicit or reference to name of key

required
body Any

explicit or reference to body to upload

required

Returns:

Type Description
Chain

sfn.Chain

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
@classmethod
def put_object(
    cls,
    scope: constructs.Construct,
    id: str,
    bucket_name: str,
    key: str,
    body: Any,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> sfn.Chain:
    """Create a chain to put a body of text to S3

    This chain consists of two states:
        1. Pass state (resolving references and resctructuring inputs)
        2. API Call to put object.

    The context at the end state of the chain should contain the following fields:
        - Bucket
        - Key

    All parameters can be either a reference (e.g. $.path.to.my.value)
    or an explicit value.

    Examples:
        Context:
            {}
        Definition:
            S3Operation.put_object(..., bucket_name="bucket", key="key", body="body")
        Result:
            # text "body" is put at s3://bucket/key
            {"Bucket": "bucket", "Key": "key"}

        Context:
            {"bucket": "woah", "key": "wait/what", "nested": {"a": "b"}}
        Definition:
            S3Operation.put_object(..., bucket_name="$.bucket", key="$.key", body="$.nested")
        Result:
            # text '{"a": "b"}' is put at s3://woah/wait/what
            {"Bucket": "woah", "Key": "wait/what"}

    Args:
        scope (constructs.Construct): scope construct
        id (str): An ID prefix
        bucket_name (str): explicit or reference to name of bucket
        key (str): explicit or reference to name of key
        body (Any): explicit or reference to body to upload

    Returns:
        sfn.Chain
    """
    init = sfn.Pass(
        scope,
        id + " PutObject Prep",
        parameters=convert_reference_paths_in_mapping(
            {
                "Bucket": bucket_name,
                "Key": key,
                "Body": body,
            }
        ),
        result_path=result_path or "$",
    )

    state_json = {
        "Type": "Task",
        "Resource": "arn:aws:states:::aws-sdk:s3:putObject",
        "Parameters": {
            "Bucket.$": f"{result_path or '$'}.Bucket",
            "Key.$": f"{result_path or '$'}.Key",
            "Body.$": f"{result_path or '$'}.Body",
        },
        # "ResultSelector": {
        #     "ETag.$": "$.Bucket",
        #     "Key.$": "$.Key",
        # },
        "ResultPath": sfn.JsonPath.DISCARD,
        "Retry": [
            {
                "ErrorEquals": ["S3.S3Exception"],
                # Interval at attempt n = IntervalSeconds x BackoffRate ^(n-1)
                # Total time from first try: 3 + 6 + 12 + 24 + 48 = 93 seconds
                "IntervalSeconds": 3,
                "MaxAttempts": 5,
                "BackoffRate": 2.0,
                "JitterStrategy": "FULL",
            },
        ],
    }
    put_object = sfn.CustomState(scope, id + " PutObject API Call", state_json=state_json)

    end = sfn.Pass(
        scope,
        id + " PutObject Post",
        parameters=convert_reference_paths_in_mapping(
            {
                "Bucket": f"{result_path or '$'}.Bucket",
                "Key": f"{result_path or '$'}.Key",
            }
        ),
        result_path=result_path,
        output_path=output_path,
    )

    return sfn.Chain.start(init).next(put_object).next(end)
get_object classmethod
get_object(
    scope: Construct,
    id: str,
    bucket_name: str,
    key: str,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> Chain

Creates a chain to get a body of text from S3

This chain consists of two states
  1. Pass state (resolving references and resctructuring inputs)
  2. API Call to get object.
The context at the end state of the chain should contain the following fields
  • Body
  • Bucket
  • Key

All parameters can be either a reference (e.g. $.path.to.my.value) or an explicit value.

Examples:

Context: {} Definition: S3Operation.get_object(..., bucket_name="bucket", key="key") Result: # text "body" is fetched from s3://bucket/key

Context: {"bucket": "woah", "key": "wait/what"} Definition: S3Operation.get_object(..., bucket_name="$.bucket", key="$.key") Result: # text '{"a": "b"}' is fetched from s3://woah/wait/what {"Body": '{"a": "b"}'}

Parameters:

Name Type Description Default
scope Construct

scope construct

required
id str

An ID prefix

required
bucket_name str

explicit or reference to name of bucket

required
key str

explicit or reference to name of key

required

Returns:

Type Description
Chain

sfn.Chain

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
@classmethod
def get_object(
    cls,
    scope: constructs.Construct,
    id: str,
    bucket_name: str,
    key: str,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> sfn.Chain:
    """Creates a chain to get a body of text from S3

    This chain consists of two states:
        1. Pass state (resolving references and resctructuring inputs)
        2. API Call to get object.

    The context at the end state of the chain should contain the following fields:
        - Body
        - Bucket
        - Key

    All parameters can be either a reference (e.g. $.path.to.my.value)
    or an explicit value.

    Examples:
        Context:
            {}
        Definition:
            S3Operation.get_object(..., bucket_name="bucket", key="key")
        Result:
            # text "body" is fetched from s3://bucket/key
            {"Body": "body"}

        Context:
            {"bucket": "woah", "key": "wait/what"}
        Definition:
            S3Operation.get_object(..., bucket_name="$.bucket", key="$.key")
        Result:
            # text '{"a": "b"}' is fetched from s3://woah/wait/what
            {"Body": '{"a": "b"}'}

    Args:
        scope (constructs.Construct): scope construct
        id (str): An ID prefix
        bucket_name (str): explicit or reference to name of bucket
        key (str): explicit or reference to name of key

    Returns:
        sfn.Chain
    """
    init = sfn.Pass(
        scope,
        id + " GetObject Prep",
        parameters=convert_reference_paths_in_mapping(
            {
                "Bucket": bucket_name,
                "Key": key,
            }
        ),
        result_path=result_path or "$",
    )

    if result_path:
        bucket_path = f"{result_path}.Bucket"
        key_path = f"{result_path}.Key"
    else:
        bucket_path = "$.Bucket"
        key_path = "$.Key"

    state_json = {
        "Type": "Task",
        "Resource": "arn:aws:states:::aws-sdk:s3:getObject",
        "Parameters": {
            "Bucket.$": bucket_path,
            "Key.$": key_path,
        },
        "ResultSelector": {
            "Body.$": "$.Body",
        },
        "ResultPath": result_path,
        "OutputPath": output_path,
        "Retry": [
            {
                "ErrorEquals": ["S3.S3Exception"],
                # Interval at attempt n = IntervalSeconds x BackoffRate ^(n-1)
                # Total time from first try: 3 + 6 + 12 + 24 + 48 = 93 seconds
                "IntervalSeconds": 3,
                "MaxAttempts": 5,
                "BackoffRate": 2.0,
                "JitterStrategy": "FULL",
            },
        ],
    }

    get_object = sfn.CustomState(scope, id + " GetObject API Call", state_json=state_json)

    return sfn.Chain.start(init).next(get_object)
put_payload classmethod
put_payload(
    scope: Construct,
    id: str,
    payload: str,
    bucket_name: str,
    key: str | None = None,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> Chain

Puts a payload to s3 and returns the location of the payload in s3

All parameters can be either a reference (e.g. $.path.to.my.value) or an explicit value.

Examples:

Context: {"a": "b"} Definition: S3Operation.put_payload(..., bucket_name="bucket", payload="$") Result: # text '{"a": "b"}' is written to s3://bucket/1234.../...1234

Context: {"bucket": "woah", "key": "wait/what", "data": {"a": "b"}} Definition: S3Operation.put_payload(..., bucket_name="$.bucket", payload="$.data") Result: # text '{"a": "b"}' is written to s3://woah/wait/what

Parameters:

Name Type Description Default
scope Construct

cdk construct

required
id str

id

required
payload str

explicit value or reference path in the context object (e.g. "$", "$.path")

required
bucket_name str

explicit value or reference path for bucket name

required
key Optional[str]

explicit value or reference path for key. If not provided, the following Key is generated: {$$.Execution.Name}/{UUID}

None

Returns:

Type Description
Chain

sfn.Chain

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
@classmethod
def put_payload(
    cls,
    scope: constructs.Construct,
    id: str,
    payload: str,
    bucket_name: str,
    key: str | None = None,
    result_path: str | None = "$",
    output_path: str | None = "$",
) -> sfn.Chain:
    """Puts a payload to s3 and returns the location of the payload in s3

    All parameters can be either a reference (e.g. $.path.to.my.value)
    or an explicit value.

    Examples:
        Context:
            {"a": "b"}
        Definition:
            S3Operation.put_payload(..., bucket_name="bucket", payload="$")
        Result:
            # text '{"a": "b"}' is written to s3://bucket/1234.../...1234
            {"Bucket": "bucket", "Key": "1234.../...1234"}

        Context:
            {"bucket": "woah", "key": "wait/what", "data": {"a": "b"}}
        Definition:
            S3Operation.put_payload(..., bucket_name="$.bucket", payload="$.data")
        Result:
            # text '{"a": "b"}' is written to s3://woah/wait/what
            {"Bucket": "woah", "Key": "wait/what"}


    Args:
        scope (constructs.Construct): cdk construct
        id (str): id
        payload (str): explicit value or reference path in the context object
            (e.g. "$", "$.path")
        bucket_name (str): explicit value or reference path for bucket name
        key (Optional[str], optional): explicit value or reference path for key.
            If not provided, the following Key is generated:
                {$$.Execution.Name}/{UUID}

    Returns:
        sfn.Chain
    """
    key = key or sfn.JsonPath.format("{}/{}", sfn.JsonPath.execution_name, sfn.JsonPath.uuid())

    put_chain = S3Operation.put_object(
        scope, id, bucket_name, key, payload, result_path, output_path
    )
    return put_chain
get_payload classmethod
get_payload(
    scope: Construct,
    id: str,
    bucket_name: str,
    key: str,
    result_path: str | None = "$",
) -> Chain

Gets a payload from s3

This chain fetches object and then passes the body through a json parser

The resulting payload will be stored to path specified by result_path

Examples:

Use Case #1 - No result path Context: {"bucket": "woah", "key": "wait/what"} Definition: S3Operation.get_payload(..., bucket_name="$.bucket", key="$.key") Result: # text '{"a": "b"}' is fetched from s3://woah/wait/what

Use Case #2 - result path specified Context: {"bucket": "woah", "key": "wait/what"} Definition: S3Operation.get_payload( ..., bucket_name="$.bucket", key="$.key", result_path="$.payload" ) Result: # text '{"a": "b"}' is fetched from s3://woah/wait/what { "bucket": "woah", "key": "wait/what", "payload": {"a": "b"} }

Parameters:

Name Type Description Default
scope Construct

cdk construct

required
id str

id

required
bucket_name str

bucket name. Can be a reference path

required
key str

key name. Can be a reference path

required
result_path Optional[str]

path to store the payload. Defaults to "$".

'$'

Returns:

Type Description
Chain

sfn.Chain: chain

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/states/s3.py
@classmethod
def get_payload(
    cls,
    scope: constructs.Construct,
    id: str,
    bucket_name: str,
    key: str,
    result_path: str | None = "$",
) -> sfn.Chain:
    """Gets a payload from s3

    This chain fetches object and then passes the body through a json parser

    The resulting payload will be stored to path specified by result_path

    Examples:
        Use Case #1 - No result path
            Context:
                {"bucket": "woah", "key": "wait/what"}
            Definition:
                S3Operation.get_payload(..., bucket_name="$.bucket", key="$.key")
            Result:
                # text '{"a": "b"}' is fetched from s3://woah/wait/what
                {"a": "b"}

        Use Case #2 - result path specified
            Context:
                {"bucket": "woah", "key": "wait/what"}
            Definition:
                S3Operation.get_payload(
                    ..., bucket_name="$.bucket", key="$.key", result_path="$.payload"
                )
            Result:
                # text '{"a": "b"}' is fetched from s3://woah/wait/what
                {
                    "bucket": "woah",
                    "key": "wait/what",
                    "payload": {"a": "b"}
                }

    Args:
        scope (constructs.Construct): cdk construct
        id (str): id
        bucket_name (str): bucket name. Can be a reference path
        key (str): key name. Can be a reference path
        result_path (Optional[str], optional): path to store the payload. Defaults to "$".

    Returns:
        sfn.Chain: chain
    """

    get_chain = S3Operation.get_object(scope, id, bucket_name, key, result_path)
    post = sfn.Pass(
        scope,
        id + " Post",
        parameters={
            "Payload": sfn.JsonPath.string_to_json(
                sfn.JsonPath.string_at(f"{result_path}.Body")
            )
        },
        result_path=result_path,
    )
    restructure = sfn.Pass(
        scope,
        id + " Restructure",
        input_path=f"{result_path}.Payload",
        result_path=result_path,
    )

    return get_chain.next(post).next(restructure)

Utilities

Step Function utility functions:

utils

Classes

JsonReferencePath

Bases: str

str extension with properties that provide some functionality for defining JsonPath reference expressions More details: https://github.com/json-path/JsonPath

Primarily supports "$" reference.

Attributes
as_key property
as_key: str

Returns reference path as a key. This appends ".$"

as_reference property
as_reference: str

Returns reference path as a value. This prepends "$."

Functions
sanitize classmethod
sanitize(s: str) -> str

Sanitizes a string to ensure string has non-consecutive periods and not on the edge.

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/utils.py
@classmethod
def sanitize(cls, s: str) -> str:
    """Sanitizes a string to ensure string has non-consecutive periods and not on the edge."""
    return f"{cls._EXTRA_PERIODS_PATTERN.sub('.', s).strip('.')}"

Functions

convert_to_sfn_api_action_case

convert_to_sfn_api_action_case(parameters: T) -> T

Converts a dictionary of parameters to the format expected by the Step Functions for service integration.

Even if a native API specifies a parameter in camelCase, the Step Functions SDK expects it in pascal case.

https://docs.aws.amazon.com/step-functions/latest/dg/supported-services-awssdk.html#use-awssdk-integ

Parameters:

Name Type Description Default
parameters Dict[str, Any]

parameters for SDK action

required

Returns:

Type Description
T

Dict[str, Any]: parameters for SDK action in pascal case

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/utils.py
def convert_to_sfn_api_action_case(parameters: T) -> T:
    """Converts a dictionary of parameters to the format expected by the Step Functions for service integration.

    Even if a native API specifies a parameter in camelCase, the Step Functions SDK expects it in pascal case.

    https://docs.aws.amazon.com/step-functions/latest/dg/supported-services-awssdk.html#use-awssdk-integ

    Args:
        parameters (Dict[str, Any]): parameters for SDK action

    Returns:
        Dict[str, Any]: parameters for SDK action in pascal case
    """  # noqa: E501
    return convert_key_case(parameters, pascalcase)

enclosed_chain

enclosed_chain(
    scope: Construct,
    id: str,
    definition: IChainable,
    input_path: str | None = None,
    result_path: str | None = None,
) -> Chain

Enclose the current state machine fragment within a parallel state.

Notes
  • If input_path is not provided, it will default to "$"
  • If result_path is not provided, it will default to input_path

Parameters:

Name Type Description Default
id str

an identifier for the parallel state

required
input_path Optional[str]

input path for the enclosed state. Defaults to "$".

None
result_path Optional[str]

result path to put output of enclosed state. Defaults to same as input_path.

None

Returns:

Type Description
Chain

sfn.Chain: the new state machine fragment

Source code in src/aibs_informatics_cdk_lib/constructs_/sfn/utils.py
def enclosed_chain(
    scope: constructs.Construct,
    id: str,
    definition: sfn.IChainable,
    input_path: str | None = None,
    result_path: str | None = None,
) -> sfn.Chain:
    """Enclose the current state machine fragment within a parallel state.

    Notes:
        - If input_path is not provided, it will default to "$"
        - If result_path is not provided, it will default to input_path

    Args:
        id (str): an identifier for the parallel state
        input_path (Optional[str], optional): input path for the enclosed state.
            Defaults to "$".
        result_path (Optional[str], optional): result path to put output of enclosed state.
            Defaults to same as input_path.

    Returns:
        sfn.Chain: the new state machine fragment
    """
    if input_path is None:
        input_path = "$"
    if result_path is None:
        result_path = input_path

    chain = (
        sfn.Chain.start(definition)
        if not isinstance(definition, (sfn.Chain, sfn.StateMachineFragment))
        else definition
    )

    if isinstance(chain, sfn.Chain):
        parallel = chain.to_single_state(
            id=f"{id} Enclosure", input_path=input_path, result_path=result_path
        )
    else:
        parallel = chain.to_single_state(input_path=input_path, result_path=result_path)
    definition = sfn.Chain.start(parallel)

    if result_path and result_path != sfn.JsonPath.DISCARD:
        restructure = sfn.Pass(
            scope,
            f"{id} Enclosure Post",
            input_path=f"{result_path}[0]",
            result_path=result_path,
        )
        definition = definition.next(restructure)

    return definition