Skip to content

Demand Context Manager

Context management utilities for demand execution.


Demand execution context management.

Provides context managers and utilities for setting up and managing demand execution environments including EFS volumes, batch jobs, and data synchronization.

BatchEFSConfiguration dataclass

BatchEFSConfiguration(mount_point_config, read_only=False)

Configuration for mounting an EFS volume in AWS Batch.

Encapsulates the mount point configuration and generates the necessary AWS Batch volume and mount point type definitions.

Attributes:

Name Type Description
mount_point_config MountPointConfiguration

The underlying mount point configuration.

read_only bool

Whether the mount should be read-only.

mount_point MountPointTypeDef

Generated AWS Batch mount point type definition.

volume VolumeTypeDef

Generated AWS Batch volume type definition.

Example
config = BatchEFSConfiguration.build(
    access_point="fsap-12345",
    mount_path="/opt/efs/scratch",
    read_only=False
)

mount_path property

mount_path

Get the mount path for this configuration.

Returns:

Type Description
Path

The container path where this volume is mounted.

build classmethod

build(access_point, mount_path, read_only=False)

Build a BatchEFSConfiguration from an access point.

Parameters:

Name Type Description Default
access_point str

The EFS access point ID.

required
mount_path Union[Path, str]

Path where the volume will be mounted.

required
read_only bool

Whether the mount should be read-only.

False

Returns:

Type Description

Configured BatchEFSConfiguration instance.

Source code in src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@classmethod
def build(cls, access_point: str, mount_path: Union[Path, str], read_only: bool = False):
    """Build a BatchEFSConfiguration from an access point.

    Args:
        access_point (str): The EFS access point ID.
        mount_path (Union[Path, str]): Path where the volume will be mounted.
        read_only (bool): Whether the mount should be read-only.

    Returns:
        Configured BatchEFSConfiguration instance.
    """
    mount_point_config = MountPointConfiguration.build(
        mount_point=mount_path,
        access_point=access_point,
    )
    return BatchEFSConfiguration(mount_point_config=mount_point_config, read_only=read_only)

DemandExecutionContextManager dataclass

DemandExecutionContextManager(
    demand_execution,
    scratch_vol_configuration,
    shared_vol_configuration,
    tmp_vol_configuration=None,
    configuration=ContextManagerConfiguration(),
    env_base=EnvBase.from_env(),
)

Manages the context and configuration for demand executions.

Coordinates EFS volume configurations, data synchronization requests, and AWS Batch job building for demand executions.

This class handles: - Path resolution between container and EFS paths - Pre-execution data sync setup (inputs) - Post-execution data sync setup (outputs) - Batch job builder configuration - Working directory and cleanup management

Attributes:

Name Type Description
demand_execution DemandExecution

The demand execution to manage.

scratch_vol_configuration BatchEFSConfiguration

EFS configuration for scratch storage.

shared_vol_configuration BatchEFSConfiguration

EFS configuration for shared/input storage.

tmp_vol_configuration Optional[BatchEFSConfiguration]

Optional EFS configuration for temp storage.

configuration ContextManagerConfiguration

Context manager configuration options.

env_base EnvBase

Environment base for resource naming.

Example
context_manager = DemandExecutionContextManager.from_demand_execution(
    demand_execution=execution,
    env_base=env_base,
)
batch_builder = context_manager.batch_job_builder

batch_job_builder property

batch_job_builder

Get or create the batch job builder for this execution.

Lazily creates the BatchJobBuilder on first access with all necessary configurations.

Returns:

Type Description
BatchJobBuilder

Configured BatchJobBuilder instance.

batch_job_queue_name property

batch_job_queue_name

Get the batch job queue name for this execution.

Returns:

Type Description
str

The AWS Batch job queue name.

container_shared_path property

container_shared_path

Returns the container path for the shared volume

Example

/opt/efs/shared

Returns:

Type Description
Path

