Skip to content

CloudWatch Constructs

Monitoring dashboards, alarms, and metrics.

cw

Modules

config_generators

Modules
sfn
Classes
StateMachineMetricConfigGenerator dataclass
StateMachineMetricConfigGenerator(
    state_machine: IStateMachine,
    state_machine_name: str,
    dimension_map: dict = field(init=False),
)
Functions
get_execution_completion_metric
get_execution_completion_metric(
    name_override: str | None = None,
) -> GraphMetricConfig

get the execution completion metric for the state machine

Parameters:

Name Type Description Default
name_override Optional[str]

override for name used. Defaults to None.

None

Returns:

Type Description
GraphMetricConfig

GraphMetricConfig

Source code in src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py
def get_execution_completion_metric(
    self, name_override: str | None = None
) -> GraphMetricConfig:
    """get the execution completion metric for the state machine

    Args:
        name_override (Optional[str], optional): override for name used.
            Defaults to None.

    Returns:
        GraphMetricConfig
    """
    return GraphMetricConfig(
        metric="ExecutionsSucceeded",
        label=f"{name_override or self.state_machine_name} Completed",
        statistic="Sum",
        dimension_map=self.dimension_map,
    )
get_execution_invocations_metric
get_execution_invocations_metric(
    name_override: str | None = None,
) -> GraphMetricConfig

get the execution invocations metric for the state machine

Parameters:

Name Type Description Default
name_override Optional[str]

override for name used. Defaults to None.

None

Returns:

Type Description
GraphMetricConfig

GraphMetricConfig

Source code in src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py
def get_execution_invocations_metric(
    self, name_override: str | None = None
) -> GraphMetricConfig:
    """get the execution invocations metric for the state machine

    Args:
        name_override (Optional[str], optional): override for name used.
            Defaults to None.

    Returns:
        GraphMetricConfig
    """
    return GraphMetricConfig(
        metric="ExecutionsStarted",
        label=f"{name_override or self.state_machine_name} Started",
        statistic="Sum",
        dimension_map=self.dimension_map,
    )
get_execution_failures_metric
get_execution_failures_metric(
    name_override: str | None = None,
    discriminator: str | None = None,
    alarm_threshold: int = 1,
    alarm_evaluation_periods: int = 3,
    alarm_datapoints_to_alarm: int = 1,
) -> GraphMetricConfig

get the execution failures metric for the state machine

Parameters:

Name Type Description Default
name_override Optional[str]

override for name used. Defaults to state machine name.

None
discriminator Optional[str]

Required if grouping with other metric configs that specify the same metric math. Defaults to "0".

None
alarm_threshold int

Alarm threshold used. Defaults to 1.

1
alarm_evaluation_periods int

Alarm evaluation periods. Defaults to 3.

3
alarm_datapoints_to_alarm int

Alarm datapoints to alarm. Defaults to 1.

1

Returns:

Type Description
GraphMetricConfig

Graph metric config for execution failures

Source code in src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py
def get_execution_failures_metric(
    self,
    name_override: str | None = None,
    discriminator: str | None = None,
    alarm_threshold: int = 1,
    alarm_evaluation_periods: int = 3,
    alarm_datapoints_to_alarm: int = 1,
) -> GraphMetricConfig:
    """get the execution failures metric for the state machine

    Args:
        name_override (Optional[str], optional): override for name used.
            Defaults to state machine name.
        discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math.
            Defaults to "0".
        alarm_threshold (int, optional): Alarm threshold used. Defaults to 1.
        alarm_evaluation_periods (int, optional): Alarm evaluation periods. Defaults to 3.
        alarm_datapoints_to_alarm (int, optional): Alarm datapoints to alarm. Defaults to 1.

    Returns:
        Graph metric config for execution failures
    """  # noqa: E501
    name = name_override or self.state_machine_name
    idx = discriminator or "0"
    return GraphMetricConfig(
        metric="ExecutionErrors",
        statistic="Sum",
        label=f"{name} Errors",
        dimension_map=self.dimension_map,
        metric_expression=(
            f"failed_{idx} + aborted_{idx} + timed_out_{idx} + throttled_{idx}"
        ),
        using_metrics={
            f"failed_{idx}": self.state_machine.metric_failed(),
            f"aborted_{idx}": self.state_machine.metric_aborted(),
            f"timed_out_{idx}": self.state_machine.metric_timed_out(),
            f"throttled_{idx}": self.state_machine.metric_throttled(),
        },
        alarm=AlarmMetricConfig(
            name=f"{name}-errors",
            threshold=alarm_threshold,
            evaluation_periods=alarm_evaluation_periods,
            datapoints_to_alarm=alarm_datapoints_to_alarm,
            comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
        ),
    )
