Skip to content

Batch

Utilities for working with AWS Batch.


build_retry_strategy

build_retry_strategy(
    num_retries=5,
    evaluate_on_exit_configs=None,
    include_default_evaluate_on_exit_configs=True,
)

Build a Retry Strategy for a Job definition

By default, SPOT Termination retries are included. These can be excluded if desired.

https://aws.amazon.com/blogs/compute/introducing-retry-strategies-for-aws-batch/

Parameters:

Name Type Description Default
num_retries int

number of times to retry. Defaults to 5.

5
evaluate_on_exit_configs Optional[List[EvaluateOnExitTypeDef]]

list of EvaluateOnExit configs.

None
include_default_evaluate_on_exit_configs bool

Whether to exclude default evaluate on exit configurations. Defaults to True.

True

Returns:

Type Description
RetryStrategyTypeDef

The retry strategy configuration.

Source code in src/aibs_informatics_aws_utils/batch.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def build_retry_strategy(
    num_retries: int = 5,
    evaluate_on_exit_configs: Optional[List[EvaluateOnExitTypeDef]] = None,
    include_default_evaluate_on_exit_configs: bool = True,
) -> RetryStrategyTypeDef:
    """Build a Retry Strategy for a Job definition

    By default, SPOT Termination retries are included. These can be excluded if desired.

    https://aws.amazon.com/blogs/compute/introducing-retry-strategies-for-aws-batch/

    Args:
        num_retries (int, optional): number of times to retry. Defaults to 5.
        evaluate_on_exit_configs (Optional[List[EvaluateOnExitTypeDef]], optional):
            list of EvaluateOnExit configs.
        include_default_evaluate_on_exit_configs (bool, optional): Whether to exclude default
            evaluate on exit configurations. Defaults to True.

    Returns:
        The retry strategy configuration.
    """
    all_evaluate_on_exit_configs: List[EvaluateOnExitTypeDef] = []
    if evaluate_on_exit_configs:
        all_evaluate_on_exit_configs.extend(evaluate_on_exit_configs)
    if include_default_evaluate_on_exit_configs:
        all_evaluate_on_exit_configs.extend(
            [
                EvaluateOnExitTypeDef(
                    action="RETRY",
                    onStatusReason="Task failed to start",
                    onReason="DockerTimeoutError*",
                ),
                EvaluateOnExitTypeDef(action="RETRY", onStatusReason="Host EC2*"),
                EvaluateOnExitTypeDef(action="EXIT", onStatusReason="*"),
            ]
        )
    return RetryStrategyTypeDef(attempts=num_retries, evaluateOnExit=all_evaluate_on_exit_configs)

register_job_definition

register_job_definition(
    job_definition_name,
    container_properties,
    parameters=None,
    job_definition_type="container",
    retry_strategy=None,
    tags=None,
    propagate_tags=False,
    region=None,
)

Register a job definition with Batch.

If a matching job definition already exists (same command, image, jobRoleArn, parameters, type, tags, and retry strategy), the existing definition is returned instead of creating a new revision.

Parameters:

Name Type Description Default
job_definition_name str

The name of the job definition.

required
container_properties ContainerPropertiesTypeDef

Container configuration.

required
parameters Optional[Mapping[str, str]]

Default parameter substitution values.

None
job_definition_type JobDefinitionTypeType

Type of job. Defaults to "container".

'container'
retry_strategy Optional[RetryStrategyTypeDef]

Retry strategy configuration.

None
tags Optional[Mapping[str, str]]

Tags to apply to the job definition.

None
propagate_tags bool

Whether to propagate tags to jobs. Defaults to False.

False
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Returns:

Type Description
Union[JobDefinitionTypeDef, RegisterJobDefinitionResponseTypeDef]

The existing job definition if matching, otherwise the new registration response.