container path for shared volume

container_tmp_path property

container_tmp_path

Returns the container path for the tmp volume

Example

/opt/efs/scratch/tmp

Returns:

Type Description
Path

container path for tmp volume

container_working_path property

container_working_path

Returns the container path for the working data path for the demand execution

Example

/opt/efs/scratch/{EXECUTION_ID}

Returns:

Type Description
Path

container path for working data path directory

efs_mount_points property

efs_mount_points

Returns a list of mount points for the EFS volumes used by the aws batch job

Returns:

Type Description
List[MountPointConfiguration]

list of mount point configurations

efs_shared_path property

efs_shared_path

Returns the global EFS path for shared (inputs) data path

Example

efs://fs-12345678:/shared

Returns:

Type Description
EFSPath

EFS URI for shared data path directory

efs_tmp_path property

efs_tmp_path

Returns the global EFS path for tmp data path (scratch)

Example

efs://fs-12345678:/scratch/tmp

Returns:

Type Description
EFSPath

EFS URI for tmp data path directory

efs_working_path property

efs_working_path

Returns the global EFS path for working data path for the demand execution

Example

efs://fs-12345678:/scratch/{EXECUTION_ID}

Returns:

Type Description
EFSPath

EFS URI for working data path directory

post_execution_data_sync_requests property

post_execution_data_sync_requests

Generate data sync requests for post-execution output upload.

Creates requests to sync output data from EFS to S3 after the batch job completes.

Returns:

Type Description
List[PrepareBatchDataSyncRequest]

List of data sync requests for output data.

post_execution_remove_data_paths_requests property

post_execution_remove_data_paths_requests

Generates remove data paths requests for post-execution data sync

Returns:

Type Description
List[RemoveDataPathsRequest]

list of remove data paths requests

pre_execution_data_sync_requests property

pre_execution_data_sync_requests

Generate data sync requests for pre-execution input staging.

Creates requests to sync input data from S3 to EFS before the batch job runs.

Returns:

Type Description
List[PrepareBatchDataSyncRequest]

List of data sync requests for input data.

from_demand_execution classmethod

from_demand_execution(
    demand_execution, env_base, configuration=None
)

Create a context manager from a demand execution.

Factory method that sets up default EFS configurations and creates the context manager.

Parameters:

Name Type Description Default
demand_execution DemandExecution

The demand execution to manage.

required
env_base EnvBase

Environment base for resource resolution.

required
configuration Optional[ContextManagerConfiguration]

Optional context manager configuration.

None

Returns:

Type Description

Configured DemandExecutionContextManager instance.

Source code in src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
@classmethod
def from_demand_execution(
    cls,
    demand_execution: DemandExecution,
    env_base: EnvBase,
    configuration: Optional[ContextManagerConfiguration] = None,
):
    """Create a context manager from a demand execution.

    Factory method that sets up default EFS configurations and
    creates the context manager.

    Args:
        demand_execution (DemandExecution): The demand execution to manage.
        env_base (EnvBase): Environment base for resource resolution.
        configuration (Optional[ContextManagerConfiguration]): Optional context
            manager configuration.

    Returns:
        Configured DemandExecutionContextManager instance.
    """
    vol_configuration = get_batch_efs_configuration(
        env_base=env_base,
        container_path=f"/opt/efs{EFS_SCRATCH_PATH}",
        access_point_name=EFS_SCRATCH_ACCESS_POINT_NAME,
        read_only=False,
    )
    shared_vol_configuration = get_batch_efs_configuration(
        env_base=env_base,
        container_path=f"/opt/efs{EFS_SHARED_PATH}",
        access_point_name=EFS_SHARED_ACCESS_POINT_NAME,
        read_only=True,
    )
    tmp_vol_configuration = None

    logger.info(f"Using following efs configuration: {vol_configuration}")
    return DemandExecutionContextManager(
        demand_execution=demand_execution,
        scratch_vol_configuration=vol_configuration,
        shared_vol_configuration=shared_vol_configuration,
        tmp_vol_configuration=tmp_vol_configuration,
        configuration=configuration or ContextManagerConfiguration(),
        env_base=env_base,
    )

