"""Cleanup and factorization strategies for VSA codebook operations.
This module provides cleanup strategies for vector symbolic architectures,
including brute-force codebook search and resonator networks for iterative
multi-factor unbinding.
Key Features:
- Abstract CleanupStrategy interface for extensibility
- BruteForceCleanup: Exhaustive codebook search (baseline)
- ResonatorCleanup: Iterative factorization (10-100x speedup)
- Support for single and multi-factor unbinding
Based on:
Kymn et al. (2024) "Attention Mechanisms in Vector Symbolic Architectures"
- Resonator Networks for multi-factor unbinding
- Typical convergence: 5-15 iterations with 0.99 threshold
- Performance: 10-100x speedup over brute-force
References:
Paper: Kymn et al. (2024) - Attention and Resonator specifications
Related: Kanerva (2009) - Hyperdimensional computing principles
Mathematical Foundation:
- Cleanup: Find argmax_i sim(query, codebook[i])
- Factorization: Iteratively unbind factors until convergence
- Convergence: similarity >= threshold or max_iterations reached
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple
from ..backends.base import Array
from ..models.base import VSAModel
class CleanupStrategy(ABC):
"""Abstract base class for cleanup strategies.
Cleanup strategies search a codebook to find the closest match(es) to
a query hypervector. Different strategies offer trade-offs between
speed, accuracy, and support for multi-factor unbinding.
Implementing classes must define:
- cleanup(): Single-factor codebook search
- factorize(): Multi-factor iterative unbinding
Examples:
>>> # Create a cleanup strategy
>>> strategy = BruteForceCleanup()
>>>
>>> # Single-factor cleanup
>>> label, similarity = strategy.cleanup(query, codebook, model)
>>> print(f"Best match: {label} (similarity: {similarity:.3f})")
>>>
>>> # Multi-factor factorization
>>> labels, similarities = strategy.factorize(
... query, codebook, model, n_factors=3
... )
>>> print(f"Factors: {labels}")
Attributes:
None (abstract class)
References:
Kanerva (2009): Hyperdimensional Computing
Kymn et al. (2024): Attention Mechanisms in VSAs
"""
@abstractmethod
def cleanup(
self,
query: Array,
codebook: Dict[str, Array],
model: VSAModel,
) -> Tuple[str, float]:
"""Find the best matching codebook entry for a query.
Args:
query: Query hypervector to clean up
codebook: Dictionary mapping labels to hypervectors
model: VSA model for similarity computation
Returns:
Tuple of (label, similarity) for the best match
Raises:
TypeError: If arguments are not correct types
ValueError: If codebook is empty
Examples:
>>> label, sim = strategy.cleanup(query, codebook, model)
>>> print(f"Best: {label} with similarity {sim:.3f}")
"""
pass
@abstractmethod
def factorize(
self,
query: Array,
codebook: Dict[str, Array],
model: VSAModel,
n_factors: int = 2,
max_iterations: int = 20,
threshold: float = 0.99,
) -> Tuple[List[str], List[float]]:
"""Factorize a composition into constituent factors.
Iteratively unbinds factors from a composite hypervector by
finding the best match, unbinding it, and repeating.
Args:
query: Composite hypervector to factorize
codebook: Dictionary mapping labels to hypervectors
model: VSA model for bind/unbind/similarity operations
n_factors: Number of factors to extract (default: 2)
max_iterations: Maximum iterations per factor (default: 20)
threshold: Convergence threshold for similarity (default: 0.99)
Returns:
Tuple of:
- labels: List of factor labels in extraction order
- similarities: List of similarities for each factor
Raises:
TypeError: If arguments are not correct types
ValueError: If n_factors < 1 or codebook is empty
Examples:
>>> # Factorize a 3-factor composition
>>> labels, sims = strategy.factorize(
... query, codebook, model, n_factors=3
... )
>>> print(f"Factors: {labels}")
>>> print(f"Similarities: {[f'{s:.3f}' for s in sims]}")
"""
pass
[docs]
class BruteForceCleanup(CleanupStrategy):
"""Brute-force cleanup via exhaustive codebook search.
This is the baseline cleanup strategy that computes similarity between
the query and every codebook entry, returning the best match. Simple
and effective, but slow for large codebooks.
Performance:
- Time complexity: O(n × d) for n items, d dimensions
- Space complexity: O(1)
- Best for: Small codebooks (< 1000 items)
Examples:
>>> # Create strategy
>>> cleanup = BruteForceCleanup()
>>>
>>> # Single cleanup
>>> label, sim = cleanup.cleanup(query, codebook, model)
>>> print(f"Found: {label}")
>>>
>>> # Multi-factor factorization
>>> labels, sims = cleanup.factorize(query, codebook, model, n_factors=3)
>>> print(f"Factors: {labels}")
References:
Kanerva (2009): Classic cleanup operation
"""
[docs]
def cleanup(
self,
query: Array,
codebook: Dict[str, Array],
model: VSAModel,
) -> Tuple[str, float]:
"""Find best match via exhaustive search.
Computes similarity between query and every codebook entry,
returning the label with highest similarity.
Args:
query: Query hypervector to clean up
codebook: Dictionary mapping labels to hypervectors
model: VSA model for similarity computation
Returns:
Tuple of (label, similarity) for the best match
Raises:
TypeError: If arguments are not correct types
ValueError: If codebook is empty
Examples:
>>> label, sim = cleanup.cleanup(query, codebook, model)
>>> print(f"Best match: {label} (sim: {sim:.3f})")
"""
# Type validation
if query is None:
raise TypeError("query cannot be None")
if not isinstance(codebook, dict):
raise TypeError(f"codebook must be dict, got {type(codebook)}")
if not isinstance(model, VSAModel):
raise TypeError(f"model must be VSAModel, got {type(model)}")
# Value validation
if len(codebook) == 0:
raise ValueError("codebook must not be empty")
# Array shape validation (ensure query is 1-D vector matching model dimension)
try:
query_shape = model.backend.shape(query)
expected_shape = (model.dimension,)
if query_shape != expected_shape:
raise ValueError(
f"query must have shape {expected_shape}, got {query_shape}. "
f"Ensure query is a 1-D hypervector matching model dimension."
)
except (AttributeError, TypeError) as e:
raise TypeError(
f"query must be a valid array compatible with model backend, got {type(query)}. "
f"Backend error: {e}"
)
# Compute similarities for all entries
best_label = None
best_similarity = float('-inf')
for label, vector in codebook.items():
similarity = model.similarity(query, vector)
if similarity > best_similarity:
best_similarity = similarity
best_label = label
return best_label, float(best_similarity)
[docs]
def factorize(
self,
query: Array,
codebook: Dict[str, Array],
model: VSAModel,
n_factors: int = 2,
max_iterations: int = 20,
threshold: float = 0.99,
) -> Tuple[List[str], List[float]]:
"""Factorize via iterative cleanup and unbinding.
Repeatedly finds the best match, unbinds it from the query,
and continues until n_factors are extracted or convergence.
Args:
query: Composite hypervector to factorize
codebook: Dictionary mapping labels to hypervectors
model: VSA model for bind/unbind/similarity operations
n_factors: Number of factors to extract (default: 2)
max_iterations: Maximum iterations per factor (default: 20)
threshold: Convergence threshold for similarity (default: 0.99)
Returns:
Tuple of:
- labels: List of factor labels in extraction order
- similarities: List of similarities for each factor
Raises:
TypeError: If arguments are not correct types
ValueError: If n_factors < 1 or codebook is empty
Examples:
>>> labels, sims = cleanup.factorize(
... query, codebook, model, n_factors=3, threshold=0.95
... )
>>> print(f"Extracted {len(labels)} factors")
"""
# Type validation
if query is None:
raise TypeError("query cannot be None")
if not isinstance(codebook, dict):
raise TypeError(f"codebook must be dict, got {type(codebook)}")
if not isinstance(model, VSAModel):
raise TypeError(f"model must be VSAModel, got {type(model)}")
if not isinstance(n_factors, int):
raise TypeError(f"n_factors must be int, got {type(n_factors)}")
if not isinstance(max_iterations, int):
raise TypeError(f"max_iterations must be int, got {type(max_iterations)}")
if not isinstance(threshold, (int, float)):
raise TypeError(f"threshold must be numeric, got {type(threshold)}")
# Value validation
if n_factors < 1:
raise ValueError(f"n_factors must be >= 1, got {n_factors}")
if len(codebook) == 0:
raise ValueError("codebook must not be empty")
if max_iterations < 1:
raise ValueError(f"max_iterations must be >= 1, got {max_iterations}")
if not (0.0 <= threshold <= 1.0):
raise ValueError(f"threshold must be in [0.0, 1.0], got {threshold}")
# Array shape validation (ensure query is 1-D vector matching model dimension)
try:
query_shape = model.backend.shape(query)
expected_shape = (model.dimension,)
if query_shape != expected_shape:
raise ValueError(
f"query must have shape {expected_shape}, got {query_shape}. "
f"Ensure query is a 1-D hypervector matching model dimension."
)
except (AttributeError, TypeError) as e:
raise TypeError(
f"query must be a valid array compatible with model backend, got {type(query)}. "
f"Backend error: {e}"
)
# Extract factors iteratively
labels = []
similarities = []
current = query
for _ in range(n_factors):
# Find best match
label, similarity = self.cleanup(current, codebook, model)
labels.append(label)
similarities.append(similarity)
# Check convergence
if similarity >= threshold:
# High similarity - factor found
pass
# Unbind the found factor and continue
factor_vector = codebook[label]
current = model.unbind(current, factor_vector)
return labels, similarities
[docs]
class ResonatorCleanup(CleanupStrategy):
"""Resonator network cleanup via iterative refinement.
Implements the resonator network algorithm from Kymn et al. (2024),
which uses iterative attention mechanisms to refine factor estimates.
Achieves 10-100x speedup over brute-force for multi-factor unbinding.
Algorithm:
1. Initialize estimates for all factors
2. For each iteration:
a. Unbind other factors to isolate target
b. Cleanup against codebook
c. Update estimate
3. Repeat until convergence or max_iterations
Performance:
- Convergence: Typically 5-15 iterations
- Speedup: 10-100x over brute-force
- Best for: Multi-factor compositions (3+ factors)
Examples:
>>> # Create resonator cleanup
>>> cleanup = ResonatorCleanup()
>>>
>>> # Single cleanup (same as brute-force)
>>> label, sim = cleanup.cleanup(query, codebook, model)
>>>
>>> # Multi-factor with resonator (much faster)
>>> labels, sims = cleanup.factorize(
... query, codebook, model, n_factors=5, threshold=0.99
... )
>>> print(f"Converged with {len(labels)} factors")
Attributes:
None (stateless)
References:
Kymn et al. (2024): Attention Mechanisms in VSAs
- Section 3: Resonator Networks
- Algorithm 1: Iterative factorization
"""
[docs]
def cleanup(
self,
query: Array,
codebook: Dict[str, Array],
model: VSAModel,
) -> Tuple[str, float]:
"""Find best match via exhaustive search.
For single-factor cleanup, resonator networks reduce to brute-force
search. Use factorize() for multi-factor speedup.
Args:
query: Query hypervector to clean up
codebook: Dictionary mapping labels to hypervectors
model: VSA model for similarity computation
Returns:
Tuple of (label, similarity) for the best match
Raises:
TypeError: If arguments are not correct types
ValueError: If codebook is empty
Examples:
>>> label, sim = cleanup.cleanup(query, codebook, model)
"""
# For single cleanup, resonator = brute-force
# Use the brute-force implementation
brute_force = BruteForceCleanup()
return brute_force.cleanup(query, codebook, model)
[docs]
def factorize(
self,
query: Array,
codebook: Dict[str, Array],
model: VSAModel,
n_factors: int = 2,
max_iterations: int = 20,
threshold: float = 0.99,
# Refinements
temperature: float = 20.0,
top_k: int = 1,
patience: int = 3,
min_delta: float = 1e-4,
mode: str = 'hard',
) -> Tuple[List[str], List[float]]:
"""Factorize via resonator network iteration.
Uses iterative attention to refine factor estimates simultaneously,
achieving much faster convergence than sequential unbinding.
Algorithm (from Kymn et al. 2024):
1. Initialize: estimates = [random from codebook] × n_factors
2. Repeat for max_iterations:
a. For each factor i:
- Unbind all other estimates from query
- Cleanup result against codebook
- Update estimate[i]
b. Check convergence (all similarities >= threshold)
3. Return final estimates and similarities
Args:
query: Composite hypervector to factorize
codebook: Dictionary mapping labels to hypervectors
model: VSA model for bind/unbind/similarity operations
n_factors: Number of factors to extract (default: 2)
max_iterations: Maximum iterations (default: 20)
threshold: Convergence threshold for similarity (default: 0.99)
Returns:
Tuple of:
- labels: List of factor labels
- similarities: List of similarities for each factor
Raises:
TypeError: If arguments are not correct types
ValueError: If n_factors < 1 or codebook is empty
Examples:
>>> # Fast multi-factor unbinding
>>> labels, sims = cleanup.factorize(
... query, codebook, model, n_factors=5
... )
>>> print(f"Factors: {labels}")
>>> print(f"Avg similarity: {sum(sims)/len(sims):.3f}")
"""
# Type validation
if query is None:
raise TypeError("query cannot be None")
if not isinstance(codebook, dict):
raise TypeError(f"codebook must be dict, got {type(codebook)}")
if not isinstance(model, VSAModel):
raise TypeError(f"model must be VSAModel, got {type(model)}")
if not isinstance(n_factors, int):
raise TypeError(f"n_factors must be int, got {type(n_factors)}")
if not isinstance(max_iterations, int):
raise TypeError(f"max_iterations must be int, got {type(max_iterations)}")
if not isinstance(threshold, (int, float)):
raise TypeError(f"threshold must be numeric, got {type(threshold)}")
# Value validation
if n_factors < 1:
raise ValueError(f"n_factors must be >= 1, got {n_factors}")
if len(codebook) == 0:
raise ValueError("codebook must not be empty")
if max_iterations < 1:
raise ValueError(f"max_iterations must be >= 1, got {max_iterations}")
if not (0.0 <= threshold <= 1.0):
raise ValueError(f"threshold must be in [0.0, 1.0], got {threshold}")
# Array shape validation (ensure query is 1-D vector matching model dimension)
try:
query_shape = model.backend.shape(query)
expected_shape = (model.dimension,)
if query_shape != expected_shape:
raise ValueError(
f"query must have shape {expected_shape}, got {query_shape}. "
f"Ensure query is a 1-D hypervector matching model dimension."
)
except (AttributeError, TypeError) as e:
raise TypeError(
f"query must be a valid array compatible with model backend, got {type(query)}. "
f"Backend error: {e}"
)
# Initialize estimates with deterministic codebook entries (cycle)
codebook_labels = list(codebook.keys())
estimates = []
estimate_labels = []
for i in range(n_factors):
# Use modulo to cycle through codebook if n_factors > codebook size
label = codebook_labels[i % len(codebook_labels)]
estimates.append(codebook[label])
estimate_labels.append(label)
# Iterative refinement with optional early stopping
best_avg = -1.0
no_improve = 0
for iteration in range(max_iterations):
converged = True
for i in range(n_factors):
# Unbind all OTHER estimates from query to isolate factor i
isolated = query
for j in range(n_factors):
if j != i:
isolated = model.unbind(isolated, estimates[j])
# Compute similarities to entire codebook
sims: List[Tuple[str, float]] = []
for lbl, vec in codebook.items():
sims.append((lbl, float(model.similarity(isolated, vec))))
# Sort by similarity desc
sims.sort(key=lambda t: t[1], reverse=True)
# Hard vs soft update
use_soft = (mode == 'soft') or (top_k > 1)
if not use_soft:
label, similarity = sims[0]
estimates[i] = codebook[label]
estimate_labels[i] = label
else:
# Take top-K and softmax-weight them
k = min(max(2, top_k), len(sims))
top = sims[:k]
import numpy as _np
vals = _np.array([s for _, s in top], dtype=_np.float64)
# temperature > 0; larger → flatter
logits = vals * float(temperature)
logits = logits - logits.max()
w = _np.exp(logits)
w = w / (w.sum() + 1e-12)
# Bundle weighted
parts = []
for (lbl, _s), wt in zip(top, w.tolist()):
parts.append(model.backend.multiply_scalar(codebook[lbl], float(wt)))
estimates[i] = model.backend.sum(model.backend.stack(parts, axis=0), axis=0)
# Label: top-1 for reporting
estimate_labels[i] = top[0][0]
similarity = float(top[0][1])
# Check convergence for this factor
if similarity < threshold:
converged = False
# Global early stopping on plateau
# Compute avg isolated similarity across factors
curr_sims = []
for i in range(n_factors):
isolated = query
for j in range(n_factors):
if j != i:
isolated = model.unbind(isolated, estimates[j])
curr_sims.append(float(model.similarity(isolated, estimates[i])))
avg_sim = sum(curr_sims) / max(1, len(curr_sims))
if avg_sim > best_avg + min_delta:
best_avg = avg_sim
no_improve = 0
else:
no_improve += 1
if converged or no_improve >= patience:
break
# Compute final similarities (as in original API)
similarities: List[float] = []
for i in range(n_factors):
isolated = query
for j in range(n_factors):
if j != i:
isolated = model.unbind(isolated, estimates[j])
similarity = model.similarity(isolated, estimates[i])
similarities.append(float(similarity))
return estimate_labels, similarities
[docs]
def factorize_verbose(
self,
query: Array,
codebook: Dict[str, Array],
model: VSAModel,
n_factors: int = 2,
max_iterations: int = 20,
threshold: float = 0.99,
temperature: float = 20.0,
top_k: int = 1,
patience: int = 3,
min_delta: float = 1e-4,
mode: str = 'hard',
) -> Tuple[List[str], List[float], List[float]]:
"""Like factorize(), but also returns avg-similarity history per iteration."""
# Lightweight wrapper: capture avg similarity after each iteration
# Re-implement loop to record history.
# Initialize estimates
codebook_labels = list(codebook.keys())
estimates = []
estimate_labels = []
for i in range(n_factors):
label = codebook_labels[i % len(codebook_labels)]
estimates.append(codebook[label])
estimate_labels.append(label)
history: List[float] = []
best_avg = -1.0
no_improve = 0
for _iter in range(max_iterations):
converged = True
for i in range(n_factors):
isolated = query
for j in range(n_factors):
if j != i:
isolated = model.unbind(isolated, estimates[j])
sims = [(lbl, float(model.similarity(isolated, vec))) for lbl, vec in codebook.items()]
sims.sort(key=lambda t: t[1], reverse=True)
use_soft = (mode == 'soft') or (top_k > 1)
if not use_soft:
label, similarity = sims[0]
estimates[i] = codebook[label]
estimate_labels[i] = label
else:
k = min(max(2, top_k), len(sims))
top = sims[:k]
import numpy as _np
vals = _np.array([s for _, s in top], dtype=_np.float64)
logits = vals * float(temperature)
logits = logits - logits.max()
w = _np.exp(logits)
w = w / (w.sum() + 1e-12)
parts = []
for (lbl, _s), wt in zip(top, w.tolist()):
parts.append(model.backend.multiply_scalar(codebook[lbl], float(wt)))
estimates[i] = model.backend.sum(model.backend.stack(parts, axis=0), axis=0)
estimate_labels[i] = top[0][0]
similarity = float(top[0][1])
if similarity < threshold:
converged = False
# record avg similarity across factors
curr_sims = []
for i in range(n_factors):
isolated = query
for j in range(n_factors):
if j != i:
isolated = model.unbind(isolated, estimates[j])
curr_sims.append(float(model.similarity(isolated, estimates[i])))
avg_sim = sum(curr_sims) / max(1, len(curr_sims))
history.append(avg_sim)
if avg_sim > best_avg + min_delta:
best_avg = avg_sim
no_improve = 0
else:
no_improve += 1
if converged or no_improve >= patience:
break
# Final similarities
final_sims = []
for i in range(n_factors):
isolated = query
for j in range(n_factors):
if j != i:
isolated = model.unbind(isolated, estimates[j])
final_sims.append(float(model.similarity(isolated, estimates[i])))
return estimate_labels, final_sims, history