Source code in src/aibs_informatics_aws_utils/batch.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@retry(ClientError)
def register_job_definition(
    job_definition_name: str,
    container_properties: ContainerPropertiesTypeDef,
    parameters: Optional[Mapping[str, str]] = None,
    job_definition_type: JobDefinitionTypeType = "container",
    retry_strategy: Optional[RetryStrategyTypeDef] = None,
    tags: Optional[Mapping[str, str]] = None,
    propagate_tags: bool = False,
    region: Optional[str] = None,
) -> Union[JobDefinitionTypeDef, RegisterJobDefinitionResponseTypeDef]:
    """Register a job definition with Batch.

    If a matching job definition already exists (same command, image, jobRoleArn,
    parameters, type, tags, and retry strategy), the existing definition is returned
    instead of creating a new revision.

    Args:
        job_definition_name (str): The name of the job definition.
        container_properties (ContainerPropertiesTypeDef): Container configuration.
        parameters (Optional[Mapping[str, str]]): Default parameter substitution values.
        job_definition_type (JobDefinitionTypeType): Type of job. Defaults to "container".
        retry_strategy (Optional[RetryStrategyTypeDef]): Retry strategy configuration.
        tags (Optional[Mapping[str, str]]): Tags to apply to the job definition.
        propagate_tags (bool): Whether to propagate tags to jobs. Defaults to False.
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Returns:
        The existing job definition if matching, otherwise the new registration response.
    """
    batch = get_batch_client(region=region)

    # First we check to make sure that we aren't crearting unnecessary revisions
    # of the same job definition.
    latest = get_latest_job_definition(job_definition_name=job_definition_name, region=region)
    logger.info(f"Previously registered batch job definition: {latest}")
    if latest:
        latest_container_properties = latest.get("containerProperties", {})
        if (
            latest_container_properties.get("command") == container_properties.get("command")
            and latest_container_properties.get("image") == container_properties.get("image")
            and latest_container_properties.get("jobRoleArn")
            == container_properties.get("jobRoleArn")
            and latest.get("parameters") == parameters
            and latest.get("type") == job_definition_type
            and latest.get("tags") == tags
            and latest.get("retryStrategy") == retry_strategy
        ):
            logger.info(
                f"Latest job definition (name={job_definition_name}) matches expected. "
                "Skipping register new job definition call"
            )
            return latest
    register_job_definition_kwargs = RegisterJobDefinitionRequestTypeDef(
        jobDefinitionName=job_definition_name,
        type=job_definition_type,
        parameters=parameters or {},
        containerProperties=container_properties,
        propagateTags=propagate_tags,
        retryStrategy=retry_strategy or {},
        tags=tags or {},
    )
    logger.info(
        f"Registering job definition with following properties: {register_job_definition_kwargs}"
    )
    response = batch.register_job_definition(**register_job_definition_kwargs)  # type: ignore[arg-type]
    return response

to_key_value_pairs

to_key_value_pairs(environment, remove_null_values=True)

Converts a map style of environment variables into a list of key-value pairs

Parameters:

Name Type Description Default
environment Dict[str, str]

map of environment variable keys and values

required
remove_null_values bool

Whether to withhold pairs where value is None. Defaults to True

True

Returns:

Type Description
List[KeyValuePairTypeDef]

List of name,value json blobs representing env variables

Source code in src/aibs_informatics_aws_utils/batch.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def to_key_value_pairs(
    environment: Dict[str, str],
    remove_null_values: bool = True,
) -> List[KeyValuePairTypeDef]:
    """Converts a map style of environment variables into a list of key-value pairs

    Args:
        environment (Dict[str, str]): map of environment variable keys and values
        remove_null_values (bool): Whether to withhold pairs where value is None. Defaults to True

    Returns:
        List of name,value json blobs representing env variables
    """

    return sorted(
        [
            KeyValuePairTypeDef(name=k, value=v)
            for k, v in environment.items()
            if not remove_null_values or v is not None
        ],
        key=lambda _: _.get("name", ""),
    )

to_resource_requirements

to_resource_requirements(gpu=None, memory=None, vcpus=None)

Converts Batch resource requirement parameters into a list of ResourceRequirement objects

The returned list only includes dictionary entries for resources that specify an explicit value. Anything unset will be dropped.

Parameters:

Name Type Description Default
gpu Optional[int]

number of GPUs to use. Defaults to None.

None
memory Optional[int]

amount of memory in MiB. Defaults to None.

None
vcpus Optional[int]

Number of VCPUs to use. Defaults to None.

None

Returns:

Type Description
List[ResourceRequirementTypeDef]

list of resource requirements

Source code in src/aibs_informatics_aws_utils/batch.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def to_resource_requirements(
    gpu: Optional[int] = None,
    memory: Optional[int] = None,
    vcpus: Optional[int] = None,
) -> List[ResourceRequirementTypeDef]:
    """Converts Batch resource requirement parameters into a list of ResourceRequirement objects

    The returned list only includes dictionary entries for resources that specify
    an explicit value. Anything unset will be dropped.

    Args:
        gpu (Optional[int], optional): number of GPUs to use. Defaults to None.
        memory (Optional[int], optional): amount of memory in MiB. Defaults to None.
        vcpus (Optional[int], optional): Number of VCPUs to use. Defaults to None.

    Returns:
        list of resource requirements
    """

    pairs: list[tuple[Literal["GPU", "MEMORY", "VCPU"], Optional[int]]] = [
        ("GPU", gpu),
        ("MEMORY", memory),
        ("VCPU", vcpus),
    ]
    return [ResourceRequirementTypeDef(type=t, value=str(v)) for t, v in pairs if v is not None]