generate_batch_job_builder

generate_batch_job_builder(
    demand_execution,
    env_base,
    working_path,
    tmp_path,
    scratch_mount_point,
    shared_mount_point,
    tmp_mount_point=None,
    env_file_write_mode=EnvFileWriteMode.ALWAYS,
)

Generate a BatchJobBuilder for the demand execution.

Creates a fully configured BatchJobBuilder with: - Command construction with pre-commands for setup - Environment variable handling (direct or via env file) - Volume and mount point configurations - Resource requirements from demand execution

Parameters:

Name Type Description Default
demand_execution DemandExecution

The demand execution to build a job for.

required
env_base EnvBase

Environment base for resource naming.

required
working_path EFSPath

EFS path for the working directory.

required
tmp_path EFSPath

EFS path for temporary files.

required
scratch_mount_point MountPointConfiguration

Mount configuration for scratch volume.

required
shared_mount_point MountPointConfiguration

Mount configuration for shared volume.

required
tmp_mount_point Optional[MountPointConfiguration]

Optional mount configuration for tmp volume.

None
env_file_write_mode EnvFileWriteMode

How to handle environment variable files.

ALWAYS

Returns:

Type Description
BatchJobBuilder

Configured BatchJobBuilder ready for job submission.

Raises:

Type Description
ValueError

If no command is specified in the demand execution.

