Skip to content

Step Functions

Utilities for working with AWS Step Functions.


ExecutionArn

Bases: ExecutionArn

ARN representation for Step Functions executions.

from_components classmethod

from_components(
    state_machine_name,
    execution_name,
    region=None,
    account_id=None,
)

Create an ExecutionArn from components.

Parameters:

Name Type Description Default
state_machine_name str

The name of the state machine.

required
execution_name str

The name of the execution.

required
region Optional[str]

AWS region. Defaults to None (uses default region).

None
account_id Optional[str]

AWS account ID. Defaults to None (uses current).

None

Returns:

Type Description
ExecutionArn

An ExecutionArn constructed from the components.

Source code in src/aibs_informatics_aws_utils/stepfn.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@classmethod
def from_components(  # type: ignore[override]
    cls,
    state_machine_name: str,
    execution_name: str,
    region: Optional[str] = None,
    account_id: Optional[str] = None,
) -> "ExecutionArn":
    """Create an ExecutionArn from components.

    Args:
        state_machine_name (str): The name of the state machine.
        execution_name (str): The name of the execution.
        region (Optional[str]): AWS region. Defaults to None (uses default region).
        account_id (Optional[str]): AWS account ID. Defaults to None (uses current).

    Returns:
        An ExecutionArn constructed from the components.
    """
    return ExecutionArn(
        ":".join(
            [
                "arn",
                "aws",
                "states",
                get_region(region),
                account_id or get_account_id(),
                "execution",
                state_machine_name,
                execution_name,
            ]
        )
    )

StateMachineArn

Bases: StateMachineArn

ARN representation for Step Functions state machines.

from_components classmethod

from_components(
    state_machine_name, region=None, account_id=None
)

Create a StateMachineArn from components.

Parameters:

Name Type Description Default
state_machine_name str

The name of the state machine.

required
region Optional[str]

AWS region. Defaults to None (uses default region).

None
account_id Optional[str]

AWS account ID. Defaults to None (uses current).

None

Returns:

Type Description
StateMachineArn

A StateMachineArn constructed from the components.

Source code in src/aibs_informatics_aws_utils/stepfn.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@classmethod
def from_components(  # type: ignore
    cls,
    state_machine_name: str,
    region: Optional[str] = None,
    account_id: Optional[str] = None,
) -> "StateMachineArn":
    """Create a StateMachineArn from components.

    Args:
        state_machine_name (str): The name of the state machine.
        region (Optional[str]): AWS region. Defaults to None (uses default region).
        account_id (Optional[str]): AWS account ID. Defaults to None (uses current).

    Returns:
        A StateMachineArn constructed from the components.
    """
    return StateMachineArn(
        ":".join(
            [
                "arn",
                "aws",
                "states",
                get_region(region),
                account_id or get_account_id(),
                "stateMachine",
                state_machine_name,
            ]
        )
    )

build_execution_name

build_execution_name(payload, date=None)

Build a unique execution name from a payload string.

Creates a SHA256 hash of the payload (optionally combined with a date) to generate a deterministic execution name.

Parameters:

Name Type Description Default
payload str

The payload string to hash.

required
date Optional[datetime]

Optional datetime to include for uniqueness.

None

Raises:

Type Description
ValueError

If serialization or encoding fails.

Returns:

Type Description
str

A SHA256 hex digest suitable for use as an execution name.

Source code in src/aibs_informatics_aws_utils/stepfn.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def build_execution_name(payload: str, date: Optional[datetime] = None) -> str:
    """Build a unique execution name from a payload string.

    Creates a SHA256 hash of the payload (optionally combined with a date)
    to generate a deterministic execution name.

    Args:
        payload (str): The payload string to hash.
        date (Optional[datetime]): Optional datetime to include for uniqueness.

    Raises:
        ValueError: If serialization or encoding fails.

    Returns:
        A SHA256 hex digest suitable for use as an execution name.
    """
    try:
        str_to_encode = payload + date.isoformat() if date else payload
        return hashlib.sha256(str_to_encode.encode()).hexdigest()
    except TypeError as e:
        raise ValueError(f"JSON serialization failed for {payload}: {e}")
    except UnicodeDecodeError as e:
        raise ValueError(f"String Encoding failed for {payload}: {e}")
    except Exception as e:
        raise ValueError(f"Exception {e} raised for {payload}")

