"""Tools for automatically converting a Keras model to a Nengo network."""
import collections
import copy
import logging
import warnings
import nengo
import numpy as np
from packaging import version
import tensorflow as tf
from tensorflow.python.keras import engine
from tensorflow.python.keras.layers import BatchNormalizationV1, BatchNormalizationV2
from nengo_dl import compat
from nengo_dl.config import configure_settings
from nengo_dl.neurons import LeakyReLU
from nengo_dl.simulator import Simulator
from nengo_dl.tensor_node import Layer, TensorNode
logger = logging.getLogger(__name__)
[docs]class Converter:
"""
Converts a Keras model to a Nengo network composed of native Nengo objects.
Parameters
----------
model : ``tf.keras.Model``
Keras model to be converted
allow_fallback : bool
If True, allow layers that cannot be converted to native Nengo
objects to be added as a `.TensorNode` instead. Note that if this occurs, the
converted Nengo network will only be runnable in the NengoDL simulator.
inference_only : bool
Allow layers to be converted in such a way that inference behaviour will
match the source model but training behaviour will not.
If ``inference_only=False`` then some
layers cannot be converted to native Nengo objects (but you can
still use ``allow_fallback=True`` to use a `.TensorNode` instead).
max_to_avg_pool : bool
If True, convert max pooling layers to average pooling layers. Note that this
will change the behaviour of the network, so parameters will probably need to
be re-trained in NengoDL. If ``max_to_avg_pool=False`` then max pooling layers
cannot be converted to native Nengo objects (but you can
still use ``allow_fallback=True`` to use a `.TensorNode` instead).
split_shared_weights : bool
In Keras, applying the same ``Layer`` object to different input layers will
result in multiple instances of the given layer that share the same weights.
This is not supported in Nengo. If ``split_shared_weights=True`` then those
shared weights will be split into independent sets of weights. They will all
be initialized to the same value, so the initial behaviour of the model will
be unchanged, but if any further training is performed on the network then
the weights in each of those instances may diverge.
swap_activations : dict
A dictionary mapping from TensorFlow activation functions or Nengo neuron types
to TensorFlow activation functions or Nengo neuron types. This can be used to
change all the activation types in a model to some other type. This is in
addition to the default activation map (see `.LayerConverter`). It can be keyed
based on either TensorFlow or Nengo activation types, and will be applied both
before and after the default activation map, in order to support whatever swap
type is most useful for a given model. In particular, ``swap_activations``
can be useful for swapping rate neuron types to spiking neuron types,
through e.g. ``{tf.nn.relu: nengo.SpikingRectifiedLinear()}`` or
``{nengo.RectifiedLinear(): nengo.SpikingRectifiedLinear()}``. Or it can be
used to swap activation types that don't have a native Nengo implementation,
e.g. ``{tf.keras.activatons.elu: tf.keras.activations.relu}``.
scale_firing_rates: float or dict
Scales the inputs of neurons by ``x``, and the outputs by ``1/x``.
The idea is that this parameter can be used to increase the firing rates of
spiking neurons (by scaling the input), without affecting the overall output
(because the output spikes are being scaled back down). Note that this is only
strictly true for neuron types with linear activation functions (e.g. ReLU).
Nonlinear neuron types (e.g. LIF) will be skewed by this linear scaling on the
input/output. ``scale_firing_rates`` can be specified as a float, which will
be applied to all layers in the model, or as a dictionary mapping Keras model
layers to a scale factor, allowing different scale factors to be applied to
different layers.
synapse : float or `nengo.synapses.Synapse`
Synaptic filter to be applied on the output of all neurons. This can be useful
to smooth out the noise introduced by spiking neurons. Note, however, that
increasing the synaptic filtering will make the network less responsive to
rapid changes in the input, and you may need to present each input value
for more timesteps in order to allow the network output to settle.
Attributes
----------
model : ``tf.keras.Model``
The input Keras model (if input was a Sequential model then this will be the
equivalent Functional model).
net : `nengo.Network`
The converted Nengo network.
inputs : `.Converter.KerasTensorDict`
Maps from Keras model inputs to input Nodes in the converted Nengo network.
For example, ``my_node = Converter(my_model).inputs[my_model.input]``.
outputs : `.Converter.KerasTensorDict`
Maps from Keras model outputs to output Probes in the converted Nengo network.
For example, ``my_probe = Converter(my_model).outputs[my_model.output]``.
layers : `.Converter.KerasTensorDict`
Maps from Keras model layers to the converted Nengo object.
For example, ``my_neurons = Converter(my_model).layers[my_layer]``.
"""
converters = {}
def __init__(
self,
model,
allow_fallback=True,
inference_only=False,
max_to_avg_pool=False,
split_shared_weights=False,
swap_activations=None,
scale_firing_rates=None,
synapse=None,
):
self.allow_fallback = allow_fallback
self.inference_only = inference_only
self.max_to_avg_pool = max_to_avg_pool
self.split_shared_weights = split_shared_weights
self.swap_activations = Converter.TrackedDict(swap_activations or {})
self.scale_firing_rates = scale_firing_rates
self.synapse = synapse
self._layer_converters = {}
# convert model
self.net = self.get_converter(model).convert(None)
# set model from the converter in case the converter has changed the model type
# (i.e. if the model is sequential, it will be converted to a functional model)
self.model = self.get_converter(model).layer
if self.swap_activations.unused_keys():
warnings.warn(
"swap_activations contained %s, but there were no layers in the model "
"with that activation type" % (self.swap_activations.unused_keys(),)
)
self.layers = self.net.layers
with self.net:
configure_settings(inference_only=self.inference_only)
# convert inputs to input nodes
self.inputs = Converter.KerasTensorDict()
for input in self.model.inputs:
input_obj = self.net.inputs[input]
logger.info("Setting input node %s (%s)", input_obj, input)
input_size = input_obj.size_in
input_obj.size_in = 0
input_obj.output = np.zeros(input_size)
self.inputs[input] = input_obj
# add probes to outputs
self.outputs = Converter.KerasTensorDict()
for output in self.model.outputs:
output_obj = self.net.outputs[output]
logger.info("Probing %s (%s)", output_obj, output)
self.outputs[output] = nengo.Probe(
output_obj,
synapse=self.synapse
if isinstance(output_obj, nengo.ensemble.Neurons)
else None,
)
[docs] def verify(self, training=False, inputs=None, atol=1e-8, rtol=1e-5):
"""
Verify that output of converted Nengo network matches the original Keras model.
Parameters
----------
training : bool
If True, check that optimizing the converted Nengo network produces the same
results as optimizing the original Keras model.
inputs : list of `numpy.ndarray`
Testing values for model inputs (if not specified, array of ones will be
used).
atol : float
Absolute tolerance for difference between Nengo and Keras outputs.
rtol : float
Relative (to Nengo) tolerance for difference between nengo and Keras
outputs.
Returns
-------
success : bool
True if output of Nengo network matches output of Keras model.
Raises
------
ValueError
If output of Nengo network does not match output of Keras model.
"""
epochs = 3
if inputs is None:
batch_size = 2
inp_vals = [np.ones((batch_size,) + x.shape[1:]) for x in self.model.inputs]
else:
batch_size = inputs[0].shape[0]
inp_vals = inputs
# get keras model output
if training:
out_vals = [
np.ones((batch_size,) + x.shape[1:]) for x in self.model.outputs
]
self.model.compile(optimizer=tf.optimizers.SGD(0.1), loss=tf.losses.mse)
self.model.fit(inp_vals, out_vals, epochs=epochs)
keras_out = self.model.predict(inp_vals)
if not isinstance(keras_out, (list, tuple)):
keras_out = [keras_out]
# get nengo sim output
inp_vals = [np.reshape(x, (batch_size, 1, -1)) for x in inp_vals]
with Simulator(self.net, minibatch_size=batch_size) as sim:
if training:
keras_params = sum(
np.prod(w.shape) for w in self.model.trainable_weights
)
nengo_params = sum(
np.prod(w.shape) for w in sim.keras_model.trainable_weights
)
if keras_params != nengo_params:
raise ValueError(
"Number of trainable parameters in Nengo network (%d) does not "
"match number of trainable parameters in Keras model (%d)"
% (nengo_params, keras_params)
)
out_vals = [np.reshape(x, (batch_size, 1, -1)) for x in out_vals]
sim.compile(optimizer=tf.optimizers.SGD(0.1), loss=tf.losses.mse)
sim.fit(inp_vals, out_vals, epochs=epochs)
sim_out = sim.predict(inp_vals)
for i, out in enumerate(self.model.outputs):
keras_vals = np.ravel(keras_out[i])
nengo_vals = np.ravel(sim_out[self.outputs[out]])
fails = np.logical_not(
np.isclose(keras_vals, nengo_vals, atol=atol, rtol=rtol)
)
if np.any(fails):
logger.info("Verification failure")
logger.info("Keras:\n%s", keras_vals[fails])
logger.info("Nengo:\n%s", nengo_vals[fails])
raise ValueError(
"Output of Keras model does not match output of converted "
"Nengo network (max difference=%.2E; set log level to INFO to see "
"all failures)" % max(abs(keras_vals[fails] - nengo_vals[fails]))
)
return True
[docs] def get_converter(self, layer):
"""
Get instantiated `.LayerConverter` for the given ``Layer`` instance.
Note that this caches the results, so calling the function multiple times
with the same Layer instance will return the same LayerConverter instance.
Parameters
----------
layer : ``tf.keras.layers.Layer``
The Keras Layer being converted.
Returns
-------
converter : `.LayerConverter`
LayerConverter class for converting ``layer`` to Nengo objects.
"""
if layer in self._layer_converters:
# already have an instantiated converter for this layer
converter = self._layer_converters[layer]
if converter.has_weights and not self.split_shared_weights:
# TODO: allow fallback
raise ValueError(
"Multiple applications of layer %s detected; this is not supported "
"unless split_shared_weights=True" % layer
)
return converter
# check if there is a registered builder for this layer type
ConverterClass = self.converters.get(type(layer), None)
# perform custom checks in layer converters
if ConverterClass is None:
error_msg = "Layer type %s does not have a registered converter" % type(
layer
)
else:
convertible, error_msg = ConverterClass.convertible(layer, self)
if not convertible:
ConverterClass = None
if ConverterClass is None:
# this means that there is no LayerConverter compatible with this layer
# (either because it has an unknown type, or it failed the ``.convertible``
# check due to its internal parameterization)
if self.allow_fallback:
warnings.warn(
"%sFalling back to TensorNode."
% (error_msg + ". " if error_msg else "")
)
ConverterClass = self.converters[None]
else:
raise TypeError(
"%sUnable to convert layer %s to native Nengo objects; set "
"allow_fallback=True if you would like to use a TensorNode "
"instead, or consider registering a custom LayerConverter for this "
"layer type." % (error_msg + ". " if error_msg else "", layer.name)
)
converter = ConverterClass(layer, self)
self._layer_converters[layer] = converter
return converter
[docs] @classmethod
def register(cls, keras_layer):
"""
A decorator for adding a class to the converter registry.
Parameters
----------
keras_layer : ``tf.keras.layers.Layer``
The Layer associated with the conversion function being registered.
"""
def register_converter(convert_cls):
if keras_layer in cls.converters:
warnings.warn(
"Layer '%s' already has a converter. Overwriting." % keras_layer
)
cls.converters[keras_layer] = convert_cls
return convert_cls
return register_converter
[docs] class KerasTensorDict(collections.abc.Mapping):
"""
A dictionary-like object that has extra logic to handle Layer/Tensor keys.
"""
def __init__(self):
self.dict = collections.OrderedDict()
def _get_key(self, key):
if isinstance(key, tf.keras.layers.Layer):
if len(key.inbound_nodes) > 1:
raise KeyError(
"Layer %s is ambiguous because it has been called multiple "
"times; use a specific set of layer outputs as key instead"
% key
)
# get output tensor
key = key.output
key = tuple(
x.ref() if isinstance(x, tf.Tensor) else x for x in tf.nest.flatten(key)
)
return key
def __setitem__(self, key, val):
self.dict[self._get_key(key)] = val
def __getitem__(self, key):
return self.dict[self._get_key(key)]
def __iter__(self):
return iter(self.dict)
def __len__(self):
return len(self.dict)
[docs] class TrackedDict(collections.abc.Mapping):
"""
A dictionary-like object that keeps track of which keys have been accessed.
"""
def __init__(self, dict):
self.dict = dict
self.read = set()
def __getitem__(self, key):
self.read.add(key)
return self.dict[key]
def __iter__(self):
return iter(self.dict)
def __len__(self):
return len(self.dict)
[docs] def unused_keys(self):
"""Returns any keys in the dictionary that have never been read."""
return set(self.dict.keys()) - self.read
[docs]class LayerConverter:
"""
Base class for converter classes, which contain the logic for mapping some Keras
layer type to Nengo objects.
Subclasses must implement the `.LayerConverter.convert` method. They may optionally
extend `.LayerConverter.convertible` if this layer type requires custom logic for
whether or not a layer can be converted to Nengo objects.
Subclasses should override the ``unsupported_args`` class parameter if there are
certain non-default Layer attributes that are not supported by the converter.
This is a list of names for attributes that must have the default value for the
layer to be convertible. The default is assumed to be ``None``, or a tuple of
``("attribute_name", default_value)`` can be specified. If there are parameters
that are supported in inference mode but not in training mode, they should be
added to the ``unsupported_training_args`` parameter.
Subclasses should override the ``has_weights`` class parameter if the layer type
being converted contains internal weights (this affects how the converter will
handle duplicate layers).
Parameters
----------
layer : ``tf.keras.layers.Layer``
The Layer object being converted.
converter : `.Converter`
The parent Converter class running the conversion process.
"""
# maps from TensorFlow activation functions to Nengo neuron types
activation_map = {
None: None,
tf.keras.activations.linear: None,
tf.keras.activations.relu: nengo.RectifiedLinear(),
tf.nn.relu: nengo.RectifiedLinear(),
tf.keras.activations.sigmoid: nengo.Sigmoid(tau_ref=1),
tf.nn.sigmoid: nengo.Sigmoid(tau_ref=1),
}
if version.parse(nengo.__version__) > version.parse("3.0.0"):
activation_map.update(
{
tf.keras.activations.tanh: compat.Tanh(tau_ref=1),
tf.nn.tanh: compat.Tanh(tau_ref=1),
}
)
# attributes of the Keras layer that are not supported for non-default values.
# the default value is assumed to be None, or a tuple of
# ("attr_name", default_value) can be specified
unsupported_args = []
# attributes that are supported in inference_only mode but otherwise not
unsupported_training_args = []
# whether or not this layer contains trainable weights (this indicates whether
# this layer is affected by split_shared_weights)
has_weights = False
def __init__(self, layer, converter):
self.layer = layer
self.converter = converter
[docs] def add_nengo_obj(self, node_id, biases=None, activation=None):
"""
Builds a Nengo object for the given Node of this layer.
Parameters
----------
node_id : int
The index of the Keras Node currently being built on this layer.
biases : `numpy.ndarray` or None
If not None, add trainable biases with the given value.
activation : callable or None
The TensorFlow activation function to be used (``None`` will be
interpreted as linear activation).
Returns
-------
obj : `nengo.Node` or `nengo.ensemble.Neurons` or `nengo_dl.TensorNode`
The Nengo object whose output corresponds to the output of the given Keras
Node.
"""
name = self.layer.name + ".%d" % node_id
# apply manually specified swaps
activation = self.converter.swap_activations.get(activation, activation)
if activation in self.activation_map or isinstance(
activation, nengo.neurons.NeuronType
):
activation = self.activation_map.get(activation, activation)
# apply any nengo->nengo swaps
activation = self.converter.swap_activations.get(activation, activation)
if activation is None:
# linear activation, uses a passthrough Node
obj = nengo.Node(
size_in=np.prod(self.output_shape(node_id)), label=name,
)
else:
# use ensemble to implement the appropriate neuron type
# apply firing rate scaling
if self.converter.scale_firing_rates is None:
scale_firing_rates = None
elif isinstance(self.converter.scale_firing_rates, dict):
# look up specific layer rate
scale_firing_rates = self.converter.scale_firing_rates.get(
self.layer, None
)
else:
# constant scale applied to all layers
scale_firing_rates = self.converter.scale_firing_rates
if scale_firing_rates is None:
gain = 1
else:
gain = scale_firing_rates
if biases is not None:
biases *= scale_firing_rates
if hasattr(activation, "amplitude"):
# copy activation so that we can change amplitude without
# affecting other instances
activation = copy.copy(activation)
# bypass read-only protection
type(activation).amplitude.data[
activation
] /= scale_firing_rates
else:
warnings.warn(
"Firing rate scaling being applied to activation type "
"that does not support amplitude (%s); this will change "
"the output" % type(activation)
)
obj = nengo.Ensemble(
np.prod(self.output_shape(node_id)),
1,
neuron_type=activation,
gain=nengo.dists.Choice([gain]),
bias=nengo.dists.Choice([0]) if biases is None else biases,
label=name,
).neurons
if biases is None:
# ensembles always have biases, so if biases=None we just use
# all-zero biases and mark them as non-trainable
self.set_trainable(obj, False)
elif self.converter.allow_fallback:
warnings.warn(
"Activation type %s does not have a native Nengo equivalent; "
"falling back to a TensorNode" % activation
)
obj = TensorNode(
activation,
shape_in=self.output_shape(node_id),
pass_time=False,
label=name,
)
else:
raise TypeError("Unsupported activation type (%s)" % self.layer.activation)
if biases is not None and isinstance(obj, (nengo.Node, TensorNode)):
# obj doesn't have its own biases, so use a connection from a constant node
# (so that the bias values will be trainable)
bias_node = nengo.Node([1], label="%s.bias" % name)
nengo.Connection(bias_node, obj, transform=biases[:, None], synapse=None)
logger.info("Created %s (size=%d)", obj, obj.size_out)
return obj
[docs] def add_connection(self, node_id, obj, input_idx=0, trainable=False, **kwargs):
"""
Adds a Connection from one of the inputs of the Node being built to the
Nengo object.
Parameters
----------
node_id : int
The index of the Keras Node currently being built on this layer.
obj : ``NengoObject``
The Nengo object implementing this Node.
input_idx : int
Which of the inputs we want to add a Connection for (in the case of
layers that have multiple inputs).
trainable : bool
Whether or not the weights associated with the created Connection
should be trainable.
kwargs : dict
Will be passed on to `nengo.Connection`.
Returns
-------
conn : `nengo.Connection`
The constructed Connection object.
"""
pre = self.get_input_obj(node_id, tensor_idx=input_idx)
conn = nengo.Connection(
pre,
obj,
synapse=self.converter.synapse
if isinstance(pre, nengo.ensemble.Neurons)
else None,
**kwargs,
)
self.set_trainable(conn, trainable)
logger.info(
"Connected %s to %s (trainable=%s)", conn.pre, conn.post, trainable,
)
return conn
def _get_shape(self, input_output, node_id, include_batch=False):
"""
Looks up the input or output shape of this Node.
Parameters
----------
input_output : "input" or "output"
Whether we want the input or output shape.
node_id : int
The node whose shape we want to look up.
include_batch : bool
Whether or not the returned shape should include the batch dimension.
Returns
-------
shape : (list of) tuple of int
A single tuple shape if the node has one input/output, or a list of shapes
if the node as multiple inputs/outputs.
"""
# note: layer.get_input/output_shape_at is generally equivalent to
# layer.input/output_shape, except when the layer is called multiple times
# with different shapes, in which case input/output_shape is not well defined
func = getattr(self.layer, "get_%s_shape_at" % input_output)
# get the shape
shape = func(node_id)
if not include_batch:
if isinstance(shape, list):
# multiple inputs/outputs; trim the batch from each one
shape = [s[1:] for s in shape]
else:
shape = shape[1:]
return shape
[docs] def output_shape(self, node_id, include_batch=False):
"""
Returns the output shape of the given node.
Parameters
----------
node_id : int
The node whose shape we want to look up.
include_batch : bool
Whether or not the returned shape should include the batch dimension.
Returns
-------
shape : (list of) tuple of int
A single tuple shape if the node has one output, or a list of shapes
if the node as multiple outputs.
"""
return self._get_shape("output", node_id, include_batch=include_batch)
[docs] @staticmethod
def set_trainable(obj, trainable):
"""
Set trainable config attribute of object.
Parameters
----------
obj : ``NengoObject``
The object to be assigned.
trainable: bool
Trainability value of ``obj``.
"""
nengo.Network.context[-1].config[obj].trainable = trainable
[docs] @classmethod
def convertible(cls, layer, converter):
"""
Check whether the given Keras layer is convertible to native Nengo objects.
Parameters
----------
layer : ``tf.keras.layers.Layer``
The Keras Layer we want to convert.
converter : `.Converter`
The Converter object running the conversion process.
Returns
-------
convertible : bool
True if the layer can be converted to native Nengo objects, else False.
"""
# check if the layer uses any unsupported arguments
unsupported = cls.unsupported_args
if not converter.inference_only:
unsupported = unsupported + cls.unsupported_training_args
for arg in unsupported:
if isinstance(arg, str):
default = None
else:
arg, default = arg
val = getattr(layer, arg)
if val != default:
msg = "%s.%s has value %s != %s, which is not supported" % (
layer.name,
arg,
val,
default,
)
if arg in cls.unsupported_training_args:
msg += " (unless inference_only=True)"
return False, msg
return True, None
[docs] def convert(self, node_id):
"""
Convert the given node of this layer to Nengo objects
Parameters
----------
node_id : int
The index of the inbound node to be converted.
Returns
-------
output : ``NengoObject``
Nengo object whose output corresponds to the output of the Keras layer node.
"""
raise NotImplementedError("Subclasses must implement convert")
[docs]@Converter.register(
tf.keras.Model
if version.parse(tf.__version__) < version.parse("2.3.0rc0")
else engine.functional.Functional
)
class ConvertModel(LayerConverter):
"""Convert ``tf.keras.Model`` to Nengo objects."""
[docs] def convert(self, node_id):
logger.info("=" * 30)
logger.info("Converting model %s", self.layer.name)
# functional models should already have been built when the model
# was instantiated
assert self.layer.built
# trace the model to find all the nodes that need to be built into
# the Nengo network
ordered_nodes, _ = compat._build_map(self.layer.outputs)
layer_ids = [
(node.layer, node.layer.inbound_nodes.index(node)) for node in ordered_nodes
]
with nengo.Network(
label=self.layer.name + ("" if node_id is None else ".%d" % node_id)
) as net:
# add the "trainable" attribute to all objects
configure_settings(trainable=None)
net.layer_map = collections.defaultdict(dict)
for layer, layer_node_id in layer_ids:
# should never be rebuilding the same node
assert layer_node_id not in net.layer_map[layer]
logger.info("-" * 30)
logger.info("Converting layer %s node %d", layer.name, layer_node_id)
# get the layerconverter object
layer_converter = self.converter.get_converter(layer)
# build the Nengo objects
nengo_layer = layer_converter.convert(layer_node_id)
assert isinstance(
nengo_layer,
(nengo.Node, nengo.ensemble.Neurons, TensorNode, nengo.Network),
)
# add output of layer_converter to layer_map
net.layer_map[layer][layer_node_id] = (
list(nengo_layer.outputs.values())
if isinstance(nengo_layer, nengo.Network)
else [nengo_layer]
)
# data structures to track converted objects
net.inputs = Converter.KerasTensorDict()
for input in self.layer.inputs:
input_layer, input_node_id, input_tensor_id = input._keras_history
net.inputs[input] = net.layer_map[input_layer][input_node_id][
input_tensor_id
]
net.outputs = Converter.KerasTensorDict()
for output in self.layer.outputs:
output_layer, output_node_id, output_tensor_id = output._keras_history
net.outputs[output] = net.layer_map[output_layer][output_node_id][
output_tensor_id
]
net.layers = Converter.KerasTensorDict()
for layer in self.layer.layers:
for layer_node_id, node_outputs in net.layer_map[layer].items():
for nengo_obj in node_outputs:
output_tensors = layer.inbound_nodes[
layer_node_id
].output_tensors
net.layers[output_tensors] = nengo_obj
if node_id is not None:
# add incoming connections in parent network
for i, inp in enumerate(net.inputs.values()):
self.add_connection(node_id, inp, input_idx=i)
logger.info("=" * 30)
return net
[docs]@Converter.register(tf.keras.Sequential)
class ConvertSequential(ConvertModel):
"""Convert ``tf.keras.Sequential`` to Nengo objects."""
def __init__(self, seq_model, converter):
# convert sequential model to functional model
warnings.warn(
"Converting sequential model to functional model; "
"use `Converter.model` to refer to the functional model (rather than the "
"original sequential model) when working with the output of the Converter"
)
input_shape = seq_model.layers[0].input_shape
inp = x = tf.keras.Input(batch_shape=input_shape)
for layer in seq_model.layers:
x = layer(x)
func_model = tf.keras.Model(inp, x)
super().__init__(func_model, converter)
[docs]@Converter.register(None)
class ConvertFallback(LayerConverter):
"""
Convert layers which do not have a native Nengo equivalent into a
`.TensorNode`.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# copy layer, so that any changes to the layer (e.g., rebuilding it), do not
# affect the source model
layer = self.layer.__class__.from_config(self.layer.get_config())
if self.layer.built:
layer.build(self.input_shape(0, include_batch=True))
layer.set_weights(self.layer.get_weights())
self.tensor_layer = Layer(layer)
[docs] def convert(self, node_id):
logger.info("Using TensorNode %s", self.tensor_layer)
input_obj = self.get_input_obj(node_id)
output = self.tensor_layer(
input_obj, shape_in=self.input_shape(node_id), label=self.layer.name
)
logger.info("Applying to %s, created %s", input_obj, output)
return output
[docs]class ConvertAvgPool(LayerConverter):
"""Base class for converting average pooling layers to Nengo objects."""
# "same" padding not supported because TensorFlow average pooling does not count
# the padded cells in the average, which we don't have a way to do using Convolution
unsupported_args = [("padding", "valid")]
[docs] def convert(self, node_id, dimensions):
output = self.add_nengo_obj(node_id)
def to_tuple(val):
if isinstance(val, int):
return (val,) * dimensions
return val
spatial_shape = (
self.input_shape(node_id)[:-1]
if self.layer.data_format == "channels_last"
else self.input_shape(node_id)[1:]
)
# the default values here are for GlobalAveragePooling (which doesn't have
# these attributes)
pool_size = to_tuple(getattr(self.layer, "pool_size", spatial_shape))
padding = getattr(self.layer, "padding", "valid")
strides = to_tuple(getattr(self.layer, "strides", 1))
# the idea here is that we set up a convolutional transform with weights 1/n,
# which will have the effect of implementing average pooling
n_filters = (
self.output_shape(node_id)[-1]
if self.layer.data_format == "channels_last"
else self.output_shape(node_id)[0]
)
n_pool = np.prod(pool_size)
kernel = np.reshape(
[np.eye(n_filters) / n_pool] * n_pool, pool_size + (n_filters, n_filters),
)
pool_conv = nengo.Convolution(
n_filters=n_filters,
input_shape=self.input_shape(node_id),
padding=padding,
strides=strides,
kernel_size=pool_size,
init=kernel,
channels_last=self.layer.data_format == "channels_last",
)
self.add_connection(node_id, output, transform=pool_conv)
return output
[docs] @classmethod
def convertible(cls, layer, converter):
if (
isinstance(
layer,
(
tf.keras.layers.MaxPool1D,
tf.keras.layers.MaxPool2D,
tf.keras.layers.MaxPool3D,
tf.keras.layers.GlobalMaxPool1D,
tf.keras.layers.GlobalMaxPool2D,
tf.keras.layers.GlobalMaxPool3D,
),
)
and not converter.max_to_avg_pool
):
msg = (
"Cannot convert max pooling layers to native Nengo objects; consider "
"setting max_to_avg_pool=True to use average pooling instead"
)
return False, msg
unsupported = cls.unsupported_args
if not hasattr(layer, "padding"):
# global layers don't have this attribute, so we temporarily remove it
# from the unsupported args
cls.unsupported_args = []
try:
convertible = super().convertible(layer, converter)
finally:
# reset the unsupported args
cls.unsupported_args = unsupported
return convertible
[docs]@Converter.register(tf.keras.layers.Activation)
class ConvertActivation(LayerConverter):
"""Convert ``tf.keras.layers.Activation`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id, activation=self.layer.activation)
self.add_connection(node_id, output)
return output
[docs]@Converter.register(tf.keras.layers.Add)
class ConvertAdd(LayerConverter):
"""Convert ``tf.keras.layers.Add`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id)
for i in range(len(self.layer.input)):
self.add_connection(node_id, output, input_idx=i)
return output
[docs]@Converter.register(tf.keras.layers.Average)
class ConvertAverage(LayerConverter):
"""Convert ``tf.keras.layers.Average`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id)
for i in range(len(self.layer.input)):
self.add_connection(
node_id, output, input_idx=i, transform=1 / len(self.layer.input)
)
return output
[docs]@Converter.register(tf.keras.layers.AvgPool1D)
@Converter.register(tf.keras.layers.MaxPool1D)
@Converter.register(tf.keras.layers.GlobalAvgPool1D)
@Converter.register(tf.keras.layers.GlobalMaxPool1D)
class ConvertAvgPool1D(ConvertAvgPool):
"""
Convert ``tf.keras.layers.AvgPool1D`` to Nengo objects.
Also works for ``tf.keras.layers.GlobalAvgPool1D``, and
``tf.keras.layers.MaxPool1D``/``GlobalMaxPool1D`` (if ``max_to_avg_pool=True``).
"""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=1)
[docs]@Converter.register(tf.keras.layers.AvgPool2D)
@Converter.register(tf.keras.layers.MaxPool2D)
@Converter.register(tf.keras.layers.GlobalAvgPool2D)
@Converter.register(tf.keras.layers.GlobalMaxPool2D)
class ConvertAvgPool2D(ConvertAvgPool):
"""
Convert ``tf.keras.layers.AvgPool2D`` to Nengo objects.
Also works for ``tf.keras.layers.GlobalAvgPool2D``, and
``tf.keras.layers.MaxPool2D``/``GlobalMaxPool2D`` (if ``max_to_avg_pool=True``).
"""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=2)
[docs]@Converter.register(tf.keras.layers.AvgPool3D)
@Converter.register(tf.keras.layers.MaxPool3D)
@Converter.register(tf.keras.layers.GlobalAvgPool3D)
@Converter.register(tf.keras.layers.GlobalMaxPool3D)
class ConvertAvgPool3D(ConvertAvgPool):
"""
Convert ``tf.keras.layers.AvgPool3D`` to Nengo objects.
Also works for ``tf.keras.layers.GlobalAvgPool3D``, and
``tf.keras.layers.MaxPool3D``/``GlobalMaxPool3D`` (if ``max_to_avg_pool=True``).
"""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=3)
[docs]@Converter.register(BatchNormalizationV1)
@Converter.register(BatchNormalizationV2)
class ConvertBatchNormalization(LayerConverter):
"""Convert ``tf.keras.layers.BatchNormalization`` to Nengo objects."""
[docs] def convert(self, node_id):
# look up the batch normalization parameters
if self.layer.scale:
gamma = tf.keras.backend.get_value(self.layer.gamma)
else:
gamma = 1
if self.layer.center:
beta = tf.keras.backend.get_value(self.layer.beta)
else:
beta = 0
mean, variance = tf.keras.backend.batch_get_value(
(self.layer.moving_mean, self.layer.moving_variance)
)
# compute the fixed affine transform values for this layer
variance += self.layer.epsilon
stddev = np.sqrt(variance)
scale = gamma / stddev
bias = beta - scale * mean
# build output object
output = self.add_nengo_obj(node_id)
# the batch normalization parameters will be n-dimensional, where n is the
# length of the axis specified in the batch normalization layer. so we need
# to set up a connection structure so that all the elements of the output
# corresponding to one of those axis elements will share the same parameter
assert len(self.layer.axis) == 1
assert self.layer.axis[0] > 0
axis = self.layer.axis[0] - 1 # not counting batch dimension
idxs = np.arange(output.size_in).reshape(self.output_shape(node_id))
slices = [slice(None) for _ in range(len(idxs.shape))]
# broadcast scale/bias along the non-axis dimensions, so that we can apply the
# same scale/bias to all those elements
broadcast_scale = np.zeros(self.output_shape(node_id))
broadcast_bias = np.zeros(self.output_shape(node_id))
for i in range(idxs.shape[axis]):
slices[axis] = i
broadcast_scale[tuple(slices)] = scale[i]
broadcast_bias[tuple(slices)] = bias[i]
broadcast_scale = np.ravel(broadcast_scale)
broadcast_bias = np.ravel(broadcast_bias)
# connect up bias node to output
bias_node = nengo.Node(
broadcast_bias, label="%s.%d.bias" % (self.layer.name, node_id)
)
conn = nengo.Connection(bias_node, output, synapse=None)
self.set_trainable(conn, False)
# connect input to output, scaled by the batch normalization scale
self.add_connection(node_id, output, transform=broadcast_scale)
return output
[docs] @classmethod
def convertible(cls, layer, converter):
if not converter.inference_only:
msg = (
"Cannot convert BatchNormalization layer to native Nengo objects "
"unless inference_only=True"
)
return False, msg
return super().convertible(layer, converter)
[docs]@Converter.register(tf.keras.layers.Concatenate)
class ConvertConcatenate(LayerConverter):
"""Convert ``tf.keras.layers.Concatenate`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id)
# axis-1 because not counting batch dimension
axis = self.layer.axis - 1 if self.layer.axis > 0 else self.layer.axis
idxs = np.arange(np.prod(self.output_shape(node_id))).reshape(
self.output_shape(node_id)
)
slices = [slice(None) for _ in range(idxs.ndim)]
offsets = np.cumsum([shape[axis] for shape in self.input_shape(node_id)])
offsets = np.concatenate(([0], offsets))
for i in range(len(self.layer.input)):
slices[axis] = slice(offsets[i], offsets[i + 1])
self.add_connection(
node_id, output[np.ravel(idxs[tuple(slices)])], input_idx=i
)
return output
[docs] @classmethod
def convertible(cls, layer, converter):
if layer.axis == 0:
msg = "Cannot concatenate along batch dimension (axis 0)"
return False, msg
return super().convertible(layer, converter)
[docs]class ConvertConv(LayerConverter):
"""Base class for converting convolutional layers to Nengo objects."""
has_weights = True
[docs] def convert(self, node_id, dimensions):
# look up parameter values from source layer
if self.layer.use_bias:
kernel, biases = tf.keras.backend.batch_get_value(
(self.layer.kernel, self.layer.bias)
)
else:
kernel = tf.keras.backend.get_value(self.layer.kernel)
biases = None
# create nengo object to implement activation function
output = self.add_nengo_obj(node_id, activation=self.layer.activation)
if self.layer.use_bias:
# conv layer biases are per-output-channel, rather than per-output-element,
# so we need to set up a nengo connection structure that will have one
# bias parameter shared across all the spatial dimensions
# add trainable bias weights
bias_node = nengo.Node([1], label="%s.%d.bias" % (self.layer.name, node_id))
bias_relay = nengo.Node(
size_in=len(biases),
label="%s.%d.bias_relay" % (self.layer.name, node_id),
)
nengo.Connection(
bias_node, bias_relay, transform=biases[:, None], synapse=None
)
# use a non-trainable sparse transform to broadcast biases along all
# non-channel dimensions
broadcast_indices = []
idxs = np.arange(np.prod(self.output_shape(node_id))).reshape(
self.output_shape(node_id)
)
slices = [slice(None) for _ in range(len(self.output_shape(node_id)))]
n_spatial = np.prod(
self.output_shape(node_id)[:-1]
if self.layer.data_format == "channels_last"
else self.output_shape(node_id)[1:]
)
axis = -1 if self.layer.data_format == "channels_last" else 0
for i in range(self.output_shape(node_id)[axis]):
slices[axis] = i
broadcast_indices.extend(
tuple(zip(np.ravel(idxs[tuple(slices)]), [i] * n_spatial))
)
conn = nengo.Connection(
bias_relay,
output,
transform=nengo.Sparse(
(output.size_in, bias_relay.size_out), indices=broadcast_indices
),
synapse=None,
)
self.set_trainable(conn, False)
# set up a convolutional transform that matches the layer parameters
transform = nengo.Convolution(
n_filters=self.layer.filters,
input_shape=self.input_shape(node_id),
kernel_size=self.layer.kernel_size,
strides=self.layer.strides,
padding=self.layer.padding,
channels_last=self.layer.data_format == "channels_last",
init=kernel,
)
self.add_connection(node_id, output, transform=transform, trainable=True)
return output
[docs]@Converter.register(tf.keras.layers.Conv1D,)
class ConvertConv1D(ConvertConv):
"""Convert ``tf.keras.layers.Conv1D`` to Nengo objects."""
unsupported_args = [
("dilation_rate", (1,)),
]
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=1)
[docs]@Converter.register(tf.keras.layers.Conv2D,)
class ConvertConv2D(ConvertConv):
"""Convert ``tf.keras.layers.Conv2D`` to Nengo objects."""
unsupported_args = [
("dilation_rate", (1, 1)),
]
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=2)
[docs]@Converter.register(tf.keras.layers.Conv3D,)
class ConvertConv3D(ConvertConv):
"""Convert ``tf.keras.layers.Conv3D`` to Nengo objects."""
unsupported_args = [
("dilation_rate", (1, 1, 1)),
]
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=3)
[docs]@Converter.register(tf.keras.layers.Dense)
class ConvertDense(LayerConverter):
"""Convert ``tf.keras.layers.Dense`` to Nengo objects."""
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
has_weights = True
[docs] def convert(self, node_id):
# look up parameter values from source layer
if self.layer.use_bias:
weights, biases = tf.keras.backend.batch_get_value(
(self.layer.kernel, self.layer.bias)
)
else:
weights = tf.keras.backend.get_value(self.layer.kernel)
biases = None
# create nengo object to implement activation function and biases
output = self.add_nengo_obj(
node_id, activation=self.layer.activation, biases=biases
)
# add connection to implement the dense weights
self.add_connection(node_id, output, transform=weights.T, trainable=True)
return output
[docs]@Converter.register(tf.keras.layers.Flatten)
class ConvertFlatten(LayerConverter):
"""Convert ``tf.keras.layers.Flatten`` to Nengo objects."""
[docs] def convert(self, node_id):
# noop, same as reshape
return self.get_input_obj(node_id)
[docs]@Converter.register(tf.keras.layers.ReLU)
class ConvertReLU(LayerConverter):
"""Convert ``tf.keras.layers.ReLU`` to Nengo objects."""
unsupported_args = ["max_value", ("threshold", 0)]
[docs] def convert(self, node_id):
if self.layer.negative_slope == 0:
activation = tf.nn.relu
else:
activation = LeakyReLU(negative_slope=self.layer.negative_slope)
output = self.add_nengo_obj(node_id, biases=None, activation=activation)
self.add_connection(node_id, output)
return output
[docs]@Converter.register(tf.keras.layers.LeakyReLU)
class ConvertLeakyReLU(LayerConverter):
"""Convert ``tf.keras.layers.LeakyReLU`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(
node_id, biases=None, activation=LeakyReLU(negative_slope=self.layer.alpha)
)
self.add_connection(node_id, output)
return output
[docs]@Converter.register(tf.keras.layers.Reshape)
class ConvertReshape(LayerConverter):
"""Convert ``tf.keras.layers.Reshape`` to Nengo objects."""
[docs] def convert(self, node_id):
# nengo doesn't pass shape information between objects (everything is just a
# vector), so we don't actually need to do anything here, we just return
# the input layer. layers that require shape information can look it up from
# the input_shape attribute of their layer
return self.get_input_obj(node_id)
[docs]class ConvertUpSampling(LayerConverter):
"""Base class for converting upsampling layers to Nengo objects."""
[docs] def convert(self, node_id, dimensions):
output = self.add_nengo_obj(node_id)
channels_last = (
getattr(self.layer, "data_format", "channels_last") == "channels_last"
)
reps = np.atleast_1d(self.layer.size)
input_shape = self.input_shape(node_id)
input_shape = input_shape[:-1] if channels_last else input_shape[1:]
output_shape = self.output_shape(node_id)
output_shape = output_shape[:-1] if channels_last else output_shape[1:]
# figure out nearest neighbour (in the source array) for each point in the
# upsampled array
input_pts = np.stack(
np.meshgrid(
*[
np.arange(0, x * reps[i], reps[i], dtype=np.float32)
for i, x in enumerate(input_shape)
],
indexing="ij",
),
axis=-1,
)
input_pts += (reps - 1) / 2 # shift to centre of upsampled block
input_pts = np.reshape(input_pts, (-1, dimensions))
upsampled_pts = np.stack(
np.meshgrid(*[np.arange(x) for x in output_shape], indexing="ij",), axis=-1,
)
upsampled_pts = np.reshape(upsampled_pts, (-1, dimensions))
dists = np.linalg.norm(
input_pts[..., None] - upsampled_pts.T[None, ...], axis=1
)
nearest = np.argmin(dists, axis=0)
# duplicate along channel axis
idxs = np.arange(np.prod(self.input_shape(node_id))).reshape(
self.input_shape(node_id)
)
if channels_last:
idxs = np.reshape(idxs, (-1, idxs.shape[-1]))
idxs = idxs[nearest]
else:
idxs = np.reshape(idxs, (idxs.shape[0], -1))
idxs = idxs[:, nearest]
# connect from pre, using idxs to perform upsampling
pre = self.get_input_obj(node_id, tensor_idx=0)[np.ravel(idxs)]
conn = nengo.Connection(
pre,
output,
synapse=self.converter.synapse
if isinstance(pre, nengo.ensemble.Neurons)
else None,
)
self.set_trainable(conn, False)
return output
[docs]@Converter.register(tf.keras.layers.UpSampling1D)
class ConvertUpSampling1D(ConvertUpSampling):
"""Convert ``tf.keras.layers.UpSampling1D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=1)
[docs]@Converter.register(tf.keras.layers.UpSampling2D)
class ConvertUpSampling2D(ConvertUpSampling):
"""Convert ``tf.keras.layers.UpSampling2D`` to Nengo objects."""
unsupported_args = [("interpolation", "nearest")]
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=2)
[docs]@Converter.register(tf.keras.layers.UpSampling3D)
class ConvertUpSampling3D(ConvertUpSampling):
"""Convert ``tf.keras.layers.UpSampling3D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=3)
[docs]class ConvertZeroPadding(LayerConverter):
"""Base class for converting zero-padding layers to Nengo objects."""
[docs] def convert(self, node_id, dimensions):
output = self.add_nengo_obj(node_id)
# zeropadding1d doesn't doesn't have data_format, assumes channels_last
channels_first = (
getattr(self.layer, "data_format", "channels_last") == "channels_first"
)
# the strategy here is that we'll create a nengo node of the full padded size,
# and then connect up the input to the subset of those node elements
# corresponding to the inner, non-padded elements. so we need to figure out
# what the indices are that we need to connect to.
# build slices representing the non-padded elements within the output shape
slices = []
if channels_first:
slices.append(slice(None))
for i in range(dimensions):
if dimensions == 1:
top_pad, bottom_pad = self.layer.padding
else:
top_pad, bottom_pad = self.layer.padding[i]
length = self.output_shape(node_id)[i + channels_first]
slices.append(slice(top_pad, length - bottom_pad))
# apply slices to index array to get the list of indices we want to connect to
idxs = np.arange(output.size_in).reshape(self.output_shape(node_id))
idxs = np.ravel(idxs[tuple(slices)])
# connect up the input to the appropriate indices
self.add_connection(node_id, output[idxs])
return output
[docs]@Converter.register(tf.keras.layers.ZeroPadding1D)
class ConvertZeroPadding1D(ConvertZeroPadding):
"""Convert ``tf.keras.layers.ZeroPadding1D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=1)
[docs]@Converter.register(tf.keras.layers.ZeroPadding2D)
class ConvertZeroPadding2D(ConvertZeroPadding):
"""Convert ``tf.keras.layers.ZeroPadding2D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=2)
[docs]@Converter.register(tf.keras.layers.ZeroPadding3D)
class ConvertZeroPadding3D(ConvertZeroPadding):
"""Convert ``tf.keras.layers.ZeroPadding3D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=3)