Skip to content

Parallel Module

The parallel module implements parallel execution, the fourth pillar of pytest-gremlins' speed strategy. Gremlins are distributed across multiple worker processes for faster results on multi-core machines.

Overview

Sequential execution:

Text Only
1000 gremlins x 50ms = 50 seconds (1 worker)

Parallel execution:

Text Only
1000 gremlins x 50ms / 8 workers = 6.25 seconds

The mutation switching architecture makes parallelization safe because each worker operates independently with its own ACTIVE_GREMLIN environment variable.

Module Imports

The parallel package does not re-export its classes from __init__.py. Import directly from the submodules:

Python
from pytest_gremlins.parallel.pool import WorkerPool, WorkerResult
from pytest_gremlins.parallel.pool_config import PoolConfig
from pytest_gremlins.parallel.persistent_pool import PersistentWorkerPool
from pytest_gremlins.parallel.batch_executor import BatchExecutor
from pytest_gremlins.parallel.aggregator import ResultAggregator
from pytest_gremlins.parallel.distribution import (
    DistributionStrategy,
    RoundRobinDistribution,
    WeightedDistribution,
)

WorkerPool

Basic worker pool using ProcessPoolExecutor.

WorkerPool

Python
WorkerPool(max_workers=None, timeout=30)

Manages a pool of worker processes for parallel mutation testing.

The worker pool wraps a ProcessPoolExecutor and provides lifecycle management for parallel gremlin execution. Workers are isolated processes that each set their own ACTIVE_GREMLIN environment variable.

Attributes:

Name Type Description
max_workers int

Maximum number of worker processes.

timeout int

Timeout in seconds for individual gremlin tests.

Example

with WorkerPool(max_workers=4) as pool: ... # Submit work to pool ... pass

Parameters:

Name Type Description Default
max_workers int | None

Maximum number of worker processes. Defaults to CPU count.

None
timeout int

Timeout in seconds for individual tests. Defaults to 30.

30
Source code in src/pytest_gremlins/parallel/pool.py
Python
def __init__(
    self,
    max_workers: int | None = None,
    timeout: int = 30,
) -> None:
    """Initialize the worker pool.

    Args:
        max_workers: Maximum number of worker processes. Defaults to CPU count.
        timeout: Timeout in seconds for individual tests. Defaults to 30.
    """
    self._max_workers = max_workers if max_workers is not None else (os.cpu_count() or 4)
    self._timeout = timeout
    self._executor: ProcessPoolExecutor | None = None
    self._shutdown_called = False

max_workers property

Python
max_workers

Return the maximum number of workers.

timeout property

Python
timeout

Return the timeout in seconds.

submit

Python
submit(gremlin_id, test_command, rootdir, instrumented_dir, env_vars)

Submit a gremlin test for execution.

Parameters:

Name Type Description Default
gremlin_id str

The ID of the gremlin to test.

required
test_command list[str]

Command to run tests.

required
rootdir str

Root directory for test execution.

required
instrumented_dir str | None

Directory with instrumented sources (or None).

required
env_vars dict[str, str]

Additional environment variables to set.

required

Returns:

Type Description
Future[WorkerResult]

Future that will contain the WorkerResult when complete.

Raises:

Type Description
RuntimeError

If the pool is not active (not in context).

Source code in src/pytest_gremlins/parallel/pool.py
Python
def submit(
    self,
    gremlin_id: str,
    test_command: list[str],
    rootdir: str,
    instrumented_dir: str | None,
    env_vars: dict[str, str],
) -> Future[WorkerResult]:
    """Submit a gremlin test for execution.

    Args:
        gremlin_id: The ID of the gremlin to test.
        test_command: Command to run tests.
        rootdir: Root directory for test execution.
        instrumented_dir: Directory with instrumented sources (or None).
        env_vars: Additional environment variables to set.

    Returns:
        Future that will contain the WorkerResult when complete.

    Raises:
        RuntimeError: If the pool is not active (not in context).
    """
    if self._executor is None:
        msg = 'WorkerPool is not active. Use as context manager.'
        raise RuntimeError(msg)

    # Add instrumented dir to env vars if provided
    all_env_vars = dict(env_vars)
    if instrumented_dir is not None:
        all_env_vars['PYTEST_GREMLINS_SOURCES_FILE'] = str(Path(instrumented_dir) / 'sources.json')

    # Suppress subprocess coverage tracking: sitecustomize.py fires at Python startup
    # before --no-cov takes effect, so we explicitly clear the env var here.
    all_env_vars['COVERAGE_PROCESS_START'] = ''

    return self._executor.submit(
        _run_gremlin_test,
        gremlin_id,
        test_command,
        rootdir,
        all_env_vars,
        self._timeout,
    )

shutdown

Python
shutdown(wait=True)

Shutdown the worker pool.

Parameters:

Name Type Description Default
wait bool

If True, wait for pending work to complete. If False, cancel pending work immediately.

True
Source code in src/pytest_gremlins/parallel/pool.py
Python
def shutdown(self, wait: bool = True) -> None:
    """Shutdown the worker pool.

    Args:
        wait: If True, wait for pending work to complete. If False, cancel
              pending work immediately.
    """
    if self._shutdown_called:
        return

    self._shutdown_called = True
    if self._executor is not None:
        self._executor.shutdown(wait=wait, cancel_futures=not wait)
        self._executor = None

WorkerPool Parameters