describe_execution

describe_execution(
    execution_arn, included_data="ALL_DATA", region=None
)

Describe a Step Functions execution.

Parameters:

Name Type Description Default
execution_arn str

The ARN of the execution to describe.

required
included_data IncludedDataType

Data to include. Defaults to "ALL_DATA".

'ALL_DATA'
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Returns:

Type Description
DescribeExecutionOutputTypeDef

The execution description including status, input, and output.

Source code in src/aibs_informatics_aws_utils/stepfn.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def describe_execution(
    execution_arn: str, included_data: IncludedDataType = "ALL_DATA", region: Optional[str] = None
) -> DescribeExecutionOutputTypeDef:
    """Describe a Step Functions execution.

    Args:
        execution_arn (str): The ARN of the execution to describe.
        included_data (IncludedDataType): Data to include. Defaults to "ALL_DATA".
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Returns:
        The execution description including status, input, and output.
    """
    sfn = get_sfn_client(region=get_region(region=region))

    execution_description = sfn.describe_execution(
        executionArn=execution_arn, includedData=included_data
    )
    return execution_description

get_execution_arn

get_execution_arn(
    state_machine_name,
    execution_name,
    env_base=None,
    region=None,
)

Get an execution ARN by state machine and execution name.

Parameters:

Name Type Description Default
state_machine_name str

The name of the state machine.

required
execution_name str

The name of the execution.

required
env_base Optional[EnvBase]

Environment base for filtering state machines.

None
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Raises:

Type Description
InvalidAmazonResourceNameError

If no matching execution is found.

Returns:

Type Description
ExecutionArn

The ExecutionArn for the matching execution.

Source code in src/aibs_informatics_aws_utils/stepfn.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def get_execution_arn(
    state_machine_name: str,
    execution_name: str,
    env_base: Optional[EnvBase] = None,
    region: Optional[str] = None,
) -> ExecutionArn:
    """Get an execution ARN by state machine and execution name.

    Args:
        state_machine_name (str): The name of the state machine.
        execution_name (str): The name of the execution.
        env_base (Optional[EnvBase]): Environment base for filtering state machines.
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Raises:
        InvalidAmazonResourceNameError: If no matching execution is found.

    Returns:
        The ExecutionArn for the matching execution.
    """
    sfn = get_sfn_client(region)
    state_machine = get_state_machine(name=state_machine_name, env_base=env_base, region=region)
    paginator = sfn.get_paginator("list_executions")
    iterator = paginator.paginate(
        stateMachineArn=state_machine["stateMachineArn"],
    )
    for list_executions_response in iterator:
        for execution in list_executions_response["executions"]:
            if execution["name"] == execution_name:
                return ExecutionArn(execution["executionArn"])
    else:
        raise InvalidAmazonResourceNameError(
            f"Could not find an execution ARN with "
            f"execution_name={execution_name}, "
            f"state_machine_name={state_machine['name']}"
        )

get_execution_history

get_execution_history(
    execution_arn,
    reverse_order=False,
    include_execution_data=False,
    region=None,
)

Get the execution history for a Step Functions execution.

Parameters:

Name Type Description Default
execution_arn Union[ExecutionArn, str]

The ARN of the execution.

required
reverse_order bool

Return events in reverse chronological order.

False
include_execution_data bool

Include input/output data in events.

False
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Returns:

Type Description
List[HistoryEventTypeDef]

List of history events for the execution.