Source code in src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
def generate_batch_job_builder(  # noqa: C901
    demand_execution: DemandExecution,
    env_base: EnvBase,
    working_path: EFSPath,
    tmp_path: EFSPath,
    scratch_mount_point: MountPointConfiguration,
    shared_mount_point: MountPointConfiguration,
    tmp_mount_point: Optional[MountPointConfiguration] = None,
    env_file_write_mode: EnvFileWriteMode = EnvFileWriteMode.ALWAYS,
) -> BatchJobBuilder:
    """Generate a BatchJobBuilder for the demand execution.

    Creates a fully configured BatchJobBuilder with:
    - Command construction with pre-commands for setup
    - Environment variable handling (direct or via env file)
    - Volume and mount point configurations
    - Resource requirements from demand execution

    Args:
        demand_execution (DemandExecution): The demand execution to build a job for.
        env_base (EnvBase): Environment base for resource naming.
        working_path (EFSPath): EFS path for the working directory.
        tmp_path (EFSPath): EFS path for temporary files.
        scratch_mount_point (MountPointConfiguration): Mount configuration for scratch volume.
        shared_mount_point (MountPointConfiguration): Mount configuration for shared volume.
        tmp_mount_point (Optional[MountPointConfiguration]): Optional mount configuration
            for tmp volume.
        env_file_write_mode (EnvFileWriteMode): How to handle environment variable files.

    Returns:
        Configured BatchJobBuilder ready for job submission.

    Raises:
        ValueError: If no command is specified in the demand execution.
    """
    logger.info("Constructing BatchJobBuilder instance")

    demand_execution = demand_execution.copy()
    efs_mount_points = [scratch_mount_point, shared_mount_point]
    if tmp_mount_point is not None:
        efs_mount_points.append(tmp_mount_point)
    logger.info(f"Resolving local paths of working dir = {working_path} and tmp dir = {tmp_path}")
    container_working_path = get_local_path(working_path, mount_points=efs_mount_points)
    container_tmp_path = get_local_path(tmp_path, mount_points=efs_mount_points)

    logger.info(f"Setting container working directory = {container_working_path}")

    EXECUTION_ID_VAR = "EXECUTION_ID"
    WORKING_DIR_VAR = "WORKING_DIR"
    TMPDIR_VAR = "TMPDIR"

    environment: Dict[str, str] = {
        EXECUTION_ID_VAR: demand_execution.execution_id,
        WORKING_DIR_VAR: f"{container_working_path}",
        TMPDIR_VAR: f"{container_tmp_path}",
    }

    for job_param in demand_execution.execution_parameters.job_params:
        job_param.update_environment(environment)

    logger.info(f"Environment updated with {len(environment)} environment variables.")

    pre_commands = [
        f"mkdir -p ${{{WORKING_DIR_VAR}}}".split(" "),
        f"mkdir -p ${{{TMPDIR_VAR}}}".split(" "),
        f"cd ${{{WORKING_DIR_VAR}}}".split(" "),
    ]

    logger.info(f"Initial pre-commands: {pre_commands}")

    command = deepcopy(demand_execution.execution_parameters.command)
    if not command:
        logger.warning("No command specified, trying to resolve from manifest")
        # TODO: add logic to resolve default command from manifest
        raise ValueError("Must specify command for demand execution")

    logger.info(f"Command extracted from demand execution: {command}")

    # ------------------------------------------------------------------
    ##  Environment File Conditional Configuration Logic
    #
    # This step tries to write the environment variables to a file that will be mounted
    # to the container at runtime. This only works if the local machine that runs this code
    # has access to EFS file system.
    #
    logger.info(
        "Attempting to create environment file for environment variables. "
        f"Using {working_path} directory to write file."
    )
    # Here we define three path variables. They point to env file from the perspective of:
    #   1. The container path
    #   2. An EFS URI (pointing to location on EFS file system)
    #   3. The (future) path on the local machine running this code.
    container_environment_file = container_working_path / ".demand.env"
    efs_environment_file_uri = get_efs_path(
        container_environment_file, mount_points=efs_mount_points
    )
    local_environment_file = get_local_path(efs_environment_file_uri, raise_if_unmounted=False)

    # If the local environment file is not None, then the file is writable from this local machine
    # We will now write a portion of environment variables to files that can be written.
    if local_environment_file is None or env_file_write_mode == EnvFileWriteMode.NEVER:
        # If the environment file cannot be written to, then the environment variables are
        # passed directly to the container. This is a fallback option and will fail if the
        # environment variables are too long.
        if local_environment_file is None:
            reason = f"Could not write environment variables to file {efs_environment_file_uri}."
        else:
            reason = "Environment file write mode set to NEVER."

        logger.warning(
            f"{reason} Environment variables will be passed directly to the container. "
            "THIS MAY CAUSE THE CONTAINER TO FAIL IF THE ENVIRONMENT VARIABLES "
            "ARE LONGER THAN 8192 CHARACTERS!!!"
        )

    else:
        if env_file_write_mode == EnvFileWriteMode.IF_REQUIRED:
            env_size = sum([sys.getsizeof(k) + sys.getsizeof(v) for k, v in environment.items()])

            if env_size > 8192 * 0.9:
                logger.info(
                    f"Environment variables are too large to pass directly to container "
                    "(> 90% of 8192). Writing environment variables to file "
                    f"{efs_environment_file_uri}."
                )
                confirm_write = True
            else:
                confirm_write = False
        elif env_file_write_mode == EnvFileWriteMode.ALWAYS:
            logger.info(f"Writing environment variables to file {efs_environment_file_uri}.")
            confirm_write = True

        if confirm_write:
            # Steps for writing environment variables to file:
            #   1. Identify all environment variables that are not referenced in the command
            #       if not referenced, then add to environment file.
            #   2. Write environment file
            #   3. Add environment file to command
            ENVIRONMENT_FILE_VAR = "_ENVIRONMENT_FILE"

            # Step 1:, split environment variables based on reference are referenced in the command
            writable_environment = environment.copy()
            required_environment: Dict[str, str] = {}
            for arg in command + [_ for c in pre_commands for _ in c]:
                for match in re.findall(r"\$\{?([\w]+)\}?", arg):
                    if match in writable_environment:
                        required_environment[match] = writable_environment.pop(match)

            # Add the environment file variable to the required environment variables
            environment = required_environment.copy()
            environment[ENVIRONMENT_FILE_VAR] = container_environment_file.as_posix()

            # Step 2: write to the environment file
            local_environment_file.parent.mkdir(parents=True, exist_ok=True)
            write_env_file(writable_environment, local_environment_file)

            # Finally, add the environment file to the command
            pre_commands.append(f". ${{{ENVIRONMENT_FILE_VAR}}}".split(" "))

    # ------------------------------------------------------------------

    command_string = " && ".join([" ".join(_) for _ in pre_commands + [command]])
    logger.info(f"Final command string created: '{command_string}'")
    vol_configurations = [
        BatchEFSConfiguration(scratch_mount_point, read_only=False),
        BatchEFSConfiguration(shared_mount_point, read_only=True),
    ]
    if tmp_mount_point:
        vol_configurations.append(BatchEFSConfiguration(tmp_mount_point, read_only=False))
    logger.info("Constructing BatchJobBuilder instance...")
    assert demand_execution.execution_platform.aws_batch is not None
    return BatchJobBuilder(
        image=demand_execution.execution_image,
        job_definition_name=env_base.get_job_name(
            demand_execution.execution_type, demand_execution.get_execution_hash(False)
        ),
        job_name=env_base.get_job_name(
            demand_execution.execution_type, demand_execution.get_execution_hash(True)
        ),
        command=["/bin/bash", "-c", command_string],
        environment=environment,
        job_definition_tags={"USER": demand_execution.execution_metadata.user or "unknown"},
        resource_requirements=to_resource_requirements(
            gpu=demand_execution.resource_requirements.gpu,
            memory=demand_execution.resource_requirements.memory,
            vcpus=demand_execution.resource_requirements.vcpus,
        ),
        mount_points=[_.mount_point for _ in vol_configurations],
        volumes=[_.volume for _ in vol_configurations],
        env_base=env_base,
        # TODO: need to make this configurable
        privileged=True,
        job_role_arn=demand_execution.execution_platform.aws_batch.job_role
        if demand_execution.execution_platform.aws_batch
        else None,
    )