Parameter Type Default Description
max_workers int \| None CPU count Number of worker processes
timeout int 30 Timeout per gremlin (seconds)

Usage Example

Python
from concurrent.futures import as_completed
from pytest_gremlins.parallel.pool import WorkerPool

with WorkerPool(max_workers=4, timeout=30) as pool:
    futures = {}

    for gremlin_id in ['g001', 'g002', 'g003']:
        future = pool.submit(
            gremlin_id=gremlin_id,
            test_command=['pytest', '-x', 'tests/'],
            rootdir='/path/to/project',
            instrumented_dir='/tmp/instrumented',
            env_vars={},
        )
        futures[future] = gremlin_id

    for future in as_completed(futures):
        result = future.result()
        print(f'{result.gremlin_id}: {result.status.value}')

WorkerResult

Result from a worker process (serializable for multiprocessing).

WorkerResult dataclass

Python
WorkerResult(gremlin_id, status, killing_test=None, execution_time_ms=None, error_output='')

Result from a worker process.

This is a simplified result that can be passed between processes. Unlike GremlinResult, it doesn't contain AST nodes which cannot be serialized for multiprocessing.

Attributes:

Name Type Description
gremlin_id str

The ID of the gremlin that was tested.

status GremlinResultStatus

The outcome of testing the gremlin.

killing_test str | None

Name of test that killed the gremlin (if any).

execution_time_ms float | None

Time taken to test this gremlin.

error_output str

Captured stderr or exception message when status is ERROR.

WorkerResult Attributes

Attribute Type Description
gremlin_id str ID of the tested gremlin
status GremlinResultStatus ZAPPED, SURVIVED, TIMEOUT, or ERROR
killing_test str \| None Test that killed the gremlin
execution_time_ms float \| None Execution time in milliseconds

PoolConfig

Configuration for optimized worker pool settings.

PoolConfig dataclass

Python
PoolConfig(max_workers=_default_max_workers(), timeout=30, start_method='auto', warmup=True, batch_size=10, executor='subprocess')

Configuration for the persistent worker pool.

This dataclass encapsulates all configuration options for the worker pool, allowing for easy customization and validation of pool settings.

Attributes:

Name Type Description
max_workers int

Maximum number of worker processes. Defaults to CPU count.

timeout int

Timeout in seconds for individual gremlin tests. Defaults to 30.

start_method StartMethod

Process start method ('auto', 'spawn', 'fork', 'forkserver').

warmup bool

Whether to pre-warm workers on pool startup. Defaults to True.

batch_size int

Number of gremlins per batch. Defaults to 10.

executor str

Execution strategy ('auto', 'subprocess', 'fork', 'inprocess'). Defaults to 'subprocess'.

Example

config = PoolConfig(max_workers=4, timeout=60) config.max_workers 4 config.timeout 60

get_mp_context

Python
get_mp_context()

Create a multiprocessing context with the configured start method.

If start_method is 'auto', uses the optimal method for the platform.

Returns:

Type Description
BaseContext

A multiprocessing context configured with the appropriate start method.

Example

config = PoolConfig(start_method='spawn') ctx = config.get_mp_context() ctx.get_start_method() 'spawn'

Source code in src/pytest_gremlins/parallel/pool_config.py
Python
def get_mp_context(self) -> multiprocessing.context.BaseContext:
    """Create a multiprocessing context with the configured start method.

    If start_method is 'auto', uses the optimal method for the platform.

    Returns:
        A multiprocessing context configured with the appropriate start method.

    Example:
        >>> config = PoolConfig(start_method='spawn')
        >>> ctx = config.get_mp_context()
        >>> ctx.get_start_method()
        'spawn'
    """
    method = self.start_method
    if method == 'auto':
        method = get_optimal_start_method()

    return multiprocessing.get_context(method)

PoolConfig Parameters

Parameter Type Default Description
max_workers int CPU count Number of worker processes
timeout int 30 Timeout per gremlin (seconds)
start_method StartMethod 'auto' Process start method
warmup bool True Pre-warm workers on startup
batch_size int 10 Gremlins per batch

Start Methods

Method Description Platform
'auto' Optimal for platform All
'spawn' Fresh interpreter (safest, slowest) All
'fork' Copy parent process (fast, unsafe with threads) Unix
'forkserver' Fork from pre-warmed server (recommended) Unix

Usage Example

Python
from pytest_gremlins.parallel.pool_config import PoolConfig

# Create optimized configuration
config = PoolConfig(
    max_workers=8,
    timeout=60,
    start_method='forkserver',
    warmup=True,
    batch_size=20,
)

# Get multiprocessing context
ctx = config.get_mp_context()
print(ctx.get_start_method())  # 'forkserver'

get_optimal_start_method

get_optimal_start_method

Python
get_optimal_start_method()

Determine the optimal process start method for the current platform.

The start method affects subprocess creation performance: - 'forkserver': Fastest on Linux/macOS. Forks from a pre-warmed server process. - 'spawn': Default on Windows. Creates fresh interpreter, slowest but safest. - 'fork': Fast but unsafe with threads or certain libraries.

Returns:

Type Description
Literal['spawn', 'fork', 'forkserver']

The optimal start method for the current platform.

Example

method = get_optimal_start_method() method in ('spawn', 'fork', 'forkserver') True

