Skip to content

Athena

Utilities for working with AWS Athena.


get_query_execution

get_query_execution(query_execution_id)

Get the status and details of an Athena query execution.

Parameters:

Name Type Description Default
query_execution_id str

The unique identifier of the query execution.

required

Raises:

Type Description
AWSError

If the query execution cannot be retrieved.

Returns:

Type Description
GetQueryExecutionOutputTypeDef

The query execution details including status and results location.

Source code in src/aibs_informatics_aws_utils/athena.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get_query_execution(query_execution_id: str) -> GetQueryExecutionOutputTypeDef:
    """Get the status and details of an Athena query execution.

    Args:
        query_execution_id (str): The unique identifier of the query execution.

    Raises:
        AWSError: If the query execution cannot be retrieved.

    Returns:
        The query execution details including status and results location.
    """
    athena = get_athena_client()
    try:
        return athena.get_query_execution(QueryExecutionId=query_execution_id)
    except Exception as e:
        logger.error(f"Error executing : {query_execution_id} {e}", exc_info=True)
        raise AWSError(f"Error starting query execution: {query_execution_id} {e}") from e

query_waiter

query_waiter(query_execution_id, timeout=60)

Wait for an Athena query to complete.

Polls the query execution status until it reaches a terminal state (SUCCEEDED, FAILED, CANCELLED) or times out.

Parameters:

Name Type Description Default
query_execution_id str

The unique identifier of the query execution.

required
timeout int

Maximum time to wait in seconds. Defaults to 60.

60

Returns:

Type Description
Tuple[ATHENA_QUERY_WAITER_STATUS, QueryExecutionStatusTypeDef]

A tuple of (status, status_details) where status is one of SUCCEEDED, FAILED, CANCELLED, or TIMEOUT.

Source code in src/aibs_informatics_aws_utils/athena.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def query_waiter(
    query_execution_id: str, timeout: int = 60
) -> Tuple[ATHENA_QUERY_WAITER_STATUS, QueryExecutionStatusTypeDef]:
    """Wait for an Athena query to complete.

    Polls the query execution status until it reaches a terminal state
    (SUCCEEDED, FAILED, CANCELLED) or times out.

    Args:
        query_execution_id (str): The unique identifier of the query execution.
        timeout (int): Maximum time to wait in seconds. Defaults to 60.

    Returns:
        A tuple of (status, status_details) where status is one of
            SUCCEEDED, FAILED, CANCELLED, or TIMEOUT.
    """
    start = time.time()
    logger.info(f"Polling for status of query execution: {query_execution_id}")
    while True:
        stats = get_query_execution(query_execution_id=query_execution_id)
        logger.info(f"Query Execution Status: {stats}")
        status = stats["QueryExecution"].get("Status", {})
        state = status.get("State")
        if state in ["SUCCEEDED", "FAILED", "CANCELLED", "TIMEOUT"]:
            return state, status  # type: ignore[return-value]
        time.sleep(0.2)  # 200ms
        # Exit if the time waiting exceed the timeout seconds
        if time.time() > start + timeout:
            return "TIMEOUT", status

start_query_execution

start_query_execution(
    query_string,
    work_group=None,
    execution_parameters=None,
    **kwargs
)

Start an Athena query execution.

Parameters:

Name Type Description Default
query_string str

The SQL query string to execute.

required
work_group Optional[str]

The name of the workgroup to execute the query in.

None
execution_parameters Optional[List[str]]

Optional list of query execution parameters.

None
**kwargs Unpack[StartQueryExecutionInputTypeDef]

Additional arguments passed to the Athena start_query_execution API.

{}

Raises:

Type Description
AWSError

If the query execution fails to start.

Returns:

Type Description
StartQueryExecutionOutputTypeDef

The start query execution response containing the QueryExecutionId.

Source code in src/aibs_informatics_aws_utils/athena.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def start_query_execution(
    query_string: str,
    work_group: Optional[str] = None,
    execution_parameters: Optional[List[str]] = None,
    **kwargs: Unpack[StartQueryExecutionInputTypeDef],
) -> StartQueryExecutionOutputTypeDef:
    """Start an Athena query execution.

    Args:
        query_string (str): The SQL query string to execute.
        work_group (Optional[str]): The name of the workgroup to execute the query in.
        execution_parameters (Optional[List[str]]): Optional list of query execution parameters.
        **kwargs: Additional arguments passed to the Athena start_query_execution API.

    Raises:
        AWSError: If the query execution fails to start.

    Returns:
        The start query execution response containing the QueryExecutionId.
    """
    athena = get_athena_client()

    request = StartQueryExecutionInputTypeDef(QueryString=query_string)
    if work_group:
        request["WorkGroup"] = work_group
    if execution_parameters:
        request["ExecutionParameters"] = execution_parameters
    request.update(kwargs)
    try:
        metadata = athena.start_query_execution(**request)
        return metadata
    except ClientError as e:
        logger.error(f"Error executing : {request} {e}", exc_info=True)
        raise AWSError(f"Error starting query execution: {request} {e}") from e