Skip to content

SQS

Utilities for working with Amazon Simple Queue Service.


delete_from_queue

delete_from_queue(queue_name, receipt_handle, region=None)

Delete a message from an SQS queue.

Parameters:

Name Type Description Default
queue_name str

The name of the SQS queue.

required
receipt_handle str

The receipt handle of the message to delete.

required
region Optional[str]

AWS region. Defaults to None (uses default region).

None

Returns:

Type Description

True if the message was successfully deleted.

Source code in src/aibs_informatics_aws_utils/sqs.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def delete_from_queue(queue_name: str, receipt_handle: str, region: Optional[str] = None):
    """Delete a message from an SQS queue.

    Args:
        queue_name (str): The name of the SQS queue.
        receipt_handle (str): The receipt handle of the message to delete.
        region (Optional[str]): AWS region. Defaults to None (uses default region).

    Returns:
        True if the message was successfully deleted.
    """
    sqs = get_sqs_client()
    queue_url_response = sqs.get_queue_url(QueueName=queue_name)
    queue_url = queue_url_response["QueueUrl"]

    sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
    logger.info("Deleted message %s form queue %s", receipt_handle, queue_name)

    return True

send_sqs_message

send_sqs_message(
    queue_name,
    payload,
    message_deduplication_id=None,
    message_group_id=None,
    payload_json_encoder=DecimalEncoder,
)

Send a message to an SQS queue by providing a queue name

Parameters:

Name Type Description Default
queue_name str

The name of the queue that you want to send a message to. (e.g. 'aps-sync-request-queue.fifo')

required
payload dict

A dictionary representing the message payload you would like to send.

required
message_deduplication_id Optional[str]

An ID that can be used by SQS to remove messages that have the same deduplication_id. Do not set if your SQS queue already uses content based deduplication. Defaults to None.

None
message_group_id Optional[str]

Required for FIFO queues. Messages sent with the same message_group_id will obey FIFO rules. Messages with different message_group_ids may be interleaved. Defaults to None.

None
payload_json_encoder Type[JSONEncoder]

The JSONEncoder class that should be used to covert the input payload dictionary into a json string. By default uses a DecimalEncoder which can handle decimal.Decimal types.

DecimalEncoder

Raises:

Type Description
AWSError

If the provided queue_name cannot be resolved to an SQS url. HINT: Does the code calling this function have the correct SQS permissions?

RuntimeError

If the destination queue is a FIFO queue, then message_group_id MUST be provided.

Returns:

Type Description
str

Returns an MD5 digest of the send message body.

Source code in src/aibs_informatics_aws_utils/sqs.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def send_sqs_message(
    queue_name: str,
    payload: dict,
    message_deduplication_id: Optional[str] = None,
    message_group_id: Optional[str] = None,
    payload_json_encoder: Type[json.JSONEncoder] = DecimalEncoder,
) -> str:
    """Send a message to an SQS queue by providing a queue name

    Args:
        queue_name (str): The name of the queue that you want to send a message to.
            (e.g. 'aps-sync-request-queue.fifo')
        payload (dict): A dictionary representing the message payload you would like to send.
        message_deduplication_id (Optional[str], optional): An ID that can be used by SQS
            to remove messages that have the same deduplication_id. Do not set if your
            SQS queue already uses content based deduplication. Defaults to None.
        message_group_id (Optional[str], optional): Required for FIFO queues.
            Messages sent with the same message_group_id will obey FIFO rules. Messages with
            different message_group_ids may be interleaved. Defaults to None.
        payload_json_encoder (Type[json.JSONEncoder], optional): The JSONEncoder
            class that should be used to covert the input `payload` dictionary into
            a json string. By default uses a DecimalEncoder which can handle decimal.Decimal types.

    Raises:
        AWSError: If the provided queue_name cannot be resolved to an SQS url.
            HINT: Does the code calling this function have the correct SQS permissions?
        RuntimeError: If the destination queue is a FIFO queue, then `message_group_id` MUST
            be provided.

    Returns:
        Returns an MD5 digest of the send message body.
    """
    sqs = get_sqs_client(region=get_region())
    try:
        queue_url_response = sqs.get_queue_url(QueueName=queue_name)
    except ClientError:
        raise AWSError(
            f"Could not find SQS queue with name: {queue_name}. "
            "Does the code calling send_sqs_message() have sqs:GetQueueUrl permissions?"
        )

    send_sqs_message_args = {
        "QueueUrl": queue_url_response["QueueUrl"],
        "MessageBody": json.dumps(payload, cls=payload_json_encoder),
    }

    if message_group_id is not None:
        send_sqs_message_args["MessageGroupId"] = message_group_id
    else:
        if queue_name.endswith(".fifo"):
            raise RuntimeError("SQS messages for a FIFO queue *must* include a message_group_id!")

    if message_deduplication_id is not None:
        send_sqs_message_args["MessageDeduplicationId"] = message_deduplication_id

    response = sqs.send_message(**send_sqs_message_args)  # type: ignore  # complains about valid kwargs

    return response["MD5OfMessageBody"]

send_to_dispatch_queue

send_to_dispatch_queue(payload, env_base)

Send a message to the demand request dispatch queue.

Parameters:

Name Type Description Default
payload dict

The message payload as a dictionary.

required
env_base str

The environment base used to construct the queue name.

required

Raises:

Type Description
AWSError

If the queue cannot be found.

Returns:

Type Description

The MD5 digest of the sent message body.

Source code in src/aibs_informatics_aws_utils/sqs.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def send_to_dispatch_queue(payload: dict, env_base: str):
    """Send a message to the demand request dispatch queue.

    Args:
        payload (dict): The message payload as a dictionary.
        env_base (str): The environment base used to construct the queue name.

    Raises:
        AWSError: If the queue cannot be found.

    Returns:
        The MD5 digest of the sent message body.
    """
    sqs = get_sqs_client(region=get_region())
    queue_name = "-".join([env_base, "demand_request_queue"])
    logger.info("Queue name: %s", queue_name)

    try:
        queue_url_response = sqs.get_queue_url(QueueName=queue_name)
    except ClientError as e:
        logger.exception(e)
        raise AWSError(f"Could not find SQS queue with name {queue_name}")

    queue_url = queue_url_response["QueueUrl"]

    response = sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(payload))

    return response["MD5OfMessageBody"]