Skip to content

Base Executor

Base class for creating executors with input/output validation.


BaseExecutor dataclass

BaseExecutor()

Bases: EnvBaseMixins, PostInitMixin, Generic[REQUEST, RESPONSE]

Abstract base executor for handling typed request/response workflows.

Subclasses must parameterize with request and response model types and implement the handle method. Provides serialization, deserialization, and CLI/remote I/O utilities.

build_from_env classmethod

build_from_env(**kwargs)

Creates an executor from environment

Must be able to create an executor from environment variables

Returns:

Type Description
E

Executor object

Source code in src/aibs_informatics_core/executors/base.py
241
242
243
244
245
246
247
248
249
250
@classmethod
def build_from_env(cls: type[E], **kwargs) -> E:
    """Creates an executor from environment

    Must be able to create an executor from environment variables

    Returns:
        Executor object
    """
    return cls(**kwargs)

deserialize_request classmethod

deserialize_request(request)

Deserialize a raw request

Supported request types: Union[JSONObject, S3Path, str, Path, REQUEST] 1. dict object 2. S3 Path object (must implement s3 deserialization method) 3. stringified object 4. path to file of json object

Parameters:

Name Type Description Default
request Union[JSONObject, S3Path, str, Path]

raw request

required

Returns:

Type Description
REQUEST

Deserialized request

Source code in src/aibs_informatics_core/executors/base.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@classmethod
def deserialize_request(cls, request: JSON) -> REQUEST:
    """Deserialize a raw request

    Supported request types:
    Union[JSONObject, S3Path, str, Path, REQUEST]
        1. dict object
        2. S3 Path object (must implement s3 deserialization method)
        3. stringified object
        4. path to file of json object

    Args:
        request (Union[JSONObject, S3Path, str, Path]): raw request

    Returns:
        Deserialized request
    """
    if isinstance(request, cls.get_request_cls()):
        return request

    request = cls.load_input(request)
    if isinstance(request, dict):
        try:
            return cls.get_request_cls().from_dict(request)
        except Exception as e:
            logger.error(f"Could not deserialize object. error: {e}")
            raise e
    else:
        raise ValueError(f"Cannot deserialize request: {request}, type: {type(request)}")

get_executor_name classmethod

get_executor_name()

Returns a distinguishable name of the executor class

By default, it returns class name.

Source code in src/aibs_informatics_core/executors/base.py
232
233
234
235
236
237
238
239
@classmethod
def get_executor_name(cls) -> str:
    """Returns a distinguishable name of the executor class

    By default, it returns class name.

    """
    return cls.__name__

get_request_cls classmethod

get_request_cls()

Return the request model class from the generic type arguments.

Returns:

Type Description
type[REQUEST]

The request model class.

Source code in src/aibs_informatics_core/executors/base.py
70
71
72
73
74
75
76
77
@classmethod
def get_request_cls(cls) -> type[REQUEST]:
    """Return the request model class from the generic type arguments.

    Returns:
        The request model class.
    """
    return cls._get_generic_args()[0]  # type: ignore

get_response_cls classmethod

get_response_cls()

Return the response model class from the generic type arguments.

Returns:

Type Description
type[RESPONSE]

The response model class.

Source code in src/aibs_informatics_core/executors/base.py
79
80
81
82
83
84
85
86
@classmethod
def get_response_cls(cls) -> type[RESPONSE]:
    """Return the response model class from the generic type arguments.

    Returns:
        The response model class.
    """
    return cls._get_generic_args()[1]  # type: ignore

handle abstractmethod

handle(request)

Core logic for handling request

NOT IMPLEMENTED

Parameters:

Name Type Description Default
request API_REQUEST

Request object expected

required

Returns:

Type Description
RESPONSE | None

response object returned, Optional

Source code in src/aibs_informatics_core/executors/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
@abstractmethod
def handle(self, request: REQUEST) -> RESPONSE | None:  # pragma: no cover
    """Core logic for handling request

    NOT IMPLEMENTED

    Args:
        request (API_REQUEST): Request object expected

    Returns:
        response object returned, Optional
    """
    raise NotImplementedError("Must implement handler logic here")

load_input classmethod

load_input(input)

Loads input from various sources

Supported input types
  1. dict object
  2. S3 Path object (must implement s3 deserialization method)
  3. stringified object
  4. path to file of json object

Parameters:

Name Type Description Default
input Any

raw input. Can be any of the supported types.

required

Returns:

Type Description
JSON

content loaded from input

Source code in src/aibs_informatics_core/executors/base.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@classmethod
def load_input(cls, input: Any) -> JSON:
    """Loads input from various sources

    Supported input types:
        1. dict object
        2. S3 Path object (must implement s3 deserialization method)
        3. stringified object
        4. path to file of json object

    Args:
        input (Any): raw input. Can be any of the supported types.

    Returns:
        content loaded from input
    """
    if isinstance(input, dict):
        return input
    elif isinstance(input, str) and is_json_str(input):
        return load_json_object(input)
    elif isinstance(input, S3Path) or (isinstance(input, str) and S3Path.is_valid(input)):
        return cls.load_input__remote(S3Path(input))
    elif isinstance(input, (str, Path)):
        return cls.load_input__file(Path(input))
    else:
        raise ValueError(f"Cannot read input: {input}, type: {type(input)}")