Source code in src/pytest_gremlins/parallel/pool_config.py
Python
def get_optimal_start_method() -> Literal['spawn', 'fork', 'forkserver']:
    """Determine the optimal process start method for the current platform.

    The start method affects subprocess creation performance:
    - 'forkserver': Fastest on Linux/macOS. Forks from a pre-warmed server process.
    - 'spawn': Default on Windows. Creates fresh interpreter, slowest but safest.
    - 'fork': Fast but unsafe with threads or certain libraries.

    Returns:
        The optimal start method for the current platform.

    Example:
        >>> method = get_optimal_start_method()
        >>> method in ('spawn', 'fork', 'forkserver')
        True
    """
    available = multiprocessing.get_all_start_methods()

    # Prefer forkserver on platforms that support it (Linux, macOS)
    # It's faster than spawn and safer than fork
    if 'forkserver' in available:
        return 'forkserver'

    # Fall back to spawn (always available, Windows-only path)
    return 'spawn'  # pragma: no cover

PersistentWorkerPool

Optimized worker pool that keeps workers alive across multiple gremlin tests.

PersistentWorkerPool

Python
PersistentWorkerPool(max_workers=None, timeout=30, *, config=None)

Manages a pool of persistent worker processes for mutation testing.

Unlike the standard WorkerPool which spawns a new subprocess per gremlin, this pool keeps worker processes alive and reuses them. Workers import modules once and stay warm, dramatically reducing startup overhead.

Supports configurable process start method and worker warmup via PoolConfig.

Attributes:

Name Type Description
max_workers int

Maximum number of worker processes.

timeout int

Timeout in seconds for individual gremlin tests.

config PoolConfig

The PoolConfig used to configure this pool.

is_warmed_up bool

Whether workers have been pre-warmed.

warmup_completed_count int

Number of workers that completed warmup.

Example

config = PoolConfig(max_workers=4, warmup=True) # doctest: +SKIP pool = PersistentWorkerPool.from_config(config) # doctest: +SKIP with pool: # doctest: +SKIP ... future = pool.submit('g001', ['pytest'], '.', None, {}) # doctest: +SKIP ... result = future.result() # doctest: +SKIP

Parameters:

Name Type Description Default
max_workers int | None

Maximum number of worker processes. Defaults to CPU count.

None
timeout int

Timeout in seconds for individual tests. Defaults to 30.

30
config PoolConfig | None

Optional PoolConfig. If provided, max_workers and timeout are taken from it (unless explicitly provided).

None
Source code in src/pytest_gremlins/parallel/persistent_pool.py
Python
def __init__(
    self,
    max_workers: int | None = None,
    timeout: int = 30,
    *,
    config: PoolConfig | None = None,
) -> None:
    """Initialize the persistent worker pool.

    Args:
        max_workers: Maximum number of worker processes. Defaults to CPU count.
        timeout: Timeout in seconds for individual tests. Defaults to 30.
        config: Optional PoolConfig. If provided, max_workers and timeout
            are taken from it (unless explicitly provided).
    """
    if config is not None:
        self._config = config
        # Use config values, but allow explicit overrides
        self._max_workers = max_workers if max_workers is not None else config.max_workers
        self._timeout = timeout if timeout != 30 else config.timeout  # noqa: PLR2004
    else:
        # Create a default config
        effective_max_workers = max_workers if max_workers is not None else (os.cpu_count() or 4)
        self._config = PoolConfig(max_workers=effective_max_workers, timeout=timeout)
        self._max_workers = effective_max_workers
        self._timeout = timeout

    self._running = False
    self._executor: ProcessPoolExecutor | None = None
    self._mp_context: multiprocessing.context.BaseContext = self._config.get_mp_context()
    self._is_warmed_up = False
    self._warmup_completed_count = 0

max_workers property

Python
max_workers

Return the maximum number of workers.

timeout property

Python
timeout

Return the timeout in seconds.

config property

Python
config

Return the PoolConfig used by this pool.

is_running property

Python
is_running

Return whether the pool is currently running.

is_warmed_up property

Python
is_warmed_up

Return whether workers have been pre-warmed.

warmup_completed_count property

Python
warmup_completed_count

Return the number of workers that completed warmup.

from_config classmethod

Python
from_config(config)

Create a PersistentWorkerPool from a PoolConfig.

This is the preferred way to create a pool with custom settings.

Parameters:

Name Type Description Default
config PoolConfig

The configuration to use.

required

Returns:

Type Description
Self

A new PersistentWorkerPool configured with the given settings.

Example

config = PoolConfig(max_workers=4, start_method='forkserver') pool = PersistentWorkerPool.from_config(config) pool.max_workers 4

Source code in src/pytest_gremlins/parallel/persistent_pool.py
Python
@classmethod
def from_config(cls, config: PoolConfig) -> Self:
    """Create a PersistentWorkerPool from a PoolConfig.

    This is the preferred way to create a pool with custom settings.

    Args:
        config: The configuration to use.

    Returns:
        A new PersistentWorkerPool configured with the given settings.

    Example:
        >>> config = PoolConfig(max_workers=4, start_method='forkserver')
        >>> pool = PersistentWorkerPool.from_config(config)
        >>> pool.max_workers
        4
    """
    return cls(config=config)

submit

Python
submit(gremlin_id, test_command, rootdir, instrumented_dir, env_vars)

Submit a gremlin test for execution.

Parameters:

Name Type Description Default
gremlin_id str

