Parallel Module¶
The parallel module implements parallel execution, the fourth pillar of pytest-gremlins' speed strategy. Gremlins are distributed across multiple worker processes for faster results on multi-core machines.
Overview¶
Sequential execution:
Parallel execution:
The mutation switching architecture makes parallelization safe because each worker operates
independently with its own ACTIVE_GREMLIN environment variable.
Module Imports¶
The parallel package does not re-export its classes from __init__.py.
Import directly from the submodules:
from pytest_gremlins.parallel.pool import WorkerPool, WorkerResult
from pytest_gremlins.parallel.pool_config import PoolConfig
from pytest_gremlins.parallel.persistent_pool import PersistentWorkerPool
from pytest_gremlins.parallel.batch_executor import BatchExecutor
from pytest_gremlins.parallel.aggregator import ResultAggregator
from pytest_gremlins.parallel.distribution import (
DistributionStrategy,
RoundRobinDistribution,
WeightedDistribution,
)
WorkerPool¶
Basic worker pool using ProcessPoolExecutor.
WorkerPool
¶
Manages a pool of worker processes for parallel mutation testing.
The worker pool wraps a ProcessPoolExecutor and provides lifecycle management for parallel gremlin execution. Workers are isolated processes that each set their own ACTIVE_GREMLIN environment variable.
Attributes:
| Name | Type | Description |
|---|---|---|
max_workers |
int
|
Maximum number of worker processes. |
timeout |
int
|
Timeout in seconds for individual gremlin tests. |
Example
with WorkerPool(max_workers=4) as pool: ... # Submit work to pool ... pass
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int | None
|
Maximum number of worker processes. Defaults to CPU count. |
None
|
timeout
|
int
|
Timeout in seconds for individual tests. Defaults to 30. |
30
|
Source code in src/pytest_gremlins/parallel/pool.py
submit
¶
Submit a gremlin test for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlin_id
|
str
|
The ID of the gremlin to test. |
required |
test_command
|
list[str]
|
Command to run tests. |
required |
rootdir
|
str
|
Root directory for test execution. |
required |
instrumented_dir
|
str | None
|
Directory with instrumented sources (or None). |
required |
env_vars
|
dict[str, str]
|
Additional environment variables to set. |
required |
Returns:
| Type | Description |
|---|---|
Future[WorkerResult]
|
Future that will contain the WorkerResult when complete. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the pool is not active (not in context). |
Source code in src/pytest_gremlins/parallel/pool.py
shutdown
¶
Shutdown the worker pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wait
|
bool
|
If True, wait for pending work to complete. If False, cancel pending work immediately. |
True
|
Source code in src/pytest_gremlins/parallel/pool.py
WorkerPool Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_workers |
int \| None |
CPU count | Number of worker processes |
timeout |
int |
30 |
Timeout per gremlin (seconds) |
Usage Example¶
from concurrent.futures import as_completed
from pytest_gremlins.parallel.pool import WorkerPool
with WorkerPool(max_workers=4, timeout=30) as pool:
futures = {}
for gremlin_id in ['g001', 'g002', 'g003']:
future = pool.submit(
gremlin_id=gremlin_id,
test_command=['pytest', '-x', 'tests/'],
rootdir='/path/to/project',
instrumented_dir='/tmp/instrumented',
env_vars={},
)
futures[future] = gremlin_id
for future in as_completed(futures):
result = future.result()
print(f'{result.gremlin_id}: {result.status.value}')
WorkerResult¶
Result from a worker process (serializable for multiprocessing).
WorkerResult
dataclass
¶
Result from a worker process.
This is a simplified result that can be passed between processes. Unlike GremlinResult, it doesn't contain AST nodes which cannot be serialized for multiprocessing.
Attributes:
| Name | Type | Description |
|---|---|---|
gremlin_id |
str
|
The ID of the gremlin that was tested. |
status |
GremlinResultStatus
|
The outcome of testing the gremlin. |
killing_test |
str | None
|
Name of test that killed the gremlin (if any). |
execution_time_ms |
float | None
|
Time taken to test this gremlin. |
error_output |
str
|
Captured stderr or exception message when status is ERROR. |
WorkerResult Attributes¶
| Attribute | Type | Description |
|---|---|---|
gremlin_id |
str |
ID of the tested gremlin |
status |
GremlinResultStatus |
ZAPPED, SURVIVED, TIMEOUT, or ERROR |
killing_test |
str \| None |
Test that killed the gremlin |
execution_time_ms |
float \| None |
Execution time in milliseconds |
PoolConfig¶
Configuration for optimized worker pool settings.
PoolConfig
dataclass
¶
PoolConfig(max_workers=_default_max_workers(), timeout=30, start_method='auto', warmup=True, batch_size=10, executor='subprocess')
Configuration for the persistent worker pool.
This dataclass encapsulates all configuration options for the worker pool, allowing for easy customization and validation of pool settings.
Attributes:
| Name | Type | Description |
|---|---|---|
max_workers |
int
|
Maximum number of worker processes. Defaults to CPU count. |
timeout |
int
|
Timeout in seconds for individual gremlin tests. Defaults to 30. |
start_method |
StartMethod
|
Process start method ('auto', 'spawn', 'fork', 'forkserver'). |
warmup |
bool
|
Whether to pre-warm workers on pool startup. Defaults to True. |
batch_size |
int
|
Number of gremlins per batch. Defaults to 10. |
executor |
str
|
Execution strategy ('auto', 'subprocess', 'fork', 'inprocess'). Defaults to 'subprocess'. |
Example
config = PoolConfig(max_workers=4, timeout=60) config.max_workers 4 config.timeout 60
get_mp_context
¶
Create a multiprocessing context with the configured start method.
If start_method is 'auto', uses the optimal method for the platform.
Returns:
| Type | Description |
|---|---|
BaseContext
|
A multiprocessing context configured with the appropriate start method. |
Example
config = PoolConfig(start_method='spawn') ctx = config.get_mp_context() ctx.get_start_method() 'spawn'
Source code in src/pytest_gremlins/parallel/pool_config.py
PoolConfig Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_workers |
int |
CPU count | Number of worker processes |
timeout |
int |
30 |
Timeout per gremlin (seconds) |
start_method |
StartMethod |
'auto' |
Process start method |
warmup |
bool |
True |
Pre-warm workers on startup |
batch_size |
int |
10 |
Gremlins per batch |
Start Methods¶
| Method | Description | Platform |
|---|---|---|
'auto' |
Optimal for platform | All |
'spawn' |
Fresh interpreter (safest, slowest) | All |
'fork' |
Copy parent process (fast, unsafe with threads) | Unix |
'forkserver' |
Fork from pre-warmed server (recommended) | Unix |
Usage Example¶
from pytest_gremlins.parallel.pool_config import PoolConfig
# Create optimized configuration
config = PoolConfig(
max_workers=8,
timeout=60,
start_method='forkserver',
warmup=True,
batch_size=20,
)
# Get multiprocessing context
ctx = config.get_mp_context()
print(ctx.get_start_method()) # 'forkserver'
get_optimal_start_method¶
get_optimal_start_method
¶
Determine the optimal process start method for the current platform.
The start method affects subprocess creation performance: - 'forkserver': Fastest on Linux/macOS. Forks from a pre-warmed server process. - 'spawn': Default on Windows. Creates fresh interpreter, slowest but safest. - 'fork': Fast but unsafe with threads or certain libraries.
Returns:
| Type | Description |
|---|---|
Literal['spawn', 'fork', 'forkserver']
|
The optimal start method for the current platform. |
Example
method = get_optimal_start_method() method in ('spawn', 'fork', 'forkserver') True
Source code in src/pytest_gremlins/parallel/pool_config.py
PersistentWorkerPool¶
Optimized worker pool that keeps workers alive across multiple gremlin tests.
PersistentWorkerPool
¶
Manages a pool of persistent worker processes for mutation testing.
Unlike the standard WorkerPool which spawns a new subprocess per gremlin, this pool keeps worker processes alive and reuses them. Workers import modules once and stay warm, dramatically reducing startup overhead.
Supports configurable process start method and worker warmup via PoolConfig.
Attributes:
| Name | Type | Description |
|---|---|---|
max_workers |
int
|
Maximum number of worker processes. |
timeout |
int
|
Timeout in seconds for individual gremlin tests. |
config |
PoolConfig
|
The PoolConfig used to configure this pool. |
is_warmed_up |
bool
|
Whether workers have been pre-warmed. |
warmup_completed_count |
int
|
Number of workers that completed warmup. |
Example
config = PoolConfig(max_workers=4, warmup=True) # doctest: +SKIP pool = PersistentWorkerPool.from_config(config) # doctest: +SKIP with pool: # doctest: +SKIP ... future = pool.submit('g001', ['pytest'], '.', None, {}) # doctest: +SKIP ... result = future.result() # doctest: +SKIP
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int | None
|
Maximum number of worker processes. Defaults to CPU count. |
None
|
timeout
|
int
|
Timeout in seconds for individual tests. Defaults to 30. |
30
|
config
|
PoolConfig | None
|
Optional PoolConfig. If provided, max_workers and timeout are taken from it (unless explicitly provided). |
None
|
Source code in src/pytest_gremlins/parallel/persistent_pool.py
warmup_completed_count
property
¶
Return the number of workers that completed warmup.
from_config
classmethod
¶
Create a PersistentWorkerPool from a PoolConfig.
This is the preferred way to create a pool with custom settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
PoolConfig
|
The configuration to use. |
required |
Returns:
| Type | Description |
|---|---|
Self
|
A new PersistentWorkerPool configured with the given settings. |
Example
config = PoolConfig(max_workers=4, start_method='forkserver') pool = PersistentWorkerPool.from_config(config) pool.max_workers 4
Source code in src/pytest_gremlins/parallel/persistent_pool.py
submit
¶
Submit a gremlin test for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlin_id
|
str
|
The ID of the gremlin to test. |
required |
test_command
|
list[str]
|
Command to run tests. |
required |
rootdir
|
str
|
Root directory for test execution. |
required |
instrumented_dir
|
str | None
|
Directory with instrumented sources (or None). |
required |
env_vars
|
dict[str, str]
|
Additional environment variables to set. |
required |
Returns:
| Type | Description |
|---|---|
Future[WorkerResult]
|
Future that will contain the WorkerResult when complete. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the pool is not running. |
Source code in src/pytest_gremlins/parallel/persistent_pool.py
submit_batch
¶
Submit a batch of gremlin tests for execution in a single subprocess.
Batch execution reduces subprocess overhead by testing multiple gremlins in one subprocess call. Tests all gremlins in the batch independently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlin_ids
|
list[str]
|
List of gremlin IDs to test. |
required |
test_command
|
list[str]
|
Command to run tests. |
required |
rootdir
|
str
|
Root directory for test execution. |
required |
instrumented_dir
|
str | None
|
Directory with instrumented sources (or None). |
required |
env_vars
|
dict[str, str]
|
Additional environment variables to set. |
required |
Returns:
| Type | Description |
|---|---|
Future[list[WorkerResult]]
|
Future that will contain list of WorkerResult for each tested gremlin. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the pool is not running. |
Source code in src/pytest_gremlins/parallel/persistent_pool.py
Why Persistent Workers?¶
Standard approach (1 subprocess per gremlin):
Persistent workers (reused processes):
25x reduction in subprocess overhead.
Usage Example¶
from pytest_gremlins.parallel.pool_config import PoolConfig
from pytest_gremlins.parallel.persistent_pool import PersistentWorkerPool
config = PoolConfig(max_workers=4, warmup=True, start_method='forkserver')
pool = PersistentWorkerPool.from_config(config)
with pool:
# Workers are warmed up
print(f'Warmed up: {pool.is_warmed_up}')
# Submit individual gremlins
future = pool.submit(
gremlin_id='g001',
test_command=['pytest', '-x'],
rootdir='/project',
instrumented_dir='/tmp/inst',
env_vars={},
)
result = future.result()
# Or submit batches (even faster)
batch_future = pool.submit_batch(
gremlin_ids=['g002', 'g003', 'g004'],
test_command=['pytest', '-x'],
rootdir='/project',
instrumented_dir='/tmp/inst',
env_vars={},
)
batch_results = batch_future.result()
BatchExecutor¶
Coordinates batch execution of gremlin tests for reduced subprocess overhead.
BatchExecutor
¶
Coordinates batch execution of gremlin tests.
Partitions gremlins into batches and executes them with reduced subprocess overhead. Each batch runs in a single subprocess, with gremlins tested sequentially within the batch.
Supports PoolConfig for advanced configuration including process start method selection and worker warmup.
Attributes:
| Name | Type | Description |
|---|---|---|
batch_size |
int
|
Number of gremlins per batch. |
max_workers |
int
|
Maximum number of parallel worker processes. |
config |
PoolConfig
|
The PoolConfig used to configure the underlying pool. |
Example
config = PoolConfig(max_workers=4, batch_size=20, warmup=True) # doctest: +SKIP executor = BatchExecutor.from_config(config) # doctest: +SKIP results = executor.execute(['g001', 'g002'], ['pytest'], '.', None, {}) # doctest: +SKIP
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of gremlins per batch. Defaults to 10. |
10
|
max_workers
|
int | None
|
Maximum number of worker processes. Defaults to CPU count. |
None
|
timeout
|
int
|
Timeout in seconds for individual gremlin tests. |
30
|
config
|
PoolConfig | None
|
Optional PoolConfig. If provided, batch_size, max_workers, and timeout are taken from it (unless explicitly provided). |
None
|
Source code in src/pytest_gremlins/parallel/batch_executor.py
from_config
classmethod
¶
Create a BatchExecutor from a PoolConfig.
This is the preferred way to create an executor with custom settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
PoolConfig
|
The configuration to use. |
required |
Returns:
| Type | Description |
|---|---|
Self
|
A new BatchExecutor configured with the given settings. |
Example
config = PoolConfig(max_workers=4, batch_size=20) executor = BatchExecutor.from_config(config) executor.batch_size 20
Source code in src/pytest_gremlins/parallel/batch_executor.py
partition
¶
Partition gremlin IDs into batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlin_ids
|
list[str]
|
List of gremlin IDs to partition. |
required |
Returns:
| Type | Description |
|---|---|
list[list[str]]
|
List of batches, where each batch is a list of gremlin IDs. |
Source code in src/pytest_gremlins/parallel/batch_executor.py
execute
¶
Execute gremlin tests in batches.
Creates a PersistentWorkerPool using the configured PoolConfig, partitions gremlins into batches, and executes them in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlin_ids
|
list[str]
|
List of gremlin IDs to test. |
required |
test_command
|
list[str]
|
Command to run tests. |
required |
rootdir
|
str
|
Root directory for test execution. |
required |
instrumented_dir
|
str | None
|
Directory with instrumented sources (or None). |
required |
env_vars
|
dict[str, str]
|
Additional environment variables to set. |
required |
Returns:
| Type | Description |
|---|---|
list[WorkerResult]
|
List of WorkerResult for each tested gremlin. |
Source code in src/pytest_gremlins/parallel/batch_executor.py
Why Batch Execution?¶
Standard approach (1 subprocess call per gremlin):
Batch execution (batch_size=10):
10x reduction in subprocess overhead.
Usage Example¶
from pytest_gremlins.parallel.pool_config import PoolConfig
from pytest_gremlins.parallel.batch_executor import BatchExecutor
config = PoolConfig(max_workers=4, batch_size=20, warmup=True)
executor = BatchExecutor.from_config(config)
# Partition gremlins into batches
gremlin_ids = [f'g{i:03d}' for i in range(100)]
batches = executor.partition(gremlin_ids)
print(f'{len(batches)} batches of {executor.batch_size}')
# Execute all gremlins
results = executor.execute(
gremlin_ids=gremlin_ids,
test_command=['pytest', '-x', 'tests/'],
rootdir='/path/to/project',
instrumented_dir='/tmp/instrumented',
env_vars={},
)
for result in results:
print(f'{result.gremlin_id}: {result.status.value}')
ResultAggregator¶
Thread-safe collection of results with progress tracking.
ResultAggregator
¶
Aggregates results from parallel worker processes.
Thread-safe collection of results with progress tracking and status counts. Results are stored as they arrive and can be retrieved sorted by gremlin ID.
Attributes:
| Name | Type | Description |
|---|---|---|
total_gremlins |
int
|
Total number of gremlins being tested. |
completed |
int
|
Number of gremlins that have been processed. |
Example
aggregator = ResultAggregator(total_gremlins=100) aggregator.add_result(WorkerResult('g001', GremlinResultStatus.ZAPPED)) progress = aggregator.get_progress() # (1, 100)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
total_gremlins
|
int
|
Total number of gremlins to be tested. |
required |
Source code in src/pytest_gremlins/parallel/aggregator.py
progress_percentage
property
¶
Return progress as a percentage.
Returns:
| Type | Description |
|---|---|
float
|
Progress from 0.0 to 100.0. |
add_result
¶
Add a result from a worker.
Thread-safe method to add a result to the aggregator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result
|
WorkerResult
|
The worker result to add. |
required |
Source code in src/pytest_gremlins/parallel/aggregator.py
| Python | |
|---|---|
add_error
¶
Record an error for a gremlin.
Creates an ERROR status result when a worker fails.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlin_id
|
str
|
The ID of the gremlin that failed. |
required |
error
|
Exception
|
The exception that occurred. |
required |
Source code in src/pytest_gremlins/parallel/aggregator.py
get_results
¶
Get all results sorted by gremlin ID.
Returns:
| Type | Description |
|---|---|
list[WorkerResult]
|
List of WorkerResult objects sorted by gremlin_id. |
Source code in src/pytest_gremlins/parallel/aggregator.py
get_progress
¶
Get progress as (completed, total).
Returns:
| Type | Description |
|---|---|
tuple[int, int]
|
Tuple of (completed count, total count). |
ResultAggregator Methods¶
| Method | Returns | Description |
|---|---|---|
add_result(result) |
None |
Add a worker result |
add_error(gremlin_id, error) |
None |
Record an error |
get_results() |
list[WorkerResult] |
Get all results (sorted) |
get_progress() |
tuple[int, int] |
Get (completed, total) |
ResultAggregator Properties¶
| Property | Type | Description |
|---|---|---|
total_gremlins |
int |
Total gremlins to test |
completed |
int |
Completed count |
zapped_count |
int |
Zapped gremlins |
survived_count |
int |
Survived gremlins |
timeout_count |
int |
Timed out gremlins |
error_count |
int |
Error gremlins |
progress_percentage |
float |
Progress (0.0 to 100.0) |
Usage Example¶
from pytest_gremlins.parallel.aggregator import ResultAggregator
from pytest_gremlins.parallel.pool import WorkerResult
from pytest_gremlins.reporting.results import GremlinResultStatus
aggregator = ResultAggregator(total_gremlins=100)
# Add results as they arrive
aggregator.add_result(WorkerResult(
gremlin_id='g001',
status=GremlinResultStatus.ZAPPED,
killing_test='test_boundary',
))
# Progress reporting
completed, total = aggregator.get_progress()
print(f'Progress: {completed}/{total} ({aggregator.progress_percentage:.1f}%)')
# Final results
results = aggregator.get_results()
print(f'Zapped: {aggregator.zapped_count}')
print(f'Survived: {aggregator.survived_count}')
Distribution Strategies¶
Strategies for distributing gremlins across workers.
DistributionStrategy Protocol¶
DistributionStrategy
¶
Bases: Protocol
Protocol for gremlin distribution strategies.
Implementations partition gremlins into buckets for parallel workers.
distribute
¶
Distribute gremlins across workers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlins
|
list[Gremlin]
|
List of gremlins to distribute. |
required |
num_workers
|
int
|
Number of worker processes. |
required |
test_counts
|
dict[str, int] | None
|
Optional mapping of gremlin_id to number of covering tests. |
None
|
Returns:
| Type | Description |
|---|---|
list[list[Gremlin]]
|
List of num_workers buckets, each containing gremlins for that worker. |
Source code in src/pytest_gremlins/parallel/distribution.py
RoundRobinDistribution¶
Simple round-robin assignment.
RoundRobinDistribution
¶
Simple round-robin distribution strategy.
Assigns gremlin N to worker N % num_workers. Fast and deterministic, but doesn't account for varying execution times.
Example::
strategy = RoundRobinDistribution()
gremlins = [g0, g1, g2, g3, g4]
result = strategy.distribute(gremlins, num_workers=3)
# result[0] = [g0, g3], result[1] = [g1, g4], result[2] = [g2]
distribute
¶
Distribute gremlins round-robin across workers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlins
|
list[Gremlin]
|
List of gremlins to distribute. |
required |
num_workers
|
int
|
Number of worker processes. |
required |
test_counts
|
dict[str, int] | None
|
Ignored for round-robin distribution. |
None
|
Returns:
| Type | Description |
|---|---|
list[list[Gremlin]]
|
List of num_workers buckets with gremlins distributed round-robin. |
Source code in src/pytest_gremlins/parallel/distribution.py
from pytest_gremlins.parallel.distribution import RoundRobinDistribution
strategy = RoundRobinDistribution()
gremlins = [g0, g1, g2, g3, g4] # 5 gremlins
buckets = strategy.distribute(gremlins, num_workers=3)
# buckets[0] = [g0, g3] # Worker 0
# buckets[1] = [g1, g4] # Worker 1
# buckets[2] = [g2] # Worker 2
WeightedDistribution¶
Balances by test count (expensive gremlins distributed evenly).
WeightedDistribution
¶
Weighted distribution strategy that balances by test count.
Assigns expensive gremlins (many covering tests) to different workers to avoid hotspots where one worker gets all the slow gremlins.
Uses a greedy algorithm: sort gremlins by weight descending, then assign each gremlin to the worker with the smallest current total weight.
Example::
strategy = WeightedDistribution()
gremlins = [g0, g1, g2, g3] # g0, g1 are heavy (100 tests each)
test_counts = {'g0': 100, 'g1': 100, 'g2': 10, 'g3': 10}
result = strategy.distribute(gremlins, num_workers=2, test_counts=test_counts)
# Heavy gremlins distributed to different workers for balance
distribute
¶
Distribute gremlins weighted by test count.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gremlins
|
list[Gremlin]
|
List of gremlins to distribute. |
required |
num_workers
|
int
|
Number of worker processes. |
required |
test_counts
|
dict[str, int] | None
|
Mapping of gremlin_id to number of covering tests. Gremlins not in this map get weight of 1. |
None
|
Returns:
| Type | Description |
|---|---|
list[list[Gremlin]]
|
List of num_workers buckets with gremlins balanced by weight. |
Source code in src/pytest_gremlins/parallel/distribution.py
from pytest_gremlins.parallel.distribution import WeightedDistribution
strategy = WeightedDistribution()
# Heavy gremlins (100 tests each) get distributed to different workers
test_counts = {
'g0': 100, # Heavy
'g1': 100, # Heavy
'g2': 10, # Light
'g3': 10, # Light
}
gremlins = [g0, g1, g2, g3]
buckets = strategy.distribute(gremlins, num_workers=2, test_counts=test_counts)
# buckets[0] = [g0, g2, g3] # 100 + 10 + 10 = 120
# buckets[1] = [g1] # 100
# Balanced workload!
CLI Integration¶
Enable parallel execution via command line:
# Enable parallel execution (auto-detect workers)
pytest --gremlins --gremlin-parallel
# Specify worker count
pytest --gremlins --gremlin-parallel --gremlin-workers=8
# Enable batch mode
pytest --gremlins --gremlin-batch
# Customize batch size
pytest --gremlins --gremlin-batch --gremlin-batch-size=20
# Combine for maximum speed
pytest --gremlins --gremlin-parallel --gremlin-batch --gremlin-workers=8 --gremlin-batch-size=20
Performance Comparison¶
| Mode | Subprocess Calls | Overhead | Best For |
|---|---|---|---|
| Sequential | 1 per gremlin | High | Small projects |
| Parallel | 1 per gremlin | High (but concurrent) | Multi-core, many gremlins |
| Batch | 1 per batch | Low | Large test suites |
| Parallel + Batch | Batches in parallel | Lowest | Maximum speed |
Example Timings¶
Project: 1000 gremlins, 8 CPU cores
Sequential: 1000 x 600ms = 600 seconds
Parallel (8 workers): 1000 x 600ms / 8 = 75 seconds
Batch (size=10): 100 x 600ms = 60 seconds
Parallel + Batch: 100 x 600ms / 8 = 7.5 seconds
Speedup: 80x
Best Practices¶
- Use forkserver on Unix - Fastest subprocess creation
- Enable warmup - Reduces latency on first batch
- Tune batch size - Balance between overhead and early termination
- Match workers to cores - No benefit beyond physical cores
- Monitor timeouts - Increase if legitimate tests are timing out