get_execution_timing_metric
get_execution_timing_metric(
    name_override: str | None = None,
    discriminator: str | None = None,
    time_unit: SFN_TIME_UNITS = "minutes",
) -> GraphMetricConfig

get the execution time metric for the state machine

Parameters:

Name Type Description Default
name_override Optional[str]

override for name used. Defaults to state machine name.

None
discriminator Optional[str]

Required if grouping with other metric configs that specify the same metric math. Defaults to "0".

None
time_unit SFN_TIME_UNITS

unit of time to use for metric. Defaults to "minutes".

'minutes'

Returns:

Type Description
GraphMetricConfig

GraphMetricConfig

Source code in src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py
def get_execution_timing_metric(
    self,
    name_override: str | None = None,
    discriminator: str | None = None,
    time_unit: SFN_TIME_UNITS = "minutes",
) -> GraphMetricConfig:
    """get the execution time metric for the state machine

    Args:
        name_override (Optional[str], optional): override for name used.
            Defaults to state machine name.
        discriminator (Optional[str], optional): Required if grouping with other metric configs
            that specify the same metric math. Defaults to "0".
        time_unit (SFN_TIME_UNITS, optional): unit of time to use for metric.
            Defaults to "minutes".

    Returns:
        GraphMetricConfig
    """
    name = name_override or self.state_machine_name
    idx = discriminator or "0"
    if time_unit == "seconds":
        divisor = " / 1000"
    elif time_unit == "minutes":
        divisor = " / 1000 / 60"
    elif time_unit == "hours":
        divisor = " / 1000 / 60 / 60"
    else:
        divisor = ""

    return GraphMetricConfig(
        metric="ExecutionTime",
        statistic="Average",
        label=f"{name} Execution Time",
        dimension_map=self.dimension_map,
        metric_expression=f"time_msec_{idx} {divisor}",
        using_metrics={f"time_msec_{idx}": self.state_machine.metric_time()},
    )

dashboard

Classes
DashboardMixins

Bases: EnvBaseConstructMixins

Functions
add_graphs
add_graphs(
    grouped_metric_configs: list[GroupedGraphMetricConfig],
    namespace: str,
    period: Duration,
    alarm_id_discriminator: str,
    alarm_topic: Topic | None,
    dimensions: dict[str, Any],
) -> None

Adds graphs to a dashboard based on metrics configs.

Parameters:

Name Type Description Default
grouped_metric_configs List[GroupedGraphMetricConfig]

config

required
namespace str

default namespace for metrics, can be overridden in configs

required
period Duration

default duration

required
alarm_id_discriminator str

a non token disciminator for alarms

required
alarm_name_discriminator str

common name alarm discriminator

required
alarm_topic Optional[Topic]

optional SNS topic to send alarm notifications

required
dimensions Dict[str, Any]

dimensions to generate metrics from

