import re
import warnings
from collections.abc import Mapping
from keyword import iskeyword
import nengo
import numpy as np
from nengo.exceptions import NengoWarning, ValidationError
from nengo_spa import semantic_pointer
from nengo_spa.algebras.hrr_algebra import HrrAlgebra
from nengo_spa.exceptions import SpaParseError
from nengo_spa.semantic_pointer import AbsorbingElement, Identity, Zero
from nengo_spa.typechecks import is_integer, is_iterable, is_number
from nengo_spa.vector_generation import UnitLengthVectors
valid_sp_regex = re.compile("^[A-Z][_a-zA-Z0-9]*$")
special_sps = {
"AbsorbingElement": AbsorbingElement,
"Identity": Identity,
"Zero": Zero,
}
reserved_sp_names = {"None", "True", "False"} | set(special_sps.keys())
[docs]class Vocabulary(Mapping):
"""A collection of semantic pointers, each with their own text label.
The Vocabulary can also act as a dictionary, with keys as the names
of the semantic pointers and values as the `.SemanticPointer` objects
themselves. The names of Semantic Pointers must be valid Python 2
identifiers starting with a capital letter.
Every vocabulary knows the special elements *AbsorbingElement*, *Identity*,
and *Zero*. However, these are not included in the keys returned by `.keys`
or the vectors returned by `.vectors`.
Parameters
-----------
dimensions : int
Number of dimensions for each semantic pointer.
strict : bool, optional
Whether to automatically create missing semantic pointers. If a
non-strict vocabulary is asked for a pointer that does not exist within
the vocabulary, the missing pointer will be automatically added to the
vocabulary. A strict vocabulary will throw an error if asked for a
pointer that does not exist in the vocabulary.
max_similarity : float, optional
When randomly generating pointers, ensure that the cosine of the
angle between the new pointer and all existing pointers is less
than this amount. If the system is unable to find such a pointer
after 100 tries, a warning message is printed.
pointer_gen : generator or np.random.RandomState, optional
Generator used to create vectors for new Semantic Pointers. Defaults to
`.UnitLengthVectors`. If a `np.random.RandomState` is passed, it will
be used by `.UnitLengthVectors`.
name : str
A name to display in the string representation of this vocabulary.
algebra : AbstractAlgebra, optional
Defines the vector symbolic operators used for Semantic Pointers in the
vocabulary. Defaults to `.HrrAlgebra`.
Attributes
----------
keys : sequence
The names of all known semantic pointers (e.g., ``['A', 'B', 'C']``).
max_similarity : float
When randomly generating pointers, ensure that the cosine of the
angle between the new pointer and all existing pointers is less
than this amount. If the system is unable to find such a pointer
after 100 tries, a warning message is printed.
strict : bool
Whether to automatically create missing semantic pointers. If a
non-strict vocabulary is asked for a pointer that does not exist within
the vocabulary, the missing pointer will be automatically added to the
vocabulary. A strict vocabulary will throw an error if asked for a
pointer that does not exist in the vocabulary.
vectors : ndarray
All of the semantic pointer vectors in a matrix, in the same order
as in `keys`.
algebra : AbstractAlgebra, optional
Defines the vector symbolic operators used for Semantic Pointers in the
vocabulary.
"""
def __init__(
self,
dimensions,
strict=True,
max_similarity=0.1,
pointer_gen=None,
name=None,
algebra=None,
):
if algebra is None:
algebra = HrrAlgebra()
self.algebra = algebra
if not is_integer(dimensions) or dimensions < 1:
raise ValidationError(
"dimensions must be a positive integer", attr="dimensions", obj=self
)
if pointer_gen is None:
pointer_gen = UnitLengthVectors(dimensions)
elif isinstance(pointer_gen, np.random.RandomState):
pointer_gen = UnitLengthVectors(dimensions, pointer_gen)
if not is_iterable(pointer_gen) or isinstance(pointer_gen, str):
raise ValidationError(
"pointer_gen must be iterable or RandomState",
attr="pointer_gen",
obj=self,
)
self.dimensions = dimensions
self.strict = strict
self.max_similarity = max_similarity
self._key2idx = {}
self._keys = []
self._vectors = np.zeros((0, dimensions), dtype=float)
self.pointer_gen = pointer_gen
self.name = name
@property
def vectors(self):
v = self._vectors.view()
v.setflags(write=False)
return v
def __str__(self):
name = "" if self.name is None else f'"{self.name}" '
return f"{self.dimensions}-dimensional vocab {name} at 0x{id(self):x}"
[docs] def create_pointer(self, attempts=100, transform=None):
"""Create a new semantic pointer and add it to the vocabulary.
This will take into account the `max_similarity` attribute. If a
pointer satisfying max_similarity is not generated after the specified
number of attempts, the candidate pointer with lowest maximum cosine
similarity with all existing pointers is returned.
Parameters
----------
attempts : int, optional
Maximum number of attempts to create a Semantic Pointer not
exceeding `max_similarity`.
transform : str, optional
A transform to apply to the generated vector. Needs to be the name
of a method of `.SemanticPointer`. Currently, the only sensible
value is 'unitary'.
Returns
-------
SemanticPointer
The generated Semantic Pointer.
"""
best_p = None
best_sim = np.inf
for _ in range(attempts):
# note: p will get its algebra from vocab.algebra
p = semantic_pointer.SemanticPointer(next(self.pointer_gen), vocab=self)
if transform is not None:
p = eval("p." + transform, dict(self), {"p": p})
if len(self) == 0:
best_p = p
break
else:
p_sim = np.max(np.dot(self._vectors, p.v))
if p_sim < best_sim:
best_p = p
best_sim = p_sim
if p_sim < self.max_similarity:
break
else:
warnings.warn(
"Could not create a semantic pointer with "
f"max_similarity={self.max_similarity:1.2f} "
f"(D={self.dimensions}, M={len(self._key2idx)}, "
f"similarity={best_sim:1.2f})"
)
return best_p
def __contains__(self, key):
return key in special_sps or key in self._key2idx
def __len__(self):
return len(self._vectors)
def __iter__(self):
return iter(self._keys)
def __getitem__(self, key):
"""Return the semantic pointer with the requested name."""
# __tracebackhide__ is used in py.test to hide stack frames from the
# traceback. That means py.test might try to look up this attribute
# in a test which will result in an exception hiding the actual
# exception. By raising a KeyError we indicate that there is no
# __tracebackhide__ attribute on this object and preserve the relevant
# exception.
if key == "__tracebackhide__":
raise KeyError()
if key in special_sps:
return special_sps[key](self.dimensions, self)
if not self.strict and key not in self:
self.add(key, self.create_pointer())
# note: pointer will get its algebra from vocab.algebra
return semantic_pointer.SemanticPointer(
self._vectors[self._key2idx[key]], vocab=self, name=key
)
def __hash__(self):
return hash(id(self))
[docs] def add(self, key, p):
"""Add the semantic pointer *p* to the vocabulary.
Parameters
----------
key : str
Name of the Semantic Pointer. Must be a valid Python 2 identifier
starting with a capital letter. Must not be *AbsorbingElement*,
*Identity*, or *Zero*.
p : SemanticPointer or array_like
Semantic Pointer to add.
"""
if not valid_sp_regex.match(key) or iskeyword(key) or key in reserved_sp_names:
raise SpaParseError(
f"Invalid Semantic Pointer name {key!r}. Valid names are valid "
"Python 2 identifiers beginning with a capital letter."
)
if not isinstance(p, semantic_pointer.SemanticPointer):
# note: p will get its algebra from vocab.algebra
p = semantic_pointer.SemanticPointer(p, vocab=self)
if key in self._key2idx:
raise ValidationError(
f"The semantic pointer {key!r} already exists", attr="", obj=self
)
isDifferentVocab = p.vocab is not None and p.vocab is not self
isDifferentAlgebra = p.algebra is not self.algebra # algebra never None
if isDifferentVocab or isDifferentAlgebra:
raise ValidationError(
"Cannot add a semantic pointer that belongs to a different "
"vocabulary or algebra.",
attr="",
obj=self,
)
self._key2idx[key] = len(self._key2idx)
self._keys.append(key)
self._vectors = np.vstack([self._vectors, p.v])
[docs] def populate(self, pointers):
"""Populate the vocabulary with semantic pointers given an expression.
In its most basic form *pointers* is a string of names separated with
``;``::
vocab.populate('A; B; C')
Semantic Pointers can be constructed from other Semantic Pointers::
vocab.populate('A; B; C = 0.3 * A + 1.4 * C')
Those constructed Semantic Pointers are **not** normalized to
unit-length. This can be done by appending a ``normalized()`` call.
In the same way unitary Semantic Pointers can be obtained with
``unitary()``::
vocab.populate('A.unitary(); B; C = (A+B).normalized()')
Parameters
----------
pointers : string
The expression defining the semantic pointers to
add to the vocabulary.
"""
if len(pointers.strip()) <= 0:
return # Do nothing (and don't fail) for empty string.
for p_expr in pointers.split(";"):
assign_split = p_expr.split("=", 1)
modifier_split = p_expr.split(".", 1)
if len(assign_split) > 1:
name, value_expr = assign_split
value = eval(value_expr.strip(), {}, self)
elif len(modifier_split) > 1:
name = modifier_split[0]
value = self.create_pointer(transform=modifier_split[1])
else:
name = p_expr
value = self.create_pointer()
self.add(name.strip(), value)
[docs] def parse(self, text):
"""Evaluate a text string and return the corresponding SemanticPointer.
This uses the Python ``eval()`` function, so any Python operators that
have been defined for SemanticPointers are valid (``+``, ``-``, ``*``,
``~``, ``()``). Valid semantic pointer terms must start
with a capital letter.
If the expression returns a scalar (int or float), a scaled version
of the identity SemanticPointer will be returned.
"""
# The following line does everything. Note that self is being
# passed in as the locals dictionary, and thanks to the __getitem__
# implementation, this will automatically create new semantic
# pointers as needed.
try:
value = eval(text, {}, self)
except NameError as err:
raise SpaParseError(
f"Error parsing expression {text!r} with {self}: {str(err)}"
)
if is_number(value):
value *= Identity(self.dimensions)
elif not isinstance(value, semantic_pointer.SemanticPointer):
raise SpaParseError(
f"The result of parsing '{text}' is not a SemanticPointer."
)
return value
[docs] def parse_n(self, *texts):
"""Applies `parse` to each item in *texts* and returns the result."""
return [self.parse(t) for t in texts]
[docs] def dot(self, v):
"""Returns the dot product with all terms in the Vocabulary.
Parameters
----------
v : SemanticPointer or array_like
SemanticPointer to calculate dot product with.
"""
if isinstance(v, semantic_pointer.SemanticPointer):
v = v.v
return np.dot(self._vectors, v)
[docs] def create_subset(self, keys):
"""Returns a subset of this vocabulary.
Creates and returns a subset of the current vocabulary that contains
all the semantic pointers found in keys.
Parameters
----------
keys : sequence
List or set of semantic pointer names to be copied over to the
new vocabulary.
"""
# Make new Vocabulary object
subset = Vocabulary(
self.dimensions,
self.strict,
self.max_similarity,
pointer_gen=self.pointer_gen,
algebra=self.algebra,
)
# Copy over the new keys
for key in keys:
subset.add(key, self[key].reinterpret(subset))
return subset
[docs]class VocabularyMap(Mapping):
"""Maps dimensionalities to corresponding vocabularies.
Acts like a Python dictionary.
Parameters
----------
vocabs : sequence of Vocabulary
A list of vocabularies to add to the mapping. The dimensionalities
will be determined from the vocabulary objects.
rng : numpy.random.RandomState
Random number generator to use for newly created vocabularies (with
`.get_or_create`).
"""
def __init__(self, vocabs=None, rng=None):
if vocabs is None:
vocabs = []
self.rng = rng
self._vocabs = {}
try:
for vo in vocabs:
self.add(vo)
except (AttributeError, TypeError):
raise ValueError(
"The `vocabs` argument requires a list of Vocabulary "
"instances or `None`."
)
[docs] def add(self, vocab):
"""Add a vocabulary to the map.
The dimensionality will be determined from the vocabulary.
Parameters
----------
vocab : Vocaublary
Vocabulary to add.
"""
if vocab.dimensions in self._vocabs:
warnings.warn(
f"Duplicate vocabularies with dimension {vocab.dimensions}. "
"Using the last entry in the vocab list with "
"that dimensionality."
)
self._vocabs[vocab.dimensions] = vocab
def __delitem__(self, dimensions):
del self._vocabs[dimensions]
[docs] def discard(self, vocab):
"""Discard (remove) a vocabulary from the mapping.
Parameters
----------
vocab : int or Vocabulary
If an integer is given, the vocabulary associated to the
dimensionality will be discarded. If a `.Vocabulary` is given, that
specific instance will be discarded.
"""
if isinstance(vocab, int):
del self._vocabs[vocab]
elif self._vocabs.get(vocab.dimensions, None) is vocab:
del self._vocabs[vocab.dimensions]
def __getitem__(self, dimensions):
return self._vocabs[dimensions]
[docs] def get_or_create(self, dimensions):
"""Gets or creates a vocabulary of given dimensionality.
If the mapping already maps the given dimensionality to a vocabulary,
it will be returned. Otherwise, a new vocabulary will be created,
added to the mapping, and returned.
Parameters
----------
dimensions : int
Dimensionality of vocabulary to return.
Returns
-------
Vocabulary
Vocabulary of given dimensionality.
"""
if dimensions not in self._vocabs:
self._vocabs[dimensions] = Vocabulary(
dimensions,
strict=False,
pointer_gen=UnitLengthVectors(dimensions, self.rng),
)
return self._vocabs[dimensions]
def __iter__(self):
return iter(self._vocabs)
def __len__(self):
return len(self._vocabs)
def __contains__(self, vocab):
if isinstance(vocab, int):
return vocab in self._vocabs
else:
return (
vocab.dimensions in self._vocabs
and self._vocabs[vocab.dimensions] is vocab
)
[docs]class VocabularyMapParam(nengo.params.Parameter):
"""Nengo parameter that accepts `.VocabularyMap` instances.
Sequences of `.Vocabulary` will be coerced to `.VocabularyMap`.
"""
def coerce(self, instance, vocab_set):
vocab_set = super(VocabularyMapParam, self).coerce(instance, vocab_set)
if vocab_set is not None and not isinstance(vocab_set, VocabularyMap):
try:
vocab_set = VocabularyMap(vocab_set)
except ValueError:
raise ValidationError(
"Must be of type 'VocabularyMap' or compatible "
f"(got type {type(vocab_set).__name__}).",
attr=self.name,
obj=instance,
)
return vocab_set
[docs]class VocabularyOrDimParam(nengo.params.Parameter):
"""Nengo parameter that accepts `.Vocabulary` or integer dimensionality.
If an integer is assigned, the vocabulary will retrieved from the
instance's *vocabs* attribute with *vocabs.get_or_create(dimensions)*.
Thus, a class using *VocabularyOrDimParam* should also have an attribute
*vocabs* of type `VocabularyMap`.
"""
coerce_defaults = False
def coerce(self, instance, value):
value = super(VocabularyOrDimParam, self).coerce(instance, value)
if value is not None:
if is_integer(value):
if value < 1:
raise ValidationError(
"Vocabulary dimensionality must be at least 1.",
attr=self.name,
obj=instance,
)
value = instance.vocabs.get_or_create(value)
elif not isinstance(value, Vocabulary):
raise ValidationError(
"Must be of type 'Vocabulary' or an integer (got type "
f"{type(value).__name__}).",
attr=self.name,
obj=instance,
)
return value