get_batch_efs_configuration

get_batch_efs_configuration(
    env_base,
    container_path,
    access_point_name,
    file_system_name=None,
    read_only=False,
)

Get a BatchEFSConfiguration by resolving resources from AWS.

Resolves EFS file system and access point by name/tags and creates the corresponding batch configuration.

Parameters:

Name Type Description Default
env_base EnvBase

Environment base for tag-based resource resolution.

required
container_path str

Path where the volume will be mounted.

required
access_point_name str

Name of the EFS access point.

required
file_system_name Optional[str]

Optional file system name to resolve.

None
read_only bool

Whether the mount should be read-only.

False

Returns:

Type Description
BatchEFSConfiguration

Configured BatchEFSConfiguration instance.

Source code in src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def get_batch_efs_configuration(
    env_base: EnvBase,
    container_path: str,
    access_point_name: str,
    file_system_name: Optional[str] = None,
    read_only: bool = False,
) -> BatchEFSConfiguration:
    """Get a BatchEFSConfiguration by resolving resources from AWS.

    Resolves EFS file system and access point by name/tags and
    creates the corresponding batch configuration.

    Args:
        env_base (EnvBase): Environment base for tag-based resource resolution.
        container_path (str): Path where the volume will be mounted.
        access_point_name (str): Name of the EFS access point.
        file_system_name (Optional[str]): Optional file system name to resolve.
        read_only (bool): Whether the mount should be read-only.

    Returns:
        Configured BatchEFSConfiguration instance.
    """
    # TODO: add support for file_system_name (learn how to resolve file system name)
    if file_system_name:
        file_system_name = env_base.get_resource_name(file_system_name)
        file_system = get_efs_file_system(name=file_system_name, tags={"env_base": env_base})
        file_system_id = file_system["FileSystemId"]
        logger.info(
            f"Using file system {file_system_id} with name {file_system_name}. "
            f"Will search for access point {access_point_name}."
        )
    else:
        logger.info(
            f"No file system name provided. "
            f"Will search for access point {access_point_name} directly."
        )
        file_system_id = None

    access_point = get_efs_access_point(
        file_system_id=file_system_id,
        access_point_name=access_point_name,
        access_point_tags={"env_base": env_base},
    )
    logger.info(f"Using access point {access_point_name}")
    return BatchEFSConfiguration.build(
        access_point=access_point["AccessPointId"],
        mount_path=container_path,
        read_only=read_only,
    )