The ID of the gremlin to test.

required
test_command list[str]

Command to run tests.

required
rootdir str

Root directory for test execution.

required
instrumented_dir str | None

Directory with instrumented sources (or None).

required
env_vars dict[str, str]

Additional environment variables to set.

required

Returns:

Type Description
Future[WorkerResult]

Future that will contain the WorkerResult when complete.

Raises:

Type Description
RuntimeError

If the pool is not running.

Source code in src/pytest_gremlins/parallel/persistent_pool.py
Python
def submit(
    self,
    gremlin_id: str,
    test_command: list[str],
    rootdir: str,
    instrumented_dir: str | None,
    env_vars: dict[str, str],
) -> Future[WorkerResult]:
    """Submit a gremlin test for execution.

    Args:
        gremlin_id: The ID of the gremlin to test.
        test_command: Command to run tests.
        rootdir: Root directory for test execution.
        instrumented_dir: Directory with instrumented sources (or None).
        env_vars: Additional environment variables to set.

    Returns:
        Future that will contain the WorkerResult when complete.

    Raises:
        RuntimeError: If the pool is not running.
    """
    if not self._running or self._executor is None:
        msg = 'PersistentWorkerPool is not running. Use as context manager.'
        raise RuntimeError(msg)

    all_env_vars = dict(env_vars)
    if instrumented_dir is not None:
        all_env_vars['PYTEST_GREMLINS_SOURCES_FILE'] = f'{instrumented_dir}/sources.json'

    return self._executor.submit(
        _run_gremlin_test,
        gremlin_id,
        test_command,
        rootdir,
        all_env_vars,
        self._timeout,
    )

submit_batch

Python
submit_batch(gremlin_ids, test_command, rootdir, instrumented_dir, env_vars)

Submit a batch of gremlin tests for execution in a single subprocess.

Batch execution reduces subprocess overhead by testing multiple gremlins in one subprocess call. Tests all gremlins in the batch independently.

Parameters:

Name Type Description Default
gremlin_ids list[str]

List of gremlin IDs to test.

required
test_command list[str]

Command to run tests.

required
rootdir str

Root directory for test execution.

required
instrumented_dir str | None

Directory with instrumented sources (or None).

required
env_vars dict[str, str]

Additional environment variables to set.

required

Returns:

Type Description
Future[list[WorkerResult]]

Future that will contain list of WorkerResult for each tested gremlin.

Raises:

Type Description
RuntimeError

If the pool is not running.

Source code in src/pytest_gremlins/parallel/persistent_pool.py
Python
def submit_batch(
    self,
    gremlin_ids: list[str],
    test_command: list[str],
    rootdir: str,
    instrumented_dir: str | None,
    env_vars: dict[str, str],
) -> Future[list[WorkerResult]]:
    """Submit a batch of gremlin tests for execution in a single subprocess.

    Batch execution reduces subprocess overhead by testing multiple gremlins
    in one subprocess call. Tests all gremlins in the batch independently.

    Args:
        gremlin_ids: List of gremlin IDs to test.
        test_command: Command to run tests.
        rootdir: Root directory for test execution.
        instrumented_dir: Directory with instrumented sources (or None).
        env_vars: Additional environment variables to set.

    Returns:
        Future that will contain list of WorkerResult for each tested gremlin.

    Raises:
        RuntimeError: If the pool is not running.
    """
    if not self._running or self._executor is None:
        msg = 'PersistentWorkerPool is not running. Use as context manager.'
        raise RuntimeError(msg)

    all_env_vars = dict(env_vars)
    if instrumented_dir is not None:
        all_env_vars['PYTEST_GREMLINS_SOURCES_FILE'] = f'{instrumented_dir}/sources.json'

    return self._executor.submit(
        _run_gremlin_batch,
        gremlin_ids,
        test_command,
        rootdir,
        all_env_vars,
        self._timeout,
    )

Why Persistent Workers?

Standard approach (1 subprocess per gremlin):

Text Only
100 gremlins x 600ms startup = 60 seconds overhead

Persistent workers (reused processes):

Text Only
4 workers x 600ms startup = 2.4 seconds overhead

25x reduction in subprocess overhead.

Usage Example

Python
from pytest_gremlins.parallel.pool_config import PoolConfig
from pytest_gremlins.parallel.persistent_pool import PersistentWorkerPool

config = PoolConfig(max_workers=4, warmup=True, start_method='forkserver')
pool = PersistentWorkerPool.from_config(config)

with pool:
    # Workers are warmed up
    print(f'Warmed up: {pool.is_warmed_up}')

    # Submit individual gremlins
    future = pool.submit(
        gremlin_id='g001',
        test_command=['pytest', '-x'],
        rootdir='/project',
        instrumented_dir='/tmp/inst',
        env_vars={},
    )
    result = future.result()

    # Or submit batches (even faster)
    batch_future = pool.submit_batch(
        gremlin_ids=['g002', 'g003', 'g004'],
        test_command=['pytest', '-x'],
        rootdir='/project',
        instrumented_dir='/tmp/inst',
        env_vars={},
    )
    batch_results = batch_future.result()

BatchExecutor

Coordinates batch execution of gremlin tests for reduced subprocess overhead.

BatchExecutor

Python
BatchExecutor(batch_size=10, max_workers=None, timeout=30, *, config=None)

Coordinates batch execution of gremlin tests.

