Source code for holovec.backends.numpy_backend

"""NumPy backend implementation for holovec operations.

This is the default backend and requires only NumPy as a dependency.
"""

from __future__ import annotations

from typing import Optional, Sequence, Tuple, Union

import numpy as np

from .base import Array, Backend


[docs] class NumPyBackend(Backend): """NumPy-based backend for VSA operations. This backend uses pure NumPy for all operations and serves as the default backend with minimal dependencies. """
[docs] def __init__(self, seed: Optional[int] = None): """Initialize NumPy backend with optional random seed. Args: seed: Random seed for reproducibility """ self._rng = np.random.default_rng(seed)
@property def name(self) -> str: return "numpy"
[docs] def is_available(self) -> bool: return True # NumPy is always available
# ===== Capability Probes =====
[docs] def supports_complex(self) -> bool: """NumPy fully supports complex number operations.""" return True
[docs] def supports_sparse(self) -> bool: """NumPy does not have native sparse array support. Note: scipy.sparse provides sparse arrays but is not part of core NumPy. """ return False
[docs] def supports_gpu(self) -> bool: """NumPy is CPU-only.""" return False
[docs] def supports_jit(self) -> bool: """NumPy does not have JIT compilation. Note: Numba can JIT-compile NumPy code but is a separate library. """ return False
[docs] def supports_device(self, device: str) -> bool: """NumPy only supports CPU.""" return device.lower() in ('cpu', 'cpu:0')
# ===== Array Creation =====
[docs] def zeros(self, shape: Union[int, Tuple[int, ...]], dtype: str = 'float32') -> Array: shape = (shape,) if isinstance(shape, int) else shape return np.zeros(shape, dtype=dtype)
[docs] def ones(self, shape: Union[int, Tuple[int, ...]], dtype: str = 'float32') -> Array: shape = (shape,) if isinstance(shape, int) else shape return np.ones(shape, dtype=dtype)
[docs] def random_normal( self, shape: Union[int, Tuple[int, ...]], mean: float = 0.0, std: float = 1.0, dtype: str = 'float32', seed: Optional[int] = None ) -> Array: shape = (shape,) if isinstance(shape, int) else shape rng = np.random.default_rng(seed) if seed is not None else self._rng return rng.normal(mean, std, shape).astype(dtype)
[docs] def random_uniform( self, shape: Union[int, Tuple[int, ...]], low: float = 0.0, high: float = 1.0, dtype: str = 'float32', seed: Optional[int] = None ) -> Array: shape = (shape,) if isinstance(shape, int) else shape rng = np.random.default_rng(seed) if seed is not None else self._rng return rng.uniform(low, high, shape).astype(dtype)
[docs] def random_binary( self, shape: Union[int, Tuple[int, ...]], p: float = 0.5, dtype: str = 'int32', seed: Optional[int] = None ) -> Array: shape = (shape,) if isinstance(shape, int) else shape rng = np.random.default_rng(seed) if seed is not None else self._rng return rng.binomial(1, p, shape).astype(dtype)
[docs] def random_bipolar( self, shape: Union[int, Tuple[int, ...]], p: float = 0.5, dtype: str = 'float32', seed: Optional[int] = None ) -> Array: shape = (shape,) if isinstance(shape, int) else shape rng = np.random.default_rng(seed) if seed is not None else self._rng binary = rng.binomial(1, p, shape) return (2 * binary - 1).astype(dtype)
[docs] def random_phasor( self, shape: Union[int, Tuple[int, ...]], dtype: str = 'complex64', seed: Optional[int] = None ) -> Array: shape = (shape,) if isinstance(shape, int) else shape rng = np.random.default_rng(seed) if seed is not None else self._rng # If last two dims are square, generate diagonal phasor matrices if isinstance(shape, tuple) and len(shape) >= 2 and shape[-1] == shape[-2]: batch = shape[:-2] m = shape[-1] # Angles for diagonal entries angles = rng.uniform(0, 2 * np.pi, batch + (m,)) diag = np.exp(1j * angles).astype(dtype) out = np.zeros(batch + (m, m), dtype=dtype) # Fill diagonal efficiently by broadcasting idx = np.arange(m) out[..., idx, idx] = diag return out # Otherwise element-wise phasors angles = rng.uniform(0, 2 * np.pi, shape) return np.exp(1j * angles).astype(dtype)
[docs] def array(self, data, dtype: Optional[str] = None) -> Array: return np.array(data, dtype=dtype)
# ===== Element-wise Operations =====
[docs] def multiply(self, a: Array, b: Array) -> Array: return np.multiply(a, b)
[docs] def add(self, a: Array, b: Array) -> Array: return np.add(a, b)
[docs] def subtract(self, a: Array, b: Array) -> Array: return np.subtract(a, b)
[docs] def divide(self, a: Array, b: Array) -> Array: return np.divide(a, b)
[docs] def xor(self, a: Array, b: Array) -> Array: return np.bitwise_xor(a.astype(np.int32), b.astype(np.int32))
[docs] def conjugate(self, a: Array) -> Array: return np.conjugate(a)
# ===== Reductions =====
[docs] def sum(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array: return np.sum(a, axis=axis, keepdims=keepdims)
[docs] def mean(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array: return np.mean(a, axis=axis, keepdims=keepdims)
[docs] def norm(self, a: Array, ord: Union[int, str] = 2, axis: Optional[int] = None) -> Array: return np.linalg.norm(a, ord=ord, axis=axis)
[docs] def dot(self, a: Array, b: Array) -> Array: return np.dot(a, b)
[docs] def max(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array: return np.max(a, axis=axis, keepdims=keepdims)
[docs] def min(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array: return np.min(a, axis=axis, keepdims=keepdims)
[docs] def argmax(self, a: Array, axis: Optional[int] = None) -> Array: return np.argmax(a, axis=axis)
[docs] def argmin(self, a: Array, axis: Optional[int] = None) -> Array: return np.argmin(a, axis=axis)
# ===== Normalization =====
[docs] def normalize(self, a: Array, ord: Union[int, str] = 2, axis: Optional[int] = None, eps: float = 1e-12) -> Array: norm = np.linalg.norm(a, ord=ord, axis=axis, keepdims=True) return a / (norm + eps)
[docs] def softmax(self, a: Array, axis: int = -1) -> Array: """Softmax with numerical stability via max subtraction.""" # Subtract max for numerical stability a_max = np.max(a, axis=axis, keepdims=True) exp_a = np.exp(a - a_max) return exp_a / np.sum(exp_a, axis=axis, keepdims=True)
# ===== FFT Operations =====
[docs] def fft(self, a: Array) -> Array: return np.fft.fft(a)
[docs] def ifft(self, a: Array) -> Array: return np.fft.ifft(a)
# ===== Circular Operations =====
[docs] def circular_convolve(self, a: Array, b: Array) -> Array: """Circular convolution using real FFT for numerical stability.""" n = a.shape[-1] fa = np.fft.rfft(a) fb = np.fft.rfft(b) return np.fft.irfft(fa * fb, n=n)
[docs] def circular_correlate(self, a: Array, b: Array) -> Array: """Circular correlation using real FFT for numerical stability.""" n = a.shape[-1] fa = np.fft.rfft(a) fb = np.fft.rfft(b) return np.fft.irfft(fa * np.conj(fb), n=n)
# ===== Permutations =====
[docs] def permute(self, a: Array, indices: Array) -> Array: return a[indices]
[docs] def roll(self, a: Array, shift: int, axis: Optional[int] = None) -> Array: return np.roll(a, shift, axis=axis)
# ===== Similarity Measures =====
[docs] def cosine_similarity(self, a: Array, b: Array) -> float: # Fast path: identical arrays try: if a is b or np.array_equal(a, b): return 1.0 except Exception: pass na = np.linalg.norm(a) nb = np.linalg.norm(b) prod = na * nb if prod == 0.0: return 0.0 val = float(np.dot(a, b) / prod) # Clamp to [-1, 1] to avoid slight numeric overshoot if val > 1.0: return 1.0 if val < -1.0: return -1.0 return val
[docs] def hamming_distance(self, a: Array, b: Array) -> float: """Hamming distance for binary/bipolar vectors.""" return float(np.sum(a != b))
[docs] def euclidean_distance(self, a: Array, b: Array) -> float: return float(np.linalg.norm(a - b))
# ===== Utilities =====
[docs] def shape(self, a: Array) -> Tuple[int, ...]: return a.shape
[docs] def dtype(self, a: Array) -> str: return str(a.dtype)
[docs] def to_numpy(self, a: Array) -> np.ndarray: return a # Already NumPy
[docs] def from_numpy(self, a: np.ndarray) -> Array: return a # Already NumPy
[docs] def clip(self, a: Array, min_val: float, max_val: float) -> Array: return np.clip(a, min_val, max_val)
[docs] def abs(self, a: Array) -> Array: return np.abs(a)
[docs] def sign(self, a: Array) -> Array: return np.sign(a)
[docs] def threshold(self, a: Array, threshold: float, above: float = 1.0, below: float = 0.0) -> Array: return np.where(a >= threshold, above, below)
[docs] def where(self, condition: Array, x: Array, y: Array) -> Array: return np.where(condition, x, y)
[docs] def stack(self, arrays: Sequence[Array], axis: int = 0) -> Array: return np.stack(arrays, axis=axis)
[docs] def concatenate(self, arrays: Sequence[Array], axis: int = 0) -> Array: return np.concatenate(arrays, axis=axis)
# ===== Matrix Operations =====
[docs] def matmul(self, a: Array, b: Array) -> Array: """Matrix multiplication or batched matrix multiplication.""" return np.matmul(a, b)
[docs] def matrix_transpose(self, a: Array) -> Array: """Transpose last two dimensions.""" # For 2D: simple transpose if len(a.shape) == 2: return np.transpose(a) # For 3D+: transpose last two dims axes = list(range(len(a.shape))) axes[-2], axes[-1] = axes[-1], axes[-2] return np.transpose(a, axes=axes)
[docs] def matrix_trace(self, a: Array) -> Array: """Compute trace of each matrix.""" # For 2D: standard trace if len(a.shape) == 2: return np.trace(a) # For 3D+: trace along last two dims return np.trace(a, axis1=-2, axis2=-1)
[docs] def svd(self, a: Array, full_matrices: bool = True) -> Tuple[Array, Array, Array]: """Compute Singular Value Decomposition. For batched matrices (3D+), computes SVD for each matrix in the batch. """ # NumPy's svd doesn't handle batched operations natively in older versions # Handle both 2D and batched cases if len(a.shape) == 2: # Simple 2D case return np.linalg.svd(a, full_matrices=full_matrices) else: # Batched case: process each matrix individually batch_shape = a.shape[:-2] m, n = a.shape[-2:] # Determine output shapes if full_matrices: u_shape = batch_shape + (m, m) vh_shape = batch_shape + (n, n) else: k = min(m, n) u_shape = batch_shape + (m, k) vh_shape = batch_shape + (k, n) s_shape = batch_shape + (min(m, n),) # Pre-allocate output arrays U_batch = np.zeros(u_shape, dtype=a.dtype) S_batch = np.zeros(s_shape, dtype=a.real.dtype) Vh_batch = np.zeros(vh_shape, dtype=a.dtype) # Flatten batch dimensions for iteration a_reshaped = a.reshape(-1, m, n) U_reshaped = U_batch.reshape(-1, *U_batch.shape[-2:]) S_reshaped = S_batch.reshape(-1, S_batch.shape[-1]) Vh_reshaped = Vh_batch.reshape(-1, *Vh_batch.shape[-2:]) # Compute SVD for each matrix for i in range(a_reshaped.shape[0]): U, S, Vh = np.linalg.svd(a_reshaped[i], full_matrices=full_matrices) U_reshaped[i] = U S_reshaped[i] = S Vh_reshaped[i] = Vh return U_batch, S_batch, Vh_batch
[docs] def reshape(self, a: Array, shape: Tuple[int, ...]) -> Array: """Reshape array.""" return np.reshape(a, shape)
# ===== Additional Operations for Encoders =====
[docs] def power(self, a: Array, exponent: float) -> Array: """Component-wise power: a^exponent.""" return np.power(a, exponent)
[docs] def angle(self, a: Array) -> Array: """Angle (phase) of complex numbers.""" return np.angle(a)
[docs] def linspace(self, start: float, stop: float, num: int) -> Array: """Create linearly spaced array.""" return np.linspace(start, stop, num)
[docs] def multiply_scalar(self, a: Array, scalar: float) -> Array: """Multiply array by scalar.""" return a * scalar
[docs] def exp(self, a: Array) -> Array: """Element-wise exponential.""" return np.exp(a)
[docs] def log(self, a: Array) -> Array: """Element-wise natural logarithm.""" return np.log(a)
[docs] def real(self, a: Array) -> Array: """Real part of complex array.""" return np.real(a)
[docs] def imag(self, a: Array) -> Array: """Imaginary part of complex array.""" return np.imag(a)