get_batch_job_queue_name

get_batch_job_queue_name(demand_execution)

Get the batch job queue name from a demand execution.

Parameters:

Name Type Description Default
demand_execution DemandExecution

The demand execution to get the queue name from.

required

Returns:

Type Description

The AWS Batch job queue name.

Raises:

Type Description
ValueError

If the demand execution lacks an AWS Batch platform.

Source code in src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
def get_batch_job_queue_name(demand_execution: DemandExecution):
    """Get the batch job queue name from a demand execution.

    Args:
        demand_execution (DemandExecution): The demand execution to get the queue name from.

    Returns:
        The AWS Batch job queue name.

    Raises:
        ValueError: If the demand execution lacks an AWS Batch platform.
    """
    aws_batch_exec_platform = demand_execution.execution_platform.aws_batch
    if aws_batch_exec_platform is None:
        raise ValueError("Demand execution does not have an AWS Batch execution platform")
    return aws_batch_exec_platform.job_queue_name

update_demand_execution_parameter_inputs

update_demand_execution_parameter_inputs(
    demand_execution,
    container_shared_path,
    container_working_path,
    isolate_inputs=False,
)

Modifies demand execution input destinations with the location of the volume configuration

This updates the input destinations to a deterministic location under the volume configuration specified. This ensures that inputs shared between jobs can used the same cached results.

The structure of the path for any input param is comprised of
  • volume's mount_path (where on container this volume is mounted)
  • a sha256 hash value of the parmeter's remote value

PATTERN: {MOUNT_PATH}/{SHA256_HASH(PARAM_REMOTE_VALUE)}

Example