Source code in src/aibs_informatics_aws_utils/stepfn.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def get_execution_history(
    execution_arn: Union[ExecutionArn, str],
    reverse_order: bool = False,
    include_execution_data: bool = False,
    region: Optional[str] = None,
) -> List[HistoryEventTypeDef]:
    """Get the execution history for a Step Functions execution.

    Args:
        execution_arn (Union[ExecutionArn, str]): The ARN of the execution.
        reverse_order (bool): Return events in reverse chronological order.
        include_execution_data (bool): Include input/output data in events.
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Returns:
        List of history events for the execution.
    """
    sfn = get_sfn_client(region=get_region(region=region))
    execution_arn = ExecutionArn(execution_arn)
    paginator = sfn.get_paginator("get_execution_history")
    return [
        event
        for response in paginator.paginate(
            executionArn=execution_arn,
            reverseOrder=reverse_order,
            includeExecutionData=include_execution_data,
        )
        for event in response.get("events", [])
    ]

get_state_machine

get_state_machine(name, env_base=None, region=None)

Get a state machine by name suffix.

Parameters:

Name Type Description Default
name str

The name suffix to match against state machine names.

required
env_base Optional[EnvBase]

Optional environment base to filter by prefix.

None
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Raises:

Type Description
ResourceNotFoundError

If no matching state machine is found.

AttributeError

If multiple state machines match the criteria.

Returns:

Type Description
StateMachineListItemTypeDef

The matching state machine metadata.

Source code in src/aibs_informatics_aws_utils/stepfn.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def get_state_machine(
    name: str, env_base: Optional[EnvBase] = None, region: Optional[str] = None
) -> StateMachineListItemTypeDef:
    """Get a state machine by name suffix.

    Args:
        name (str): The name suffix to match against state machine names.
        env_base (Optional[EnvBase]): Optional environment base to filter by prefix.
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Raises:
        ResourceNotFoundError: If no matching state machine is found.
        AttributeError: If multiple state machines match the criteria.

    Returns:
        The matching state machine metadata.
    """
    region = get_region(region=region)
    # env_base = env_base or get_env_base()

    matching_state_machines = [
        state_machine
        for state_machine in get_state_machines(env_base=env_base, region=region)
        if state_machine["name"].endswith(name)
    ]
    if len(matching_state_machines) == 0:
        raise ResourceNotFoundError(
            f"No state machines with env_base={env_base} and name suffix={name} exist"
        )
    elif len(matching_state_machines) > 1:
        raise AttributeError(
            f"More than 1 state machines with env_base={env_base} "
            f"and name suffix={name} exist: {matching_state_machines}"
        )
    return matching_state_machines[0]

get_state_machines

get_state_machines(env_base=None, region=None)

List all state machines, optionally filtered by environment base.

Parameters:

Name Type Description Default
env_base Optional[EnvBase]

Optional environment base to filter by prefix.

None
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Returns:

Type Description
List[StateMachineListItemTypeDef]

List of state machine metadata items.

Source code in src/aibs_informatics_aws_utils/stepfn.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def get_state_machines(
    env_base: Optional[EnvBase] = None, region: Optional[str] = None
) -> List[StateMachineListItemTypeDef]:
    """List all state machines, optionally filtered by environment base.

    Args:
        env_base (Optional[EnvBase]): Optional environment base to filter by prefix.
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Returns:
        List of state machine metadata items.
    """
    region = get_region(region=region)
    sfn = get_sfn_client(region=region)
    paginator = sfn.get_paginator("list_state_machines")

    return [
        state_machine_info
        for response in paginator.paginate()
        for state_machine_info in response["stateMachines"]
        if not env_base or state_machine_info["name"].startswith(env_base)
    ]

start_execution

start_execution(
    state_machine_name,
    state_machine_input,
    reuse_existing_execution=False,
    execution_name=None,
    env_base=None,
    region=None,
)

Starts a StepFn Execution

Notes
  • The state machine Arn is resolved using the name.
  • If ExecutionAlreadyExists and reuse_existing_execution=False, a unique execution name will be generated using aibs_informatics_core.utils.time.get_current_time().

Parameters:

Name Type Description Default
state_machine_name str

Name of state machine to execute

required
state_machine_input str

Serialized payload input for execution.

