import warnings
from nengo.exceptions import BuildError
import numpy as np
import scipy.sparse
from nengo_loihi.nxsdk_obfuscation import d
[docs]class LoihiBlock:
"""Class holding Loihi objects that can be placed on the chip.
This class can be thought of as a block of the Loihi board, and is how
Nengo Loihi keeps track of how Loihi Neuron cores will be configured.
Generally, the job of the build process is to convert Nengo objects
(ensembles, connections, nodes, and probes) to LoihiBlocks, which
will then be used by the `.EmulatorInterface` or `.HardwareInterface`.
Initially, parameters in a LoihiBlock are floating point values.
The `.discretize_block` function converts them to integer values inplace
for use on Loihi.
Attributes
----------
axons : list of Axon
Axon objects outputting from the compartments in the block.
compartment : Compartment
Compartment object representing all compartments in the block.
n_neurons : int
The number of neurons in the block.
named_synapses : dict {str: Synape}
Mapping from a name to a Synapse object.
label : string
A label for the block (for debugging purposes).
probes : list of Probe
Probes recording information from objects in the block.
synapses : list of Synapse
Synapse objects projecting to compartments in the block.
"""
def __init__(self, n_neurons, label=None):
self.n_neurons = n_neurons
self.label = label
self.compartment = Compartment(n_compartments=n_neurons)
self.axons = []
self.synapses = []
self.named_synapses = {}
self.probes = []
def __str__(self):
return "%s(%s)" % (type(self).__name__, self.label if self.label else "")
def add_synapse(self, synapse, name=None):
self.synapses.append(synapse)
if name is not None:
assert name not in self.named_synapses
self.named_synapses[name] = synapse
def add_axon(self, axon):
self.axons.append(axon)
def add_probe(self, probe):
assert probe.target is self
self.probes.append(probe)
class Config:
def __eq__(self, obj):
return isinstance(obj, type(self)) and all(
self.__dict__[key] == obj.__dict__[key] for key in self.params
)
def __hash__(self):
return hash(tuple(self.__dict__[key] for key in self.params))
[docs]class Compartment:
"""Stores information for configuring Loihi compartments.
The information stored here will be associated with some block,
and all compartments will share certain information.
While compartments are usually thought of neurons, we use compartments
to implement Nengo ensembles, nodes, and connection through special
decode neurons.
Before `.discretize_compartment` has been called, most attributes in
this class are floating-point values. Calling `.discretize_compartment`
converts them to integer values inplace for use on Loihi.
Attributes
----------
bias : (n,) ndarray
Compartment biases.
enable_noise : (n,) ndarray
Whether to enable noise for each compartment.
decay_u : (n,) ndarray
Input (synapse) decay constant for each compartment.
decay_v : (n,) ndarray
Voltage decay constant for each compartment.
label : string
A label for the block (for debugging purposes).
n_compartments : int
The number of compartments in the block.
noise_at_membrane : {0, 1}
Inject noise into current (0) or voltage (1).
noise_exp : float or int
Exponent for noise generation. Floating point values are base 10
in units of current or voltage. Integer values are in base 2.
noise_offset : float or int
Offset for noise generation.
refract_delay : (n,) ndarray
Compartment refractory delays, in time steps.
scale_u : bool
Scale input (U) by decay_u so that the integral of U is
the same before and after filtering.
scale_v : bool
Scale voltage (V) by decay_v so that the integral of V is
the same before and after filtering.
tau_s : float or None
Time constant used to set decay_u. None if decay_u has not been set.
vmax : float or int (range [2**9 - 1, 2**23 - 1])
Maximum voltage for all compartments, in the same units as ``vth``.
vmin : float or int (range [-2**23 + 1, 0])
Minimum voltage for all compartments, in the same units as ``vth``.
vth : (n,) ndarray
Compartment voltage thresholds.
"""
# threshold at which U/V scaling is allowed
DECAY_SCALE_TH = 0.5 / d(b"NDA5Ng==", int) # half of decay scaling unit
def __init__(self, n_compartments, label=None):
self.n_compartments = n_compartments
self.label = label
# parameters specific to compartments/block
self.decay_u = np.ones(n_compartments, dtype=np.float32)
# ^ default to no filter
self.decay_v = np.zeros(n_compartments, dtype=np.float32)
# ^ default to integration
self.tau_s = None
self.scale_u = True
self.scale_v = False
self.refract_delay = np.zeros(n_compartments, dtype=np.int32)
self.vth = np.zeros(n_compartments, dtype=np.float32)
self.bias = np.zeros(n_compartments, dtype=np.float32)
self.enable_noise = np.zeros(n_compartments, dtype=bool)
# parameters common to core
self.vmin = 0
self.vmax = np.inf
self.noise_offset = 0
self.noise_exp = 0
self.noise_at_membrane = 0
def __str__(self):
return "%s(%s)" % (type(self).__name__, self.label if self.label else "")
def _configure_filter(self, tau_s, dt):
decay_u = 1 if tau_s == 0 else -(np.expm1(-dt / np.asarray(tau_s)))
self.decay_u[:] = decay_u
self.scale_u = decay_u > self.DECAY_SCALE_TH
if not self.scale_u:
raise BuildError(
"Current (U) scaling is required. Perhaps a synapse time "
"constant is too large in your model."
)
[docs]class Axon:
"""A group of axons targeting a specific Synapse object.
Attributes
----------
compartment_atoms : list of length ``block.n_neurons``
Atom (weight index) associated with each block compartment.
compartment_map : list of length ``block.n_neurons``
Index of the axon in ``target`` targeted by each block compartment.
n_axons : int
The number of outgoing axons.
target : Synapse
Target synapses for these axons.
"""
[docs] class Spike:
"""A spike targeting a particular axon within a Synapse object.
The Synapse target is implicit, given by the Axon object that
creates this Spike.
Parameters
----------
axon_id : int
The index of the axon within the targeted Synapse object.
atom : int, optional (Default: 0)
An index into the target Synapse weights. This allows spikes
targeting a particular axon to use different weights.
"""
__slots__ = ["axon_id", "atom"]
def __init__(self, axon_id, atom=0):
self.axon_id = axon_id
self.atom = atom
def __repr__(self):
return "%s(axon_id=%d, atom=%d)" % (
type(self).__name__,
self.axon_id,
self.atom,
)
def __init__(self, n_axons, label=None):
self.n_axons = n_axons
self.label = label
self.target = None
self.compartment_map = None
self.compartment_atoms = None
def __str__(self):
return "%s(%s)" % (type(self).__name__, self.label if self.label else "")
@property
def pop_type(self):
return self.target.pop_type
@property
def slots_per_axon(self):
"""The number of axon_cfg slots occupied by each axon."""
return 2 if self.pop_type == 32 else 1
[docs] def axon_slots(self):
"""The total number of axon_cfg slots used by all axons."""
return self.slots_per_axon * self.n_axons
def map_axon(self, compartment_idxs):
return (
self.compartment_map[compartment_idxs]
if self.compartment_map is not None
else compartment_idxs
)
def map_atoms(self, compartment_idxs):
return (
self.compartment_atoms[compartment_idxs]
if self.compartment_atoms is not None
else [0 for _ in compartment_idxs]
)
def map_spikes(self, compartment_idxs):
axon_ids = self.map_axon(compartment_idxs)
atoms = self.map_atoms(compartment_idxs)
return [
self.Spike(axon_id, atom=atom) if axon_id >= 0 else None
for axon_id, atom in zip(axon_ids, atoms)
]
[docs] def set_compartment_axon_map(self, target_axons, atoms=None):
"""Set mapping from compartments to axons in target.
Parameters
----------
target_axons : array_like (``n_compartments``,)
Indices indicating which target axon each compartment maps to.
If < 0, the corresponding compartment will not be used with these
axons.
atoms : array_like (``n_compartments``,)
Atoms to use for each compartment. Use only if ``pop_type != 0``.
"""
self.compartment_map = target_axons
self.compartment_atoms = atoms
class SynapseConfig(Config):
INDEX_BITS_MAP = d(b"WzAsIDYsIDcsIDgsIDksIDEwLCAxMSwgMTJd", "list_int")
WEIGHT_BITS_MAP = d(b"WzAsIDEsIDIsIDMsIDQsIDUsIDYsIDhd", "list_int")
params = (
"weight_limit_mant",
"weight_limit_exp",
"weight_exp",
"disc_max_weight",
"learning_cfg",
"tag_bits",
"delay_bits",
"weight_bits",
"reuse_synapse_data",
"n_synapses",
"idx_offset",
"idx_mult",
"skip_bits",
"idx_bits",
"synapse_type",
"fanout_type",
"compression",
"stdp_cfg",
"ignore_delay",
)
def __init__(
self,
weight_limit_mant=0,
weight_limit_exp=0,
weight_exp=0,
disc_max_weight=0,
learning_cfg=0,
tag_bits=0,
delay_bits=0,
weight_bits=0,
reuse_synapse_data=0,
n_synapses=0,
idx_offset=0,
idx_mult=0,
skip_bits=0,
idx_bits=0,
synapse_type=0,
fanout_type=0,
compression=0,
stdp_cfg=0,
ignore_delay=0,
):
self.weight_limit_mant = weight_limit_mant
self.weight_limit_exp = weight_limit_exp
self.weight_exp = weight_exp
self.disc_max_weight = disc_max_weight
self.learning_cfg = learning_cfg
self.tag_bits = tag_bits
self.delay_bits = delay_bits
self.weight_bits = weight_bits
self.reuse_synapse_data = reuse_synapse_data
self.n_synapses = n_synapses
self.idx_offset = idx_offset
self.idx_mult = idx_mult
self.skip_bits = skip_bits
self.idx_bits = idx_bits
self.synapse_type = synapse_type
self.fanout_type = fanout_type
self.compression = compression
self.stdp_cfg = stdp_cfg
self.ignore_delay = ignore_delay
@classmethod
def get_real_weight_exp(cls, weight_exp):
return d(b"Ng==", int) + weight_exp
@classmethod
def get_scale(cls, weight_exp):
return 2 ** cls.get_real_weight_exp(weight_exp)
@property
def is_mixed(self):
return self.fanout_type == 1
@property
def real_idx_bits(self):
return self.INDEX_BITS_MAP[self.idx_bits]
@property
def real_weight_bits(self):
return self.WEIGHT_BITS_MAP[self.weight_bits]
@property
def real_weight_exp(self):
return self.get_real_weight_exp(self.weight_exp)
@property
def scale(self):
return self.get_scale(self.weight_exp)
@property
def shift_bits(self):
"""Number of bits the weight is right-shifted by."""
return d(b"OA==", int) - self.real_weight_bits + self.is_mixed
def bits_per_axon(self, n_weights):
"""For an axon with n weights, compute the weight memory bits used"""
bits_per_weight = self.real_weight_bits + self.delay_bits + self.tag_bits
if self.compression == d(b"MA==", int):
bits_per_weight += self.real_idx_bits
elif self.compression == d(b"Mw==", int):
pass
else:
raise NotImplementedError("Compression %s" % (self.compression,))
synapse_idx_bits = d(b"NA==", int)
n_synapses_bits = d(b"Ng==", int)
bits = 0
synapses_per_block = self.n_synapses + 1
for i in range(0, n_weights, synapses_per_block):
n = min(n_weights - i, synapses_per_block)
bits_i = n * bits_per_weight + synapse_idx_bits + n_synapses_bits
# round up to nearest memory unit
bits_i = -d(b"NjQ=", int) * (-bits_i // d(b"NjQ=", int))
bits += bits_i
return bits
def set(self, **kwargs):
for key, value in kwargs.items():
assert hasattr(self, key)
setattr(self, key, value)
[docs]class Synapse:
"""A group of Loihi synapses that share some properties.
Attributes
----------
axon_compartment_bases : list or None
List providing base (compartment offset) for each input axon.
axon_to_weight_map : dict or None
Map from input axon index to weight index, to allow weights to be
re-used by axons. If None, the weight index for an input axon is the
axon index.
indices : (population, axon, compartment) ndarray
The synapse indices.
learning : bool
Whether synaptic tracing and learning is enabled for these synapses.
learning_rate : float
The learning rate.
learning_wgt_exp : int
The weight exponent used on this connection if learning is enabled.
n_axons : int
Number of input axons to this group of synapses.
pop_type : int (0, 16, 32)
Whether these synapses are discrete (0), pop16, or pop32. This
determines the type of axons these synapses can connect to.
synapse_cfg : SynapseConfig
The synapse format object for these synapses.
tracing_mag : float
Magnitude by which the learning trace is increased for each spike.
tracing_tau : int
Decay time constant for the learning trace, in timesteps (not seconds).
weights : (n_axons,) list of (n_populations, n_compartments) ndarray
The synapse weights. Organized as a list of arrays so each axon
can have a different number of target compartments.
"""
def __init__(self, n_axons, label=None):
self.n_axons = n_axons
self.label = label
self.synapse_cfg = None
self.weights = None
self.indices = None
self.axon_compartment_bases = None
self.axon_to_weight_map = None
self.learning = False
self.learning_rate = 1.0
self.learning_wgt_exp = None
self.tracing_tau = None
self.tracing_mag = None
self.pop_type = 0 # one of (0, 16, 32) for discrete, pop16, pop32
def __str__(self):
return "%s(%s)" % (type(self).__name__, self.label if self.label else "")
def atom_bits(self):
max_populations = max(w.shape[0] for w in self.weights)
return int(np.ceil(np.log2(max_populations)))
def atom_bits_extra(self):
atom_bits = self.atom_bits()
assert atom_bits <= d(b"OQ==", int), "Too many atom bits"
return max(atom_bits - d(b"NQ==", int), 0)
def axon_bits(self):
if self.pop_type == 16:
return d(b"MTA=", int) - self.atom_bits_extra()
else:
return d(b"MTI=", int)
def axon_compartment_base(self, axon_idx):
if self.axon_compartment_bases is None:
return 0
base = self.axon_compartment_bases[axon_idx]
# negative indicates unused axon
return base if base >= 0 else None
def axon_populations(self, axon_idx):
weight_idx = self.axon_weight_idx(axon_idx)
return self.weights[weight_idx].shape[0]
def axon_weight_idx(self, axon_idx):
return (
self.axon_to_weight_map[axon_idx]
if self.axon_to_weight_map is not None
else axon_idx
)
def axon_weights_indices(self, axon_idx, atom=0):
weight_idx = self.axon_weight_idx(axon_idx)
w = self.weights[weight_idx]
i = self.indices[weight_idx]
return w[atom, :], i[atom, :]
def bits(self):
return sum(self.synapse_cfg.bits_per_axon(w.size) for w in self.weights)
def format(self, **kwargs):
if self.synapse_cfg is None:
self.synapse_cfg = SynapseConfig()
self.synapse_cfg.set(**kwargs)
def idx_bits(self):
bits = int(np.ceil(np.log2(self.max_ind() + 1)))
assert (
bits <= SynapseConfig.INDEX_BITS_MAP[-1]
), "bits out of range, ensemble too large?"
bits = next(i for i, v in enumerate(SynapseConfig.INDEX_BITS_MAP) if v >= bits)
return bits
def idxs_per_synapse(self):
return d(b"Mg==", int) if self.learning else d(b"MQ==", int)
def max_abs_weight(self):
return max(np.abs(w).max() if w.size > 0 else -np.inf for w in self.weights)
def max_ind(self):
return max(i.max() if len(i) > 0 else -1 for i in self.indices)
def _set_weights_indices(self, weights, indices=None):
weights = [np.array(w, copy=False, dtype=np.float32, ndmin=2) for w in weights]
assert all(
w.ndim == 2 for w in weights
), "Weights must be shape (n_axons,) (n_populations, n_compartments)"
assert all(
w.shape[0] == weights[0].shape[0] for w in weights
), "All axon weights must have the same number of populations"
self.weights = weights
if indices is None:
indices = [
np.zeros((w.shape[0], 1), dtype=np.int32)
+ np.arange(w.shape[1], dtype=np.int32)
for w in self.weights
]
indices = [np.array(i, copy=False, dtype=np.int32, ndmin=2) for i in indices]
assert all(
i.ndim == 2 for i in indices
), "Indices must be shape (n_axons,) (n_populations, n_compartments)"
assert all(
i.shape == w.shape for i, w in zip(indices, weights)
), "Indices shapes must match weights shapes"
assert len(weights) == len(indices)
self.indices = indices
def set_weights(self, weights):
if isinstance(weights, scipy.sparse.spmatrix):
csr = weights.tocsr()
weights_by_row, idxs_by_row = [], []
for i in range(weights.shape[0]):
i0, i1 = csr.indptr[i : i + 2]
weights_by_row.append(csr.data[i0:i1])
idxs_by_row.append(csr.indices[i0:i1])
weights = weights_by_row
indices = idxs_by_row
else:
weights = np.array(weights, copy=False, dtype=np.float32)
assert weights.ndim == 2
indices = None
assert len(weights) == self.n_axons, "Must have different weights for each axon"
self._set_weights_indices(weights, indices=indices)
self.format(
compression=d(b"Mw==", int),
idx_bits=self.idx_bits(),
fanout_type=d(b"MQ==", int),
n_synapses=d(b"NjM=", int),
weight_bits=d(b"Nw==", int),
)
def set_learning(
self, learning_rate=1.0, tracing_tau=2, tracing_mag=1.0, wgt_exp=4
):
assert tracing_tau == int(tracing_tau), "tracing_tau must be integer"
self.learning = True
self.tracing_tau = int(tracing_tau)
self.tracing_mag = tracing_mag
# stdp_cfg hard-coded for now (see hardware.builder)
self.format(learning_cfg=d(b"MQ==", int), stdp_cfg=d(b"MA==", int))
self.train_epoch = 2
self.learn_epoch_k = 1
self.learn_epoch = self.train_epoch * 2 ** self.learn_epoch_k
self.learning_rate = learning_rate * self.learn_epoch
self.learning_wgt_exp = wgt_exp
def set_population_weights(
self, weights, indices, axon_to_weight_map, compartment_bases, pop_type=None
):
self._set_weights_indices(weights, indices)
self.axon_to_weight_map = axon_to_weight_map
self.axon_compartment_bases = compartment_bases
self.pop_type = 16 if pop_type is None else pop_type
idx_bits = self.idx_bits()
self.format(
compression=d(b"MA==", int),
idx_bits=idx_bits,
fanout_type=d(b"MQ==", int),
n_synapses=d(b"NjM=", int),
weight_bits=d(b"Nw==", int),
)
def size(self):
return sum(w.size for w in self.weights)
[docs]class Probe:
"""Record data from compartment states of a LoihiBlock.
Parameters
----------
target : LoihiBlock
The block to record values from. Use ``slice`` to record from a subset
of compartments.
key : string ('current', 'voltage', 'spiked')
The compartment attribute to probe.
slice : slice or list
Select a subset of the compartments in the block to record from.
synapse : nengo.synapses.Synapse
A synapse to use for filtering the probe.
weights : np.ndarray
A linear transformation to apply to the outputs.
"""
_slice = slice
def __init__(self, target=None, key=None, slice=None, weights=None, synapse=None):
self.target = target
self.key = key
self.slice = slice if slice is not None else self._slice(None)
self.weights = weights
self.synapse = synapse
self.use_snip = False
self.snip_info = None