Partitions gremlins into batches and executes them with reduced subprocess overhead. Each batch runs in a single subprocess, with gremlins tested sequentially within the batch.

Supports PoolConfig for advanced configuration including process start method selection and worker warmup.

Attributes:

Name Type Description
batch_size int

Number of gremlins per batch.

max_workers int

Maximum number of parallel worker processes.

config PoolConfig

The PoolConfig used to configure the underlying pool.

Example

config = PoolConfig(max_workers=4, batch_size=20, warmup=True) # doctest: +SKIP executor = BatchExecutor.from_config(config) # doctest: +SKIP results = executor.execute(['g001', 'g002'], ['pytest'], '.', None, {}) # doctest: +SKIP

Parameters:

Name Type Description Default
batch_size int

Number of gremlins per batch. Defaults to 10.

10
max_workers int | None

Maximum number of worker processes. Defaults to CPU count.

None
timeout int

Timeout in seconds for individual gremlin tests.

30
config PoolConfig | None

Optional PoolConfig. If provided, batch_size, max_workers, and timeout are taken from it (unless explicitly provided).

None
Source code in src/pytest_gremlins/parallel/batch_executor.py
Python
def __init__(
    self,
    batch_size: int = 10,
    max_workers: int | None = None,
    timeout: int = 30,
    *,
    config: PoolConfig | None = None,
) -> None:
    """Initialize the batch executor.

    Args:
        batch_size: Number of gremlins per batch. Defaults to 10.
        max_workers: Maximum number of worker processes. Defaults to CPU count.
        timeout: Timeout in seconds for individual gremlin tests.
        config: Optional PoolConfig. If provided, batch_size, max_workers,
            and timeout are taken from it (unless explicitly provided).
    """
    if config is not None:
        self._config = config
        # Use config values, but allow explicit overrides
        self._batch_size = batch_size if batch_size != 10 else config.batch_size  # noqa: PLR2004
        self._max_workers = max_workers if max_workers is not None else config.max_workers
        self._timeout = timeout if timeout != 30 else config.timeout  # noqa: PLR2004
    else:
        # Create a default config
        effective_max_workers = max_workers if max_workers is not None else (os.cpu_count() or 4)
        self._config = PoolConfig(
            max_workers=effective_max_workers,
            timeout=timeout,
            batch_size=batch_size,
        )
        self._batch_size = batch_size
        self._max_workers = effective_max_workers
        self._timeout = timeout

batch_size property

Python
batch_size

Return the batch size.

max_workers property

Python
max_workers

Return the maximum number of workers.

config property

Python
config

Return the PoolConfig used by this executor.

from_config classmethod

Python
from_config(config)

Create a BatchExecutor from a PoolConfig.

This is the preferred way to create an executor with custom settings.

Parameters:

Name Type Description Default
config PoolConfig

The configuration to use.

required

Returns:

Type Description
Self

A new BatchExecutor configured with the given settings.

Example

config = PoolConfig(max_workers=4, batch_size=20) executor = BatchExecutor.from_config(config) executor.batch_size 20

Source code in src/pytest_gremlins/parallel/batch_executor.py
Python
@classmethod
def from_config(cls, config: PoolConfig) -> Self:
    """Create a BatchExecutor from a PoolConfig.

    This is the preferred way to create an executor with custom settings.

    Args:
        config: The configuration to use.

    Returns:
        A new BatchExecutor configured with the given settings.

    Example:
        >>> config = PoolConfig(max_workers=4, batch_size=20)
        >>> executor = BatchExecutor.from_config(config)
        >>> executor.batch_size
        20
    """
    return cls(config=config)

partition

Python
partition(gremlin_ids)

Partition gremlin IDs into batches.

Parameters:

Name Type Description Default
gremlin_ids list[str]

List of gremlin IDs to partition.

required

Returns:

Type Description
list[list[str]]

List of batches, where each batch is a list of gremlin IDs.

Source code in src/pytest_gremlins/parallel/batch_executor.py
Python
def partition(self, gremlin_ids: list[str]) -> list[list[str]]:
    """Partition gremlin IDs into batches.

    Args:
        gremlin_ids: List of gremlin IDs to partition.

    Returns:
        List of batches, where each batch is a list of gremlin IDs.
    """
    if not gremlin_ids:
        return []

    batches: list[list[str]] = []
    for i in range(0, len(gremlin_ids), self._batch_size):
        batch = gremlin_ids[i : i + self._batch_size]
        batches.append(batch)

    return batches

execute

Python
execute(gremlin_ids, test_command, rootdir, instrumented_dir, env_vars)

Execute gremlin tests in batches.

Creates a PersistentWorkerPool using the configured PoolConfig, partitions gremlins into batches, and executes them in parallel.

Parameters:

Name Type Description Default
gremlin_ids list[str]

List of gremlin IDs to test.

required
test_command list[str]

Command to run tests.

required
rootdir str

Root directory for test execution.

required
instrumented_dir str | None

Directory with instrumented sources (or None).

required
env_vars dict[str, str]

Additional environment variables to set.

required

Returns:

Type Description
list[WorkerResult]

List of WorkerResult for each tested gremlin.

