"""NumPy backend implementation for holovec operations.
This is the default backend and requires only NumPy as a dependency.
"""
from __future__ import annotations
from typing import Optional, Sequence, Tuple, Union
import numpy as np
from .base import Array, Backend
[docs]
class NumPyBackend(Backend):
"""NumPy-based backend for VSA operations.
This backend uses pure NumPy for all operations and serves as the
default backend with minimal dependencies.
"""
[docs]
def __init__(self, seed: Optional[int] = None):
"""Initialize NumPy backend with optional random seed.
Args:
seed: Random seed for reproducibility
"""
self._rng = np.random.default_rng(seed)
@property
def name(self) -> str:
return "numpy"
[docs]
def is_available(self) -> bool:
return True # NumPy is always available
# ===== Capability Probes =====
[docs]
def supports_complex(self) -> bool:
"""NumPy fully supports complex number operations."""
return True
[docs]
def supports_sparse(self) -> bool:
"""NumPy does not have native sparse array support.
Note: scipy.sparse provides sparse arrays but is not part of core NumPy.
"""
return False
[docs]
def supports_gpu(self) -> bool:
"""NumPy is CPU-only."""
return False
[docs]
def supports_jit(self) -> bool:
"""NumPy does not have JIT compilation.
Note: Numba can JIT-compile NumPy code but is a separate library.
"""
return False
[docs]
def supports_device(self, device: str) -> bool:
"""NumPy only supports CPU."""
return device.lower() in ('cpu', 'cpu:0')
# ===== Array Creation =====
[docs]
def zeros(self, shape: Union[int, Tuple[int, ...]], dtype: str = 'float32') -> Array:
shape = (shape,) if isinstance(shape, int) else shape
return np.zeros(shape, dtype=dtype)
[docs]
def ones(self, shape: Union[int, Tuple[int, ...]], dtype: str = 'float32') -> Array:
shape = (shape,) if isinstance(shape, int) else shape
return np.ones(shape, dtype=dtype)
[docs]
def random_normal(
self,
shape: Union[int, Tuple[int, ...]],
mean: float = 0.0,
std: float = 1.0,
dtype: str = 'float32',
seed: Optional[int] = None
) -> Array:
shape = (shape,) if isinstance(shape, int) else shape
rng = np.random.default_rng(seed) if seed is not None else self._rng
return rng.normal(mean, std, shape).astype(dtype)
[docs]
def random_binary(
self,
shape: Union[int, Tuple[int, ...]],
p: float = 0.5,
dtype: str = 'int32',
seed: Optional[int] = None
) -> Array:
shape = (shape,) if isinstance(shape, int) else shape
rng = np.random.default_rng(seed) if seed is not None else self._rng
return rng.binomial(1, p, shape).astype(dtype)
[docs]
def random_bipolar(
self,
shape: Union[int, Tuple[int, ...]],
p: float = 0.5,
dtype: str = 'float32',
seed: Optional[int] = None
) -> Array:
shape = (shape,) if isinstance(shape, int) else shape
rng = np.random.default_rng(seed) if seed is not None else self._rng
binary = rng.binomial(1, p, shape)
return (2 * binary - 1).astype(dtype)
[docs]
def random_phasor(
self,
shape: Union[int, Tuple[int, ...]],
dtype: str = 'complex64',
seed: Optional[int] = None
) -> Array:
shape = (shape,) if isinstance(shape, int) else shape
rng = np.random.default_rng(seed) if seed is not None else self._rng
# If last two dims are square, generate diagonal phasor matrices
if isinstance(shape, tuple) and len(shape) >= 2 and shape[-1] == shape[-2]:
batch = shape[:-2]
m = shape[-1]
# Angles for diagonal entries
angles = rng.uniform(0, 2 * np.pi, batch + (m,))
diag = np.exp(1j * angles).astype(dtype)
out = np.zeros(batch + (m, m), dtype=dtype)
# Fill diagonal efficiently by broadcasting
idx = np.arange(m)
out[..., idx, idx] = diag
return out
# Otherwise element-wise phasors
angles = rng.uniform(0, 2 * np.pi, shape)
return np.exp(1j * angles).astype(dtype)
[docs]
def array(self, data, dtype: Optional[str] = None) -> Array:
return np.array(data, dtype=dtype)
# ===== Element-wise Operations =====
[docs]
def multiply(self, a: Array, b: Array) -> Array:
return np.multiply(a, b)
[docs]
def add(self, a: Array, b: Array) -> Array:
return np.add(a, b)
[docs]
def subtract(self, a: Array, b: Array) -> Array:
return np.subtract(a, b)
[docs]
def divide(self, a: Array, b: Array) -> Array:
return np.divide(a, b)
[docs]
def xor(self, a: Array, b: Array) -> Array:
return np.bitwise_xor(a.astype(np.int32), b.astype(np.int32))
[docs]
def conjugate(self, a: Array) -> Array:
return np.conjugate(a)
# ===== Reductions =====
[docs]
def sum(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array:
return np.sum(a, axis=axis, keepdims=keepdims)
[docs]
def mean(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array:
return np.mean(a, axis=axis, keepdims=keepdims)
[docs]
def norm(self, a: Array, ord: Union[int, str] = 2, axis: Optional[int] = None) -> Array:
return np.linalg.norm(a, ord=ord, axis=axis)
[docs]
def dot(self, a: Array, b: Array) -> Array:
return np.dot(a, b)
[docs]
def max(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array:
return np.max(a, axis=axis, keepdims=keepdims)
[docs]
def min(self, a: Array, axis: Optional[int] = None, keepdims: bool = False) -> Array:
return np.min(a, axis=axis, keepdims=keepdims)
[docs]
def argmax(self, a: Array, axis: Optional[int] = None) -> Array:
return np.argmax(a, axis=axis)
[docs]
def argmin(self, a: Array, axis: Optional[int] = None) -> Array:
return np.argmin(a, axis=axis)
# ===== Normalization =====
[docs]
def normalize(self, a: Array, ord: Union[int, str] = 2, axis: Optional[int] = None, eps: float = 1e-12) -> Array:
norm = np.linalg.norm(a, ord=ord, axis=axis, keepdims=True)
return a / (norm + eps)
[docs]
def softmax(self, a: Array, axis: int = -1) -> Array:
"""Softmax with numerical stability via max subtraction."""
# Subtract max for numerical stability
a_max = np.max(a, axis=axis, keepdims=True)
exp_a = np.exp(a - a_max)
return exp_a / np.sum(exp_a, axis=axis, keepdims=True)
# ===== FFT Operations =====
[docs]
def fft(self, a: Array) -> Array:
return np.fft.fft(a)
[docs]
def ifft(self, a: Array) -> Array:
return np.fft.ifft(a)
# ===== Circular Operations =====
[docs]
def circular_convolve(self, a: Array, b: Array) -> Array:
"""Circular convolution using real FFT for numerical stability."""
n = a.shape[-1]
fa = np.fft.rfft(a)
fb = np.fft.rfft(b)
return np.fft.irfft(fa * fb, n=n)
[docs]
def circular_correlate(self, a: Array, b: Array) -> Array:
"""Circular correlation using real FFT for numerical stability."""
n = a.shape[-1]
fa = np.fft.rfft(a)
fb = np.fft.rfft(b)
return np.fft.irfft(fa * np.conj(fb), n=n)
# ===== Permutations =====
[docs]
def permute(self, a: Array, indices: Array) -> Array:
return a[indices]
[docs]
def roll(self, a: Array, shift: int, axis: Optional[int] = None) -> Array:
return np.roll(a, shift, axis=axis)
# ===== Similarity Measures =====
[docs]
def cosine_similarity(self, a: Array, b: Array) -> float:
# Fast path: identical arrays
try:
if a is b or np.array_equal(a, b):
return 1.0
except Exception:
pass
na = np.linalg.norm(a)
nb = np.linalg.norm(b)
prod = na * nb
if prod == 0.0:
return 0.0
val = float(np.dot(a, b) / prod)
# Clamp to [-1, 1] to avoid slight numeric overshoot
if val > 1.0:
return 1.0
if val < -1.0:
return -1.0
return val
[docs]
def hamming_distance(self, a: Array, b: Array) -> float:
"""Hamming distance for binary/bipolar vectors."""
return float(np.sum(a != b))
[docs]
def euclidean_distance(self, a: Array, b: Array) -> float:
return float(np.linalg.norm(a - b))
# ===== Utilities =====
[docs]
def shape(self, a: Array) -> Tuple[int, ...]:
return a.shape
[docs]
def dtype(self, a: Array) -> str:
return str(a.dtype)
[docs]
def to_numpy(self, a: Array) -> np.ndarray:
return a # Already NumPy
[docs]
def from_numpy(self, a: np.ndarray) -> Array:
return a # Already NumPy
[docs]
def clip(self, a: Array, min_val: float, max_val: float) -> Array:
return np.clip(a, min_val, max_val)
[docs]
def abs(self, a: Array) -> Array:
return np.abs(a)
[docs]
def sign(self, a: Array) -> Array:
return np.sign(a)
[docs]
def threshold(self, a: Array, threshold: float, above: float = 1.0, below: float = 0.0) -> Array:
return np.where(a >= threshold, above, below)
[docs]
def where(self, condition: Array, x: Array, y: Array) -> Array:
return np.where(condition, x, y)
[docs]
def stack(self, arrays: Sequence[Array], axis: int = 0) -> Array:
return np.stack(arrays, axis=axis)
[docs]
def concatenate(self, arrays: Sequence[Array], axis: int = 0) -> Array:
return np.concatenate(arrays, axis=axis)
# ===== Matrix Operations =====
[docs]
def matmul(self, a: Array, b: Array) -> Array:
"""Matrix multiplication or batched matrix multiplication."""
return np.matmul(a, b)
[docs]
def matrix_transpose(self, a: Array) -> Array:
"""Transpose last two dimensions."""
# For 2D: simple transpose
if len(a.shape) == 2:
return np.transpose(a)
# For 3D+: transpose last two dims
axes = list(range(len(a.shape)))
axes[-2], axes[-1] = axes[-1], axes[-2]
return np.transpose(a, axes=axes)
[docs]
def matrix_trace(self, a: Array) -> Array:
"""Compute trace of each matrix."""
# For 2D: standard trace
if len(a.shape) == 2:
return np.trace(a)
# For 3D+: trace along last two dims
return np.trace(a, axis1=-2, axis2=-1)
[docs]
def svd(self, a: Array, full_matrices: bool = True) -> Tuple[Array, Array, Array]:
"""Compute Singular Value Decomposition.
For batched matrices (3D+), computes SVD for each matrix in the batch.
"""
# NumPy's svd doesn't handle batched operations natively in older versions
# Handle both 2D and batched cases
if len(a.shape) == 2:
# Simple 2D case
return np.linalg.svd(a, full_matrices=full_matrices)
else:
# Batched case: process each matrix individually
batch_shape = a.shape[:-2]
m, n = a.shape[-2:]
# Determine output shapes
if full_matrices:
u_shape = batch_shape + (m, m)
vh_shape = batch_shape + (n, n)
else:
k = min(m, n)
u_shape = batch_shape + (m, k)
vh_shape = batch_shape + (k, n)
s_shape = batch_shape + (min(m, n),)
# Pre-allocate output arrays
U_batch = np.zeros(u_shape, dtype=a.dtype)
S_batch = np.zeros(s_shape, dtype=a.real.dtype)
Vh_batch = np.zeros(vh_shape, dtype=a.dtype)
# Flatten batch dimensions for iteration
a_reshaped = a.reshape(-1, m, n)
U_reshaped = U_batch.reshape(-1, *U_batch.shape[-2:])
S_reshaped = S_batch.reshape(-1, S_batch.shape[-1])
Vh_reshaped = Vh_batch.reshape(-1, *Vh_batch.shape[-2:])
# Compute SVD for each matrix
for i in range(a_reshaped.shape[0]):
U, S, Vh = np.linalg.svd(a_reshaped[i], full_matrices=full_matrices)
U_reshaped[i] = U
S_reshaped[i] = S
Vh_reshaped[i] = Vh
return U_batch, S_batch, Vh_batch
[docs]
def reshape(self, a: Array, shape: Tuple[int, ...]) -> Array:
"""Reshape array."""
return np.reshape(a, shape)
# ===== Additional Operations for Encoders =====
[docs]
def power(self, a: Array, exponent: float) -> Array:
"""Component-wise power: a^exponent."""
return np.power(a, exponent)
[docs]
def angle(self, a: Array) -> Array:
"""Angle (phase) of complex numbers."""
return np.angle(a)
[docs]
def linspace(self, start: float, stop: float, num: int) -> Array:
"""Create linearly spaced array."""
return np.linspace(start, stop, num)
[docs]
def multiply_scalar(self, a: Array, scalar: float) -> Array:
"""Multiply array by scalar."""
return a * scalar
[docs]
def exp(self, a: Array) -> Array:
"""Element-wise exponential."""
return np.exp(a)
[docs]
def log(self, a: Array) -> Array:
"""Element-wise natural logarithm."""
return np.log(a)
[docs]
def real(self, a: Array) -> Array:
"""Real part of complex array."""
return np.real(a)
[docs]
def imag(self, a: Array) -> Array:
"""Imaginary part of complex array."""
return np.imag(a)