Source code for mcframework.backends.torch_cuda

r"""
Torch CUDA backend for NVIDIA GPU acceleration.

This module provides:

Classes
    :class:`TorchCUDABackend` — GPU-accelerated batch execution on NVIDIA GPUs

Functions
    :func:`is_cuda_available` — Check CUDA availability
    :func:`validate_cuda_device` — Validate CUDA is usable

Features
--------
**Adaptive Batch Sizing**: Automatically estimates optimal batch size based on
available GPU memory to prevent OOM errors while maximizing throughput.

**Dual RNG Modes**:
- ``torch.Generator`` (default) — PyTorch's Philox RNG, fully deterministic
- ``cuRAND`` (optional) — Native GPU RNG via CuPy, maximum performance

**CUDA Optimizations**:
- CUDA streams for overlapped execution
- Native float64 support (zero conversion overhead vs MPS)
- Efficient memory management via PyTorch's caching allocator

**Defensive Validation**: Comprehensive checks for ``supports_batch`` attribute
and required batch methods before execution.

Notes
-----
**Native float64 support**: Unlike MPS (Apple Silicon), CUDA fully supports
float64 tensors. The backend intelligently handles both float32 and float64,
promoting to float64 only when necessary.

**Batch size estimation**: Uses a probe run to estimate per-sample memory
requirements, then calculates optimal batch size to use ~75% of available
GPU memory.

Examples
--------
>>> # Simple usage with defaults
>>> if is_cuda_available():
...     sim.run(1_000_000, backend="torch", torch_device="cuda")  # doctest: +SKIP

>>> # Advanced: Direct backend construction with custom settings
>>> if is_cuda_available():
...     from mcframework.backends import TorchCUDABackend
...     backend = TorchCUDABackend(device_id=0, batch_size=100_000, use_streams=True)
...     results = backend.run(sim, n_simulations=10_000_000, seed_seq=sim.seed_seq)
... # doctest: +SKIP
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Callable

import numpy as np

from .torch_base import import_torch, make_torch_generator

if TYPE_CHECKING:
    import torch

    from ..simulation import MonteCarloSimulation

logger = logging.getLogger(__name__)

__all__ = [
    "TorchCUDABackend",
    "is_cuda_available",
    "validate_cuda_device",
]


[docs] def is_cuda_available() -> bool: """ Check if CUDA is available. Returns ------- bool True if CUDA is available and PyTorch was built with CUDA support. Examples -------- >>> if is_cuda_available(): ... backend = TorchCUDABackend() # doctest: +SKIP """ try: th = import_torch() return th.cuda.is_available() except ImportError: return False
[docs] def validate_cuda_device(device_id: int = 0) -> None: """ Validate that CUDA device is available and usable. Parameters ---------- device_id : int, default 0 CUDA device index to validate. Raises ------ ImportError If PyTorch is not installed. RuntimeError If CUDA is not available or device index is invalid. Examples -------- >>> validate_cuda_device() # doctest: +SKIP >>> validate_cuda_device(device_id=1) # Check second GPU # doctest: +SKIP """ th = import_torch() if not th.cuda.is_available(): raise RuntimeError( "CUDA device requested but not available. " "Ensure NVIDIA drivers and CUDA toolkit are installed." ) device_count = th.cuda.device_count() if device_id >= device_count: raise RuntimeError( f"CUDA device {device_id} requested but only {device_count} device(s) available." )
[docs] class TorchCUDABackend: r""" Torch CUDA batch execution backend for NVIDIA GPUs. CUDA backend with adaptive batch sizing, dual RNG modes, and native float64 support. Requires simulations to implement :meth:`~mcframework.core.MonteCarloSimulation.torch_batch` (or :meth:`~mcframework.core.MonteCarloSimulation.cupy_batch` for cuRAND mode) and set ``supports_batch = True``. Parameters ---------- device_id : int, default 0 CUDA device index to use. Use :func:`torch.cuda.device_count` to check available devices. use_curand : bool, default False Use cuRAND (via CuPy) instead of torch.Generator for RNG. Requires CuPy installation and simulation to implement :meth:`~mcframework.core.MonteCarloSimulation.cupy_batch`. batch_size : int or None, default None Fixed batch size for simulation execution. If None, automatically estimates optimal batch size based on available GPU memory. use_streams : bool, default True Use CUDA streams for overlapped execution. Recommended for performance. Attributes ---------- device_type : str Always ``"cuda"``. device : torch.device CUDA device object for this backend. device_id : int CUDA device index. use_curand : bool Whether cuRAND mode is enabled. batch_size : int or None Fixed batch size, or None for adaptive. use_streams : bool Whether CUDA streams are enabled. Notes ----- **RNG architecture**: Uses explicit generators seeded from :class:`numpy.random.SeedSequence` via ``spawn()``. Never uses global RNG state (:func:`torch.manual_seed` or :meth:`cupy.random.RandomState.seed`). **Adaptive batching**: When ``batch_size=None``, performs a probe run with 1000 samples to estimate memory requirements, then calculates optimal batch size to use ~75% of available GPU memory. **Native float64**: CUDA fully supports float64 tensors. If simulation's :meth:`~mcframework.core.MonteCarloSimulation.torch_batch` or :meth:`~mcframework.core.MonteCarloSimulation.cupy_batch` returns float64, the backend uses it directly with zero conversion overhead. If float32, it converts to float64 on GPU before moving to CPU for stats engine compatibility. **CUDA streams**: When ``use_streams=True``, executes each batch in a dedicated stream for better GPU utilization and overlapped execution. Examples -------- >>> # Default configuration (adaptive batching, torch.Generator) >>> if is_cuda_available(): ... backend = TorchCUDABackend(device_id=0) ... results = backend.run(sim, n_simulations=1_000_000, seed_seq=seed_seq) ... # doctest: +SKIP >>> # High-performance configuration (fixed batching, CuPy) >>> if is_cuda_available(): ... backend = TorchCUDABackend( ... device_id=0, ... use_curand=True, ... batch_size=100_000, ... use_streams=True ... ) ... results = backend.run(sim, n_simulations=10_000_000, seed_seq=seed_seq) ... # doctest: +SKIP See Also -------- :func:`is_cuda_available` : Check CUDA availability before instantiation. :class:`TorchMPSBackend` : Apple Silicon alternative. :class:`TorchCPUBackend` : CPU fallback. """ device_type: str = "cuda"
[docs] def __init__( self, device_id: int = 0, use_curand: bool = False, batch_size: int | None = None, use_streams: bool = True, ): """ Initialize Torch CUDA backend with specified configuration. Parameters ---------- device_id : int, default 0 CUDA device index to use. use_curand : bool, default False Use cuRAND via CuPy instead of torch.Generator. batch_size : int or None, default None Fixed batch size (None = adaptive). use_streams : bool, default True Enable CUDA streams for overlapped execution. Raises ------ ImportError If PyTorch is not installed, or if CuPy is required but not installed. RuntimeError If CUDA is not available or device index is invalid. """ validate_cuda_device(device_id) th = import_torch() self.device_id = device_id self.device = th.device(f"cuda:{device_id}") self.use_curand = use_curand self.batch_size = batch_size self.use_streams = use_streams # Validate CuPy if cuRAND mode requested if use_curand: try: import cupy as cp # noqa: F401 # pylint: disable=import-outside-toplevel,import-error,unused-import except ImportError as e: raise ImportError( "cuRAND mode requires CuPy. Install with: pip install mcframework[cuda]" ) from e # Log device info device_name = th.cuda.get_device_name(device_id) logger.info( "Initialized CUDA backend on device %d: %s (cuRAND=%s, batch_size=%s, streams=%s)", device_id, device_name, use_curand, batch_size or "adaptive", use_streams )
def _validate_simulation_compatible( self, sim: "MonteCarloSimulation", ) -> None: """ Validate that simulation supports batch execution with required methods. This is the defensive validation layer (Priority 0) that ensures: 1. Simulation has supports_batch attribute 2. supports_batch is explicitly True 3. Required batch method exists (torch_batch or curand_batch) Parameters ---------- sim : MonteCarloSimulation Simulation instance to validate. Raises ------ AttributeError If simulation class is missing 'supports_batch' attribute. ValueError If simulation has supports_batch = False. NotImplementedError If required batch method is not implemented. """ # Check 1: supports_batch attribute exists if not hasattr(sim, 'supports_batch'): raise AttributeError( f"Simulation class '{sim.__class__.__name__}' is missing 'supports_batch' attribute. " f"GPU execution requires explicit declaration to prevent accidental usage. " f"To enable CUDA acceleration, add 'supports_batch = True' as a class attribute " f"and implement either torch_batch() or curand_batch() method." ) # Check 2: supports_batch is explicitly True if not sim.supports_batch: raise ValueError( f"Simulation '{sim.name}' does not support Torch batch execution. " f"Class '{sim.__class__.__name__}' has supports_batch = False. " f"Set supports_batch = True and implement torch_batch() or curand_batch() " f"to enable CUDA acceleration." ) # Check 3: Required batch method exists and is overridden if self.use_curand: # cuRAND mode requires curand_batch method if not hasattr(sim, 'curand_batch') or not callable(getattr(sim, 'curand_batch')): raise NotImplementedError( f"Simulation '{sim.__class__.__name__}' requested cuRAND mode " f"but does not implement curand_batch() method. " f"Either implement curand_batch(n, device_id, rng) or use default " f"torch.Generator mode (use_curand=False)." ) else: # Default mode requires torch_batch method to be overridden # Check if torch_batch is overridden from base class # pylint: disable-next=import-outside-toplevel from ..simulation import MonteCarloSimulation base_torch_batch = MonteCarloSimulation.torch_batch sim_torch_batch = sim.__class__.torch_batch if sim_torch_batch is base_torch_batch: # Method is not overridden - using base class stub raise NotImplementedError( f"Simulation '{sim.__class__.__name__}' has supports_batch = True " f"but does not implement torch_batch() method. " f"Either implement torch_batch(n, device, generator) or set " f"use_curand=True with curand_batch() implementation." ) def _estimate_available_memory(self) -> tuple[int, int]: """ Query available GPU memory on the target device. Returns ------- tuple[int, int] (free_memory_bytes, total_memory_bytes) """ th = import_torch() free_mem, total_mem = th.cuda.mem_get_info(self.device_id) return free_mem, total_mem def _estimate_batch_size( self, sim: "MonteCarloSimulation", n_simulations: int, seed_seq: np.random.SeedSequence | None, ) -> int: """ Estimate optimal batch size based on available GPU memory. Performs a probe run with 1000 samples to estimate per-sample memory requirements, then calculates batch size to use ~75% of available memory. Parameters ---------- sim : MonteCarloSimulation Simulation instance to profile. n_simulations : int Total number of simulations requested. seed_seq : SeedSequence or None Seed sequence for probe run. Returns ------- int Estimated optimal batch size, clamped to [1000, n_simulations]. """ th = import_torch() # Query available memory free_mem, total_mem = self._estimate_available_memory() logger.debug( "CUDA memory: %.2f GB free / %.2f GB total", free_mem / 1e9, total_mem / 1e9 ) # Perform probe run with small batch probe_size = min(1000, n_simulations) th.cuda.reset_peak_memory_stats(self.device_id) mem_before = th.cuda.memory_allocated(self.device_id) try: if self.use_curand: # cuRAND probe # Note: _make_curand_generator will be added to torch_base.py # pylint: disable=import-outside-toplevel,import-error import cupy as cp from .torch_base import _make_curand_generator # noqa: F401 cp.cuda.Device(self.device_id).use() child_seed = seed_seq.spawn(1)[0] if seed_seq else None if child_seed: seed_int = int(child_seed.generate_state(1, dtype=np.uint64)[0]) rng = cp.random.RandomState(seed=seed_int) else: rng = cp.random.RandomState() _ = sim.curand_batch(probe_size, self.device_id, rng) cp.cuda.Device(self.device_id).synchronize() else: # torch.Generator probe generator = make_torch_generator(self.device, seed_seq) _ = sim.torch_batch(probe_size, device=self.device, generator=generator) th.cuda.synchronize(self.device_id) mem_after = th.cuda.memory_allocated(self.device_id) mem_used = mem_after - mem_before per_sample_mem = mem_used / probe_size except Exception as e: # pylint: disable=broad-exception-caught logger.warning("Probe run failed: %s. Using conservative batch size.", e) # Conservative fallback per_sample_mem = 1024 # 1KB per sample estimate # Calculate batch size to use 75% of available memory # Reserve 20% for PyTorch overhead and other allocations usable_mem = free_mem * 0.75 estimated_batch_size = int(usable_mem / per_sample_mem) if per_sample_mem > 0 else 10_000 # Clamp to reasonable range batch_size = max(1000, min(estimated_batch_size, n_simulations)) logger.info( "Estimated batch size: %d (%.2f MB per sample, %.2f GB available)", batch_size, per_sample_mem / 1e6, usable_mem / 1e9 ) return batch_size def _run_single_batch( self, sim: "MonteCarloSimulation", batch_size: int, seed_seq: np.random.SeedSequence | None, ) -> "torch.Tensor": """ Execute a single batch of simulations. Parameters ---------- sim : MonteCarloSimulation Simulation instance to run. batch_size : int Number of simulations in this batch. seed_seq : SeedSequence or None Seed sequence for this batch. Returns ------- torch.Tensor Batch results as float64 tensor on CPU. """ th = import_torch() if self.use_curand: # cuRAND path import cupy as cp # pylint: disable=import-outside-toplevel,import-error cp.cuda.Device(self.device_id).use() # Create cuRAND generator from SeedSequence if seed_seq: child_seed = seed_seq.spawn(1)[0] seed_int = int(child_seed.generate_state(1, dtype=np.uint64)[0]) rng = cp.random.RandomState(seed=seed_int) else: rng = cp.random.RandomState() if self.use_streams: # Execute in dedicated stream stream = cp.cuda.Stream() with stream: samples_cp = sim.curand_batch(batch_size, self.device_id, rng) stream.synchronize() else: samples_cp = sim.curand_batch(batch_size, self.device_id, rng) cp.cuda.Device(self.device_id).synchronize() # Convert CuPy array to PyTorch tensor samples = th.as_tensor(samples_cp, device=self.device) else: # torch.Generator path (default) generator = make_torch_generator(self.device, seed_seq) if self.use_streams: # Execute in dedicated stream stream = th.cuda.Stream(device=self.device) with th.cuda.stream(stream): samples = sim.torch_batch(batch_size, device=self.device, generator=generator) stream.synchronize() else: samples = sim.torch_batch(batch_size, device=self.device, generator=generator) th.cuda.synchronize(self.device_id) # CUDA-specific dtype handling (native float64 support) if samples.dtype == th.float64: # Already float64 - just move to CPU (ZERO conversion overhead) samples = samples.detach().cpu() logger.debug("CUDA backend: Using native float64 (optimal path)") else: # float32 from simulation - convert on GPU before CPU transfer samples = samples.detach().to(th.float64).cpu() logger.debug("CUDA backend: Converting float32 → float64 on GPU") return samples
[docs] def run( self, sim: "MonteCarloSimulation", n_simulations: int, seed_seq: np.random.SeedSequence | None, progress_callback: Callable[[int, int], None] | None = None, **_simulation_kwargs: Any, ) -> np.ndarray: r""" Run simulations using Torch CUDA batch execution with adaptive batching. Parameters ---------- sim : MonteCarloSimulation The simulation instance to run. Must have ``supports_batch = True`` and implement :meth:`torch_batch` (or :meth:`curand_batch` for cuRAND mode). n_simulations : int Number of simulation draws to perform. seed_seq : SeedSequence or None Seed sequence for reproducible random streams. progress_callback : callable or None Optional callback ``f(completed, total)`` for progress reporting. **_simulation_kwargs : Any Ignored for Torch backend (batch method handles all parameters). Returns ------- np.ndarray Array of simulation results with shape ``(n_simulations,)``. Results are float64 regardless of internal tensor dtype. Raises ------ AttributeError If simulation class is missing 'supports_batch' attribute. ValueError If the simulation does not support batch execution. NotImplementedError If the simulation does not implement required batch method. RuntimeError If CUDA out-of-memory error occurs during execution. Notes ----- **Adaptive batching**: When ``batch_size=None`` (default), automatically estimates optimal batch size. Large workloads are split across multiple batches with progress tracking. **Memory safety**: Monitors GPU memory and adjusts batch size to prevent OOM errors. Uses PyTorch's caching allocator for efficient memory reuse. **Determinism**: With same seed, produces identical results (bitwise for torch.Generator, statistical for cuRAND). """ th = import_torch() # Priority 0: Defensive validation self._validate_simulation_compatible(sim) # Determine batch size (adaptive or fixed) if self.batch_size is None: batch_size = self._estimate_batch_size(sim, n_simulations, seed_seq) else: batch_size = min(self.batch_size, n_simulations) logger.info( "Computing %d simulations using CUDA backend (device %d, batch_size=%d, cuRAND=%s)...", n_simulations, self.device_id, batch_size, self.use_curand ) # Single batch: fast path if n_simulations <= batch_size: samples = self._run_single_batch(sim, n_simulations, seed_seq) if progress_callback: progress_callback(n_simulations, n_simulations) return samples.numpy() # Multiple batches: adaptive execution with progress tracking results_list = [] completed = 0 # Spawn seed sequences for each batch if seed_seq: n_batches = (n_simulations + batch_size - 1) // batch_size batch_seeds = seed_seq.spawn(n_batches) else: batch_seeds = [None] * ((n_simulations + batch_size - 1) // batch_size) for batch_idx, batch_seed in enumerate(batch_seeds): remaining = n_simulations - completed current_batch_size = min(batch_size, remaining) try: # Execute batch batch_samples = self._run_single_batch(sim, current_batch_size, batch_seed) results_list.append(batch_samples) completed += current_batch_size # Progress callback if progress_callback: progress_callback(completed, n_simulations) # Optional: Clear cache between large batches if batch_idx % 10 == 0 and batch_idx > 0: th.cuda.empty_cache() except RuntimeError as e: if "out of memory" in str(e).lower(): # OOM error - try to recover with smaller batch logger.error( "CUDA OOM error at batch %d. Try reducing batch_size or using adaptive batching.", batch_idx ) th.cuda.empty_cache() raise # Concatenate all batches all_samples = th.cat(results_list, dim=0) # Log memory statistics max_mem = th.cuda.max_memory_allocated(self.device_id) logger.info( "CUDA execution complete. Peak memory: %.2f GB", max_mem / 1e9 ) return all_samples.numpy()