Source code in src/pytest_gremlins/parallel/batch_executor.py
Python
def execute(
    self,
    gremlin_ids: list[str],
    test_command: list[str],
    rootdir: str,
    instrumented_dir: str | None,
    env_vars: dict[str, str],
) -> list[WorkerResult]:
    """Execute gremlin tests in batches.

    Creates a PersistentWorkerPool using the configured PoolConfig,
    partitions gremlins into batches, and executes them in parallel.

    Args:
        gremlin_ids: List of gremlin IDs to test.
        test_command: Command to run tests.
        rootdir: Root directory for test execution.
        instrumented_dir: Directory with instrumented sources (or None).
        env_vars: Additional environment variables to set.

    Returns:
        List of WorkerResult for each tested gremlin.
    """
    batches = self.partition(gremlin_ids)

    if not batches:
        return []

    all_env_vars = dict(env_vars)
    if instrumented_dir is not None:
        all_env_vars['PYTEST_GREMLINS_SOURCES_FILE'] = f'{instrumented_dir}/sources.json'

    all_results: list[WorkerResult] = []

    # Create pool using config for optimal settings
    pool = PersistentWorkerPool.from_config(self._config)

    with pool:
        futures = {
            pool.submit_batch(
                gremlin_ids=batch,
                test_command=test_command,
                rootdir=rootdir,
                instrumented_dir=instrumented_dir,
                env_vars=env_vars,
            ): batch
            for batch in batches
        }

        for future in as_completed(futures):
            batch_results = future.result()
            all_results.extend(batch_results)

    return all_results

Why Batch Execution?

Standard approach (1 subprocess call per gremlin):

Text Only
100 gremlins = 100 subprocess calls
100 x 600ms overhead = 60 seconds

Batch execution (batch_size=10):

Text Only
100 gremlins / 10 = 10 subprocess calls
10 x 600ms overhead = 6 seconds

10x reduction in subprocess overhead.

Usage Example

Python
from pytest_gremlins.parallel.pool_config import PoolConfig
from pytest_gremlins.parallel.batch_executor import BatchExecutor

config = PoolConfig(max_workers=4, batch_size=20, warmup=True)
executor = BatchExecutor.from_config(config)

# Partition gremlins into batches
gremlin_ids = [f'g{i:03d}' for i in range(100)]
batches = executor.partition(gremlin_ids)
print(f'{len(batches)} batches of {executor.batch_size}')

# Execute all gremlins
results = executor.execute(
    gremlin_ids=gremlin_ids,
    test_command=['pytest', '-x', 'tests/'],
    rootdir='/path/to/project',
    instrumented_dir='/tmp/instrumented',
    env_vars={},
)

for result in results:
    print(f'{result.gremlin_id}: {result.status.value}')

ResultAggregator

Thread-safe collection of results with progress tracking.

ResultAggregator

Python
ResultAggregator(total_gremlins)

Aggregates results from parallel worker processes.

Thread-safe collection of results with progress tracking and status counts. Results are stored as they arrive and can be retrieved sorted by gremlin ID.

Attributes:

Name Type Description
total_gremlins int

Total number of gremlins being tested.

completed int

Number of gremlins that have been processed.

Example

aggregator = ResultAggregator(total_gremlins=100) aggregator.add_result(WorkerResult('g001', GremlinResultStatus.ZAPPED)) progress = aggregator.get_progress() # (1, 100)

Parameters:

Name Type Description Default
total_gremlins int

Total number of gremlins to be tested.

required
Source code in src/pytest_gremlins/parallel/aggregator.py
Python
def __init__(self, total_gremlins: int) -> None:
    """Initialize the result aggregator.

    Args:
        total_gremlins: Total number of gremlins to be tested.
    """
    self._total_gremlins = total_gremlins
    self._results: list[WorkerResult] = []
    self._lock = threading.Lock()
    self._zapped = 0
    self._survived = 0
    self._timeout = 0
    self._error = 0

total_gremlins property

Python
total_gremlins

Return the total number of gremlins.

completed property

Python
completed

Return the number of completed results.

zapped_count property

Python
zapped_count

Return the number of zapped gremlins.

survived_count property

Python
survived_count

Return the number of survived gremlins.

timeout_count property

Python
timeout_count

Return the number of timed out gremlins.

error_count property

Python
error_count

Return the number of error gremlins.

progress_percentage property

Python
progress_percentage

Return progress as a percentage.

Returns:

Type Description
float

Progress from 0.0 to 100.0.

add_result

Python
add_result(result)

Add a result from a worker.

Thread-safe method to add a result to the aggregator.

Parameters:

Name Type Description Default
result WorkerResult

The worker result to add.

required
Source code in src/pytest_gremlins/parallel/aggregator.py
Python
def add_result(self, result: WorkerResult) -> None:
    """Add a result from a worker.

    Thread-safe method to add a result to the aggregator.

    Args:
        result: The worker result to add.
    """
    with self._lock:
        self._results.append(result)
        self._update_status_count(result.status)

add_error

Python
add_error(gremlin_id, error)

Record an error for a gremlin.

Creates an ERROR status result when a worker fails.

Parameters:

Name Type Description Default
gremlin_id str

The ID of the gremlin that failed.

required
error Exception

The exception that occurred.

required
Source code in src/pytest_gremlins/parallel/aggregator.py
Python
def add_error(self, gremlin_id: str, error: Exception) -> None:
    """Record an error for a gremlin.

    Creates an ERROR status result when a worker fails.

    Args:
        gremlin_id: The ID of the gremlin that failed.
        error: The exception that occurred.
    """
    result = WorkerResult(
        gremlin_id=gremlin_id,
        status=GremlinResultStatus.ERROR,
        error_output=str(error)[:2000],
    )
    self.add_result(result)

