"""
The Simulator class is the access point for the main features of NengoDL,
including `running <.Simulator.run_steps>` and `training <.Simulator.fit>`
a model.
"""
import collections
import contextlib
import copy
from functools import partial
import logging
import textwrap
import warnings
import jinja2
from nengo import (
Connection,
Direct,
Ensemble,
Network,
Node,
Probe,
)
from nengo import rc as nengo_rc
from nengo.builder.connection import BuiltConnection
from nengo.builder.ensemble import BuiltEnsemble
from nengo.ensemble import Neurons
from nengo.exceptions import (
ReadonlyError,
SimulatorClosed,
NengoWarning,
SimulationError,
ValidationError,
)
from nengo.solvers import NoSolver
from nengo.transforms import Convolution, Dense, Sparse, SparseMatrix
from nengo.utils.magic import decorator
import numpy as np
import tensorflow as tf
from nengo_dl import callbacks, compat, config, utils
from nengo_dl.builder import NengoBuilder, NengoModel
from nengo_dl.tensor_graph import TensorGraph
logger = logging.getLogger(__name__)
@decorator
def with_self(wrapped, instance, args, kwargs):
"""A decorator that can be used to ensure that any TensorFlow operations happening
within a method will use the settings associated with this Simulator."""
keras_dtype = tf.keras.backend.floatx()
tf.keras.backend.set_floatx(instance.tensor_graph.dtype)
with tf.device(instance.tensor_graph.device):
output = wrapped(*args, **kwargs)
tf.keras.backend.set_floatx(keras_dtype)
return output
@decorator
def require_open(wrapped, instance, args, kwargs):
"""A decorator that can be used to mark methods that require the Simulator to
be open."""
if instance.closed:
raise SimulatorClosed(
"Cannot call %s after simulator is closed" % wrapped.__name__
)
return wrapped(*args, **kwargs)
def fill_docs(*args, **kwargs):
"""Stores documentation for common arguments in one place, to avoid duplication,
and then fills them in automatically in the docstring."""
docs = {
"x": """
{% set uses_y = func_name in ("fit", "evaluate") %}
{% if func_name in ("predict_on_batch", "run_steps") %}
{% set batch_size = 1 %}
{% else %}
{% set batch_size = 50 %}
{% endif %}
Data for input Nodes in the model. This argument is optional; if
it is not specified, then data will automatically be generated
according to the inputs specified in the Node definitions (e.g., by calling
the output function associated with that Node).
``{{ param_name }}`` can be specified as:
- A dictionary of {`nengo.Node` or str: `numpy.ndarray`}
indicating the input values for the given nodes. Nodes can be referred
to by the Node object itself or by a string name, which will be
``Node.label`` if one was specified or ``"node"``
(duplicate names will have a number appended, corresponding to the order
found in `nengo.Network.all_nodes`).
- A list of `numpy.ndarray` indicating the input values for each
input Node, ordered according to the order in which the Nodes were
added to the model (this corresponds to the order found in
`nengo.Network.all_nodes`).
- A `numpy.ndarray` indicating the input value for a single input Node.
{% if func_name not in ("predict_on_batch", "run_steps") %}
- A generator or ``tf.data.Dataset`` that produces one of the above.
{% endif %}
All inputs should have shape ``(batch_size, n_steps, node.size_out)``.
For example, if the model only has a single input Node, then
``{{ param_name }}`` can simply be an ndarray of data for that Node.
.. testcode::
with nengo.Network() as net:
a = nengo.Node([0])
p = nengo.Probe(a)
with nengo_dl.Simulator(net) as sim:
{% if uses_y %}
sim.compile(loss="mse")
sim.{{ func_name }}(
{{ param_name }}=np.ones((50, 10, 1)), y=np.ones((50, 10, 1)))
{% elif func_name == "run_steps" %}
sim.{{ func_name }}(
10, {{ param_name }}=np.ones(({{ batch_size }}, 10, 1)))
{% else %}
sim.{{ func_name }}(
{{ param_name }}=np.ones(({{ batch_size }}, 10, 1)))
{% endif %}
{% if uses_y %}
.. testoutput::
:hide:
...
{% endif %}
If the network has multiple inputs, then ``{{ param_name }}`` can be specified
as a dictionary mapping `nengo.Node` objects to arrays, e.g.
.. testcode::
with nengo.Network() as net:
a = nengo.Node([0])
b = nengo.Node([0, 0])
p = nengo.Probe(a)
with nengo_dl.Simulator(net) as sim:
{% if uses_y %}
sim.compile(loss="mse")
sim.{{ func_name }}(
{{ param_name }}={
a: np.ones((50, 10, 1)),
b: np.ones((50, 10, 2))
},
y=np.ones((50, 10, 1))
)
{% elif func_name == "run_steps" %}
sim.{{ func_name }}(
10,
{{ param_name }}={
a: np.ones(({{ batch_size }}, 10, 1)),
b: np.ones(({{ batch_size }}, 10, 2))
}
)
{% else %}
sim.{{ func_name }}(
{{ param_name }}={
a: np.ones(({{ batch_size }}, 10, 1)),
b: np.ones(({{ batch_size }}, 10, 2))
}
)
{% endif %}
{% if uses_y %}
.. testoutput::
:hide:
...
{% endif %}
If an input value is not specified for one of the Nodes in the model then
data will be filled in automatically according to the Node definition.
{% if func_name not in ("predict_on_batch", "run_steps") %}
For dynamic input types (e.g., ``tf.data`` pipelines or generators), NengoDL
tries to avoid introspecting/altering the data before the
simulation starts, as this may have unintended side-effects. So data must be
specified via one of the standard Keras methods (arrays, list of arrays, or
string name dictionary; using a dictionary of Node objects is not supported).
In addition, data must be explicitly provided for all input nodes (it will not
be automatically generated if data is not specified).
In addition, when using dynamic inputs, data must be provided for the
special ``"n_steps"`` input. This specifies the number of timesteps that the
simulation will run for. Technically this is just a single scalar value
(e.g., ``10``). But Keras requires that all input data be batched, so that
input value needs to be duplicated into an array with size
``(batch_size, 1)`` (where all entries have the same value, e.g. ``10``).
{% if uses_y %}
Also keep in mind that when using a dynamic input for ``x`` the ``y`` parameter
is unused, and instead the generator should return ``(x, y)`` pairs.
{% endif %}
.. testcode::
with nengo.Network() as net:
a = nengo.Node([0], label="a")
p = nengo.Probe(a, label="p")
with nengo_dl.Simulator(net) as sim:
{% if uses_y %}
dataset = tf.data.Dataset.from_tensor_slices(
({"a": tf.ones((50, 10, 1)),
"n_steps": tf.ones((50, 1), dtype=tf.int32) * 10},
{"p": tf.ones((50, 10, 1))})
).batch(sim.minibatch_size)
sim.compile(loss="mse")
sim.{{ func_name }}({{ param_name }}=dataset)
{% else %}
dataset = tf.data.Dataset.from_tensor_slices(
{"a": tf.ones((50, 10, 1)),
"n_steps": tf.ones((50, 1), dtype=tf.int32) * 10}
).batch(sim.minibatch_size)
sim.{{ func_name }}({{ param_name }}=dataset)
{% endif %}
{% if uses_y %}
.. testoutput::
:hide:
...
{% endif %}
{% endif %}
""",
"y": """
Target values for Probes in the model. These can be specified in the same
ways as the input values in ``x``, except using Probes instead of Nodes.
All targets should have shape ``(batch_size, n_steps, probe.size_in)``.
For example,
.. testcode::
with nengo.Network() as net:
a = nengo.Node([0])
p = nengo.Probe(a)
with nengo_dl.Simulator(net) as sim:
sim.compile(loss="mse")
sim.{{ func_name }}(
x={a: np.zeros((50, 10, 1))}, y={p: np.zeros((50, 10, 1))})
.. testoutput::
:hide:
...
Note that data is only specified for the probes used in the loss function
(specified when calling `.Simulator.compile`). For example, if we have two
probes, but only one is used during training (the other is used for data
collection during inference), we could set this up like:
.. testcode::
with nengo.Network() as net:
a = nengo.Node([0])
b = nengo.Node([0])
p_a = nengo.Probe(a)
p_b = nengo.Probe(b)
with nengo_dl.Simulator(net) as sim:
# compiled loss function only depends on p_a
sim.compile(loss={p_a: "mse"})
# only specify data for p_a
sim.{{ func_name }}(
x={a: np.zeros((50, 10, 1))}, y={p_a: np.zeros((50, 10, 1))})
.. testoutput::
:hide:
...
``y`` is not used if ``x`` is a generator. Instead, the generator passed to
``x`` should yield ``(x, y)`` tuples, where ``y`` is in one of the formats
described above.
""",
"n_steps": """
The number of simulation steps to be executed. This parameter is optional;
if not specified, the number of simulation steps will be inferred from the
input data. However, this parameter can be useful if you don't want to
specify input data (you just want to use the inputs defined by the Nengo
Nodes), or if your model does not have any input Nodes (so there is no data
to be passed in).
""",
"stateful": """
This parameter controls whether or not the saved internal stimulation state
will be updated after a run completes. If ``stateful=False`` then the initial
state of future runs will be unaffected by this run. With ``stateful=True``,
future runs will begin from the terminal state of this run.
For example,
.. code-block:: python
# begins in state0, terminates in state1
sim.{{ func_name }}(..., stateful=False)
# begins in state0, terminates in state2
sim.{{ func_name }}(..., stateful=True)
# begins in state2, terminates in state3
sim.{{ func_name }}(..., stateful=False)
# begins in state2, terminates in state4
sim.{{ func_name }}(..., stateful=True)
Note that `.Simulator.reset` can be used to reset the state to initial
conditions at any point.
""",
}
# use default name for args
for arg in args:
kwargs[arg] = arg
env = jinja2.Environment(trim_blocks=True, lstrip_blocks=True)
def fill_documentation(func):
rendered_docs = {}
for name, template in kwargs.items():
doc = docs[template]
# fill in variables
doc = env.from_string(doc).render(param_name=name, func_name=func.__name__)
# correct indentation
doc = textwrap.indent(doc, " " * 4)
doc = doc.strip()
rendered_docs[name] = doc
# insert result into docstring
func.__doc__ = env.from_string(func.__doc__).render(**rendered_docs)
return func
return fill_documentation
[docs]class Simulator: # pylint: disable=too-many-public-methods
"""
Simulate network using the ``nengo_dl`` backend.
Parameters
----------
network : `nengo.Network`
A network object to be built and then simulated.
dt : float
Length of a simulator timestep, in seconds.
seed : int
Seed for all stochastic operators used in this simulator.
model : `~nengo.builder.Model`
Pre-built model object (mainly used for debugging).
device : None or ``"/cpu:0"`` or ``"/gpu:[0-n]"``
This specifies the computational device on which the simulation will
run. The default is ``None``, which means that operations will be assigned
according to TensorFlow's internal logic (generally speaking, this means that
things will be assigned to the GPU if GPU support is available,
otherwise everything will be assigned to the CPU). The device can be set
manually by passing the `TensorFlow device specification
<https://www.tensorflow.org/api_docs/python/tf/Graph#device>`_ to this
parameter. For example, setting ``device="/cpu:0"`` will force everything
to run on the CPU. This may be worthwhile for small models, where the extra
overhead of communicating with the GPU outweighs the actual computations. On
systems with multiple GPUs, ``device="/gpu:0"``/``"/gpu:1"``/etc. will select
which one to use.
unroll_simulation : int
This controls how many simulation iterations are executed each time through
the outer simulation loop. That is, we could run 20 timesteps as
.. code-block:: python
for i in range(20):
<run 1 step>
or
.. code-block:: python
for i in range(5):
<run 1 step>
<run 1 step>
<run 1 step>
<run 1 step>
This is an optimization process known as "loop unrolling", and
``unroll_simulation`` controls how many simulation steps are unrolled. The
first example above would correspond to ``unroll_simulation=1``, and the
second would be ``unroll_simulation=4``.
Unrolling the simulation will result in faster simulation speed, but increased
build time and memory usage.
In general, unrolling the simulation will have no impact on the output of a
simulation. The only case in which unrolling may have an impact is if
the number of simulation steps is not evenly divisible by
``unroll_simulation``. In that case extra simulation steps will be executed,
which could change the internal state of the simulation and
will affect any subsequent calls to ``sim.run``. So it is recommended that the
number of steps always be evenly divisible by ``unroll_simulation``.
minibatch_size : int
The number of simultaneous inputs that will be passed through the
network. For example, a single call to `.Simulator.run` will process
``minibatch_size`` input instances in parallel. Or when calling
`.Simulator.predict`/`.Simulator.fit` with a batch of data, that data will be
divided up into ``minibatch_size`` chunks.
progress_bar : bool
If True (default), display progress information when building a model. This will
also be the default for the ``progress_bar`` argument within `.Simulator.run`
and `.Simulator.run_steps`.
Attributes
----------
data : `.SimulationData`
Stores simulation data and parameter values (in particular, the recorded output
from probes after calling `.Simulator.run` can be accessed through
``sim.data[my_probe]``).
model : `nengo.builder.Model`
Built Nengo model, containing the data that defines the network to be simulated.
keras_model : ``tf.keras.Model``
Keras Model underlying the simulation (implements the inference/training loops).
tensor_graph : `.tensor_graph.TensorGraph`
Keras Layer implementing the Nengo simulation (built into ``keras_model``).
"""
def __init__(
self,
network,
dt=0.001,
seed=None,
model=None,
device=None,
unroll_simulation=1,
minibatch_size=None,
progress_bar=True,
):
self.closed = None
self.unroll = unroll_simulation
self.minibatch_size = 1 if minibatch_size is None else minibatch_size
self.data = SimulationData(self, minibatch_size is not None)
if seed is None:
if network is not None and network.seed is not None:
seed = network.seed + 1
else:
seed = np.random.randint(np.iinfo(np.int32).max)
if device is None and not utils.tf_gpu_installed:
warnings.warn(
"No GPU support detected. See "
"https://www.nengo.ai/nengo-dl/installation.html#installing-tensorflow "
"for instructions on setting up TensorFlow with GPU support."
)
logger.info("Running on CPU")
else:
logger.info(
"Running on %s",
"CPU/GPU" if device is None else ("CPU" if "cpu" in device else "GPU"),
)
self.progress_bar = progress_bar
ProgressBar = utils.ProgressBar if progress_bar else utils.NullProgressBar
# build model (uses default nengo builder)
nengo_precision = nengo_rc.get("precision", "bits")
nengo_rc.set(
"precision",
"bits",
config.get_setting(model or network, "dtype", "float32")[-2:],
)
if model is None:
self.model = NengoModel(
dt=float(dt),
label="%s, dt=%f" % (network, dt),
builder=NengoBuilder(),
fail_fast=False,
)
else:
if dt != model.dt:
warnings.warn(
"Model dt (%g) does not match Simulator "
"dt (%g)" % (model.dt, dt),
NengoWarning,
)
self.model = model
if network is not None:
p = ProgressBar("Building network", "Build")
self.model.build(network, progress=p)
nengo_rc.set("precision", "bits", nengo_precision)
self.stateful = config.get_setting(self.model, "stateful", True)
# set up tensorflow graph plan
with ProgressBar(
"Optimizing graph", "Optimization", max_value=None
) as progress:
self.tensor_graph = TensorGraph(
self.model,
self.dt,
unroll_simulation,
self.minibatch_size,
device,
progress,
seed,
)
# build keras models
with ProgressBar(
"Constructing graph", "Construction", max_value=None
) as progress:
self._build_keras(progress)
# initialize sim attributes
self._n_steps = self._time = 0
for p in self.model.probes:
self.model.params[p] = []
self.closed = False
@with_self
def _build_keras(self, progress=None):
"""
Build the underlying Keras model that drives the simulation.
Parameters
----------
progress : `.utils.ProgressBar`
Progress bar for construction stage.
"""
self.node_inputs, n_steps = self.tensor_graph.build_inputs()
inputs = list(self.node_inputs.values()) + [n_steps]
outputs = self.tensor_graph(inputs, stateful=self.stateful, progress=progress)
self.keras_model = tf.keras.Model(
inputs=inputs, outputs=outputs, name="keras_model",
)
# set more informative output names
# keras names them like LayerName_i, whereas we would like to have the names
# associated with the probes
self.keras_model.output_names = [
self.tensor_graph.io_names[p] for p in self.model.probes
] + ["steps_run"]
self.tensor_graph.build_post()
[docs] @require_open
@with_self
def reset(
self,
seed=None,
include_trainable=True,
include_probes=True,
include_processes=True,
):
"""
Resets the simulator to initial conditions.
Parameters
----------
seed : int
If not None, overwrite the default simulator seed with this value
(note: this becomes the new default simulator seed).
include_trainable : bool
If True (default), also reset any online or offline training that has been
performed on simulator parameters (e.g., connection weights).
include_probes : bool
If True (default), also clear probe data.
include_processes: bool
If True (default), also reset all `nengo.Process` objects in the model.
Notes
-----
Changing the TensorFlow seed only affects ops created from then on; it has
no impact on existing ops (either changing their seed or resetting their random
state). So calling `.Simulator.reset` will likely have no impact on any
TensorFlow randomness (it will still affect numpy randomness, such as in a
`nengo.Process`, as normal).
"""
reset_vars = (
list(self.tensor_graph.saved_state.items()) if self.stateful else []
)
if include_trainable:
reset_vars.extend(self.tensor_graph.base_params.items())
if compat.eager_enabled():
for key, var in reset_vars:
var.assign(
# TODO: cache these instead of regenerating each time
self.tensor_graph.initial_values[key](var.shape, dtype=var.dtype)
)
else:
tf.keras.backend.batch_get_value([var.initializer for _, var in reset_vars])
if include_probes:
for p in self.model.probes:
self.model.params[p] = []
self._update_steps()
# update rng
if seed is not None:
warnings.warn(
"Changing the seed will not affect any TensorFlow operations "
"created before the seed was updated"
)
self.tensor_graph.seed = seed
if include_processes:
self.tensor_graph.build_post()
[docs] @require_open
@with_self
def soft_reset(self, include_trainable=False, include_probes=False):
"""
Deprecated, use `.Simulator.reset` instead.
"""
warnings.warn(
"Simulator.soft_reset is deprecated, use Simulator.reset("
"include_trainable=False, include_probes=False, include_processes=False) "
"instead",
DeprecationWarning,
)
self.reset(
seed=None,
include_trainable=include_trainable,
include_probes=include_probes,
include_processes=False,
)
[docs] @require_open
@fill_docs("x", "n_steps", "stateful")
def predict(self, x=None, n_steps=None, stateful=False, **kwargs):
"""
Generate output predictions for the input samples.
Computation is (optionally) done in batches.
This function implements the `tf.keras.Model.predict
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict>`_ API.
Parameters
----------
x
{{ x }}
n_steps : int
{{ n_steps }}
stateful : bool
{{ stateful}}
kwargs: dict
Will be passed on to `tf.keras.Model.predict
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict>`_.
Returns
-------
probe_values : dict of {`nengo.Probe`: `numpy.ndarray`}
Output values from all the Probes in the network.
"""
return self._call_keras(
"predict", x=x, n_steps=n_steps, stateful=stateful, **kwargs
)
[docs] @require_open
@fill_docs("x", "n_steps", "stateful")
def predict_on_batch(self, x=None, n_steps=None, stateful=False, **kwargs):
"""
Generate output predictions for a single minibatch of input samples.
Batch size is determined by ``sim.minibatch_size`` (i.e., inputs must have
shape ``(sim.minibatch_size, n_steps, node.size_in)``.
This function implements the `tf.keras.Model.predict_on_batch
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict_on_batch>`_
API.
Parameters
----------
x
{{ x }}
n_steps : int
{{ n_steps }}
stateful : bool
{{ stateful }}
kwargs: dict
Will be passed on to `tf.keras.Model.predict_on_batch
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#predict_on_batch>`_.
Returns
-------
probe_values : dict of {`nengo.Probe`: `numpy.ndarray`}
Output values from all the Probes in the network.
"""
# need to reset if simulator is stateful but this call is not stateful
need_reset = not stateful and self.stateful
# predict_on_batch doesn't support callbacks, so we do it manually
if need_reset:
cbk = callbacks.IsolateState(self)
# note: setting stateful to self.stateful so that the inner _call_keras won't
# try to do any resetting
output = self._call_keras(
"predict_on_batch", x=x, n_steps=n_steps, stateful=self.stateful, **kwargs
)
if need_reset:
cbk.reset()
self._update_steps()
return output
[docs] @require_open
@with_self
def compile(self, *args, loss=None, metrics=None, loss_weights=None, **kwargs):
"""
Configure the model for training/evaluation.
Parameters
----------
args
Will be passed on to `tf.keras.Model.compile
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#compile>`_.
loss
Loss functions define the error that will be minimized during
training.
Losses can be specified as:
- A `tf.losses.Loss
<https://www.tensorflow.org/api_docs/python/tf/keras/losses>`_ instance.
- A string matching the name of one of the loss functions above.
- A function that accepts two arguments (``y_true, y_pred``) and returns
a loss value (represented as a ``tf.Tensor``).
- A list of some combination of the above, indicating different loss
functions for each output Probe (ordered according to the order in
which Probes were added to the model, which corresponds to the order
found in ``Simulator.model.probes``).
- A dictionary mapping Probe instances or names to loss functions.
The total loss minimized during training will be the sum over the loss
computed on each Probe (possibly weighted by ``loss_weights``).
For example,
.. testcode::
with nengo.Network() as net:
node0 = nengo.Node([0])
node1 = nengo.Node([0])
probe0 = nengo.Probe(node0)
probe1 = nengo.Probe(node1)
with nengo_dl.Simulator(net) as sim:
sim.compile(loss={probe0: "mse", probe1: tf.losses.mae})
would compile ``probe0`` to use mean squared error and ``probe1`` to use
mean absolute error.
metrics
Metrics are additional values (generally different kinds of losses) that
will be computed during training for tracking purposes, but do not affect
the result of the training.
They can be specified in all the same ways as ``loss`` above.
In addition, multiple metrics can be specified for each output Probe when
using a list or dict, by providing multiple functions in a list (e.g.,
``metrics={my_probe: ["mae", "mse"]}``).
loss_weights : list or dict
Scalar weights that will be applied to the loss value computed for each
output probe before summing them to compute the overall training loss. Can
be a list (order corresponding to the order in ``loss``) or a dict mapping
Probe instances/names to weights.
kwargs
Will be passed on to `tf.keras.Model.compile
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#compile>`_.
"""
# convert inputs to canonical name dict form
loss = self._standardize_data(loss, self.model.probes, broadcast_unary=True)
metrics = self._standardize_data(
metrics, self.model.probes, broadcast_unary=True
)
loss_weights = self._standardize_data(loss_weights, self.model.probes)
self.keras_model.compile(
*args, loss=loss, metrics=metrics, loss_weights=loss_weights, **kwargs
)
[docs] @require_open
@fill_docs("x", "y", "n_steps", "stateful")
def fit(self, x=None, y=None, n_steps=None, stateful=False, **kwargs):
"""
Trains the model on some dataset.
Note that if the model contains spiking neurons, during the execution of this
function those neurons will be swapped for the equivalent non-spiking
implementation (as opposed to, e.g., `Simulator.evaluate`, which will
use the spiking implementation).
Optimizer and loss functions are defined separately in `.Simulator.compile`.
This function implements the `tf.keras.Model.fit
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit>`_ API.
Parameters
----------
x
{{ x }}
y
{{ y }}
n_steps : int
{{ n_steps }}
stateful : bool
{{ stateful }}
kwargs: dict
Will be passed on to `tf.keras.Model.fit
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit>`_.
Returns
-------
history : ``tf.keras.callbacks.History``
The history has two attributes: ``history.epoch`` is the list of epoch
numbers, and ``history.history`` is a dictionary keyed by metric names
(e.g., "loss") containing a list of values of those metrics from each epoch.
"""
# if validation data is None or a dataset we don't do anything, but
# otherwise we apply the same data augmentation/validation
# as for x and y
if isinstance(kwargs.get("validation_data", None), (list, tuple)):
validation_data = kwargs["validation_data"]
x_val = validation_data[0]
x_val = self._generate_inputs(x_val, n_steps=n_steps)
self._check_data(x_val, n_steps=n_steps)
y_val = validation_data[1]
y_val = self._standardize_data(y_val, self.model.probes)
self._check_data(y_val, n_steps=None, nodes=False)
if len(validation_data) == 2:
kwargs["validation_data"] = (x_val, y_val)
else:
kwargs["validation_data"] = (x_val, y_val, validation_data[2])
return self._call_keras(
"fit", x=x, y=y, n_steps=n_steps, stateful=stateful, **kwargs
)
[docs] @require_open
@fill_docs("x", "y", "n_steps", "stateful")
def evaluate(self, x=None, y=None, n_steps=None, stateful=False, **kwargs):
"""
Compute the loss and metric values for the network.
Loss functions and other metrics are defined separately in `.Simulator.compile`.
This function implements the `tf.keras.Model.evaluate
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#evaluate>`_ API.
Parameters
----------
x
{{ x }}
y
{{ y }}
n_steps : int
{{ n_steps }}
stateful : bool
{{ stateful }}
kwargs: dict
Will be passed on to `tf.keras.Model.evaluate
<https://www.tensorflow.org/api_docs/python/tf/keras/Model#evaluate>`_.
Returns
-------
outputs : dict of {str: `numpy.ndarray`}
Computed loss/metric values. The overall loss will be in
``outputs["loss"]``, and values for each Probe will be in
``outputs["probe_name_loss"]`` or ``outputs["probe_name_metric_name"]``.
"""
return self._call_keras(
"evaluate", x=x, y=y, n_steps=n_steps, stateful=stateful, **kwargs
)
@with_self
def _call_keras(
self, func_type, x=None, y=None, n_steps=None, stateful=False, **kwargs
):
"""
Internal base function for all the predict, fit, and evaluate functions.
Parameters
----------
func_type : "predict" or "predict_on_batch" or "fit" or "evaluate"
The underlying function to call on the Keras model.
x
See description in documentation of ``<func_type>`` method.
y
See description in documentation of ``<func_type>`` method.
n_steps : int
See description in documentation of ``<func_type>`` method.
stateful : bool
See description in documentation of ``<func_type>`` method.
kwargs : dict
Will be passed to the underlying Keras function.
Returns
-------
See description in documentation of ``<func_type>`` method.
"""
if func_type.startswith("fit") and self.tensor_graph.inference_only:
raise SimulationError(
"Network was created with inference_only=True, cannot "
"be run in training mode"
)
if stateful and not self.stateful:
warnings.warn(
"Ignoring stateful=True, since the model was created with the "
"stateful=False config setting."
)
if "batch_size" in kwargs:
# note: the keras "batch size" parameter refers to minibatch size
# (i.e., the number of elements passed to the network in each iteration,
# rather than the total number of elements in the data)
warnings.warn(
"Batch size is determined statically via Simulator.minibatch_size; "
"ignoring value passed to `%s`" % func_type
)
if "on_batch" not in func_type:
kwargs["batch_size"] = (
self.minibatch_size if compat.eager_enabled() else None
)
# TODO: apply standardize/generate/check data to generator somehow
# maybe move it into a callback where the generated data is available?
x = self._generate_inputs(x, n_steps=n_steps)
self._check_data(
x,
n_steps=n_steps,
batch_size=self.minibatch_size if "on_batch" in func_type else None,
)
if isinstance(x, dict):
input_steps = x["n_steps"][0, 0]
input_batch = x["n_steps"].shape[0]
else:
input_steps = None
input_batch = self.minibatch_size if "on_batch" in func_type else None
if y is not None:
y = self._standardize_data(y, self.model.probes)
# we set n_steps=None because targets do not necessarily need to have
# the same number of timesteps as input (depending on the loss function)
self._check_data(y, n_steps=None, batch_size=input_batch, nodes=False)
if kwargs.get("validation_split", 0) != 0 and input_batch is not None:
# validation_split is only a kwarg in `fit`, but we do it here because
# we need to know `input_batch`.
# split math set up to match
# `keras.engine.training_utils.split_training_and_validation_data`.
split = int(input_batch * (1 - kwargs["validation_split"]))
if (
split % self.minibatch_size != 0
or (input_batch - split) % self.minibatch_size != 0
):
raise ValidationError(
"Split data is not evenly divisible by minibatch size",
"validation_split",
)
# warn for synapses with n_steps=1
# note: we don't warn if stateful, since there could be effects across runs
if not stateful:
if compat.eager_enabled():
target_probes = [
p
for p, e in zip(self.model.probes, self.keras_model.output_names)
if self.keras_model.compiled_loss is None
or self.keras_model.compiled_loss._losses is None
or e in self.keras_model.compiled_loss._losses
]
else:
target_probes = [
p
for p, e in zip(
self.model.probes,
getattr(self.keras_model, "_training_endpoints", []),
)
if not e.should_skip_target()
]
synapses = [
x.synapse is not None
for x in (self.model.toplevel.all_connections + target_probes)
]
if input_steps == 1 and self.model.toplevel is not None and any(synapses):
warnings.warn(
"Running for one timestep, but the network contains "
"synaptic filters (which will introduce at least a "
"one-timestep delay); did you mean to set synapse=None?"
)
# set up callback to reset state after execution.
# only necessary if simulator is stateful but this call is not stateful
if not stateful and self.stateful:
kwargs["callbacks"] = (kwargs.get("callbacks", None) or []) + [
callbacks.IsolateState(self)
]
# call underlying keras function
if "predict" in func_type:
func_args = dict(x=x, **kwargs)
else:
func_args = dict(x=x, y=y, **kwargs)
outputs = getattr(self.keras_model, func_type)(**func_args)
# update n_steps/time
if stateful:
self._update_steps()
# process keras outputs
if func_type.startswith("predict"):
# reorganize results (will be flattened) back into dict
if not isinstance(outputs, list):
outputs = [outputs]
return collections.OrderedDict(zip(self.model.probes, outputs))
elif func_type.startswith("evaluate"):
# return outputs as named dict
return collections.OrderedDict(zip(self.keras_model.metrics_names, outputs))
else:
# return training history
return outputs
[docs] def step(self, **kwargs):
"""
Run the simulation for one time step.
Parameters
----------
kwargs : dict
See `.run_steps`
Notes
-----
Progress bar is disabled by default when running via this method.
"""
kwargs.setdefault("progress_bar", False)
self.run_steps(1, **kwargs)
[docs] def run(self, time_in_seconds, **kwargs):
"""
Run the simulation for the given length of time.
Parameters
----------
time_in_seconds : float
Run the simulator for the given number of simulated seconds.
kwargs : dict
See `.run_steps`
"""
if time_in_seconds < 0:
raise ValidationError(
"Must be positive (got %g)" % (time_in_seconds,), attr="time_in_seconds"
)
steps = int(np.round(float(time_in_seconds) / self.dt))
if steps == 0:
warnings.warn(
"%g results in running for 0 timesteps. Simulator "
"still at time %g." % (time_in_seconds, self.time)
)
else:
self.run_steps(steps, **kwargs)
[docs] @require_open
@fill_docs("stateful", data="x")
def run_steps(self, n_steps, data=None, progress_bar=None, stateful=True):
"""
Run the simulation for the given number of steps.
Parameters
----------
n_steps : int
The number of simulation steps to be executed.
data :
{{ data }}
progress_bar : bool
If True, print information about the simulation status to standard
output.
stateful : bool
{{ stateful }}
Notes
-----
If ``unroll_simulation=x`` is specified, and ``n_steps > x``, this will
repeatedly execute ``x`` timesteps until the the number of steps
executed is >= ``n_steps``.
"""
actual_steps = self.unroll * int(np.ceil(n_steps / self.unroll))
# error checking
if actual_steps != n_steps:
warnings.warn(
"Number of steps (%d) is not an even multiple of "
"`unroll_simulation` (%d). Simulation will run for %d steps, "
"which may have unintended side effects."
% (n_steps, self.unroll, actual_steps),
RuntimeWarning,
)
if progress_bar is None:
progress_bar = self.progress_bar
progress = (
utils.ProgressBar("Simulating", "Simulation", max_value=None)
if progress_bar
else utils.NullProgressBar()
)
with progress:
# run the simulation
try:
output = self.predict_on_batch(
data, n_steps=actual_steps, stateful=stateful
)
except (tf.errors.InternalError, tf.errors.UnknownError) as e:
if "nengo.exceptions.SimulationError" in e.message:
raise SimulationError(
"SimulationError detected; this most likely means that a "
"Python function (e.g. in a Node or Direct ensemble) caused "
"an error. See the full error log above."
)
else:
raise e # pragma: no cover (unknown errors)
# update stored probe data
for probe, val in output.items():
if probe.sample_every is not None:
# downsample probe according to `sample_every`
period = probe.sample_every / self.dt
steps = np.arange(self.n_steps - actual_steps, self.n_steps)
val = val[:, (steps + 1) % period < 1]
self.model.params[probe].append(val)
[docs] def train(self, *args, **kwargs):
"""Deprecated, use `.Simulator.compile` and `.Simulator.fit` instead."""
raise SimulationError(
"Simulator.train has been deprecated, use Simulator.compile/fit instead"
)
[docs] def loss(self, *args, **kwargs):
"""Deprecated, use `.Simulator.compile` and `.Simulator.evaluate` instead."""
raise SimulationError(
"Simulator.loss has been deprecated, use Simulator.compile/evaluate instead"
)
[docs] @require_open
@with_self
def save_params(self, path, include_state=False, include_non_trainable=None):
"""
Save network parameters to the given ``path``.
Parameters
----------
path : str
Filepath of parameter output file.
include_state : bool
If True (default False) also save the internal simulation state.
.. versionchanged:: 3.2.0
Renamed from ``include_non_trainable`` to ``include_state``.
Notes
-----
This function is useful for saving/loading entire models; for
saving/loading individual objects within a model, see
`.get_nengo_params`.
"""
if include_non_trainable is not None:
warnings.warn(
"include_non_trainable is deprecated, use include_state instead",
DeprecationWarning,
)
include_state = include_non_trainable
params = list(self.keras_model.weights)
if include_state:
params.extend(self.tensor_graph.saved_state.values())
np.savez_compressed(path + ".npz", *tf.keras.backend.batch_get_value(params))
logger.info("Model parameters saved to %s.npz", path)
[docs] @require_open
@with_self
def load_params(self, path, include_state=False, include_non_trainable=None):
"""
Load network parameters from the given ``path``.
Parameters
----------
path : str
Filepath of parameter input file.
include_state : bool
If True (default False) also save the internal simulation state.
.. versionchanged:: 3.2.0
Renamed from ``include_non_trainable`` to ``include_state``.
Notes
-----
This function is useful for saving/loading entire models; for
saving/loading individual objects within a model, see
`.get_nengo_params`.
"""
if include_non_trainable is not None:
warnings.warn(
"include_non_trainable is deprecated, use include_state instead",
DeprecationWarning,
)
include_state = include_non_trainable
params = list(self.keras_model.weights)
if include_state:
params.extend(self.tensor_graph.saved_state.values())
with np.load(path + ".npz") as vals:
if len(params) != len(vals.files):
raise SimulationError(
"Number of saved parameters in %s (%d) != number of variables in "
"the model (%d)" % (path, len(vals.files), len(params))
)
tf.keras.backend.batch_set_value(
zip(params, (vals["arr_%d" % i] for i in range(len(vals.files))))
)
logger.info("Model parameters loaded from %s.npz", path)
[docs] @require_open
def freeze_params(self, objs):
"""
Stores the live parameter values from the simulation back into a
Nengo object definition.
This can be helpful for reusing a NengoDL model inside a different
Simulator. For example:
.. testcode::
with nengo.Network() as net:
ens = nengo.Ensemble(10, 1)
with nengo_dl.Simulator(net) as sim:
# < run some optimization >
sim.freeze_params(net)
with nengo.Simulator(net) as sim2:
# run the network in the default Nengo simulator, with the
# trained parameters
sim2.run(1.0)
.. testoutput::
:hide:
...
Parameters
----------
obj : (list of) ``NengoObject``
The Nengo object(s) into which parameter values will be stored.
Note that these objects must be members of the Network used to
initialize the Simulator.
Notes
-----
This modifies the source object in-place, and it may slightly modify
the structure of that object. The goal is to have the object produce
the same output as it would if run in the NengoDL simulator. It may
not be possible to accurately freeze all possible object; if you run
into errors in this process, try manually extracting the parameters you
need in your model (from ``sim.data``).
"""
if not isinstance(objs, (list, tuple)):
objs = [objs]
for obj in objs:
if obj not in [self.model.toplevel] + self.model.toplevel.all_objects:
raise ValueError(
"%s is not a member of the Network used to "
"initialize the Simulator"
)
if not isinstance(obj, (Network, Ensemble, Connection)):
raise TypeError(
"Objects of type %s do not have parameters to store" % type(obj)
)
if isinstance(obj, Network):
todo = obj.all_ensembles + obj.all_connections
else:
todo = [obj]
for o, params in zip(todo, self.get_nengo_params(todo)):
for k, v in params.items():
setattr(o, k, v)
[docs] def get_nengo_params(self, nengo_objs, as_dict=False):
"""
Extract model parameters in a form that can be used to initialize
Nengo objects in a different model.
For example:
.. testcode::
with nengo.Network() as net:
a = nengo.Ensemble(10, 1)
b = nengo.Ensemble(10, 1)
c = nengo.Connection(a, b)
with nengo_dl.Simulator(net) as sim:
# < do some optimization >
params = sim.get_nengo_params([a, b, c])
with nengo.Network() as new_net:
# < build some other network >
# now we want to insert two connected ensembles with
# the same parameters as our previous network:
d = nengo.Ensemble(10, 1, **params[0])
e = nengo.Ensemble(10, 1, **params[1])
f = nengo.Connection(d, e, **params[2])
Note that this function only returns trainable parameters (e.g. connection
weights, biases, or encoders), or parameters that directly interact with
those parameters (e.g. gains). Other arguments that are independent of the
trainable parameters (e.g. ``Ensemble.neuron_type`` or ``Connection.synapse``)
should be specified manually (since they may change between models).
Parameters
----------
nengo_objs : (list of) `~nengo.Ensemble` or `~nengo.Connection`
A single object or list of objects for which we want to get the
parameters.
as_dict : bool
If True, return the values as a dictionary keyed by object label,
instead of a list (the default). Note that in this case labels
must be unique.
Returns
-------
params : (list or dict) of dicts
kwarg dicts corresponding to ``nengo_objs`` (passing these
dicts as kwargs when creating new Nengo objects will result in a
new object with the same parameters as the source object). A
single kwarg dict if a single object was passed in, or a list
(dict if ``as_dict=True``) of kwargs corresponding to multiple
input objects.
"""
if isinstance(nengo_objs, (list, tuple)):
scalar = False
else:
scalar = True
nengo_objs = [nengo_objs]
# convert neurons to the parent ensemble
nengo_objs = [
obj.ensemble if isinstance(obj, Neurons) else obj for obj in nengo_objs
]
# find all the data we need to fetch
fetches = []
for obj in nengo_objs:
if isinstance(obj, Connection):
if compat.conn_has_weights(obj):
fetches.append((obj, "weights"))
elif isinstance(obj, Ensemble):
if isinstance(obj.neuron_type, Direct):
# we cannot transfer direct ensemble parameters, because
# the nengo builder ignores the encoders specified for
# a direct ensemble
raise ValueError(
"get_nengo_params will not work correctly for "
"Direct neuron ensembles. Try manually translating "
"your network using `sim.data` instead."
)
fetches.extend([(obj, "scaled_encoders"), (obj, "bias")])
else:
raise ValueError(
"Can only get Nengo parameters for Ensembles or Connections"
)
# get parameter values from simulation
data = self.data.get_params(*fetches)
# store parameter values in a form that can be loaded in nengo
params = []
idx = 0
for obj in nengo_objs:
if isinstance(obj, Connection):
if not compat.conn_has_weights(obj):
params.append({"transform": None})
continue
weights = data[idx]
idx += 1
if isinstance(obj.transform, Convolution):
transform = copy.copy(obj.transform)
# manually bypass the read-only check (we are sure that
# nothing else has a handle to the new transform at this
# point, so this won't cause any problems)
Convolution.init.data[transform] = weights
params.append({"transform": transform})
elif isinstance(obj.transform, Sparse):
transform = copy.copy(obj.transform)
if isinstance(transform.init, SparseMatrix):
init = SparseMatrix(
transform.init.indices, weights, transform.init.shape
)
else:
init = transform.init.tocoo()
init = SparseMatrix(
np.stack((init.row, init.col), axis=-1), weights, init.shape
)
Sparse.init.data[transform] = init
params.append({"transform": transform})
elif isinstance(obj.transform, (Dense, compat.NoTransform)):
if isinstance(obj.pre_obj, Ensemble):
# decoded connection
params.append(
{
"solver": NoSolver(weights.T, weights=False),
"function": lambda x, weights=weights: np.zeros(
weights.shape[0]
),
"transform": compat.default_transform,
}
)
else:
if all(x == 1 for x in weights.shape):
weights = np.squeeze(weights)
params.append({"transform": weights})
else:
raise NotImplementedError(
"Cannot get parameters of Connections with transform type '%s'"
% type(obj.transform).__name__
)
else:
# note: we don't want to change the original gain (even though
# it is rolled into the encoder values), because connections
# direct to `ens.neurons` will still use the gains (and those
# gains are not updated during training, only the encoders)
gain = self.model.params[obj].gain
# the encoders we get from the simulation are the actual
# weights we want in the simulation. but during the build
# process, gains and radius will be applied to the encoders.
# so we need to undo that scaling here, so that the build
# process will result in the correct values.
encoders = data[idx] * obj.radius / gain[:, None]
params.append(
{
"encoders": encoders,
"normalize_encoders": False,
"gain": gain,
"bias": data[idx + 1],
"max_rates": Ensemble.max_rates.default,
"intercepts": Ensemble.intercepts.default,
}
)
idx += 2
# return params in appropriate format
if scalar:
return params[0]
if as_dict:
param_dict = {}
for obj, p in zip(nengo_objs, params):
if obj.label in param_dict:
raise ValueError(
"Duplicate label ('%s') detected; cannot return "
"parameters with as_dict=True" % obj.label
)
else:
param_dict[obj.label] = p
params = param_dict
return params
[docs] @require_open
@with_self
def check_gradients(self, inputs=None, outputs=None, atol=1e-5, rtol=1e-3):
"""
Perform gradient checks for the network (used to verify that the
analytic gradients are correct).
Raises a simulation error if the difference between analytic and
numeric gradient is greater than ``atol + rtol * numeric_grad``
(elementwise).
Parameters
----------
inputs : list of `numpy.ndarray`
Input values for all the input Nodes in the model (ordered according to
the order in which Nodes were added to the model). If None, will use all
zeros.
outputs : list of `~nengo.Probe`
Compute gradients wrt this output (if None, computes wrt each
output probe).
atol : float
Absolute error tolerance.
rtol : float
Relative (to numeric grad) error tolerance.
Notes
-----
Calling this method will reset the internal simulation state.
"""
if self.tensor_graph.inference_only:
raise SimulationError(
"Network was created with inference_only=True, cannot "
"compute gradients"
)
if inputs is None:
n_steps = self.unroll * 2
inputs = [
np.zeros(
tuple(n_steps if s is None else s for s in x.shape),
x.dtype.as_numpy_dtype(),
)
for x in self.keras_model.inputs[:-1]
]
else:
n_steps = inputs[0].shape[1]
if outputs is None:
outputs = self.model.probes
# compute_gradients expects to be called with a function that works in
# specific ways, so we wrap the model to work in the way it expects
@tf.function
@tf.autograph.experimental.do_not_convert
def arg_func(*args, output=None):
for i, x in enumerate(args):
x.set_shape(inputs[i].shape)
args += (tf.ones((self.minibatch_size, 1), dtype=np.int32) * n_steps,)
out = self.tensor_graph(args, training=True)
self.tensor_graph.build_post()
if self.stateful:
# reset state
for key, var in self.tensor_graph.saved_state.items():
var.assign(
self.tensor_graph.initial_values[key](
var.shape, dtype=var.dtype
)
)
# drop steps_run
out = out[:-1]
# get selected output
out = out[self.model.probes.index(output)]
return out
self.reset(
include_probes=False, include_trainable=False, include_processes=False
)
ctx = (
# noop
contextlib.suppress
if compat.eager_enabled()
else tf.compat.v1.keras.backend.get_session().as_default
)
grads = dict()
for output in outputs:
with ctx():
analytic, numeric = tf.test.compute_gradient(
partial(arg_func, output=output), inputs
)
grads[output] = dict()
grads[output]["analytic"] = analytic
grads[output]["numeric"] = numeric
for a, n in zip(analytic, numeric):
if np.any(np.isnan(a)) or np.any(np.isnan(n)):
raise SimulationError("NaNs detected in gradient")
fail = abs(a - n) >= atol + rtol * abs(n)
if np.any(fail):
raise SimulationError(
"Gradient check failed\n"
"numeric values:\n%s\n"
"analytic values:\n%s\n" % (n[fail], a[fail])
)
logger.info("Gradient check passed")
return grads
[docs] def trange(self, sample_every=None, dt=None):
"""
Create a vector of simulation step times matching probed data.
Note that the range does not start at 0 as one might expect, but at
the first timestep (i.e., ``dt``).
Parameters
----------
sample_every : float (Default: None)
The sampling period of the probe to create a range for.
If None, a time value for every ``dt`` will be produced.
"""
if dt is not None:
if sample_every is not None:
raise ValidationError(
"Cannot specify both `dt` and `sample_every`. "
"Use `sample_every` only.",
attr="dt",
obj=self,
)
warnings.warn(
"`dt` is deprecated. Use `sample_every` instead.", DeprecationWarning
)
sample_every = dt
period = 1 if sample_every is None else sample_every / self.dt
steps = np.arange(1, self.n_steps + 1)
return self.dt * steps[steps % period < 1]
[docs] def close(self):
"""
Close the simulation, freeing resources.
Notes
-----
The simulation cannot be restarted after it is closed.
"""
if not self.closed:
self.keras_model = None
self.tensor_graph = None
self._closed_attrs = ["keras_model", "tensor_graph"]
self.closed = True
[docs] def get_name(self, obj):
"""
Returns the standardized string name for input Nodes or output Probes.
These are used when referring to inputs/outputs by string in Keras.
Parameters
----------
obj : `nengo.Node` or `nengo.Probe`
Input Node or output Probe
Returns
-------
name : str
Name of the given object
"""
if isinstance(obj, Node):
if obj not in self.node_inputs:
raise ValidationError(
"%s is not an input Node (a nengo.Node with "
"size_in==0), or is from a different network." % obj,
"obj",
)
elif isinstance(obj, Probe):
if obj not in self.tensor_graph.probe_arrays:
raise ValidationError("%s is from a different network." % obj, "obj")
else:
raise ValidationError(
"%s is of an unknown type (%s); should be nengo.Node "
"or nengo.Probe" % (obj, type(obj)),
"obj",
)
return self.tensor_graph.io_names[obj]
def _standardize_data(self, data, objects, broadcast_unary=False):
"""
Converts data to the standardized input format (named string dicts).
Parameters
----------
data : `numpy.ndarray` or list or dict
Input data in one of the formats supported by fit/predict/eval.
objects : list of `nengo.Node` or `nengo.Probe`
List of input Nodes or output Probes in the model (depending on which
kind of data is being standardized).
broadcast_unary: bool
If True, singular (e.g. non-list/dict) inputs will be applied to all
``objects``, otherwise will only be applied to first item in ``objects``.
Returns
-------
data : dict of {str: object}
Elements of data reorganized into standardized data structure (named
string dict).
"""
if data is None:
return data
if not isinstance(data, (list, tuple, dict)):
# convert unary inputs to length-1 lists
data = [data]
if broadcast_unary:
data *= len(objects)
if isinstance(data, (list, tuple)):
if len(data) != len(objects):
warnings.warn(
"Number of elements (%d) in %s does not match number of "
"%ss (%d); consider using an explicit input dictionary in this "
"case, so that the assignment of data to objects is unambiguous."
% (
len(data),
[type(d).__name__ for d in data],
type(objects[0]).__name__,
len(objects),
)
)
# convert list to named dict
data = collections.OrderedDict(
(self.get_name(obj), val) for obj, val in zip(objects, data)
)
elif isinstance(data, dict):
# convert objects to string names
data = collections.OrderedDict(
(obj if isinstance(obj, str) else self.get_name(obj), val)
for obj, val in data.items()
)
return data
def _generate_inputs(self, data=None, n_steps=None):
"""
Generate inputs for the network (the output values of each Node with
no incoming connections).
Parameters
----------
data : list or dict of {`~nengo.Node` or str: `~numpy.ndarray`}
Override the values of input Nodes with the given data. Arrays
should have shape ``(sim.minibatch_size, n_steps, node.size_out)``.
n_steps : int
Number of simulation timesteps for which to generate input data
Returns
-------
node_vals : dict of {str: `~numpy.ndarray}
Simulation values for all the input Nodes in the network.
"""
if data is None:
data = {}
if not isinstance(data, (list, tuple, dict, np.ndarray, tf.Tensor)):
# data is some kind of generator, so we don't try to modify it (too many
# different types of generators this could be)
if n_steps is not None:
raise SimulationError(
"Cannot automatically add n_steps to generator with type %s; "
"please specify n_steps manually as the first element in the "
"values yielded from generator, remembering that it needs to "
"be repeated to have shape (batch_size, 1)" % type(data)
)
return data
data = self._standardize_data(data, list(self.node_inputs.keys()))
if len(data) == 0:
data_batch = data_steps = None
else:
data_batch, data_steps = next(iter(data.values())).shape[:2]
batch_size = self.minibatch_size if data_batch is None else data_batch
if n_steps is None:
if data_steps is None:
raise ValidationError(
"Must specify either input data or n_steps", "data"
)
n_steps = data_steps
input_vals = collections.OrderedDict()
# fill in data for input nodes
for node, output in self.tensor_graph.input_funcs.items():
name = self.get_name(node)
if name in data:
node_val = data[name]
elif isinstance(output, np.ndarray):
# tile to n_steps/minibatch size
node_val = np.tile(output[None, None, :], (batch_size, n_steps, 1))
else:
# call output function to determine value
node_val = np.zeros(
(batch_size, n_steps, node.size_out),
dtype=np.dtype(self.tensor_graph.dtype),
)
for i in range(n_steps):
# note: need to copy the output of func, as func
# may mutate its outputs in-place on subsequent calls.
# this assignment will broadcast the output along the
# minibatch dimension if required.
# note: we still call the function even if the output
# is not being used in the graph, because it may have side-effects
node_val[:, i] = [
func((i + self.n_steps + 1) * self.dt) for func in output
]
input_vals[name] = node_val
for name in data:
if name not in input_vals:
raise ValidationError(
"Input contained entry for '%s', which is not a valid input name"
% name,
"data",
)
# fill in n_steps
input_vals["n_steps"] = np.resize(n_steps, (batch_size, 1)).astype(np.int32)
return input_vals
def _check_data(self, data, batch_size=None, n_steps=None, nodes=True):
"""
Performs error checking on simulation data.
Parameters
----------
data : dict of {str: `~numpy.ndarray` or ``tf.Tensor``}
Array of data associated with given objects in model (Nodes or
Probes)
batch_size : int
Number of elements in batch (if None, will just verify that all
data items have same batch size)
n_steps : int
Number of simulation steps (if None, will just verify that all
data items have same number of steps)
nodes : bool
If True the data being validated is associated with Nodes, if False the
data is associated with Probes.
Notes
-----
This may modify ``data`` in-place, if it contains data that is not evenly
divisible by ``Simulator.minibatch_size``.
"""
if not isinstance(data, dict):
# data is a generator, so don't perform validation
return
# make sure data is evenly divisible by minibatch size
for k, v in data.items():
try:
data_batch = v.shape[0]
except IndexError:
# v is a scalar
continue
if (
data_batch > self.minibatch_size
and data_batch % self.minibatch_size != 0
):
warnings.warn(
"Number of elements in input data (%d) is not evenly divisible by "
"Simulator.minibatch_size (%d); input data will be truncated."
% (data_batch, self.minibatch_size)
)
data_batch -= data_batch % self.minibatch_size
data[k] = v[:data_batch]
# exclude n_steps from normal data checking
data_n_steps = data.get("n_steps", None)
data = {k: val for k, val in data.items() if k != "n_steps"}
for name, x in data.items():
# check that name is valid
if nodes:
valid_names = [self.get_name(n) for n in self.node_inputs]
if name not in valid_names:
raise ValidationError(
"'%s' is not a valid node name; perhaps the name is wrong (it "
"should match the `label` on the Node), or this is not an "
"input Node (a Node with size_in==0) in this network. "
"Valid names are: %s." % (name, valid_names),
"data",
)
else:
valid_names = [self.get_name(p) for p in self.model.probes]
if name not in valid_names:
raise ValidationError(
"'%s' is not a valid probe name; perhaps the name is wrong (it "
"should match the `label` on the Probe), or this is not a "
"Probe in this network. Valid names are: %s."
% (name, valid_names),
"data",
)
# generic shape checks
if len(x.shape) != 3:
raise ValidationError(
"should have rank 3 (batch_size, n_steps, dimensions), "
"found rank %d" % len(x.shape),
"%s data" % name,
)
if x.shape[0] < self.minibatch_size:
raise ValidationError(
"Batch size of data (%d) less than Simulator `minibatch_size` (%d)"
% (x.shape[0], self.minibatch_size),
"%s data" % name,
)
if nodes and x.shape[1] % self.unroll != 0:
raise ValidationError(
"The number of timesteps in input data (%s) must be evenly "
"divisible by unroll_simulation (%s)" % (x.shape[1], self.unroll),
"data",
)
# check that shapes match the given values (if specified) or are
# internally consistent (if not)
args = [batch_size, n_steps]
labels = ["batch size", "number of timesteps"]
for i in range(2):
if args[i] is None:
if i == 1 and not nodes:
# we don't apply this check to probes, because target values can
# have different values for n_steps (as long as it matches what is
# expected by the loss function)
continue
if len(data) > 0:
val = next(iter(data.values())).shape[i]
for n, x in data.items():
if x.shape[i] != val:
raise ValidationError(
"Elements have different %s: %s vs %s"
% (labels[i], val, x.shape[i]),
"data",
)
else:
for n, x in data.items():
if x.shape[i] != args[i]:
raise ValidationError(
"Data for %s has %s=%s, which does not match "
"expected size (%s)" % (n, labels[i], x.shape[i], args[i]),
"data",
)
if (
n_steps is not None
and not self.tensor_graph.use_loop
and n_steps != self.unroll
):
raise ValidationError(
"When use_loop=False, n_steps (%d) must exactly match "
"unroll_simulation (%d)" % (n_steps, self.unroll),
"n_steps",
)
if nodes:
# validate special n_steps input
if data_n_steps is None:
raise ValidationError("Must specify 'n_steps' in input data", "data")
if (
batch_size is None
and (data_n_steps.ndim != 2 or data_n_steps.shape[1] != 1)
) or (batch_size is not None and data_n_steps.shape != (batch_size, 1)):
raise ValidationError(
"'n_steps' has wrong shape; should be %s (note that this is just "
"the integer n_steps value repeated)" % ((batch_size, 1),),
"data",
)
if not np.all(data_n_steps == data_n_steps[0, 0]):
raise ValidationError(
"'n_steps' should all have the same value", "data"
)
if n_steps is not None and not np.all(data_n_steps == n_steps):
raise ValidationError(
"`n_steps` input does not match the requested number of steps",
"data",
)
@with_self
def _update_steps(self):
self._n_steps = tf.keras.backend.get_value(
self.tensor_graph.get_tensor(self.model.step)
).item()
self._time = self._n_steps * self.dt
@property
def dt(self):
"""The time (in seconds) represented by one simulation timestep."""
return self.model.dt
@dt.setter
def dt(self, _):
raise ReadonlyError(attr="dt", obj=self)
@property
def n_steps(self):
"""The current simulation timestep."""
return self._n_steps
@property
def time(self):
"""The current simulation time."""
return self._time
@property
def seed(self):
"""The simulation random seed."""
return self.tensor_graph.seed
@require_open
def __enter__(self):
self._device_context = tf.device(self.tensor_graph.device)
self._device_context.__enter__()
self._keras_dtype = tf.keras.backend.floatx()
tf.keras.backend.set_floatx(self.tensor_graph.dtype)
return self
@require_open
def __exit__(self, *args):
tf.keras.backend.set_floatx(self._keras_dtype)
self._device_context.__exit__(*args)
self.close()
def __del__(self):
"""
Raise a RuntimeWarning if the Simulator is deallocated while open.
"""
if self.closed is not None and not self.closed:
warnings.warn(
"Simulator with model=%s was deallocated while open. "
"Simulators should be closed manually to ensure resources "
"are properly freed." % self.model,
RuntimeWarning,
)
self.close()
def __getstate__(self):
raise NotImplementedError(
"TensorFlow does not support pickling; see "
"https://www.nengo.ai/nengo-dl/simulator.html"
"#saving-and-loading-parameters "
"for information on how to save/load a NengoDL model."
)
def __getattribute__(self, name):
if super().__getattribute__("closed") and name in super().__getattribute__(
"_closed_attrs"
):
raise SimulatorClosed(
"Cannot access Simulator.%s after Simulator is closed" % name
)
return super().__getattribute__(name)
[docs]class SimulationData(collections.Mapping):
"""
Data structure used to access simulation data from the model.
The main use case for this is to access Probe data; for example,
``probe_data = sim.data[my_probe]``. However, it is also
used to access the parameters of objects in the model; for example, after
the model has been optimized via `.Simulator.fit`, the updated
encoder values for an ensemble can be accessed via
``trained_encoders = sim.data[my_ens].encoders``.
Parameters
----------
sim : `.Simulator`
The simulator from which data will be drawn
minibatched : bool
If False, discard the minibatch dimension on probe data
Notes
-----
SimulationData shouldn't be created/accessed directly by the user, but
rather via ``sim.data`` (which is an instance of SimulationData).
"""
def __init__(self, sim, minibatched):
self.sim = sim
self.minibatched = minibatched
[docs] def __getitem__(self, obj):
"""Return the data associated with ``obj``.
Parameters
----------
obj : `~nengo.Probe` or `~nengo.Ensemble` or `~nengo.Connection`
Object whose simulation data is being accessed
Returns
-------
data : `~numpy.ndarray` or \
`~nengo.builder.ensemble.BuiltEnsemble` or \
`~nengo.builder.connection.BuiltConnection`
Array containing probed data if ``obj`` is a
`~nengo.Probe`, otherwise the corresponding
parameter object
"""
if obj not in self.sim.model.params:
raise ValidationError(
"Object is not in parameters of model %s" % self.sim.model, str(obj)
)
data = self.sim.model.params[obj]
if isinstance(obj, Probe):
if len(data) == 0:
return []
data = np.concatenate(data, axis=1)
if not self.minibatched:
data = data[0]
data.setflags(write=False)
elif isinstance(obj, Ensemble):
if isinstance(obj.neuron_type, Direct):
# direct mode ensemble
gain = bias = None
scaled_encoders = encoders = self.get_params((obj, "scaled_encoders"))[
0
]
else:
# get the live simulation values
scaled_encoders, bias = self.get_params(
(obj, "scaled_encoders"), (obj, "bias")
)
# infer the related values (rolled into scaled_encoders)
gain = (
obj.radius
* np.linalg.norm(scaled_encoders, axis=-1)
/ np.linalg.norm(data.encoders, axis=-1)
)
encoders = obj.radius * scaled_encoders / gain[:, None]
# figure out max_rates/intercepts from neuron model
max_rates, intercepts = obj.neuron_type.max_rates_intercepts(gain, bias)
data = BuiltEnsemble(
data.eval_points,
encoders,
intercepts,
max_rates,
scaled_encoders,
gain,
bias,
)
elif isinstance(obj, Connection):
# get the live simulation values
weights = (
self.get_params((obj, "weights"))[0]
if compat.conn_has_weights(obj)
else None
)
# impossible to recover transform
transform = None
data = BuiltConnection(
data.eval_points, data.solver_info, weights, transform
)
return data
[docs] def get_params(self, *obj_attrs):
"""
Returns the current parameter values for the given objects.
Parameters
----------
obj_attrs : list of (``NengoObject``, str)
The Nengo object and attribute of that object for which we want
to know the parameter values (each object-attribute pair specified
as a tuple argument to the function).
Returns
-------
params : list of `~numpy.ndarray`
Current values of the requested parameters
Notes
-----
Parameter values should be accessed through ``sim.data[my_obj]``
(which will call this function if necessary), rather than directly
through this function.
"""
if self.sim.closed:
warnings.warn(
"Checking parameters after simulator is closed; "
"cannot fetch live values, so the initial values "
"will be returned."
)
return [
getattr(self.sim.model.params[obj], attr) for obj, attr in obj_attrs
]
params = []
sigs = []
fetches = {}
for obj, attr in obj_attrs:
sig_obj, sig_attr = self._attr_map(obj, attr)
sig = self.sim.model.sig[sig_obj][sig_attr]
sigs.append(sig)
if sig not in self.sim.tensor_graph.signals:
# if sig isn't in sig_map then that means it isn't used
# anywhere in the simulation (and therefore never changes), so
# we can safely return the static build value
params.append(getattr(self.sim.model.params[obj], attr))
else:
# this is a live parameter value we need to fetch from the
# simulation. we queue them up and fetch them all at once to
# be more efficient
placeholder = object()
fetches[placeholder] = self.sim.tensor_graph.get_tensor(sig)
params.append(placeholder)
# get the live parameter values
fetched = dict(
zip(
fetches.keys(), tf.keras.backend.batch_get_value(list(fetches.values()))
)
)
# final updating of parameters
for i, sig in enumerate(sigs):
# fill in placeholder values
if type(params[i]) == object:
params[i] = fetched[params[i]]
if sig.minibatched and not self.minibatched:
# drop minibatch dimension
params[i] = params[i][0]
return params
def _attr_map(self, obj, attr):
"""
Maps from ``sim.data[obj].attr`` to the equivalent
``model.sig[obj][attr]``.
Parameters
----------
obj : ``NengoObject``
The nengo object for which we want to know the parameters
attr : str
The parameter of ``obj`` to be returned
Returns
-------
obj : ``NengoObject``
The nengo object to key into ``model.sig``
attr : str
The name of the signal corresponding to input attr
"""
if isinstance(obj, Ensemble) and attr == "bias":
return obj.neurons, attr
elif isinstance(obj, Ensemble) and attr == "scaled_encoders":
return obj, "encoders"
return obj, attr
def __len__(self):
return len(self.sim.model.params)
def __iter__(self):
return iter(self.sim.model.params)