load_input__file classmethod

load_input__file(local_path)

Reads input from file

Parameters:

Name Type Description Default
local_path Path

Local path to read input from

required

Returns:

Type Description
JSON

JSON data read from file

Source code in src/aibs_informatics_core/executors/base.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@classmethod
def load_input__file(cls, local_path: Path) -> JSON:
    """Reads input from file

    Args:
        local_path (Path): Local path to read input from

    Returns:
        JSON data read from file
    """
    if not local_path.exists():
        raise ValueError(f"local path specified {local_path} does not exist")
    if local_path.is_dir():
        raise ValueError(f"local path specified {local_path} cannot be read. Must be a file.")
    return load_json_object(local_path)

load_input__remote classmethod

load_input__remote(remote_path)

Reads input from S3 location

Parameters:

Name Type Description Default
s3_path S3Path

S3 location to read input from

required

Returns:

Type Description
JSON

JSON data read from S3

Source code in src/aibs_informatics_core/executors/base.py
157
158
159
160
161
162
163
164
165
166
167
168
169
@classmethod
def load_input__remote(cls, remote_path: S3Path) -> JSON:
    """Reads input from S3 location

    Args:
        s3_path (S3Path): S3 location to read input from

    Returns:
        JSON data read from S3
    """
    raise NotImplementedError(
        f"{cls.__name__} has not provided an implementation for this form of input"
    )

run_executor classmethod

run_executor(input, output_location=None, **kwargs)

Runs an executor

Parameters:

Name Type Description Default
input JSON

input to executor

required
output_location Optional[str]

Optional output location to write response to. Defaults to None.

None
Source code in src/aibs_informatics_core/executors/base.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
@classmethod
def run_executor(
    cls, input: JSON, output_location: str | Path | None = None, **kwargs
) -> JSON | None:
    """Runs an executor

    Args:
        input (JSON): input to executor
        output_location (Optional[str], optional): Optional output location to write
            response to. Defaults to None.
    """
    executor = cls.build_from_env(**kwargs)

    request = cls.deserialize_request(input)
    response = executor.handle(request)
    output = executor.serialize_response(response) if response else None

    if output_location:
        if output is None:
            logger.warning(
                "Response is None but output location is specified. Writing empty dict"
            )
        executor.write_output(output or {}, output_location)

    return output

serialize_response classmethod

serialize_response(response)

Serializes response object into JSON

Parameters:

Name Type Description Default
response API_RESPONSE

The returned response object

required

Returns:

Type Description
JSONObject

serialized form of response object

Source code in src/aibs_informatics_core/executors/base.py
118
119
120
121
122
123
124
125
126
127
128
@classmethod
def serialize_response(cls, response: RESPONSE) -> JSONObject:
    """Serializes response object into JSON

    Args:
        response (API_RESPONSE): The returned response object

    Returns:
        serialized form of response object
    """
    return response.to_dict()

write_output classmethod

write_output(output, path)

Writes output to location

Parameters:

Name Type Description Default
output JSON

serialized output object

required
path Path | str

location to write output to. Can be S3 or local file

required
Source code in src/aibs_informatics_core/executors/base.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@classmethod
def write_output(cls, output: JSON, path: str | Path) -> None:
    """Writes output to location

    Args:
        output (JSON): serialized output object
        path (Path | str): location to write output to. Can be S3 or local file
    """

    if isinstance(path, S3Path) or (isinstance(path, str) and S3Path.is_valid(path)):
        return cls.write_output__remote(output, S3Path(path))
    else:
        # Here we assume that it could be string or path to file
        return cls.write_output__file(output, Path(path))

write_output__file classmethod

write_output__file(output, local_path)

Writes output to file

Parameters:

Name Type Description Default
output JSON

serialized output json

required
local_path S3Path

Local path to write response to

required
Source code in src/aibs_informatics_core/executors/base.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@classmethod
def write_output__file(cls, output: JSON, local_path: Path) -> None:
    """Writes output to file

    Args:
        output (JSON): serialized output json
        local_path (S3Path): Local path to write response to
    """

    if local_path.is_dir() or (
        local_path.parent.exists() and not os.access(local_path.parent, os.W_OK)
    ):
        raise ValueError(
            f"local path specified {local_path} cannot be written to. Must be a file "
        )
    local_path.parent.mkdir(parents=True, exist_ok=True)
    local_path.write_text(json.dumps(output, sort_keys=True))

write_output__remote classmethod

write_output__remote(output, remote_path)

Writes output to Remote location

Parameters:

Name Type Description Default
output RESPONSE

output object

required
remote_path S3Path

S3 location to write response to

required
Source code in src/aibs_informatics_core/executors/base.py
202
203
204
205
206
207
208
209
210
211
212
@classmethod
def write_output__remote(cls, output: JSON, remote_path: S3Path) -> None:
    """Writes output to Remote location

    Args:
        output (RESPONSE): output object
        remote_path (S3Path): S3 location to write response to
    """
    raise NotImplementedError(
        f"{cls.__name__} has not provided an implementation for this form of serialization"
    )