get_results

Python
get_results()

Get all results sorted by gremlin ID.

Returns:

Type Description
list[WorkerResult]

List of WorkerResult objects sorted by gremlin_id.

Source code in src/pytest_gremlins/parallel/aggregator.py
Python
def get_results(self) -> list[WorkerResult]:
    """Get all results sorted by gremlin ID.

    Returns:
        List of WorkerResult objects sorted by gremlin_id.
    """
    with self._lock:
        return sorted(self._results, key=lambda r: r.gremlin_id)

get_progress

Python
get_progress()

Get progress as (completed, total).

Returns:

Type Description
tuple[int, int]

Tuple of (completed count, total count).

Source code in src/pytest_gremlins/parallel/aggregator.py
Python
def get_progress(self) -> tuple[int, int]:
    """Get progress as (completed, total).

    Returns:
        Tuple of (completed count, total count).
    """
    with self._lock:
        return (len(self._results), self._total_gremlins)

ResultAggregator Methods

Method Returns Description
add_result(result) None Add a worker result
add_error(gremlin_id, error) None Record an error
get_results() list[WorkerResult] Get all results (sorted)
get_progress() tuple[int, int] Get (completed, total)

ResultAggregator Properties

Property Type Description
total_gremlins int Total gremlins to test
completed int Completed count
zapped_count int Zapped gremlins
survived_count int Survived gremlins
timeout_count int Timed out gremlins
error_count int Error gremlins
progress_percentage float Progress (0.0 to 100.0)

Usage Example

Python
from pytest_gremlins.parallel.aggregator import ResultAggregator
from pytest_gremlins.parallel.pool import WorkerResult
from pytest_gremlins.reporting.results import GremlinResultStatus

aggregator = ResultAggregator(total_gremlins=100)

# Add results as they arrive
aggregator.add_result(WorkerResult(
    gremlin_id='g001',
    status=GremlinResultStatus.ZAPPED,
    killing_test='test_boundary',
))

# Progress reporting
completed, total = aggregator.get_progress()
print(f'Progress: {completed}/{total} ({aggregator.progress_percentage:.1f}%)')

# Final results
results = aggregator.get_results()
print(f'Zapped: {aggregator.zapped_count}')
print(f'Survived: {aggregator.survived_count}')

Distribution Strategies

Strategies for distributing gremlins across workers.

DistributionStrategy Protocol

DistributionStrategy

Bases: Protocol

Protocol for gremlin distribution strategies.

Implementations partition gremlins into buckets for parallel workers.

distribute

Python
distribute(gremlins, num_workers, test_counts=None)

Distribute gremlins across workers.

Parameters:

Name Type Description Default
gremlins list[Gremlin]

List of gremlins to distribute.

required
num_workers int

Number of worker processes.

required
test_counts dict[str, int] | None

Optional mapping of gremlin_id to number of covering tests.

None

Returns:

Type Description
list[list[Gremlin]]

List of num_workers buckets, each containing gremlins for that worker.

Source code in src/pytest_gremlins/parallel/distribution.py
Python
def distribute(
    self,
    gremlins: list[Gremlin],
    num_workers: int,
    test_counts: dict[str, int] | None = None,
) -> list[list[Gremlin]]:
    """Distribute gremlins across workers.

    Args:
        gremlins: List of gremlins to distribute.
        num_workers: Number of worker processes.
        test_counts: Optional mapping of gremlin_id to number of covering tests.

    Returns:
        List of num_workers buckets, each containing gremlins for that worker.
    """
    ...

RoundRobinDistribution

Simple round-robin assignment.

RoundRobinDistribution

Simple round-robin distribution strategy.

Assigns gremlin N to worker N % num_workers. Fast and deterministic, but doesn't account for varying execution times.

Example::

Text Only
strategy = RoundRobinDistribution()
gremlins = [g0, g1, g2, g3, g4]
result = strategy.distribute(gremlins, num_workers=3)
# result[0] = [g0, g3], result[1] = [g1, g4], result[2] = [g2]

distribute

Python
distribute(gremlins, num_workers, test_counts=None)

Distribute gremlins round-robin across workers.

Parameters:

Name Type Description Default
gremlins list[Gremlin]

List of gremlins to distribute.

required
num_workers int

Number of worker processes.

required
test_counts dict[str, int] | None

Ignored for round-robin distribution.

None

Returns:

Type Description
list[list[Gremlin]]

List of num_workers buckets with gremlins distributed round-robin.

Source code in src/pytest_gremlins/parallel/distribution.py
Python
def distribute(
    self,
    gremlins: list[Gremlin],
    num_workers: int,
    test_counts: dict[str, int] | None = None,  # noqa: ARG002
) -> list[list[Gremlin]]:
    """Distribute gremlins round-robin across workers.

    Args:
        gremlins: List of gremlins to distribute.
        num_workers: Number of worker processes.
        test_counts: Ignored for round-robin distribution.

    Returns:
        List of num_workers buckets with gremlins distributed round-robin.
    """
    buckets: list[list[Gremlin]] = [[] for _ in range(num_workers)]

    for i, gremlin in enumerate(gremlins):
        worker_idx = i % num_workers
        buckets[worker_idx].append(gremlin)

    return buckets
