Source code for holovec.utils.cleanup

"""Cleanup and factorization strategies for VSA codebook operations.

This module provides cleanup strategies for vector symbolic architectures,
including brute-force codebook search and resonator networks for iterative
multi-factor unbinding.

Key Features:
    - Abstract CleanupStrategy interface for extensibility
    - BruteForceCleanup: Exhaustive codebook search (baseline)
    - ResonatorCleanup: Iterative factorization (10-100x speedup)
    - Support for single and multi-factor unbinding

Based on:
    Kymn et al. (2024) "Attention Mechanisms in Vector Symbolic Architectures"
    - Resonator Networks for multi-factor unbinding
    - Typical convergence: 5-15 iterations with 0.99 threshold
    - Performance: 10-100x speedup over brute-force

References:
    Paper: Kymn et al. (2024) - Attention and Resonator specifications
    Related: Kanerva (2009) - Hyperdimensional computing principles

Mathematical Foundation:
    - Cleanup: Find argmax_i sim(query, codebook[i])
    - Factorization: Iteratively unbind factors until convergence
    - Convergence: similarity >= threshold or max_iterations reached
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple

from ..backends.base import Array
from ..models.base import VSAModel


class CleanupStrategy(ABC):
    """Abstract base class for cleanup strategies.

    Cleanup strategies search a codebook to find the closest match(es) to
    a query hypervector. Different strategies offer trade-offs between
    speed, accuracy, and support for multi-factor unbinding.

    Implementing classes must define:
        - cleanup(): Single-factor codebook search
        - factorize(): Multi-factor iterative unbinding

    Examples:
        >>> # Create a cleanup strategy
        >>> strategy = BruteForceCleanup()
        >>>
        >>> # Single-factor cleanup
        >>> label, similarity = strategy.cleanup(query, codebook, model)
        >>> print(f"Best match: {label} (similarity: {similarity:.3f})")
        >>>
        >>> # Multi-factor factorization
        >>> labels, similarities = strategy.factorize(
        ...     query, codebook, model, n_factors=3
        ... )
        >>> print(f"Factors: {labels}")

    Attributes:
        None (abstract class)

    References:
        Kanerva (2009): Hyperdimensional Computing
        Kymn et al. (2024): Attention Mechanisms in VSAs
    """

    @abstractmethod
    def cleanup(
        self,
        query: Array,
        codebook: Dict[str, Array],
        model: VSAModel,
    ) -> Tuple[str, float]:
        """Find the best matching codebook entry for a query.

        Args:
            query: Query hypervector to clean up
            codebook: Dictionary mapping labels to hypervectors
            model: VSA model for similarity computation

        Returns:
            Tuple of (label, similarity) for the best match

        Raises:
            TypeError: If arguments are not correct types
            ValueError: If codebook is empty

        Examples:
            >>> label, sim = strategy.cleanup(query, codebook, model)
            >>> print(f"Best: {label} with similarity {sim:.3f}")
        """
        pass

    @abstractmethod
    def factorize(
        self,
        query: Array,
        codebook: Dict[str, Array],
        model: VSAModel,
        n_factors: int = 2,
        max_iterations: int = 20,
        threshold: float = 0.99,
    ) -> Tuple[List[str], List[float]]:
        """Factorize a composition into constituent factors.

        Iteratively unbinds factors from a composite hypervector by
        finding the best match, unbinding it, and repeating.

        Args:
            query: Composite hypervector to factorize
            codebook: Dictionary mapping labels to hypervectors
            model: VSA model for bind/unbind/similarity operations
            n_factors: Number of factors to extract (default: 2)
            max_iterations: Maximum iterations per factor (default: 20)
            threshold: Convergence threshold for similarity (default: 0.99)

        Returns:
            Tuple of:
                - labels: List of factor labels in extraction order
                - similarities: List of similarities for each factor

        Raises:
            TypeError: If arguments are not correct types
            ValueError: If n_factors < 1 or codebook is empty

        Examples:
            >>> # Factorize a 3-factor composition
            >>> labels, sims = strategy.factorize(
            ...     query, codebook, model, n_factors=3
            ... )
            >>> print(f"Factors: {labels}")
            >>> print(f"Similarities: {[f'{s:.3f}' for s in sims]}")
        """
        pass


[docs] class BruteForceCleanup(CleanupStrategy): """Brute-force cleanup via exhaustive codebook search. This is the baseline cleanup strategy that computes similarity between the query and every codebook entry, returning the best match. Simple and effective, but slow for large codebooks. Performance: - Time complexity: O(n × d) for n items, d dimensions - Space complexity: O(1) - Best for: Small codebooks (< 1000 items) Examples: >>> # Create strategy >>> cleanup = BruteForceCleanup() >>> >>> # Single cleanup >>> label, sim = cleanup.cleanup(query, codebook, model) >>> print(f"Found: {label}") >>> >>> # Multi-factor factorization >>> labels, sims = cleanup.factorize(query, codebook, model, n_factors=3) >>> print(f"Factors: {labels}") References: Kanerva (2009): Classic cleanup operation """
[docs] def cleanup( self, query: Array, codebook: Dict[str, Array], model: VSAModel, ) -> Tuple[str, float]: """Find best match via exhaustive search. Computes similarity between query and every codebook entry, returning the label with highest similarity. Args: query: Query hypervector to clean up codebook: Dictionary mapping labels to hypervectors model: VSA model for similarity computation Returns: Tuple of (label, similarity) for the best match Raises: TypeError: If arguments are not correct types ValueError: If codebook is empty Examples: >>> label, sim = cleanup.cleanup(query, codebook, model) >>> print(f"Best match: {label} (sim: {sim:.3f})") """ # Type validation if query is None: raise TypeError("query cannot be None") if not isinstance(codebook, dict): raise TypeError(f"codebook must be dict, got {type(codebook)}") if not isinstance(model, VSAModel): raise TypeError(f"model must be VSAModel, got {type(model)}") # Value validation if len(codebook) == 0: raise ValueError("codebook must not be empty") # Array shape validation (ensure query is 1-D vector matching model dimension) try: query_shape = model.backend.shape(query) expected_shape = (model.dimension,) if query_shape != expected_shape: raise ValueError( f"query must have shape {expected_shape}, got {query_shape}. " f"Ensure query is a 1-D hypervector matching model dimension." ) except (AttributeError, TypeError) as e: raise TypeError( f"query must be a valid array compatible with model backend, got {type(query)}. " f"Backend error: {e}" ) # Compute similarities for all entries best_label = None best_similarity = float('-inf') for label, vector in codebook.items(): similarity = model.similarity(query, vector) if similarity > best_similarity: best_similarity = similarity best_label = label return best_label, float(best_similarity)
[docs] def factorize( self, query: Array, codebook: Dict[str, Array], model: VSAModel, n_factors: int = 2, max_iterations: int = 20, threshold: float = 0.99, ) -> Tuple[List[str], List[float]]: """Factorize via iterative cleanup and unbinding. Repeatedly finds the best match, unbinds it from the query, and continues until n_factors are extracted or convergence. Args: query: Composite hypervector to factorize codebook: Dictionary mapping labels to hypervectors model: VSA model for bind/unbind/similarity operations n_factors: Number of factors to extract (default: 2) max_iterations: Maximum iterations per factor (default: 20) threshold: Convergence threshold for similarity (default: 0.99) Returns: Tuple of: - labels: List of factor labels in extraction order - similarities: List of similarities for each factor Raises: TypeError: If arguments are not correct types ValueError: If n_factors < 1 or codebook is empty Examples: >>> labels, sims = cleanup.factorize( ... query, codebook, model, n_factors=3, threshold=0.95 ... ) >>> print(f"Extracted {len(labels)} factors") """ # Type validation if query is None: raise TypeError("query cannot be None") if not isinstance(codebook, dict): raise TypeError(f"codebook must be dict, got {type(codebook)}") if not isinstance(model, VSAModel): raise TypeError(f"model must be VSAModel, got {type(model)}") if not isinstance(n_factors, int): raise TypeError(f"n_factors must be int, got {type(n_factors)}") if not isinstance(max_iterations, int): raise TypeError(f"max_iterations must be int, got {type(max_iterations)}") if not isinstance(threshold, (int, float)): raise TypeError(f"threshold must be numeric, got {type(threshold)}") # Value validation if n_factors < 1: raise ValueError(f"n_factors must be >= 1, got {n_factors}") if len(codebook) == 0: raise ValueError("codebook must not be empty") if max_iterations < 1: raise ValueError(f"max_iterations must be >= 1, got {max_iterations}") if not (0.0 <= threshold <= 1.0): raise ValueError(f"threshold must be in [0.0, 1.0], got {threshold}") # Array shape validation (ensure query is 1-D vector matching model dimension) try: query_shape = model.backend.shape(query) expected_shape = (model.dimension,) if query_shape != expected_shape: raise ValueError( f"query must have shape {expected_shape}, got {query_shape}. " f"Ensure query is a 1-D hypervector matching model dimension." ) except (AttributeError, TypeError) as e: raise TypeError( f"query must be a valid array compatible with model backend, got {type(query)}. " f"Backend error: {e}" ) # Extract factors iteratively labels = [] similarities = [] current = query for _ in range(n_factors): # Find best match label, similarity = self.cleanup(current, codebook, model) labels.append(label) similarities.append(similarity) # Check convergence if similarity >= threshold: # High similarity - factor found pass # Unbind the found factor and continue factor_vector = codebook[label] current = model.unbind(current, factor_vector) return labels, similarities
[docs] class ResonatorCleanup(CleanupStrategy): """Resonator network cleanup via iterative refinement. Implements the resonator network algorithm from Kymn et al. (2024), which uses iterative attention mechanisms to refine factor estimates. Achieves 10-100x speedup over brute-force for multi-factor unbinding. Algorithm: 1. Initialize estimates for all factors 2. For each iteration: a. Unbind other factors to isolate target b. Cleanup against codebook c. Update estimate 3. Repeat until convergence or max_iterations Performance: - Convergence: Typically 5-15 iterations - Speedup: 10-100x over brute-force - Best for: Multi-factor compositions (3+ factors) Examples: >>> # Create resonator cleanup >>> cleanup = ResonatorCleanup() >>> >>> # Single cleanup (same as brute-force) >>> label, sim = cleanup.cleanup(query, codebook, model) >>> >>> # Multi-factor with resonator (much faster) >>> labels, sims = cleanup.factorize( ... query, codebook, model, n_factors=5, threshold=0.99 ... ) >>> print(f"Converged with {len(labels)} factors") Attributes: None (stateless) References: Kymn et al. (2024): Attention Mechanisms in VSAs - Section 3: Resonator Networks - Algorithm 1: Iterative factorization """
[docs] def cleanup( self, query: Array, codebook: Dict[str, Array], model: VSAModel, ) -> Tuple[str, float]: """Find best match via exhaustive search. For single-factor cleanup, resonator networks reduce to brute-force search. Use factorize() for multi-factor speedup. Args: query: Query hypervector to clean up codebook: Dictionary mapping labels to hypervectors model: VSA model for similarity computation Returns: Tuple of (label, similarity) for the best match Raises: TypeError: If arguments are not correct types ValueError: If codebook is empty Examples: >>> label, sim = cleanup.cleanup(query, codebook, model) """ # For single cleanup, resonator = brute-force # Use the brute-force implementation brute_force = BruteForceCleanup() return brute_force.cleanup(query, codebook, model)
[docs] def factorize( self, query: Array, codebook: Dict[str, Array], model: VSAModel, n_factors: int = 2, max_iterations: int = 20, threshold: float = 0.99, # Refinements temperature: float = 20.0, top_k: int = 1, patience: int = 3, min_delta: float = 1e-4, mode: str = 'hard', ) -> Tuple[List[str], List[float]]: """Factorize via resonator network iteration. Uses iterative attention to refine factor estimates simultaneously, achieving much faster convergence than sequential unbinding. Algorithm (from Kymn et al. 2024): 1. Initialize: estimates = [random from codebook] × n_factors 2. Repeat for max_iterations: a. For each factor i: - Unbind all other estimates from query - Cleanup result against codebook - Update estimate[i] b. Check convergence (all similarities >= threshold) 3. Return final estimates and similarities Args: query: Composite hypervector to factorize codebook: Dictionary mapping labels to hypervectors model: VSA model for bind/unbind/similarity operations n_factors: Number of factors to extract (default: 2) max_iterations: Maximum iterations (default: 20) threshold: Convergence threshold for similarity (default: 0.99) Returns: Tuple of: - labels: List of factor labels - similarities: List of similarities for each factor Raises: TypeError: If arguments are not correct types ValueError: If n_factors < 1 or codebook is empty Examples: >>> # Fast multi-factor unbinding >>> labels, sims = cleanup.factorize( ... query, codebook, model, n_factors=5 ... ) >>> print(f"Factors: {labels}") >>> print(f"Avg similarity: {sum(sims)/len(sims):.3f}") """ # Type validation if query is None: raise TypeError("query cannot be None") if not isinstance(codebook, dict): raise TypeError(f"codebook must be dict, got {type(codebook)}") if not isinstance(model, VSAModel): raise TypeError(f"model must be VSAModel, got {type(model)}") if not isinstance(n_factors, int): raise TypeError(f"n_factors must be int, got {type(n_factors)}") if not isinstance(max_iterations, int): raise TypeError(f"max_iterations must be int, got {type(max_iterations)}") if not isinstance(threshold, (int, float)): raise TypeError(f"threshold must be numeric, got {type(threshold)}") # Value validation if n_factors < 1: raise ValueError(f"n_factors must be >= 1, got {n_factors}") if len(codebook) == 0: raise ValueError("codebook must not be empty") if max_iterations < 1: raise ValueError(f"max_iterations must be >= 1, got {max_iterations}") if not (0.0 <= threshold <= 1.0): raise ValueError(f"threshold must be in [0.0, 1.0], got {threshold}") # Array shape validation (ensure query is 1-D vector matching model dimension) try: query_shape = model.backend.shape(query) expected_shape = (model.dimension,) if query_shape != expected_shape: raise ValueError( f"query must have shape {expected_shape}, got {query_shape}. " f"Ensure query is a 1-D hypervector matching model dimension." ) except (AttributeError, TypeError) as e: raise TypeError( f"query must be a valid array compatible with model backend, got {type(query)}. " f"Backend error: {e}" ) # Initialize estimates with deterministic codebook entries (cycle) codebook_labels = list(codebook.keys()) estimates = [] estimate_labels = [] for i in range(n_factors): # Use modulo to cycle through codebook if n_factors > codebook size label = codebook_labels[i % len(codebook_labels)] estimates.append(codebook[label]) estimate_labels.append(label) # Iterative refinement with optional early stopping best_avg = -1.0 no_improve = 0 for iteration in range(max_iterations): converged = True for i in range(n_factors): # Unbind all OTHER estimates from query to isolate factor i isolated = query for j in range(n_factors): if j != i: isolated = model.unbind(isolated, estimates[j]) # Compute similarities to entire codebook sims: List[Tuple[str, float]] = [] for lbl, vec in codebook.items(): sims.append((lbl, float(model.similarity(isolated, vec)))) # Sort by similarity desc sims.sort(key=lambda t: t[1], reverse=True) # Hard vs soft update use_soft = (mode == 'soft') or (top_k > 1) if not use_soft: label, similarity = sims[0] estimates[i] = codebook[label] estimate_labels[i] = label else: # Take top-K and softmax-weight them k = min(max(2, top_k), len(sims)) top = sims[:k] import numpy as _np vals = _np.array([s for _, s in top], dtype=_np.float64) # temperature > 0; larger → flatter logits = vals * float(temperature) logits = logits - logits.max() w = _np.exp(logits) w = w / (w.sum() + 1e-12) # Bundle weighted parts = [] for (lbl, _s), wt in zip(top, w.tolist()): parts.append(model.backend.multiply_scalar(codebook[lbl], float(wt))) estimates[i] = model.backend.sum(model.backend.stack(parts, axis=0), axis=0) # Label: top-1 for reporting estimate_labels[i] = top[0][0] similarity = float(top[0][1]) # Check convergence for this factor if similarity < threshold: converged = False # Global early stopping on plateau # Compute avg isolated similarity across factors curr_sims = [] for i in range(n_factors): isolated = query for j in range(n_factors): if j != i: isolated = model.unbind(isolated, estimates[j]) curr_sims.append(float(model.similarity(isolated, estimates[i]))) avg_sim = sum(curr_sims) / max(1, len(curr_sims)) if avg_sim > best_avg + min_delta: best_avg = avg_sim no_improve = 0 else: no_improve += 1 if converged or no_improve >= patience: break # Compute final similarities (as in original API) similarities: List[float] = [] for i in range(n_factors): isolated = query for j in range(n_factors): if j != i: isolated = model.unbind(isolated, estimates[j]) similarity = model.similarity(isolated, estimates[i]) similarities.append(float(similarity)) return estimate_labels, similarities
[docs] def factorize_verbose( self, query: Array, codebook: Dict[str, Array], model: VSAModel, n_factors: int = 2, max_iterations: int = 20, threshold: float = 0.99, temperature: float = 20.0, top_k: int = 1, patience: int = 3, min_delta: float = 1e-4, mode: str = 'hard', ) -> Tuple[List[str], List[float], List[float]]: """Like factorize(), but also returns avg-similarity history per iteration.""" # Lightweight wrapper: capture avg similarity after each iteration # Re-implement loop to record history. # Initialize estimates codebook_labels = list(codebook.keys()) estimates = [] estimate_labels = [] for i in range(n_factors): label = codebook_labels[i % len(codebook_labels)] estimates.append(codebook[label]) estimate_labels.append(label) history: List[float] = [] best_avg = -1.0 no_improve = 0 for _iter in range(max_iterations): converged = True for i in range(n_factors): isolated = query for j in range(n_factors): if j != i: isolated = model.unbind(isolated, estimates[j]) sims = [(lbl, float(model.similarity(isolated, vec))) for lbl, vec in codebook.items()] sims.sort(key=lambda t: t[1], reverse=True) use_soft = (mode == 'soft') or (top_k > 1) if not use_soft: label, similarity = sims[0] estimates[i] = codebook[label] estimate_labels[i] = label else: k = min(max(2, top_k), len(sims)) top = sims[:k] import numpy as _np vals = _np.array([s for _, s in top], dtype=_np.float64) logits = vals * float(temperature) logits = logits - logits.max() w = _np.exp(logits) w = w / (w.sum() + 1e-12) parts = [] for (lbl, _s), wt in zip(top, w.tolist()): parts.append(model.backend.multiply_scalar(codebook[lbl], float(wt))) estimates[i] = model.backend.sum(model.backend.stack(parts, axis=0), axis=0) estimate_labels[i] = top[0][0] similarity = float(top[0][1]) if similarity < threshold: converged = False # record avg similarity across factors curr_sims = [] for i in range(n_factors): isolated = query for j in range(n_factors): if j != i: isolated = model.unbind(isolated, estimates[j]) curr_sims.append(float(model.similarity(isolated, estimates[i]))) avg_sim = sum(curr_sims) / max(1, len(curr_sims)) history.append(avg_sim) if avg_sim > best_avg + min_delta: best_avg = avg_sim no_improve = 0 else: no_improve += 1 if converged or no_improve >= patience: break # Final similarities final_sims = [] for i in range(n_factors): isolated = query for j in range(n_factors): if j != i: isolated = model.unbind(isolated, estimates[j]) final_sims.append(float(model.similarity(isolated, estimates[i]))) return estimate_labels, final_sims, history