Skip to content

Multiprocessing

Functions for working with multiprocessing.


apply_args_and_kwargs

apply_args_and_kwargs(fn, args, kwargs)

Invoke a callable with positional and keyword arguments.

Used as the target function for pool.starmap.

Parameters:

Name Type Description Default
fn

The callable to invoke.

required
args Iterable[Any]

Positional arguments.

required
kwargs Mapping[str, Any]

Keyword arguments.

required

Returns:

Type Description

The return value of fn.

Source code in src/aibs_informatics_core/utils/multiprocessing.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def apply_args_and_kwargs(fn, args: Iterable[Any], kwargs: Mapping[str, Any]):
    """Invoke a callable with positional and keyword arguments.

    Used as the target function for ``pool.starmap``.

    Args:
        fn: The callable to invoke.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        The return value of ``fn``.
    """
    return fn(*args, **kwargs)  # pragma: no cover

parallel_starmap

parallel_starmap(
    callable,
    arguments,
    keyword_arguments=None,
    pool_class=None,
    processes=None,
    chunk_size=None,
    callback=None,
    error_callback=None,
)

Execute a callable in parallel across a pool of worker processes.

Parameters:

Name Type Description Default
callable Callable[[Any], U]

The function to call for each set of arguments.

required
arguments Sequence[T]

A sequence of argument tuples.

required
keyword_arguments Sequence[Mapping[str, Any]] | Mapping[str, Any] | None

Keyword arguments, either a single mapping applied to all calls or a sequence of mappings.

None
pool_class type[Pool] | None

The pool class to use. Defaults to multiprocessing.pool.Pool.

None
processes int | None

Number of worker processes.

None
chunk_size int | None

Chunk size for starmap_async.

None
callback Callable[[list[T]], Any] | None

Optional callback invoked with results on success.

None
error_callback Callable[[BaseException], None] | None

Optional callback invoked on error.

None

Returns:

Type Description
list[U]

A list of results from all invocations.

Source code in src/aibs_informatics_core/utils/multiprocessing.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def parallel_starmap(
    callable: Callable[[Any], U],
    arguments: Sequence[T],
    keyword_arguments: Sequence[Mapping[str, Any]] | Mapping[str, Any] | None = None,
    pool_class: type[mp_pool.Pool] | None = None,
    processes: int | None = None,
    chunk_size: int | None = None,
    callback: Callable[[list[T]], Any] | None = None,
    error_callback: Callable[[BaseException], None] | None = None,
) -> list[U]:
    """Execute a callable in parallel across a pool of worker processes.

    Args:
        callable: The function to call for each set of arguments.
        arguments: A sequence of argument tuples.
        keyword_arguments: Keyword arguments, either a single mapping applied to
            all calls or a sequence of mappings.
        pool_class: The pool class to use. Defaults to ``multiprocessing.pool.Pool``.
        processes: Number of worker processes.
        chunk_size: Chunk size for ``starmap_async``.
        callback: Optional callback invoked with results on success.
        error_callback: Optional callback invoked on error.

    Returns:
        A list of results from all invocations.
    """
    pool_class = pool_class or mp_pool.Pool
    with pool_class(processes=processes) as pool:
        starmap_arguments = zip(
            repeat(callable),
            arguments,
            (
                repeat(keyword_arguments or {})
                if not isinstance(keyword_arguments, Sequence)
                else keyword_arguments
            ),
        )

        async_results = [
            pool.starmap_async(
                _starmap_apply,
                (_,),
                chunksize=chunk_size,
                callback=callback,
                error_callback=error_callback,
            )
            for _ in starmap_arguments
        ]

        return [result for async_result in async_results for result in async_result.get()]

starmap_with_kwargs

starmap_with_kwargs(pool, fn, args_iter, kwargs_iter)

Apply a function using pool.starmap with both positional and keyword arguments.

Parameters:

Name Type Description Default
pool

A multiprocessing pool instance.

required
fn Callable

The callable to invoke.

required
args_iter Sequence[Iterable[Any]]

A sequence of positional argument iterables, one per call.

required
kwargs_iter Sequence[Mapping[str, Any]]

A sequence of keyword argument mappings, one per call.

required

Returns:

Type Description

A list of results from each invocation.

Source code in src/aibs_informatics_core/utils/multiprocessing.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def starmap_with_kwargs(
    pool,
    fn: Callable,
    args_iter: Sequence[Iterable[Any]],
    kwargs_iter: Sequence[Mapping[str, Any]],
):
    """Apply a function using ``pool.starmap`` with both positional and keyword arguments.

    Args:
        pool: A multiprocessing pool instance.
        fn: The callable to invoke.
        args_iter: A sequence of positional argument iterables, one per call.
        kwargs_iter: A sequence of keyword argument mappings, one per call.

    Returns:
        A list of results from each invocation.
    """
    args_for_starmap = zip(repeat(fn), args_iter, kwargs_iter)
    return pool.starmap(apply_args_and_kwargs, args_for_starmap)