required
reuse_existing_execution bool

Skip if existing execution submitted. Defaults to False.

False
env_base EnvBase

environment base. Defaults to None.

None
region str

ergion. Defaults to None.

None

Returns:

Type Description
ExecutionArn

the execution arn

Source code in src/aibs_informatics_aws_utils/stepfn.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def start_execution(
    state_machine_name: str,
    state_machine_input: str,
    reuse_existing_execution: bool = False,
    execution_name: Optional[str] = None,
    env_base: Optional[EnvBase] = None,
    region: Optional[str] = None,
) -> ExecutionArn:
    """Starts a StepFn Execution

    Notes:
        * The state machine Arn is resolved using the name.
        * If ExecutionAlreadyExists and `reuse_existing_execution`=False,
          a unique execution name will be generated using
          aibs_informatics_core.utils.time.get_current_time().

    Args:
        state_machine_name (str): Name of state machine to execute
        state_machine_input (str): Serialized payload input for execution.
        reuse_existing_execution (bool, optional): Skip if existing execution submitted.
            Defaults to False.
        env_base (EnvBase, optional): environment base. Defaults to None.
        region (str, optional): ergion. Defaults to None.

    Returns:
        the execution arn
    """
    region = get_region(region=region)
    sfn = get_sfn_client()

    state_machine = get_state_machine(name=state_machine_name, env_base=env_base, region=region)
    state_machine_full_name = state_machine["name"]
    state_machine_arn = state_machine["stateMachineArn"]

    def _start_execution(execution_name: str):
        try:
            logger.info(
                f"Starting execution of step function {state_machine_arn} [{execution_name}]",
            )

            response = sfn.start_execution(
                stateMachineArn=state_machine_arn,
                name=execution_name,
                input=state_machine_input,
            )

        except ClientError as e:
            if get_client_error_code(e) == "ExecutionAlreadyExists":
                if reuse_existing_execution:
                    logger.info(f"ExecutionAlreadyExists for {state_machine_input} so skipping.")
                    return ExecutionArn.findall(get_client_error_message(e))[0]
                else:
                    return _start_execution(
                        build_execution_name(state_machine_input, date=get_current_time())
                    )
            elif get_client_error_code(e) == "StateMachineDoesNotExist":
                msg = (
                    f"State machine {state_machine_full_name} [{state_machine_arn}] does not exist"
                )
                logger.error(msg)
                raise ResourceNotFoundError(msg) from e
            else:
                raise AWSError(
                    f"Start StepFn failed with unknown ClientError for {state_machine_arn}: {e}"
                ) from e
        except Exception as e:
            raise AWSError(f"Unknown failure for {state_machine_arn}") from e
        return ExecutionArn(response["executionArn"])

    execution_name = execution_name or build_execution_name(state_machine_input)
    return _start_execution(execution_name)

stop_execution

stop_execution(
    execution_arn, error=None, cause=None, region=None
)

Stop a running Step Functions execution.

Parameters:

Name Type Description Default
execution_arn Union[ExecutionArn, str]

The ARN of the execution to stop.

required
error Optional[str]

Optional error code to record.

None
cause Optional[str]

Optional cause description to record.

None
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Returns:

Type Description
datetime

The datetime when the execution was stopped.

Source code in src/aibs_informatics_aws_utils/stepfn.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def stop_execution(
    execution_arn: Union[ExecutionArn, str],
    error: Optional[str] = None,
    cause: Optional[str] = None,
    region: Optional[str] = None,
) -> datetime:
    """Stop a running Step Functions execution.

    Args:
        execution_arn (Union[ExecutionArn, str]): The ARN of the execution to stop.
        error (Optional[str]): Optional error code to record.
        cause (Optional[str]): Optional cause description to record.
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Returns:
        The datetime when the execution was stopped.
    """
    sfn = get_sfn_client(region=get_region(region=region))
    response = sfn.stop_execution(
        executionArn=ExecutionArn(execution_arn), error=error or "", cause=cause or ""
    )
    return response["stopDate"]