r"""
Monte Carlo simulation base class and orchestration logic.
This module provides:
Classes
:class:`MonteCarloSimulation` — Abstract base class for defining simulations
The simulation class handles:
- Reproducible seeding via :class:`numpy.random.SeedSequence`
- Sequential and parallel execution (delegated to backends)
- Statistics computation via the stats engine
- Result assembly and percentile handling
Example
-------
>>> from mcframework.simulation import MonteCarloSimulation
>>> class DiceSim(MonteCarloSimulation):
... def single_simulation(self, _rng=None):
... rng = self._rng(_rng, self.rng)
... return float(rng.integers(1, 7, size=2).sum())
>>> sim = DiceSim(name="2d6")
>>> sim.set_seed(42)
>>> result = sim.run(10_000) # doctest: +SKIP
See Also
--------
mcframework.backends
Execution backends for sequential and parallel execution.
mcframework.stats_engine
Statistical metrics and confidence intervals.
"""
from __future__ import annotations
import logging
import multiprocessing as mp
import time
import warnings
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable, Iterable, Mapping
import numpy as np
from .backends import ProcessBackend, SequentialBackend, ThreadBackend
from .stats_engine import (
DEFAULT_ENGINE,
CIMethod,
StatsContext,
StatsEngine,
_ensure_ctx,
ci_mean,
mean,
std,
)
from .stats_engine import percentiles as pct
if TYPE_CHECKING:
import cupy
import torch
from .core import SimulationResult
logger = logging.getLogger(__name__)
__all__ = ["MonteCarloSimulation"]
[docs]
class MonteCarloSimulation(ABC):
r"""
Abstract base class for Monte Carlo simulations.
Subclass this and implement :meth:`single_simulation`. The framework takes care of
reproducible seeding, (optional) parallel execution, statistics, and percentiles.
Examples
--------
>>> from mcframework.simulation import MonteCarloSimulation
>>> class PiSim(MonteCarloSimulation):
... def single_simulation(self, _rng=None, n_points: int = 10_000):
... rng = self._rng(_rng, self.rng)
... x, y = rng.random(n_points), rng.random(n_points)
... return 4.0 * ((x*x + y*y) <= 1.0).mean()
...
>>> sim = PiSim()
>>> sim.set_seed(42)
>>> res = sim.run(10_000, parallel=True, compute_stats=True) # doctest: +SKIP
Notes
-----
**Parallel backend.**
The ``parallel_backend`` attribute can be ``"auto"``, ``"thread"``, or ``"process"``.
With NumPy RNGs (which release the GIL), threads are usually faster and avoid
process-spawn overhead.
**Percentiles.**
If ``compute_stats=True``, the stats engine computes defaults
``_PCTS`` = ``(5, 25, 50, 75, 95)`` and merges them with user-requested
percentiles. The original user request is preserved in
``result.metadata["requested_percentiles"]`` and enforced by
:meth:`MonteCarloFramework.compare_results` for percentile metrics.
"""
# Default percentiles for stats engine
_PCTS = (5, 25, 50, 75, 95)
# Minimum simulations to use parallel execution (soft limit)
_PARALLEL_THRESHOLD = 20_000
# Number of chunks per worker for load balancing (ensures dynamic work distribution)
_CHUNKS_PER_WORKER = 8
#: Whether this simulation supports batch GPU execution.
#: Override in subclass by setting ``supports_batch = True``.
_supports_batch: bool = False
@property
def supports_batch(self) -> bool:
"""
Whether this simulation supports batch GPU execution.
Subclasses that implement :meth:`~mcframework.core.MonteCarloSimulation.torch_batch`
or :meth:`~mcframework.core.MonteCarloSimulation.cupy_batch` should set this to
``True`` either as a class attribute or by setting ``self.supports_batch = True``.
Returns
-------
bool
``True`` if the simulation supports vectorized Torch execution.
Examples
--------
>>> class MySim(MonteCarloSimulation):
... supports_batch = True # Class-level override
... def torch_batch(self, n, *, device, generator):
... ...
"""
# Check instance attribute first, then class attribute
return getattr(self, "_supports_batch", False)
@supports_batch.setter
def supports_batch(self, value: bool) -> None:
self._supports_batch = value
@staticmethod
def _rng(
rng: np.random.Generator | None,
default: np.random.Generator | None = None,
) -> np.random.Generator:
r"""
Choose the RNG to use inside :meth:`single_simulation`.
Parameters
----------
rng : :class:`numpy.random.Generator` or None
RNG passed down by the framework (per-worker/per-chunk stream).
default : :class:`numpy.random.Generator` or None
Fallback RNG, typically ``self.rng``.
Returns
-------
:class:`numpy.random.Generator`
The generator to use.
Notes
-----
This helper makes subclass code concise:
>>> def single_simulation(self, _rng=None):
... rng = self._rng(_rng, self.rng)
... return float(rng.normal())
"""
return rng if rng is not None else default # type: ignore[return-value]
[docs]
def __init__(self, name: str = "Simulation"):
self.name = name
self.seed_seq: np.random.SeedSequence | None = None
self.rng = np.random.default_rng()
self.backend: str = "auto"
@property
def parallel_backend(self) -> str:
"""Legacy alias for :attr:`backend` (deprecated)."""
return self.backend
@parallel_backend.setter
def parallel_backend(self, value: str) -> None:
"""Legacy alias for :attr:`backend` (deprecated)."""
self.backend = value
def __getstate__(self):
"""Avoid pickling the RNG (not pickleable)."""
state = self.__dict__.copy()
state["rng"] = None
return state
def __setstate__(self, state):
"""Recreate the RNG after unpickling."""
self.__dict__.update(state)
if self.seed_seq is not None:
self.rng = np.random.default_rng(self.seed_seq)
else:
self.rng = np.random.default_rng()
[docs]
@abstractmethod
def single_simulation(self, *args, **kwargs) -> float:
r"""
Perform a single simulation run.
Notes
-----
Subclasses must implement this method.
Returns
-------
float
The result of the simulation run.
"""
raise NotImplementedError # pragma: no cover
[docs]
def torch_batch(
self,
n: int,
*,
device: "torch.device",
generator: "torch.Generator",
) -> "torch.Tensor":
"""
Optional vectorized Torch implementation.
Override this method in subclasses to enable GPU-accelerated batch execution.
When implemented alongside ``supports_batch = True``, the framework will use
this method instead of repeated ``single_simulation`` calls.
Parameters
----------
n : int
Number of simulation draws.
device : torch.device
Device to use for the simulation (``"cpu"``, ``"mps"``, or ``"cuda"``).
generator : torch.Generator
Explicit Torch generator for reproducible random sampling. This generator
is seeded from :class:`numpy.random.SeedSequence` to maintain the same
spawning semantics as the NumPy backend.
Returns
-------
torch.Tensor
A 1D tensor of length ``n`` containing simulation results. Use float32
for MPS compatibility; the framework promotes to float64 after moving
to CPU.
Raises
------
NotImplementedError
If the subclass does not implement this method.
Notes
-----
**RNG discipline.** All random sampling must use the provided ``generator``
explicitly. Never use global Torch RNG (``torch.manual_seed``).
**Dtype policy (device-specific):**
- **MPS (Apple Silicon):** Must return float32 (Metal doesn't support float64).
Framework promotes to float64 on CPU.
- **CUDA (NVIDIA):** Can return float32 or float64. Float64 preferred for zero
conversion overhead and full precision.
- **CPU:** Can return float32 or float64. Float64 preferred for consistency
with framework precision.
This method is optional and must be implemented by subclasses that support
the Torch backend. If not implemented, the framework will fall back to the
NumPy backend.
Examples
--------
>>> class PiSim(MonteCarloSimulation):
... supports_batch = True
... def torch_batch(self, n, *, device, generator):
... import torch
... x = torch.rand(n, device=device, generator=generator)
... y = torch.rand(n, device=device, generator=generator)
... inside = (x * x + y * y) <= 1.0
... return 4.0 * inside.float() # float32 for MPS compatibility
"""
raise NotImplementedError
[docs]
def cupy_batch(
self, n: int, *, device: "torch.device", rng: cupy.random.RandomState) -> "cupy.ndarray":
"""
Optional vectorized cuRAND implementation using CuPy.
Override this method in subclasses to enable GPU-accelerated batch execution.
When implemented alongside ``supports_batch = True``, the framework will use
this method instead of repeated ``single_simulation`` calls.
Parameters
----------
n : int
Number of simulation draws.
device : torch.device
Device to use for the simulation (``"cuda"``).
rng : cupy.random.RandomState
cuRAND generator for reproducible random sampling.
Returns
-------
cupy.ndarray
A 1D array of length ``n`` containing simulation results.
"""
raise NotImplementedError
[docs]
def set_seed(self, seed: int | None) -> None:
r"""
Set the random seed for reproducible experiments.
Parameters
----------
seed : int or None
Seed for :class:`numpy.random.SeedSequence`. :data:`None` chooses entropy
from the OS.
Notes
-----
The framework spawns independent child sequences per worker/chunk via
:meth:`numpy.random.SeedSequence.spawn`, ensuring deterministic parallel
streams given the same ``seed`` and block layout.
"""
self.seed_seq = np.random.SeedSequence(seed)
self.rng = np.random.default_rng(self.seed_seq)
# Valid backend values for execution (torch added for GPU-ready API)
_VALID_BACKENDS = ("auto", "sequential", "thread", "process", "torch")
def _validate_run_params(
self,
n_simulations: int,
n_workers: int | None,
confidence: float,
ci_method: str,
backend: str = "auto",
) -> None:
"""Validate parameters for run() method."""
if n_simulations <= 0:
raise ValueError("n_simulations must be positive")
if n_workers is not None and n_workers <= 0:
raise ValueError("n_workers must be positive")
if not 0.0 < confidence < 1.0:
raise ValueError("confidence must be in the interval (0, 1)")
if ci_method not in ("auto", "z", "t", "bootstrap"):
raise ValueError(f"ci_method must be one of 'auto', 'z', 't', 'bootstrap', got '{ci_method}'")
if backend not in self._VALID_BACKENDS:
raise ValueError(f"backend must be one of {self._VALID_BACKENDS}, got '{backend}'")
def _compute_stats_with_engine(
self,
results: np.ndarray,
n_simulations: int,
confidence: float,
ci_method: str,
stats_engine: StatsEngine | None,
extra_context: Mapping[str, Any] | None,
) -> tuple[dict[str, Any], dict[int, float]]:
"""
Compute statistics using the stats engine.
Returns
-------
tuple[dict[str, Any], dict[int, float]]
(stats dict, percentiles dict)
"""
eng = stats_engine or DEFAULT_ENGINE
if eng is None:
return {}, {}
engine_defaults = self._PCTS
# Convert string ci_method to enum
ci_method_enum = CIMethod(ci_method)
# Create StatsContext object
try:
ctx = StatsContext(
n=n_simulations,
percentiles=engine_defaults,
confidence=confidence,
ci_method=ci_method_enum,
**(dict(extra_context) if extra_context else {}),
)
except (TypeError, ValueError) as e:
logger.warning("Invalid context parameters: %s. Using defaults.", e)
ctx = StatsContext(
n=n_simulations,
percentiles=engine_defaults,
confidence=confidence,
ci_method=ci_method_enum,
)
# Compute stats
try:
result = eng.compute(results, ctx)
# Extract metrics from ComputeResult
stats = result.metrics if hasattr(result, "metrics") else {}
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Stats engine failed: %s", e)
stats = {}
# Merge engine stats with baseline (engine wins on collisions)
baseline = self._compute_stats_block(results, ctx)
merged_stats = dict(baseline)
merged_stats.update(stats if isinstance(stats, dict) else {})
stats = merged_stats
# Pull percentiles returned by the engine (if any)
engine_perc: dict[int, float] = {}
if isinstance(stats, dict) and "percentiles" in stats:
engine_perc = stats.pop("percentiles") or {}
percentile_map = {int(k): float(v) for k, v in engine_perc.items()}
return stats, percentile_map
def _handle_percentiles(
self,
results: np.ndarray,
percentiles: Iterable[int] | None,
compute_stats: bool,
percentile_map: dict[int, float],
) -> tuple[dict[int, float], list[int], bool]:
"""
Handle percentile computation and tracking.
Returns
-------
tuple[dict[int, float], list[int], bool]
(final percentile_map, requested_percentiles list, engine_defaults_used flag)
"""
user_percentiles_provided = percentiles is not None
user_pcts: tuple[int, ...] = tuple(int(p) for p in (percentiles or ()))
if not compute_stats:
# No stats engine: only compute user-requested percentiles
if not user_percentiles_provided:
final_map = {}
else:
final_map = self._percentiles(results, user_pcts) if user_pcts else {}
requested_percentiles = list(user_pcts) if user_percentiles_provided else []
return final_map, requested_percentiles, False
# If the user requested extra percentiles beyond engine defaults, compute & merge them
if user_pcts:
percentile_map.update(self._percentiles(results, user_pcts))
requested_percentiles = list(user_pcts)
return percentile_map, requested_percentiles, True
[docs]
def run( # pylint: disable=too-many-arguments
self,
n_simulations: int,
*,
backend: str = "auto",
torch_device: str = "cpu",
cuda_device_id: int = 0,
cuda_use_curand: bool = False,
cuda_batch_size: int | None = None,
cuda_use_streams: bool = True,
parallel: bool | None = None, # Deprecated, use backend instead
n_workers: int | None = None,
progress_callback: Callable[[int, int], None] | None = None,
percentiles: Iterable[int] | None = None,
compute_stats: bool = True,
stats_engine: StatsEngine | None = None,
confidence: float = 0.95,
ci_method: str = "auto",
extra_context: Mapping[str, Any] | None = None,
**simulation_kwargs: Any,
) -> "SimulationResult":
r"""
Run the Monte Carlo simulation.
Parameters
----------
n_simulations : int
Number of simulation draws.
backend : {"auto", "sequential", "thread", "process", "torch"}, default ``"auto"``
Execution backend to use:
- ``"auto"`` — Sequential for small jobs, parallel (thread/process) for large jobs
- ``"sequential"`` — Single-threaded execution
- ``"thread"`` — Thread-based parallelism (best when NumPy releases GIL)
- ``"process"`` — Process-based parallelism (required on Windows for true parallelism)
- ``"torch"`` — Torch batch execution (requires ``supports_batch = True``)
torch_device : {"cpu", "mps", "cuda"}, default ``"cpu"``
Torch device for ``backend="torch"``. Ignored for other backends.
- ``"cpu"`` — Safe default, works everywhere
- ``"mps"`` — Apple Metal Performance Shaders (M1/M2/M3 Macs)
- ``"cuda"`` — NVIDIA GPU acceleration
cuda_device_id : int, default 0
CUDA device index for multi-GPU systems. Only used when
``backend="torch"`` and ``torch_device="cuda"``.
cuda_use_curand : bool, default False
Use cuRAND (via CuPy) instead of torch.Generator for maximum
GPU performance. Requires CuPy and ``curand_batch()`` implementation.
cuda_batch_size : int or None, default None
Fixed batch size for CUDA execution. If None, automatically
estimates optimal batch size based on available GPU memory.
cuda_use_streams : bool, default True
Use CUDA streams for overlapped execution. Recommended for performance.
parallel : bool, optional
**Deprecated.** Use ``backend`` instead. If provided, ``parallel=True`` maps to
``backend="auto"`` with parallel preference, ``parallel=False`` maps to
``backend="sequential"``.
n_workers : int, optional
Worker count for parallel backends. Defaults to CPU count.
progress_callback : callable, optional
A function ``f(completed: int, total: int)`` called periodically.
percentiles : iterable of int, optional
Percentiles to compute from raw results. If ``None`` and
``compute_stats=True``, the stats engine's defaults (``_PCTS``)
are used; if ``compute_stats=False``, **no** percentiles are computed
unless explicitly provided.
compute_stats : bool, default ``True``
Compute additional metrics via a :class:`~mcframework.stats_engine.StatsEngine`.
stats_engine : StatsEngine, optional
Custom engine (defaults to ``mcframework.stats_engine.DEFAULT_ENGINE``).
confidence : float, default ``0.95``
Confidence level for CI-related metrics.
ci_method : {"auto","z","t"}, default ``"auto"``
Which critical values the stats engine should use.
extra_context : mapping, optional
Extra context forwarded to the stats engine.
**simulation_kwargs : Any
Keyword arguments forwarded to :meth:`single_simulation`.
Returns
-------
SimulationResult
See :class:`~mcframework.core.SimulationResult`.
Notes
-----
**MPS determinism caveat.** When using ``torch_device="mps"``, the framework
preserves RNG stream structure but does not guarantee bitwise reproducibility
due to Metal backend scheduling and float32 arithmetic. Statistical properties
(mean, variance, CI coverage) remain correct.
See Also
--------
:meth:`~mcframework.core.MonteCarloFramework.run_simulation` : Run a registered simulation by name.
"""
# Handle deprecated parallel parameter
if parallel is not None:
# Check if user also explicitly provided backend (not using default)
if backend != "auto":
# User provided both - warn and ignore the deprecated parameter
warnings.warn(
f"Both 'parallel' and 'backend' parameters provided. "
f"The deprecated 'parallel={parallel}' is ignored; using backend='{backend}'.",
DeprecationWarning,
stacklevel=2,
)
else:
# Only parallel provided - apply deprecated behavior with warning
warnings.warn(
"The 'parallel' parameter is deprecated. Use 'backend' instead: "
"backend='sequential' for parallel=False, "
"backend='thread' or 'process' for parallel=True.",
DeprecationWarning,
stacklevel=2,
)
if parallel:
# parallel=True -> let auto-resolution handle small-job fallback
backend = "auto"
else:
backend = "sequential"
# Validate parameters
self._validate_run_params(n_simulations, n_workers, confidence, ci_method, backend)
# Execute simulation using appropriate backend
t0 = time.perf_counter()
results = self._execute_with_backend(
backend, n_simulations, n_workers, progress_callback,
torch_device=torch_device,
cuda_device_id=cuda_device_id,
cuda_use_curand=cuda_use_curand,
cuda_batch_size=cuda_batch_size,
cuda_use_streams=cuda_use_streams,
**simulation_kwargs,
)
exec_time = time.perf_counter() - t0
# Compute stats and percentiles
stats: dict[str, Any] = {}
percentile_map: dict[int, float] = {}
if compute_stats:
stats, percentile_map = self._compute_stats_with_engine(
results, n_simulations, confidence, ci_method, stats_engine, extra_context
)
percentile_map, requested_percentiles, engine_defaults_used = self._handle_percentiles(
results, percentiles, compute_stats, percentile_map
)
return self._create_result(
results,
n_simulations,
exec_time,
percentile_map,
stats,
requested_percentiles,
engine_defaults_used,
)
def _resolve_backend_type(self, requested: str | None = None) -> str:
"""
Resolve the effective parallel backend type.
This method only resolves *parallel* backends (``"thread"`` or ``"process"``).
It does not handle ``"sequential"`` — that is handled by :meth:`_execute_with_backend`.
Parameters
----------
requested : str or None
Explicitly requested backend type. If None, uses ``self.backend``.
Returns
-------
str
Resolved parallel backend type: ``"thread"`` or ``"process"``.
Notes
-----
``"auto"`` maps to:
* ``"thread"`` on POSIX-like platforms where NumPy releases the GIL.
* ``"process"`` on Windows where threads tend to serialize under the GIL.
Invalid values fall back to ``"auto"`` and are then resolved.
"""
# Import from core for backward compatibility with tests that monkeypatch
# mcframework.core._is_windows_platform
from . import core as _core # pylint: disable=import-outside-toplevel
backend = requested or self.backend
if backend not in self._VALID_BACKENDS:
logger.warning(
"backend must be one of %s, got '%s'. Defaulting to 'auto'.",
self._VALID_BACKENDS,
backend,
)
backend = "auto"
if backend == "auto":
on_windows = _core._is_windows_platform() # pylint: disable=protected-access
resolved = "process" if on_windows else "thread"
if on_windows:
logger.info("Parallel backend 'auto' resolved to 'process' on Windows platform.")
return resolved
return backend
def _create_backend(
self, backend: str, n_workers: int | None
) -> SequentialBackend | ThreadBackend | ProcessBackend:
r"""
Create and instantiate the appropriate execution backend.
Parameters
----------
backend : str
Backend type: ``"sequential"``, ``"thread"``, or ``"process"``.
n_workers : int or None
Number of workers for parallel backends.
Returns
-------
SequentialBackend, ThreadBackend, or ProcessBackend
Configured backend instance.
"""
if backend == "sequential":
return SequentialBackend()
if backend == "torch":
# Torch backend is handled separately via _run_torch_batch
raise RuntimeError(
"Torch backend should be dispatched via _run_torch_batch, not _create_backend."
)
# Parallel backends need n_workers
if n_workers is None:
n_workers = mp.cpu_count() # pragma: no cover
if backend == "thread":
return ThreadBackend(n_workers=n_workers)
return ProcessBackend(n_workers=n_workers)
def _execute_with_backend(
self,
backend: str,
n_simulations: int,
n_workers: int | None,
progress_callback: Callable[[int, int], None] | None,
*,
torch_device: str = "cpu",
cuda_device_id: int = 0,
cuda_use_curand: bool = False,
cuda_batch_size: int | None = None,
cuda_use_streams: bool = True,
**simulation_kwargs: Any,
) -> np.ndarray:
r"""
Execute simulation draws using the specified backend.
Parameters
----------
backend : str
Backend type: ``"auto"``, ``"sequential"``, ``"thread"``, ``"process"``, or ``"torch"``.
n_simulations : int
Number of simulation draws.
n_workers : int or None
Number of workers for parallel backends.
progress_callback : callable or None
Progress reporting callback.
torch_device : str, default ``"cpu"``
Torch device type (``"cpu"``, ``"mps"``, ``"cuda"``). Only used for ``backend="torch"``.
cuda_device_id : int, default 0
CUDA device index. Only used when ``torch_device="cuda"``.
cuda_use_curand : bool, default False
Use cuRAND via CuPy. Only used when ``torch_device="cuda"``.
cuda_batch_size : int or None, default None
Fixed batch size for CUDA (None = adaptive). Only used when ``torch_device="cuda"``.
cuda_use_streams : bool, default True
Enable CUDA streams. Only used when ``torch_device="cuda"``.
**simulation_kwargs : Any
Arguments forwarded to ``single_simulation``.
Returns
-------
np.ndarray
Array of simulation results.
Notes
-----
For ``"auto"`` backend:
- Small jobs (< ``_PARALLEL_THRESHOLD``) use sequential execution
- Large jobs resolve to thread/process based on platform
For ``"torch"`` backend:
- Requires ``supports_batch = True`` and :meth:`torch_batch` implementation
- Ignores ``simulation_kwargs`` (batch method handles all parameters)
"""
# Early dispatch to Torch if explicitly requested
if backend == "torch":
from .backends import TorchBackend # pylint: disable=import-outside-toplevel
# Build device kwargs for CUDA
device_kwargs: dict[str, Any] = {}
if torch_device == "cuda":
device_kwargs = {
"device_id": cuda_device_id,
"use_curand": cuda_use_curand,
"batch_size": cuda_batch_size,
"use_streams": cuda_use_streams,
}
torch_backend = TorchBackend(device=torch_device, **device_kwargs)
return torch_backend.run(self, n_simulations, self.seed_seq, progress_callback)
# Resolve "auto" backend
if backend == "auto":
if n_workers is None:
n_workers = mp.cpu_count() # pragma: no cover
# Small job fallback to sequential
if n_workers <= 1 or n_simulations < self._PARALLEL_THRESHOLD:
backend = "sequential"
else:
backend = self._resolve_backend_type()
# Log execution info
if backend == "sequential":
logger.info("Computing %d simulations sequentially...", n_simulations)
else:
if n_workers is None:
n_workers = mp.cpu_count() # pragma: no cover
logger.info(
"Computing %d simulations in parallel using %s backend with %d workers...",
n_simulations, backend, n_workers
)
# Create and run with backend
backend_instance = self._create_backend(backend, n_workers)
return backend_instance.run(
self, n_simulations, self.seed_seq, progress_callback, **simulation_kwargs
)
# Backward compatibility aliases
def _resolve_parallel_backend(self, requested: str | None = None) -> str:
"""Deprecated: Use _resolve_backend_type instead."""
return self._resolve_backend_type(requested)
def _create_parallel_backend(self, n_workers: int) -> ThreadBackend | ProcessBackend:
"""Deprecated: Use _create_backend instead."""
backend_type = self._resolve_backend_type()
if backend_type == "thread":
return ThreadBackend(n_workers=n_workers)
return ProcessBackend(n_workers=n_workers)
@staticmethod
def _percentiles(arr: np.ndarray, ps: Iterable[int]) -> dict[int, float]:
"""Return a ``{percentile: value}`` map computed via :func:`numpy.percentile`."""
return {int(p): float(np.percentile(arr, int(p))) for p in ps}
@staticmethod
def _compute_stats_block(results: np.ndarray, ctx) -> dict[str, object]:
"""
Build the stats dict expected by tests:
- 'mean': float
- 'std' : float
- 'ci_mean' : (low, high)
"""
ctx = _ensure_ctx(ctx, results)
results = np.asarray(results, dtype=float).ravel()
if results.size == 0:
return {"mean": float("nan"), "std": float("nan"), "ci_mean": (float("nan"), float("nan"))}
m = mean(results, ctx)
s = std(results, ctx)
ci = ci_mean(results, ctx)
return {
"mean": float(m) if m is not None else float("nan"),
"std": float(s) if s is not None else float("nan"),
"ci_mean": (float(ci["low"]), float(ci["high"])),
"confidence": float(ci["confidence"]),
"method": ci["method"],
"se": float(ci["se"]),
"crit": float(ci["crit"]),
}
@staticmethod
def _compute_percentiles_block(results: np.ndarray, ctx) -> dict[float, float]:
"""
Build the percentiles dict from whatever is requested in ctx.
Accepts either ctx.percentiles or ctx.requested_percentiles.
Returns {q: value} with q as float (e.g., 5.0, 50.0, 95.0).
"""
ctx = _ensure_ctx(ctx, results)
results = np.asarray(results, dtype=float).ravel()
req = getattr(ctx, "percentiles", None) or getattr(ctx, "requested_percentiles", None) or []
req = list(req)
if not req:
return {}
vals = pct(results, ctx) # aligned to req
if isinstance(vals, Mapping):
return {float(q): float(vals[q]) for q in req}
vals_arr = np.asarray(vals, dtype=float).ravel()
if vals_arr.size != len(req):
msg = "pct() must return as many values as requested percentiles"
raise ValueError(msg)
return {float(q): float(v) for q, v in zip(req, vals_arr)}
def _create_result(
self,
results: np.ndarray,
n_simulations: int,
execution_time: float,
percentiles: dict[int, float],
stats: dict[str, Any],
requested_percentiles: list[int],
engine_defaults_used: bool,
) -> "SimulationResult":
r"""
Assemble a :class:`SimulationResult` and merge any stats-engine percentiles.
Notes
-----
Preserves the user's requested percentiles in
``metadata["requested_percentiles"]`` and whether engine defaults were used
in ``metadata["engine_defaults_used"]``.
"""
# Import here to avoid circular dependency
from .core import SimulationResult # pylint: disable=import-outside-toplevel
mean_val = float(np.mean(results))
std_sample = float(np.std(results, ddof=1)) if results.size > 1 else 0.0
stats = dict(stats) if stats else {}
# If the stats engine also returned percentiles merge once.
if "percentiles" in stats:
stats_percentiles = stats.pop("percentiles")
for k, v in stats_percentiles.items():
percentiles.setdefault(int(k), float(v))
# Gather metadata and include user-requested percentiles
meta = {
"simulation_name": self.name,
"timestamp": time.time(),
"n": n_simulations,
"seed_entropy": self.seed_seq.entropy if self.seed_seq else None,
"requested_percentiles": requested_percentiles,
"engine_defaults_used": engine_defaults_used,
}
return SimulationResult(
results=results,
n_simulations=n_simulations,
execution_time=execution_time,
mean=mean_val,
std=std_sample,
percentiles=percentiles,
stats=stats,
metadata=meta,
)