required
Source code in src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py
def add_graphs(
    self,
    grouped_metric_configs: list[GroupedGraphMetricConfig],
    namespace: str,
    period: cdk.Duration,
    alarm_id_discriminator: str,
    alarm_topic: sns.Topic | None,
    dimensions: dict[str, Any],
) -> None:
    """Adds graphs to a dashboard based on metrics configs.

    Args:
        grouped_metric_configs (List[GroupedGraphMetricConfig]): config
        namespace (str): default namespace for metrics, can be overridden in configs
        period (cdk.Duration): default duration
        alarm_id_discriminator (str): a non token disciminator for alarms
        alarm_name_discriminator (str): common name alarm discriminator
        alarm_topic (Optional[sns.Topic]): optional SNS topic to send alarm notifications
        dimensions (Dict[str, Any]): dimensions to generate metrics from
    """
    # First, calculate widths dynamically
    MAX_PER_ROW = 4
    TOTAL_WIDTH = 24
    grouped_metric_configs = deepcopy(grouped_metric_configs)
    for idx in range(0, len(grouped_metric_configs), MAX_PER_ROW):
        grouped_metric_configs_subset = grouped_metric_configs[idx : idx + MAX_PER_ROW]
        requested_widget_widths = [_.get("width", 0) for _ in grouped_metric_configs_subset]
        remaining_width = TOTAL_WIDTH - sum(requested_widget_widths)
        widgets_without_width = sum([_ == 0 for _ in requested_widget_widths])
        if not remaining_width or not widgets_without_width:
            continue
        default_widget_width = remaining_width // widgets_without_width
        for grouped_metric_config in grouped_metric_configs_subset:
            grouped_metric_config["width"] = default_widget_width

    # Next generate the graph widgets and alarms
    graph_widgets, metric_alarms = self.create_widgets_and_alarms(
        grouped_metric_configs=grouped_metric_configs,
        namespace=namespace,
        period=period,
        alarm_id_discriminator=alarm_id_discriminator,
        alarm_topic=alarm_topic,
        dimensions=dimensions,
    )

    for idx in range(0, len(graph_widgets), MAX_PER_ROW):
        self.dashboard.add_widgets(*graph_widgets[idx : idx + MAX_PER_ROW])
    if metric_alarms:
        max_alarms_per_row = 6  # This is how many fit with full screen (improve me)
        num_alarms = len(metric_alarms)
        alarm_widget_height = ceil(ceil(num_alarms // max_alarms_per_row) * 1.5)
        self.dashboard.add_widgets(
            cw.AlarmStatusWidget(
                alarms=metric_alarms,
                height=alarm_widget_height,
                width=24,
            )
        )
create_widgets_and_alarms
create_widgets_and_alarms(
    grouped_metric_configs: list[GroupedGraphMetricConfig],
    namespace: str,
    period: Duration,
    alarm_id_discriminator: str,
    alarm_topic: Topic | None,
    dimensions: dict[str, Any],
) -> tuple[list[IWidget], list[IAlarm]]

Create graph widgets and alarms from configs

Parameters:

Name Type Description Default
grouped_metric_configs List[GroupedGraphMetricConfig]

configs

required
namespace str

default metric namespace

required
period Duration

default duration

required
alarm_name_discriminator str

alarm discriminator name

required
alarm_topic Optional[Topic]

optional sns topic for alarms

required
dimensions Dict[str, Any]
required

Returns:

Type Description
tuple[list[IWidget], list[IAlarm]]

Tuple[List[cw.IWidget], List[cw.IAlarm]]: List of widgets and list of alarms

Source code in src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py
def create_widgets_and_alarms(
    self,
    grouped_metric_configs: list[GroupedGraphMetricConfig],
    namespace: str,
    period: cdk.Duration,
    alarm_id_discriminator: str,
    alarm_topic: sns.Topic | None,
    dimensions: dict[str, Any],
) -> tuple[list[cw.IWidget], list[cw.IAlarm]]:
    """Create graph widgets and alarms from configs

    Args:
        grouped_metric_configs (List[GroupedGraphMetricConfig]): configs
        namespace (str): default metric namespace
        period (cdk.Duration): default duration
        alarm_name_discriminator (str): alarm discriminator name
        alarm_topic (Optional[sns.Topic]): optional sns topic for alarms
        dimensions (Dict[str, Any]):

    Returns:
        Tuple[List[cw.IWidget], List[cw.IAlarm]]: List of widgets and list of alarms
    """
    self_stack = cdk.Stack.of(self.dashboard)

    graph_widgets: list[cw.IWidget] = []
    metric_alarms: list[cw.IAlarm] = []
    for grouped_metric_config in grouped_metric_configs:
        lr_graph_metrics: dict[Literal["left", "right"], list[cw.Metric]] = defaultdict(list)
        lr_annotations: dict[Literal["left", "right"], list[cw.HorizontalAnnotation]] = (
            defaultdict(list)
        )

        graph_metric_namespace = grouped_metric_config.get("namespace", namespace)
        graph_dimension_map = {**dimensions, **grouped_metric_config.get("dimension_map", {})}
        for metric_config in grouped_metric_config["metrics"]:
            if isinstance(metric_config["metric"], (cw.Metric, cw.MathExpression)):
                graph_metric = metric_config["metric"]
                metric_label = metric_config.get("label", graph_metric.label)
                if isinstance(graph_metric, cw.Metric):
                    metric_name = graph_metric.metric_name
                else:
                    metric_name = graph_metric.label or metric_config["statistic"]
            else:
                metric = metric_config["metric"]
                if isinstance(metric, cw.Metric):
                    metric_name = metric.metric_name
                else:
                    metric_name = str(metric)
                metric_label = metric_config.get(
                    "label",
                    re.sub(
                        r"([a-z])([A-Z])",
                        r"\1 \2",
                        re.sub(r"([A-Z])([a-z])", r" \1\2", metric_name.replace(".", " ")),
                    ),
                )

                metric_expression = metric_config.get("metric_expression")
                if metric_expression:
                    graph_metric = cw.MathExpression(
                        expression=metric_expression,
                        using_metrics=metric_config.get("using_metrics", {}),
                        label=metric_label,
                    )
                else:
                    graph_metric = cw.Metric(
                        metric_name=metric_name,
                        namespace=metric_config.get("namespace", graph_metric_namespace),
                        label=metric_label,
                        statistic=metric_config["statistic"],
                        period=period,
                        dimensions_map={
                            **graph_dimension_map,
                            **metric_config.get("dimension_map", {}),
                        },
                        unit=metric_config.get("unit"),
                    )

            metric_axis = metric_config.get("axis_side", "left")
            lr_graph_metrics[metric_axis].append(graph_metric)  # type: ignore # MathExpression implements IMetric

            metric_alarm_config = metric_config.get("alarm")
            if metric_alarm_config:
                alarm_name = metric_alarm_config["name"]
                alarm = graph_metric.create_alarm(
                    self_stack,
                    self.get_construct_id(alarm_name, alarm_id_discriminator),
                    # TODO: every time a change is made to these alarms, Cfn throws an error
                    #       for trying to modify what is a custom resource. So instead, let
                    #       the name be autogenerated.
                    # alarm_name=alarm_name,
                    alarm_description=f"Alarm for {alarm_name}",
                    threshold=metric_alarm_config["threshold"],
                    evaluation_periods=metric_alarm_config["evaluation_periods"],
                    datapoints_to_alarm=metric_alarm_config["datapoints_to_alarm"],
                    comparison_operator=to_comparison_operator(
                        metric_alarm_config["comparison_operator"]
                    ),
                )
                lr_annotations[metric_axis].append(
                    cw.HorizontalAnnotation(
                        value=metric_alarm_config["threshold"],
                        color=graph_metric.color,
                    )
                )
                metric_alarms.append(alarm)
                if alarm_topic:
                    alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic))  # type: ignore # SnsAction implements IAlarmAction

        graph_widgets.append(
            cw.GraphWidget(
                title=grouped_metric_config["title"],
                left=lr_graph_metrics["left"],
                left_annotations=lr_annotations["left"],
                left_y_axis=grouped_metric_config.get("left_y_axis"),
                right=lr_graph_metrics["right"],
                right_annotations=lr_annotations["right"],
                right_y_axis=grouped_metric_config.get("right_y_axis"),
                height=grouped_metric_config.get("height", 10),
                width=grouped_metric_config.get("width"),
            )
        )
    return graph_widgets, metric_alarms