Given volume: - mount path (/opt/efs/shared) Given execution parameter inputs: - X (s3://bucket/prefix/A) - Y (s3://bucket/prefix/A) - Z (s3://bucket/prefix/B) Output: - X -> /opt/efs/shared/abcdef...AAAA - Y -> /opt/efs/shared/abcdef...AAAA - Z -> /opt/efs/shared/abcdef...BBBB

Parameters:

Name Type Description Default
demand_execution DemandExecution

Demand execution object to modify (copied)

required
container_shared_path Path

Path where the shared volume is mounted.

required
container_working_path Path

Path where the working directory is mounted.

required
isolate_inputs bool

flag to determine if inputs should be isolated

False

Returns:

Type Description
DemandExecution

a demand execution with modified execution parameter inputs

Source code in src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def update_demand_execution_parameter_inputs(
    demand_execution: DemandExecution,
    container_shared_path: Path,
    container_working_path: Path,
    isolate_inputs: bool = False,
) -> DemandExecution:
    """Modifies demand execution input destinations with the location of the volume configuration

    This updates the input destinations to a deterministic location under the volume configuration
    specified. This ensures that inputs shared between jobs can used the same cached results.

    The structure of the path for any input param is comprised of:
        - volume's mount_path (where on container this volume is mounted)
        - a sha256 hash value of the parmeter's remote value

    PATTERN: {MOUNT_PATH}/{SHA256_HASH(PARAM_REMOTE_VALUE)}

    Example:
        Given volume:
            - mount path (/opt/efs/shared)
        Given execution parameter inputs:
            - X (s3://bucket/prefix/A)
            - Y (s3://bucket/prefix/A)
            - Z (s3://bucket/prefix/B)
        Output:
           - X -> /opt/efs/shared/abcdef...AAAA
           - Y -> /opt/efs/shared/abcdef...AAAA
           - Z -> /opt/efs/shared/abcdef...BBBB

    Args:
        demand_execution (DemandExecution): Demand execution object to modify (copied)
        container_shared_path (Path): Path where the shared volume is mounted.
        container_working_path (Path): Path where the working directory is mounted.
        isolate_inputs (bool): flag to determine if inputs should be isolated

    Returns:
        a demand execution with modified execution parameter inputs
    """

    demand_execution = demand_execution.copy()
    execution_params = demand_execution.execution_parameters
    updated_params = {}
    for param in execution_params.downloadable_job_param_inputs:
        if isolate_inputs:
            local = container_working_path / param.value
            logger.info(f"Isolating input {param.name} from shared volume. Local path: {local}")
        else:
            local = container_shared_path / sha256_hexdigest(param.remote_value)
            logger.info(f"Using shared volume for input {param.name}. Local path: {local}")

        new_resolvable = Resolvable(local=local.as_posix(), remote=param.remote_value)
        updated_params[param.name] = new_resolvable

    execution_params.update_params(**updated_params)
    return demand_execution

update_demand_execution_parameter_outputs

update_demand_execution_parameter_outputs(
    demand_execution, container_working_path
)

Modifies the demand execution output's local paths relative to the container working path

This updates the output destinations to their absolute location under the container working path.

PATTERN: {CONTAINER_WORKING_PATH}/{PARAM_VALUE}

Example

Given volume: - mount path working dir (/opt/efs/scratch/UUID) Given execution parameter outputs: - X (s3://bucket/prefix/A) - Y (s3://bucket/prefix/B) - Z (s3://bucket/prefix/C) Output (local value): - X -> /opt/efs/scratch/UUID/X - Y -> /opt/efs/scratch/UUID/Y - Z -> /opt/efs/scratch/UUID/Z

Parameters:

Name Type Description Default
demand_execution DemandExecution

Demand execution object to modify (copied)

required
container_working_path Path

Path where the working directory is mounted.

required

Returns:

Type Description
DemandExecution

a demand execution with modified execution parameter inputs

Source code in src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def update_demand_execution_parameter_outputs(
    demand_execution: DemandExecution,
    container_working_path: Path,
) -> DemandExecution:
    """Modifies the demand execution output's local paths relative to the container working path

    This updates the output destinations to their absolute location under the container working
    path.

    PATTERN: {CONTAINER_WORKING_PATH}/{PARAM_VALUE}

    Example:
        Given volume:
            - mount path working dir (/opt/efs/scratch/UUID)
        Given execution parameter outputs:
            - X (s3://bucket/prefix/A)
            - Y (s3://bucket/prefix/B)
            - Z (s3://bucket/prefix/C)
        Output (local value):
           - X -> /opt/efs/scratch/UUID/X
           - Y -> /opt/efs/scratch/UUID/Y
           - Z -> /opt/efs/scratch/UUID/Z

    Args:
        demand_execution (DemandExecution): Demand execution object to modify (copied)
        container_working_path (Path): Path where the working directory is mounted.

    Returns:
        a demand execution with modified execution parameter inputs
    """

    demand_execution = demand_execution.copy()
    execution_params = demand_execution.execution_parameters
    updated_params = {
        param.name: Uploadable(
            local=(container_working_path / param.value).as_posix(),
            remote=param.remote_value,
        )
        for param in execution_params.uploadable_job_param_outputs
    }
    execution_params.update_params(**updated_params)
    return demand_execution