Python
from pytest_gremlins.parallel.distribution import RoundRobinDistribution

strategy = RoundRobinDistribution()
gremlins = [g0, g1, g2, g3, g4]  # 5 gremlins
buckets = strategy.distribute(gremlins, num_workers=3)

# buckets[0] = [g0, g3]  # Worker 0
# buckets[1] = [g1, g4]  # Worker 1
# buckets[2] = [g2]      # Worker 2

WeightedDistribution

Balances by test count (expensive gremlins distributed evenly).

WeightedDistribution

Weighted distribution strategy that balances by test count.

Assigns expensive gremlins (many covering tests) to different workers to avoid hotspots where one worker gets all the slow gremlins.

Uses a greedy algorithm: sort gremlins by weight descending, then assign each gremlin to the worker with the smallest current total weight.

Example::

Text Only
strategy = WeightedDistribution()
gremlins = [g0, g1, g2, g3]  # g0, g1 are heavy (100 tests each)
test_counts = {'g0': 100, 'g1': 100, 'g2': 10, 'g3': 10}
result = strategy.distribute(gremlins, num_workers=2, test_counts=test_counts)
# Heavy gremlins distributed to different workers for balance

distribute

Python
distribute(gremlins, num_workers, test_counts=None)

Distribute gremlins weighted by test count.

Parameters:

Name Type Description Default
gremlins list[Gremlin]

List of gremlins to distribute.

required
num_workers int

Number of worker processes.

required
test_counts dict[str, int] | None

Mapping of gremlin_id to number of covering tests. Gremlins not in this map get weight of 1.

None

Returns:

Type Description
list[list[Gremlin]]

List of num_workers buckets with gremlins balanced by weight.

Source code in src/pytest_gremlins/parallel/distribution.py
Python
def distribute(
    self,
    gremlins: list[Gremlin],
    num_workers: int,
    test_counts: dict[str, int] | None = None,
) -> list[list[Gremlin]]:
    """Distribute gremlins weighted by test count.

    Args:
        gremlins: List of gremlins to distribute.
        num_workers: Number of worker processes.
        test_counts: Mapping of gremlin_id to number of covering tests.
                    Gremlins not in this map get weight of 1.

    Returns:
        List of num_workers buckets with gremlins balanced by weight.
    """
    buckets: list[list[Gremlin]] = [[] for _ in range(num_workers)]

    if not gremlins:
        return buckets

    # If no test counts, fall back to round-robin
    if test_counts is None:
        return RoundRobinDistribution().distribute(gremlins, num_workers)

    # Sort gremlins by weight (test count) descending
    # Gremlins not in test_counts get weight of 1
    weighted_gremlins = sorted(
        gremlins,
        key=lambda g: test_counts.get(g.gremlin_id, 1),
        reverse=True,
    )

    # Track total weight per worker
    worker_weights = [0] * num_workers

    # Greedy assignment: assign each gremlin to least-loaded worker
    for gremlin in weighted_gremlins:
        weight = test_counts.get(gremlin.gremlin_id, 1)
        # Find worker with minimum current weight
        min_worker = min(range(num_workers), key=lambda w: worker_weights[w])
        buckets[min_worker].append(gremlin)
        worker_weights[min_worker] += weight

    return buckets
Python
from pytest_gremlins.parallel.distribution import WeightedDistribution

strategy = WeightedDistribution()

# Heavy gremlins (100 tests each) get distributed to different workers
test_counts = {
    'g0': 100,  # Heavy
    'g1': 100,  # Heavy
    'g2': 10,   # Light
    'g3': 10,   # Light
}

gremlins = [g0, g1, g2, g3]
buckets = strategy.distribute(gremlins, num_workers=2, test_counts=test_counts)

# buckets[0] = [g0, g2, g3]  # 100 + 10 + 10 = 120
# buckets[1] = [g1]          # 100
# Balanced workload!

CLI Integration

Enable parallel execution via command line:

Bash
# Enable parallel execution (auto-detect workers)
pytest --gremlins --gremlin-parallel

# Specify worker count
pytest --gremlins --gremlin-parallel --gremlin-workers=8

# Enable batch mode
pytest --gremlins --gremlin-batch

# Customize batch size
pytest --gremlins --gremlin-batch --gremlin-batch-size=20

# Combine for maximum speed
pytest --gremlins --gremlin-parallel --gremlin-batch --gremlin-workers=8 --gremlin-batch-size=20

Performance Comparison

Mode Subprocess Calls Overhead Best For
Sequential 1 per gremlin High Small projects
Parallel 1 per gremlin High (but concurrent) Multi-core, many gremlins
Batch 1 per batch Low Large test suites
Parallel + Batch Batches in parallel Lowest Maximum speed

Example Timings

Text Only
Project: 1000 gremlins, 8 CPU cores

Sequential:           1000 x 600ms = 600 seconds
Parallel (8 workers): 1000 x 600ms / 8 = 75 seconds
Batch (size=10):      100 x 600ms = 60 seconds
Parallel + Batch:     100 x 600ms / 8 = 7.5 seconds

Speedup: 80x

Best Practices

  1. Use forkserver on Unix - Fastest subprocess creation
  2. Enable warmup - Reduces latency on first batch
  3. Tune batch size - Balance between overhead and early termination
  4. Match workers to cores - No benefit beyond physical cores
  5. Monitor timeouts - Increase if legitimate tests are timing out